TDengine Kafka Connector 包含 TDengine Source Connector 和 TDengine Sink Connector 兩個插件。用戶只需提供簡單的配置文件,就可以將 Kafka 中指定 topic 的數據(批量或實時)同步到 TDengine,或將 TDengine 中指定數據庫的數據(批量或實時)同步到 Kafka。
什么是 Kafka Connect?
Kafka Connect 是 Apache Kafka 的一個組件,用于使其它系統,比如數據庫、云服務、文件系統等能方便地連接到 Kafka。數據既可以通過 Kafka Connect 從其它系統流向 Kafka, 也可以通過 Kafka Connect 從 Kafka 流向其它系統。從其它系統讀數據的插件稱為 Source Connector, 寫數據到其它系統的插件稱為 Sink Connector。Source Connector 和 Sink Connector 都不會直接連接 Kafka Broker,Source Connector 把數據轉交給 Kafka Connect。Sink Connector 從 Kafka Connect 接收數據。
TDengine Source Connector 用于把數據實時地從 TDengine 讀出來發送給 Kafka Connect。TDengine Sink Connector 用于 從 Kafka Connect 接收數據并寫入 TDengine。
前置條件
運行本教程中示例的前提條件。
- Linux 操作系統
- 已安裝 Java 8 和 Maven
- 已安裝 Git、curl、vi
- 已安裝并啟動 TDengine。如果還沒有可參考 安裝和卸載
安裝 Kafka
-
在任意目錄下執行:
curl -O https://dlcdn.apache.org/kafka/4.0.0/kafka_2.13-4.0.0.tgz tar xzf kafka_2.13-3.4.0.tgz -C /opt/ ln -s /opt/kafka_2.13-3.4.0 /opt/kafka
-
然后需要把
$KAFKA_HOME/bin
目錄加入 PATH。export KAFKA_HOME=/opt/kafka export PATH=$PATH:$KAFKA_HOME/bin
以上腳本可以追加到當前用戶的 profile 文件(~/.profile 或 ~/.bash_profile)
安裝 TDengine Connector 插件
編譯插件
git clone --branch 3.0 https://github.com/taosdata/kafka-connect-tdengine.git
cd kafka-connect-tdengine
mvn clean package -Dmaven.test.skip=true
unzip -d $KAFKA_HOME/components/ target/components/packages/taosdata-kafka-connect-tdengine-*.zip
以上腳本先 clone 項目源碼,然后用 Maven 編譯打包。打包完成后在 target/components/packages/
目錄生成了插件的 zip 包。把這個 zip 包解壓到安裝插件的路徑即可。上面的示例中使用了內置的插件安裝路徑: $KAFKA_HOME/components/
。
配置插件
將 kafka-connect-tdengine 插件加入 $KAFKA_HOME/config/connect-distributed.properties
配置文件 plugin.path 中
plugin.path=/usr/share/java,/opt/kafka/components
啟動 Kafka
zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.propertieskafka-server-start.sh -daemon $KAFKA_HOME/config/server.propertiesconnect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties
驗證 kafka Connect 是否啟動成功
輸入命令:
curl http://localhost:8083/connectors
如果各組件都啟動成功,會得到如下輸出:
[]
TDengine Sink Connector 的使用
TDengine Sink Connector 的作用是同步指定 topic 的數據到 TDengine。用戶無需提前創建數據庫和超級表。可手動指定目標數據庫的名字(見配置參數 connection.database),也可按一定規則生成 (見配置參數 connection.database.prefix)。
TDengine Sink Connector 內部使用 TDengine 無模式寫入接口 寫數據到 TDengine,目前支持三種格式的數據:InfluxDB 行協議格式,OpenTSDB Telnet 協議格式,和 OpenTSDB JSON 協議格式。
下面的示例將主題 meters 的數據,同步到目標數據庫 power。數據格式為 InfluxDB Line 協議格式。
添加 Sink Connector 配置文件
mkdir ~/test
cd ~/test
vi sink-demo.json
sink-demo.json 內容如下:
{"name": "TDengineSinkConnector","config": {"connector.class":"com.taosdata.kafka.connect.sink.TDengineSinkConnector","tasks.max": "1","topics": "meters","connection.url": "jdbc:TAOS://127.0.0.1:6030","connection.user": "root","connection.password": "taosdata","connection.database": "power","db.schemaless": "line","data.precision": "ns","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter","errors.tolerance": "all","errors.deadletterqueue.topic.name": "dead_letter_topic","errors.deadletterqueue.topic.replication.factor": 1}
}
關鍵配置說明:
"topics": "meters"
和"connection.database": "power"
, 表示訂閱主題 meters 的數據,并寫入數據庫 power。"db.schemaless": "line"
, 表示使用 InfluxDB Line 協議格式的數據。
創建 Sink Connector 實例
curl -X POST -d @sink-demo.json http://localhost:8083/connectors -H "Content-Type: application/json"
若以上命令執行成功,則有如下輸出:
{"name": "TDengineSinkConnector","config": {"connection.database": "power","connection.password": "taosdata","connection.url": "jdbc:TAOS://127.0.0.1:6030","connection.user": "root","connector.class": "com.taosdata.kafka.connect.sink.TDengineSinkConnector","data.precision": "ns","db.schemaless": "line","key.converter": "org.apache.kafka.connect.storage.StringConverter","tasks.max": "1","topics": "meters","value.converter": "org.apache.kafka.connect.storage.StringConverter","name": "TDengineSinkConnector","errors.tolerance": "all","errors.deadletterqueue.topic.name": "dead_letter_topic","errors.deadletterqueue.topic.replication.factor": "1", },"tasks": [],"type": "sink"
}
寫入測試數據
準備測試數據的文本文件,內容如下:
meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249000000
meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611250000000
meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249000000
meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250000000
使用 kafka-console-producer 向主題 meters 添加測試數據。
cat test-data.txt | kafka-console-producer.sh --broker-list localhost:9092 --topic meters
:::note
如果目標數據庫 power 不存在,那么 TDengine Sink Connector 會自動創建數據庫。自動創建數據庫使用的時間精度為納秒,這就要求寫入數據的時間戳精度也是納秒。如果寫入數據的時間戳精度不是納秒,將會拋異常。
:::
驗證同步是否成功
使用 TDengine CLI 驗證同步是否成功。
taos> use power;
Database changed.taos> select * from meters;_ts | current | voltage | phase | groupid | location |
===============================================================================================================================================================2022-03-28 09:56:51.249000000 | 11.800000000 | 221.000000000 | 0.280000000 | 2 | California.LosAngeles |2022-03-28 09:56:51.250000000 | 13.400000000 | 223.000000000 | 0.290000000 | 2 | California.LosAngeles |2022-03-28 09:56:51.249000000 | 10.800000000 | 223.000000000 | 0.290000000 | 3 | California.LosAngeles |2022-03-28 09:56:51.250000000 | 11.300000000 | 221.000000000 | 0.350000000 | 3 | California.LosAngeles |
Query OK, 4 row(s) in set (0.004208s)
若看到了以上數據,則說明同步成功。若沒有,請檢查 Kafka Connect 的日志。配置參數的詳細說明見 “配置參考” 一節。
TDengine Source Connector 的使用
TDengine Source Connector 的作用是將 TDengine 某個數據庫某一時刻之后的數據全部推送到 Kafka。TDengine Source Connector 的實現原理是,先分批拉取歷史數據,再用定時查詢的策略同步增量數據。同時會監控表的變化,可以自動同步新增的表。如果重啟 Kafka Connect, 會從上次中斷的位置繼續同步。
TDengine Source Connector 會將 TDengine 數據表中的數據轉換成 InfluxDB Line 協議格式 或 OpenTSDB JSON 協議格式然后寫入 Kafka。
下面的示例程序同步數據庫 test 中的數據到主題 tdengine-test-meters。
添加 Source Connector 配置文件
vi source-demo.json
輸入以下內容:
{"name":"TDengineSourceConnector","config":{"connector.class": "com.taosdata.kafka.connect.source.TDengineSourceConnector","tasks.max": 1,"subscription.group.id": "source-demo","connection.url": "jdbc:TAOS://127.0.0.1:6030","connection.user": "root","connection.password": "taosdata","connection.database": "test","connection.attempts": 3,"connection.backoff.ms": 5000,"topic.prefix": "tdengine","topic.delimiter": "-","poll.interval.ms": 1000,"fetch.max.rows": 100,"topic.per.stable": true,"topic.ignore.db": false,"out.format": "line","data.precision": "ms","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter"}
}
準備測試數據
準備生成測試數據的 SQL 文件。
DROP DATABASE IF EXISTS test;
CREATE DATABASE test;
USE test;
CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT);INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-03 14:38:05.000',10.30000,219,0.31000) \d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-03 14:38:15.000',12.60000,218,0.33000) \d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-03 14:38:16.800',12.30000,221,0.31000) \d1002 USING meters TAGS('California.SanFrancisco', 3) VALUES('2018-10-03 14:38:16.650',10.30000,218,0.25000) \d1003 USING meters TAGS('California.LosAngeles', 2) VALUES('2018-10-03 14:38:05.500',11.80000,221,0.28000) \d1003 USING meters TAGS('California.LosAngeles', 2) VALUES('2018-10-03 14:38:16.600',13.40000,223,0.29000) \d1004 USING meters TAGS('California.LosAngeles', 3) VALUES('2018-10-03 14:38:05.000',10.80000,223,0.29000) \d1004 USING meters TAGS('California.LosAngeles', 3) VALUES('2018-10-03 14:38:06.500',11.50000,221,0.35000);
使用 TDengine CLI, 執行 SQL 文件。
taos -f prepare-source-data.sql
創建 Source Connector 實例
curl -X POST -d @source-demo.json http://localhost:8083/connectors -H "Content-Type: application/json"
查看 topic 數據
使用 kafka-console-consumer 命令行工具監控主題 tdengine-test-meters 中的數據。一開始會輸出所有歷史數據,往 TDengine 插入兩條新的數據之后,kafka-console-consumer 也立即輸出了新增的兩條數據。輸出數據 InfluxDB line protocol 的格式。
kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic tdengine-test-meters
輸出:
......
meters,location="California.SanFrancisco",groupid=2i32 current=10.3f32,voltage=219i32,phase=0.31f32 1538548685000000000
meters,location="California.SanFrancisco",groupid=2i32 current=12.6f32,voltage=218i32,phase=0.33f32 1538548695000000000
......
此時會顯示所有歷史數據。切換到 TDengine CLI,插入兩條新的數據:
USE test;
INSERT INTO d1001 VALUES (now, 13.3, 229, 0.38);
INSERT INTO d1002 VALUES (now, 16.3, 233, 0.22);
再切換回 kafka-console-consumer,此時命令行窗口已經打印出剛插入的 2 條數據。
unload 插件
測試完畢之后,用 unload 命令停止已加載的 connector。
查看當前活躍的 connector:
curl http://localhost:8083/connectors
如果按照前述操作,此時應有兩個活躍的 connector。使用下面的命令 unload:
curl -X DELETE http://localhost:8083/connectors/TDengineSinkConnector
curl -X DELETE http://localhost:8083/connectors/TDengineSourceConnector
性能調優
如果在從 TDengine 同步數據到 Kafka 的過程中發現性能不達預期,可以嘗試使用如下參數提升 Kafka 的寫入吞吐量。
- 打開 KAFKA_HOME/config/producer.properties 配置文件。
- 參數說明及配置建議如下:
參數 參數說明 設置建議 producer.type 此參數用于設置消息的發送方式,默認值為 sync
表示同步發送,async
表示異步發送。采用異步發送能夠提升消息發送的吞吐量。async request.required.acks 參數用于配置生產者發送消息后需要等待的確認數量。當設置為 1 時,表示只要領導者副本成功寫入消息就會給生產者發送確認,而無需等待集群中的其他副本寫入成功。這種設置可以在一定程度上保證消息的可靠性,同時也能保證一定的吞吐量。因為不需要等待所有副本都寫入成功,所以可以減少生產者的等待時間,提高發送消息的效率。 1 max.request.size 該參數決定了生產者在一次請求中可以發送的最大數據量。其默認值為 1048576,也就是 1M。如果設置得太小,可能會導致頻繁的網絡請求,降低吞吐量。如果設置得太大,可能會導致內存占用過高,或者在網絡狀況不佳時增加請求失敗的概率。建議設置為 100M。 104857600 batch.size 此參數用于設定 batch 的大小,默認值為 16384,即 16KB。在消息發送過程中,發送到 Kafka 緩沖區中的消息會被劃分成一個個的 batch。故而減小 batch 大小有助于降低消息延遲,而增大 batch 大小則有利于提升吞吐量,可根據實際的數據量大小進行合理配置。可根據實際情況進行調整,建議設置為 512K。 524288 buffer.memory 此參數用于設置生產者緩沖待發送消息的內存總量。較大的緩沖區可以允許生產者積累更多的消息后批量發送,提高吞吐量,但也會增加延遲和內存使用。可根據機器資源來配置,建議配置為 1G。 1073741824
配置參考
通用配置
以下配置項對 TDengine Sink Connector 和 TDengine Source Connector 均適用。
name
:connector 名稱。connector.class
:connector 的完整類名,例如如 com.taosdata.kafka.connect.sink.TDengineSinkConnector。tasks.max
:最大任務數,默認 1。topics
:需要同步的 topic 列表,多個用逗號分隔,如topic1,topic2
。connection.url
:TDengine JDBC 連接字符串,如jdbc:TAOS://127.0.0.1:6030
。connection.user
:TDengine 用戶名,默認 root。connection.password
:TDengine 用戶密碼,默認 taosdata。connection.attempts
:最大嘗試連接次數。默認 3。connection.backoff.ms
:創建連接失敗重試時間隔時間,單位為 ms。默認 5000。data.precision
:使用 InfluxDB 行協議格式時,時間戳的精度。可選值為:- ms:表示毫秒
- us:表示微秒
- ns:表示納秒
TDengine Sink Connector 特有的配置
connection.database
:目標數據庫名。如果指定的數據庫不存在會則自動創建。自動建庫使用的時間精度為納秒。默認值為 null。為 null 時目標數據庫命名規則參考connection.database.prefix
參數的說明connection.database.prefix
:當 connection.database 為 null 時,目標數據庫的前綴。可以包含占位符 'KaTeX parse error: Expected 'EOF', got '}' at position 8: \{topic}?'。比如 kafka_{topic}, 對于主題 ‘orders’ 將寫入數據庫 ‘kafka_orders’。默認 null。當為 null 時,目標數據庫的名字和主題的名字是一致的。batch.size
:分批寫入每批記錄數。當 Sink Connector 一次接收到的數據大于這個值時將分批寫入。max.retries
:發生錯誤時的最大重試次數。默認為 1。retry.backoff.ms
:發送錯誤時重試的時間間隔。單位毫秒,默認為 3000。db.schemaless
:數據格式,可選值為:- line:代表 InfluxDB 行協議格式
- json:代表 OpenTSDB JSON 格式
- telnet:代表 OpenTSDB Telnet 行協議格式
TDengine Source Connector 特有的配置
connection.database
:源數據庫名稱,無缺省值。topic.prefix
:數據導入 kafka 時使用的 topic 名稱的前綴。默認為空字符串 “”。timestamp.initial
:數據同步起始時間。格式為’yyyy-MM-dd HH:mm:ss’,若未指定則從指定 DB 中最早的一條記錄開始。poll.interval.ms
:檢查是否有新建或刪除的表的時間間隔,單位為 ms。默認為 1000。fetch.max.rows
:檢索數據庫時最大檢索條數。默認為 100。query.interval.ms
:從 TDengine 一次讀取數據的時間跨度,需要根據表中的數據特征合理配置,避免一次查詢的數據量過大或過小;在具體的環境中建議通過測試設置一個較優值,默認值為 0,即獲取到當前最新時間的所有數據。out.format
:結果集輸出格式。line
表示輸出格式為 InfluxDB Line 協議格式,json
表示輸出格式是 json。默認為 line。topic.per.stable
:如果設置為 true,表示一個超級表對應一個 Kafka topic,topic 的命名規則<topic.prefix><topic.delimiter><connection.database><topic.delimiter><stable.name>
;如果設置為 false,則指定的 DB 中的所有數據進入一個 Kafka topic,topic 的命名規則為<topic.prefix><topic.delimiter><connection.database>
topic.ignore.db
:topic 命名規則是否包含 database 名稱,true 表示規則為<topic.prefix><topic.delimiter><stable.name>
,false 表示規則為<topic.prefix><topic.delimiter><connection.database><topic.delimiter><stable.name>
,默認 false。此配置項在topic.per.stable
設置為 false 時不生效。topic.delimiter
:topic 名稱分割符,默認為-
。read.method
:從 TDengine 讀取數據方式,query 或是 subscription。默認為 subscription。subscription.group.id
:指定 TDengine 數據訂閱的組 id,當read.method
為 subscription 時,此項為必填項。subscription.from
:指定 TDengine 數據訂閱起始位置,latest 或是 earliest。默認為 latest。
其他說明
- 關于如何在獨立安裝的 Kafka 環境使用 Kafka Connect 插件,請參考官方文檔:https://kafka.apache.org/documentation/#connect。
問題反饋
無論遇到任何問題,都歡迎在本項目的 Github 倉庫反饋:https://github.com/taosdata/kafka-connect-tdengine/issues。
參考
- https://kafka.apache.org/documentation/