下載postgres的插件:https://debezium.io/documentation/reference/2.7/install.html
2.7版本支持postgresql12數據庫。
debezium-connector-postgres-2.7.4.Final-plugin.tar.gz
上傳插件并解壓
mkdir /usr/local/kafka/kafka_2.12-2.2.1/connector
cd /usr/local/kafka/kafka_2.12-2.2.1/connector
tar -zxvf debezium-connector-postgres-2.7.4.Final-plugin.tar.gz
進入kafka配置目錄配置連接器(單機kafka)
cd /usr/local/kafka/kafka_2.12-2.2.1/config
#修改配置
vi connect-standalone.properties
#配置kafka地址
bootstrap.servers=192.168.159.100:9092#采集的數據存儲到kafka的數據格式
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter#是否存儲schemas,存儲則格式會復雜些
key.converter.schemas.enable=false
value.converter.schemas.enable=false#插件的路徑
plugin.path=/usr/local/kafka/kafka_2.12-2.2.1/connector
使用命令啟動 Kafka Connect 服務(單機kafka)
/usr/local/kafka/kafka_2.12-2.2.1/bin/connect-standalone.sh -daemon /usr/local/kafka/kafka_2.12-2.2.1/config/connect-standalone.properties /usr/local/kafka/kafka_2.12-2.2.1/config/connect-file-source.properties /usr/local/kafka/kafka_2.12-2.2.1/config/connect-file-sink.properties
配置Debezium Connector:
pgsql修改配置文件:postgresql.conf
listen_addresses = '*'
#開啟邏輯復制
wal_level = logical
# 允許多少個流復制協議連接過來,一個流復制協議會產生一個walsender進程,該數不能低于master的配置數量
max_wal_senders = 2
max_replication_slots = 1
修改pg_hba.conf:
#添加
host all all 0.0.0.0/0 trust
host replication all 0.0.0.0.0 trust
注冊連接器:
{"name": "my-postgresql-connector","config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector","database.hostname": "192.168.159.103","database.port": "15432","database.dbname": "db_test","database.user": "postgres","database.password": "123456","plugin.name": "pgoutput","database.server.name": "server5","topic.prefix": "linging"}
}
執行:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 192.168.159.100:8083/connectors/ -d '{"name":"my-postgresql-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.hostname":"192.168.159.103","database.port":"15432","database.dbname":"db_test","database.user":"postgres","database.password":"123456","plugin.name":"pgoutput","database.server.name":"server5","topic.prefix":"linging"}}'
修改數據庫表,查看kafka消息: