- 測試結論
源端增量獲取方式包括:bulk、incrementing、timestamp、incrementing+timestamp(混合),各種方式說明如下:
bulk: 一次同步整個表的數據
incrementing: 使用嚴格的自增列標識增量數據。不支持對舊數據的更新和刪除
timestamp: 使用時間戳標識增量數據,每次更新數據都要修改時間戳,時間戳嚴格遞增
timestamp+incrementing: 使用兩個列,一個為自增列,一個為時間戳列。綜合incrementing和timestamp的功能
- 環境說明
本文在kafka的standalone模式下,適配kafka jdbc connector從源端mysql數據庫實時同步數據到kadb中。驗證1. 增量數據獲取及增量數據獲取方式
- kadb版本:V8R3
- mysql版本:5.7
- 操作系統:centos 7.6
- jdbc connector版本:10.8.3。下載地址:JDBC Connector (Source and Sink) | Confluent Hub: Apache Kafka Connectors for Streaming Data.
- mysql驅動:mysql-connector-java-5.1.39-bin.jar
- kadb驅動:postgresql-42.7.4.jar
- java版本:17.0.12 (kafka要求必須為17或者18版本,否則kafka安裝報錯)
- kafka版本:kafka_2.13-4.0.0
- kafka jdbc connector參考資料:
JDBC Source and Sink Connector for Confluent Platform | Confluent Documentation
- kafka connector參考資料
https://kafka.apache.org/documentation/
- 環境部署
- kafka部署
解壓
tar -xzf kafka_2.13-4.0.0.tgz
cd kafka_2.13-4.0.0
產生集群UUID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
格式化日志目錄
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
啟動kafka
bin/kafka-server-start.sh config/server.properties
- jdbc connector部署
下載jdbc connector,將解壓的內容保存到kafka解壓目錄的plugins下(plugins目錄需自己創建內容如下:
[root@nanri plugins]# ls -l
total 8
drwxr-xr-x. 2 root root?? 43 Apr 17 21:50 assets
drwxr-xr-x. 3 root root? 108 Apr 17 21:50 doc
drwxr-xr-x. 2 root root?? 90 Apr 17 21:50 etc
drwxr-xr-x. 2 root root 4096 Apr 17 21:50 lib
-rw-r--r--. 1 root root 2687 Apr 17 21:50 manifest.json
[root@nanri plugins]# pwd
/root/kafka_2.13-4.0.0/plugins
- 源端/目標端jdbc驅動
將源端mysql的jdbc驅動文件和目標端kadb驅動文件拷貝至kafka的解壓目錄的libs目錄下:
[root@nanri libs]# ls -l mysql* postgres*
-rw-r--r--. 1 root root? 989497 Apr 17 23:15 mysql-connector-java-5.1.39-bin.jar
-rw-r--r--. 1 root root 1086687 Apr 17 23:14 postgresql-42.7.4.jar
[root@nanri libs]# pwd
/root/kafka_2.13-4.0.0/libs
- 配置文件修改
- 連接器配置文件:connect-standalone.properties
添加插件路徑參數:(絕對路徑)
plugin.path=/root/kafka_2.13-4.0.0/plugins,/root/kafka_2.13-4.0.0/libs/connect-file-4.0.0.jar
- 源端配置文件:connect-mysql-source.properties文件內容,參數意義參考:
https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html
#productor名字
name=connect-mysql-source????????????????
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector??? //固定值,使用jdbc connector的類
# topic名稱列表,源端和目標端的topic必須一致
topics=test
# 配置jdbc連接
connection.url=jdbc:mysql://192.168.85.145:3306/test_source?useUnicode=true&characterEncoding=utf8&user=root&password=Kingbase@1234&serverTimezone=Asia/Shanghai&useSSL=false
#增量獲取方式,支持bulk,incrementing,timestamp等等
mode=incrementing
- 目標端配置文件:connect-kadb-sink.properties文件內容,參數意義參考:
https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html
#consumer名字
name=connect-kadb-sink
# 為當前connector創建的最大線程數
tasks.max=1
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector //固定值,必須設置
# topic名稱列表
topics=test
# 配置jdbc連接
connection.url=jdbc:postgresql://192.168.85.145:5432/test_sink
connection.user=mppadmin
# 自動創建表
auto.create=true
# 寫入模式
insert.mode=insert
- 啟動connect
bin/connect-standalone.sh
config/connect-standalone.properties ?????????????? //connect配置參數
config/connect-mysql-source.properties ?? //源端配置參數
config/connect-kadb-sink.properties??????????? //目標端參數
- 測試
- mysql源端創建表,目標端會自動創建對應的表
mysql> desc test
??? -> ;
+-------+-------------+------+-----+---------+----------------+
| Field | Type??????? | Null | Key | Default | Extra????????? |
+-------+-------------+------+-----+---------+----------------+
| a???? | int(11)???? | NO?? | PRI | NULL??? | auto_increment |??? //使用increment ing方式,必須是自增列
| b???? | varchar(10) | YES? |???? | NULL??? |??????????????? |
+-------+-------------+------+-----+---------+----------------+
2 rows in set (0.00 sec)
- 源端插入數據
mysql> insert into test(b) values('dddd');
Query OK, 1 row affected (0.00 sec)
- connect日志:
[2025-04-18 22:39:27,665] INFO [connect-kadb-sink|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)
[2025-04-18 22:39:27,680] INFO [connect-kadb-sink|task-0] Completed write operation for 1 records to the database (io.confluent.connect.jdbc.sink.JdbcDbWriter:100)
[2025-04-18 22:39:27,680] INFO [connect-kadb-sink|task-0] Successfully wrote 1 records. (io.confluent.connect.jdbc.sink.JdbcSinkTask:91)
[2025-04-18 22:39:32,637] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)
[2025-04-18 22:39:32,641] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)
[2025-04-18 22:39:34,208] INFO [connect-mysql-source|task-0|offsets] WorkerSourceTask{id=connect-mysql-source-0} Committing offsets for 0 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:236)
[2025-04-18 22:39:37,642] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)
[2025-04-18 22:39:37,644] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)
[2025-04-18 22:39:42,645] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)
[2025-04-18 22:39:42,648] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)
[2025-04-18 22:39:44,210] INFO [connect-mysql-source|task-0|offsets] WorkerSourceTask{id=connect-mysql-source-0} Committing offsets for 1 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:236)
[2025-04-18 22:39:47,649] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)
[2025-04-18 22:39:47,650] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)
[2025-04-18 22:39:52,653] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)
[2025-04-18 22:39:52,657] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)
[2025-04-18 22:39:54,192] INFO Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)
- 使用kafka-console-consumer.sh查看topic中的事件
bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"a"},{"type":"string","optional":true,"field":"b"}],"optional":false,"name":"test"},"payload":{"a":5,"b":"dddd"}}
- 目標端數據
1 | aaa
?2 | bbb
?3 | ccc
?4 | ddd
?5 | dddd
(844 rows)
test_sink=#
- 源端數據
mysql> select * from test;
+---+------+
| a | b??? |
+---+------+
| 5 | dddd |
+---+------+
1 row in set (0.00 sec)
- 命令參考
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic sys_config
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic sys_config
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic sys_config --from-beginning
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 –list
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group connect-local-file-sink –state
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic __consumer_offsets
bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092