隨著越來越多的用戶使用 DolphinDB,各種不同的應用的場景也對 DolphinDB 的數據接入提出了不同的要求。部分用戶需要將 PostgreSQL 的數據實時同步到 DolphinDB 中來,以滿足在 DolphinDB 中使用數據的實時性需求。本篇教程將介紹使用 Debezium 來實時捕獲和發布 PostgreSQL 的數據庫更改事件,并完成 PostgreSQL 到 DolphinDB 的實時數據同步的完整的解決方案。
1. Debezium 同步方案概述
Debezium 是一個開源的分布式平臺,用于實時捕獲和發布數據庫更改事件。它可以將關系型數據庫(如MySQL、PostgreSQL、Oracle 等)的變更事件轉化為可觀察的流數據,以供其他應用程序實時消費和處理。本教程中將采用 Debezium 與 Kafka 組合的方式來實現從 PosgreSQL12 到 DolphinDB 的數據同步。
Kafka +Debezium 的數據同步方案需要部署 4 個服務,如下所示:
- ZooKeeper:kafka 的依賴部署。
- Kafka:數據存儲。
- Kafka-Connect:用于接入數據插件 source-connetor, sink-connector 的框架,可以提供高可用,也可以部署單實例版本。
- Schema-Registry?:提供實時同步的數據的元數據注冊功能 ,支持數據序列化。
基于 Debezium 同步 PostgreSQL 數據到 DolphinDB 的架構圖如下:
圖 1-1 同步架構圖
接下來,本教程將逐一介紹這些服務的下載、安裝,以及配置數據同步任務。
2. 部署 Kafka 單實例實時數據存儲
Kafka 單實例實時數據存儲部署方案在 MySQL 和 Oracle 數據同步教程中均有有介紹,可以查看:Debezium+Kafka 實時同步 Oracle 11g 數據到 DolphinDB 教程,部署好 ZooKeeper,Kafka,Kafka-Connect , Schema-Registry 的數據同步、存儲運行環境。
3. 從 PostgreSQL 到 Kafka 的數據同步:部署程序與配置任務
在部署好 Kafka 和 Kafka Connect 環境后,即可配置 PostgreSQL 的實時數據同步任務。
注意:PostgreSQL 的數據實時同步僅支持 UTF-8 字符集。
本節涉及的操作,若未作特殊說明,均使用操作系統用戶 kafka
執行。kafka 用戶的權限等配置,詳見章節 2 部署 Kafka 單實例實時數據存儲 部分。
3.1 配置 PostgreSQL 數據庫
對于 Source 數據庫是 PostgresSQL,本教程采用 PostgreSQL 自帶的邏輯復制流插件 pgoutput 插件來捕獲實時數據變化。pgoutput 插件是 PostgreSQL 在版本 PostgreSQL 10+ 推出并內置的插件,無須額外配置。所以本教程介紹的 PostgresSQL 到 DolphinDB 的實時數據同步方案 需要 PostgreSQL 10+ 以上版本。
本教程中使用的數據庫是配置于 CentOS 7 下的 PostgresSQL 12 版本。
第一步: 配置邏輯解碼和數據庫訪問權限
PostgreSQL 的實時變更數據捕獲是通過配置 replication slots ,開啟邏輯復制流實現。配置以下參數,可以管理當前數據庫的邏輯解碼進程。
通過 vim 命令打開 /var/lib/pgsql/12/data/postgresql.conf 文件,添加或者調整以下參數。wal_level 參數必須設置。
# REPLICATION
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 4 # max number of walsender processes (change requires restart)
#wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
max_replication_slots = 4 # max number of replication slots (change requires restart)
PostgreSQL 中的數據庫訪問權限需要在 /var/lib/pgsql/12/data/pg_hba.conf 中進行配置。可以如下配置 IP4 的所有主機的 md5 加密訪問權限。
host all all 0.0.0.0/0 md5
修改配置文件需要重啟數據庫生效。
第二步: 創建數據庫和用戶
在使用數據庫時,通常會為業務數據創建專門的用戶和數據庫進行數據管理。這里我們創建業務數據的存儲數據庫和業務用戶,再創建一個專門進行邏輯復制的用戶。
使用超級用戶 postgres ,創建業務數據用戶和數據庫,并授予權限。
CREATE USER factoruser WITH PASSWORD '111111';
CREATE DATABASE factordb;
GRANT ALL PRIVILEGES ON DATABASE factordb TO factoruser ;
創建邏輯復制用戶,并授予該用戶邏輯復制權限和連接業務數據庫權限。
CREATE USER datasyn WITH PASSWORD '111111';
ALTER USER datasyn WITH REPLICATION;
GRANT CONNECT ON DATABASE factordb TO datasyn;
接下來我們創建業務數據庫的 schema 并授予邏輯復制用戶對業務數據庫的數據訪問權限,這需要登錄數據庫,并切換到業務數據庫(database)后執行。切換 database,使用 psql 連接可以按 圖 3-1 操作。
圖 3-1 PostgreSQL 切換 database 操作
創建業務數據 schema,需要當前數據庫是 factordb。
create schema factorsch authorization factoruser;
授予 datasyn 用戶對 業務數據庫 factordb 的 factorsch 的具體使用權限。
GRANT USAGE ON SCHEMA factorsch TO datasyn;
GRANT SELECT ON ALL TABLES IN SCHEMA factorsch TO datasyn;
配置邏輯復制用戶的邏輯復制訪問權限,修改文件 /var/lib/pgsql/12/data/pg_hba.conf。
host replication datasyn 0.0.0.0/0 md5
重啟數據庫,并測試邏輯解碼插件。
--創建復制槽
SELECT * FROM pg_create_logical_replication_slot('pgoutput_demo', 'pgoutput');
--查看相應復制槽是否存在
SELECT * FROM pg_replication_slots;
--刪除復制槽
SELECT * FROM pg_drop_replication_slot('pgoutput_demo');
3.2 安裝 Debezium-PostgreSQL 連接器插件
PostgreSQL 的 Debezium 數據同步插件需要安裝在 Kafka Connect 程序部署文件路徑下。
配置啟動 Debezium-PostgreSQL 連接器,需要以下兩步:
-
下載 Debezium-PostgreSQL-Connector 插件,將插件解壓并放到 Kafka Connect 的插件路徑下
-
重新啟動 Kafka Connect 程序,以加載插件
第一步: 下載安裝 Debezium-PostgreSQL 插件
前往官方網站?Debezium,選擇 2.5.1.Final 版本進行下載,程序名為 debezium-connector-postgres-2.5.1.Final-plugin.tar.gz,本教程測試時采用的此版本,也可以使用 2.5 系列的最新版本。如果想使用 Debezium 的最新穩定版本,需要仔細閱讀對應的版本需求,并進行詳細測試。
圖 3-2 Debezium 官網下載鏈接
在 confluent 的安裝路徑下創建插件路徑,在此路徑下解壓 Debezium 的 PostgreSQL 插件包,請確保 kafka 用戶對此路徑具有讀權限,如果下面代碼中的下載鏈接失效,請按 圖 3-2 官網示例位置下載。
sudo mkdir -p /opt/confluent/share/java/plugin
cd /opt/confluent/share/java/plugin
sudo wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.5.1.Final/debezium-connector-postgres-2.5.1.Final-plugin.tar.gz
sudo tar -xvf ./debezium-connector-postgres-2.5.1.Final-plugin.tar.gz
sudo rm ./debezium-connector-postgres-2.5.1.Final-plugin.tar.gz
第二步: 配置 Kafka Connect 加載插件
修改 Kafka Connect 的配置文件,添加插件路徑配置。若已配置插件路徑,則跳過該步驟。
cd /KFDATA/kafka-connect/etc
vim kafka-connect.properties
添加或修改參數 plugin.path 如下。
plugin.path=/opt/confluent/share/java/plugin
重新啟動 Kafka Connect。
sudo systemctl restart kafka-connect
查看 kafka connect 的日志輸出,能查詢到信息則說明插件加載成功。
cat /KFDATA/kafka-connect/logs/connect.log | grep PostgresConnector
圖 3-3 Debezium-PostgreSQL 插件加載成功信息
3.3 配置 PostgreSQL 數據同步連接任務
數據庫基礎配置和 PostgreSQL 同步插件安裝好,我們就可以開始配置 PostgreSQL 的數據同步任務。配置同步任務及檢查的很多命令都要帶上 url 等參數。為了操作快捷,本教程封裝了一些加載配置文件的操作腳本在 kafka-tools.tar 包中,詳情參見附錄。下載當前包,解壓縮到 /KFDATA 目錄下。后續的很多操作,包括檢查 Kafka 的 topic、查看數據和配置同步任務等都會使用 kafka-tools.tar 包中的腳本。包中的腳本在無參數運行時會輸出 help 文檔。
cd /KFDATA
sudo tar -xvf kafka-tools.tar
sudo chown kafka:kafka kafka-tools
rm ./kafka-tools.tar
修改 /KFDATA/kafka-tools/config/config.properties 配置參數。
按照本機的路徑、IP 等對應修改 Kafka 和 Kafka Connect 的啟動 IP 地址,以及安裝目錄。
示例如下:
#kafka parameters
kafka_home=/opt/kafka
confluent_home=/opt/confluent
bootstrap_server=192.168.189.130:9092#kafka-connect parameters
connect_rest_url=192.168.1.178:8083
#rest_pd means restful request password,This is not necessary
#rest_pd=appsdba:passwd
schema_ip=192.168.189.130
schema_port=8081
第一步:準備 PostgreSQL 數據庫表
本教程的同步方案支持 DolphinDB 的 TSDB 和 OLAP 兩種存儲引擎的數據同步。其中 TSDB 引擎對于單字段主鍵和多字段復合主鍵有不同的處理方式。所以這里我們創建三張表來展示不同的情況的配置操作。
-
單主鍵同步到 TSDB 引擎 : factorsch.stock_example。
-
復合主鍵同步到 TSDB 引擎 : factorsch.index_example_tsdb。
-
復合主鍵同步到 OLAP 引擎 : factorsch.index_example_olap。
創建業務數據庫表、數據,均使用 factoruser ,登錄 factordb。
創建表 factorsch.stock_example。
create table factorsch.stock_example (id bigint,ts_code varchar(20),symbol_id varchar(20),name varchar(20),area varchar(20),industry varchar(20),list_date date,primary key (id)
);
插入數據。
insert into factorsch.stock_example(id,ts_code,symbol_id,name,area,industry,list_date)
values (1,'000001.SZ','000001','平安銀行','深圳','銀行','1991-04-03'),
(2,'000002.SZ','000002','萬科A','深圳','地產','1991-01-29'),
(3,'000004.SZ','000004','ST國華','深圳','軟件服務','1991-01-14');
創建表 factorsch.index_example_tsdb。
create table factorsch.index_example_tsdb (trade_date date,stock_code varchar(20),effDate timestamp,indexShortName varchar(20),indexCode varchar(20),secShortName varchar(50),exchangeCD varchar(10),weight decimal(26,6),tm_stamp timestamp,flag integer,primary key (trade_date, stock_code, indexCode, flag)
);
插入數據。
insert into factorsch.index_example_tsdb
values(to_date('2006-11-30', 'YYYY-MM-DD'), '000759', to_date('2018-06-30 03:48:05', 'YYYY-MM-DD HH24:MI:SS'),
'中證500', '000905', '中百集團', 'XSHE', 0.0044, to_date('2018-06-30 05:43:05', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into factorsch.index_example_tsdb
values(to_date('2006-11-30', 'YYYY-MM-DD'), '000759', to_date('2018-06-30 04:47:05', 'YYYY-MM-DD HH24:MI:SS'),
'中證500', '000906', '中百集團', 'XSHE', 0.0011, to_date('2018-06-30 05:48:06', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into factorsch.index_example_tsdb
values(to_date('2006-11-30','YYYY-MM-DD'), '600031', to_date('2018-06-30 03:48:05', 'YYYY-MM-DD HH24:MI:SS'),
'上證180', '000010', '三一重工', 'XSHG', 0.0043, to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH24:MI:SS'), 1);
創建表 factorsch.index_example_olap。
create table factorsch.index_example_olap (trade_date date,stock_code varchar(20),effDate timestamp,indexShortName varchar(20),indexCode varchar(20),secShortName varchar(50),exchangeCD varchar(10),weight decimal(26,6),tm_stamp timestamp,flag integer,primary key (trade_date, stock_code, indexCode, flag)
);
插入數據。
insert into factorsch.index_example_olap
values(to_date('2006-11-30', 'YYYY-MM-DD'), '000759', to_date('2018-06-30 03:48:05', 'YYYY-MM-DD HH24:MI:SS'),
'中證500', '000905', '中百集團', 'XSHE', 0.0044, to_date('2018-06-30 05:43:05', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into factorsch.index_example_olap
values(to_date('2006-11-30', 'YYYY-MM-DD'), '000759', to_date('2018-06-30 04:47:05', 'YYYY-MM-DD HH24:MI:SS'),
'中證500', '000906', '中百集團', 'XSHE', 0.0011, to_date('2018-06-30 05:48:06', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into factorsch.index_example_olap
values(to_date('2006-11-30','YYYY-MM-DD'), '600031', to_date('2018-06-30 03:48:05', 'YYYY-MM-DD HH24:MI:SS'),
'上證180', '000010', '三一重工', 'XSHG', 0.0043, to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH24:MI:SS'), 1);
第二步:配置訂閱發布
使用 PostgreSQL 的 pgoutput 插件進行邏輯復制,需要配置 publication 來定義哪些表的數據變更需要被邏輯復制。配置 publication 在 PostgreSQL 的不同版本支持的腳本寫法不盡相同,本教程以 PostgreSQL 12 版本為例介紹通用的兩種配置方式。
方式一:普通用戶配置(推薦)
factoruser 是普通的業務數據用戶,不具備超級用戶權限。配置發布時需要逐一配置每張表,也可以對單一指定表進行取消配置,當有新增表時需要進行添加。這里我們將上文中創建的三張表全部配置。
創建一個發布,先發布兩張表 factorsch.index_example_tsdb 和 factorsch.stock_example。
CREATE PUBLICATION factordb_publication FOR TABLE factorsch.index_example_tsdb,factorsch.stock_example
查看發布和發布的具體表。
select * from pg_publication
select * from pg_publication_tables
圖 3-4 PostgreSQL 中發布列表查詢
圖 3-5 PostgreSQL 中的發布表明細查詢
對指定 publication ,添加一張表 factorsch.index_example_olap。
ALTER PUBLICATION factordb_publication ADD TABLE factorsch.index_example_olap;
圖 3-6 查看增加的發布表
對指定 publication ,刪除一張表。
ALTER PUBLICATION factordb_publication DROP TABLE factorsch.index_example_olap;
刪除發布。.
drop publication factordb_publication
方式二:超級用戶配置
需要先提升 factoruser 權限為超級用戶,可以使用超級用戶 postgres 操作。
ALTER USER factoruser WITH SUPERUSER;
使用超級用戶 factoruser 創建全表發布。
CREATE PUBLICATION all_tables_publication FOR ALL TABLES;
如 圖 3-7 所示,所有的表都會被發布,增加新表也會直接變成發布表。
圖 3-7 全表訂閱發布表列表查看
對于以上兩種方式,推薦使用方式一,可以精確進行邏輯復制,控制資源使用,控制用戶權限。只有在確實需要整庫同步,且表的數據量眾多的場景可以使用方式二。
第三步:準備連接器配置文件,并啟動連接任務
創建連接 PostgreSQL 的 source 連接器配置文件。
mkdir -p /KFDATA/datasyn-config
vim /KFDATA/datasyn-config/source-postgres.json
錄入以下配置,hostname 和 kafka 啟動地址需對應修改。注意這里的 publication.name 參數要和配置發布名一致,且要提前配置好。
{"name": "postgresTask","config": {"connector.class" : "io.debezium.connector.postgresql.PostgresConnector","snapshot.mode" : "initial","tasks.max" : "1","topic.prefix" : "pg_factorDB","database.hostname" : "192.168.189.130","database.port" : 5432,"database.user" : "datasyn","database.password" : "111111","database.dbname" : "factordb","table.include.list" : "factorsch.index_example_tsdb,factorsch.index_example_olap,factorsch.stock_example","decimal.handling.mode": "string","plugin.name": "pgoutput","publication.name": "factordb_publication","slot.name": "datasyn_slot","heartbeat.interval.ms":"20000"}
}
重要參數說明:
表 3-1 source 連接器重要參數說明
更多詳細參數說明可以參看?Debezium 2.5,不同Debezium版本的參數配置不同,若使用其他版本的Debezium,需找到對應文檔做修改。
第三步: 啟動 PostgreSQL 的數據同步任務
通過 REST API 啟動 PostgreSQL 的 source 連接器。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://192.168.189.130:8083/connectors/ -d @/KFDATA/datasyn-config/source-postgres.json
也可以通過 kafka-tools 中的腳本啟動。
cd /KFDATA/kafka-tools/bin
./rest.sh create /KFDATA/datasyn-config/source-postgres.json
圖 3-8 source 連接器啟動成功信息
第四步:查看 PostgreSQL 數據同步任務狀態
查看同步任務列表。list 參數展示任務名列表,showall 參數會顯示全部同步任務狀態。
cd /KFDATA/kafka-tools/bin
./rest.sh showall
當前只有一個同步任務,如下圖所示。
圖 3-9 查看全部同步任務狀態信息
查看 kafka 中的 topic 列表:
./kafka.sh tplist
圖 3-10 當前 kafka 中的 topic 列表
查看 表 stock_example、index_example_tsdb、index_example_olap 目前進入到 kafka 中的數據條數:
./kafka.sh get_offsets pg_factorDB.factorsch.stock_example
./kafka.sh get_offsets pg_factorDB.factorsch.index_example_tsdb
./kafka.sh get_offsets pg_factorDB.factorsch.index_example_olap
結果如圖所示,當前配置的三張數據數據表的歷史數據都已經寫入了 kafka 中的對應 topic 中。
圖 3-11 對應 topic 中數據條數信息
4. 從Kafka 到 DolphinDB 的數據同步:部署程序與配置任務
4.1 安裝 Kafka-DolphinDB 數據同步連接器插件
配置啟動 Kafka-DolphinDB 連接器,需要以下兩步:
-
下載 Kafka-DolphinDB-Connector 插件,將插件解壓并放到 Kafka Connect 的插件路徑下。
-
重新啟動 Kafka Connect 程序,以加載插件。
第一步:下載 Kafka-DolphinDB 插件
-
jdbc-1.30.22.5-CDC.jar :該包為 DolphinDB JDBC 包為數據同步做了一些專門修改,為特殊版本。
-
kafka-connect-jdbc-4.00.jar:是基于kafka-connect-jdbc-10.7.4 開發的 DolphinDB 連接器,后續會進行代碼開源。
創建插件路徑,在此路徑下放置 Kafka-DolphinDB 插件包,將上述兩個 jar 包放在此目錄下。請確保 kafka 用戶包含對這兩個文件的讀權限。(文件包見附件)
sudo mkdir -p /opt/confluent/share/java/plugin/kafka-connect-jdbc
sudo cp ~/jdbc-1.30.22.5-CDC.jar /opt/confluent/share/java/plugin/kafka-connect-jdbc/
sudo cp ~/kafka-connect-jdbc-10.7.4-ddb1.04.OLAP.jar /opt/confluent/share/java/plugin/kafka-connect-jdbc/
如果上面的操作碰到權限問題,則可以使用以下命令賦予權限。
sudo chmod o+rx /opt/confluent/share/java/plugin/kafka-connect-jdbc/*
第二步: 重啟 kafka-connect
sudo systemctl restart kafka-connect
查看 kafka-connect 路徑的日志輸出
cat /KFDATA/kafka-connect/logs/connect.log | grep JdbcSinkConnector
如下圖所示,則插件加載成功。
圖 4 -1 Kafka-DolphinDB 插件加載成功信息
4.2 配置 DolphinDB 的數據同步連接任務
第一步:創建同步的 DolphinDB 庫、表
根據 PostgreSQL 表結構,創建與 PostgreSQL 表結構一致的表,PostgreSQL 數據類型轉換為 DolphinDB 數據類型對照表可以參考5.2節。
創建單主鍵表 stock_example 的 DolphinDB 對應表:
//創建 dfs://stock_data 數據庫
if (existsDatabase("dfs://stock_data"))dropDatabase("dfs://stock_data")
dbName = "dfs://stock_data"
db=database(directory=dbName, partitionType=HASH, partitionScheme=[LONG, 3], engine="TSDB", atomic="CHUNK")
//創建 stock_example 表
tbName = "stock_example"
colNames = `id`ts_code`symbol_id`name`area`industry`list_date`dummySortKey__
colTypes = `LONG`SYMBOL`SYMBOL`SYMBOL`SYMBOL`SYMBOL`DATE`INT
t = table(1:0, colNames, colTypes)
db.createPartitionedTable(t, tbName, partitionColumns=`id, sortColumns=`id`dummySortKey__, keepDuplicates=LAST, sortKeyMappingFunction=[hashBucket{,100}], softDelete=true)
創建復合主鍵表 index_example_olap 的 DolphinDB 對應表:
//創建 dfs://index_data_olap 數據庫
if (existsDatabase("dfs://index_data_olap"))dropDatabase("dfs://index_data_olap")
dbName = "dfs://index_data_olap"
db = database(directory=dbName, partitionType=RANGE, partitionScheme=1990.01M+(0..80)*12, atomic="CHUNK")
//創建 olap 引擎數據庫下的 index_example
tbName = "index_example"
colNames = `trade_date`stock_code`effDate`indexShortName`indexCode`secShortName`exchangeCD`weight`tm_stamp`flag
colTypes = `DATE`SYMBOL`TIMESTAMP`SYMBOL`SYMBOL`SYMBOL`SYMBOL`DOUBLE`TIMESTAMP`INT
t = table(1:0, colNames, colTypes)
db.createPartitionedTable(t, tbName, partitionColumns=`trade_date)
創建復合主鍵表 index_example_tsdb 的 DolphinDB 對應表:
//創建 dfs://index_data_tsdb 數據庫
if (existsDatabase("dfs://index_data_tsdb"))dropDatabase("dfs://index_data_tsdb")
dbName = "dfs://index_data_tsdb"
db = database(directory=dbName, partitionType=RANGE, partitionScheme=1990.01M+(0..80)*12, engine="TSDB", atomic="CHUNK")
//創建 tsdb 引擎數據庫下的 index_example
tbName = "index_example"
colNames = `trade_date`stock_code`effDate`indexShortName`indexCode`secShortName`exchangeCD`weight`tm_stamp`flag
colTypes = `DATE`SYMBOL`TIMESTAMP`SYMBOL`SYMBOL`SYMBOL`SYMBOL`DOUBLE`TIMESTAMP`INT
t = table(1:0, colNames, colTypes)
db.createPartitionedTable(t, tbName, partitionColumns=`trade_date, sortColumns=`stock_code`indexCode`flag`trade_date, keepDuplicates=LAST, sortKeyMappingFunction=[hashBucket{,10},hashBucket{,10},hashBucket{,1}], softDelete=true)
注:建表時的軟刪除功能,即 softDelete 選項需要 DolphinDB 2.00.11 及以上的版本。舊版本 DolphinDB 建表時可以去除該選項。
第二步: 配置同步配置表
在DolphinDB 中創建一張配置表,記錄 kafka topic 和 DolphinDB 庫表之間的映射關系。配置表的庫表名可以自行調整,并在 DolphinDB 的同步任務中設置相應的庫表名稱。配置表中字段名是固定的,需和示例保持一致。
數據庫名:dfs://ddb_sync_config
表名:sync_config
db = database("dfs://ddb_sync_config", HASH, [SYMBOL, 2])
t = table(1:0, `connector_name`topic_name`target_db`target_tab`add_sortcol_flag`primary_key,
[SYMBOL, SYMBOL, SYMBOL, SYMBOL, SYMBOL, SYMBOL])
db.createTable(t, "sync_config")
kafka topic 名可以通過之前介紹的 ./kafka.sh tplist 的命令查看,當前配置的三張表對應的 topic 如下:
-
單主鍵同步到 TSDB 引擎 : stock_example -> pg_factorDB.factorsch.stock_example。
-
復合主鍵同步到 TSDB 引擎 : index_example_tsdb->pg_factorDB.factorsch.index_example_tsdb
-
復合主鍵同步到 OLAP 引擎 : index_example_olap ->pg_factorDB.factorsch.index_example_olap。
插入配置信息表,將 kafka 中的 topic 和 DolphinDB 庫表名稱一一對應。
def addSyncConfig(connector_name, topic_name, dbname, tbname, add_sortcol_flag="0",primary_key=NULL) {loadTable("dfs://ddb_sync_config", "sync_config").append!(table([connector_name] as col1, [topic_name] as col2, [dbname] as col3,[tbname] as col4, [add_sortcol_flag] as col5,[primary_key] as col6))
}addSyncConfig("ddb-sink-postgres", "pg_factorDB.factorsch.stock_example", "dfs://stock_data", "stock_example", "1", "")
addSyncConfig("ddb-sink-postgres", "pg_factorDB.factorsch.index_example_tsdb", "dfs://index_data_tsdb", "index_example", "0", "")
addSyncConfig("ddb-sink-postgres", "pg_factorDB.factorsch.index_example_olap", "dfs://index_data_olap", "index_example", "0", "stock_code,indexCode,flag,trade_date")
以下是配置表的各個字段說明:
表 4-1 同步配置表字段說明
第三步: 準備連接器配置文件,并啟動連接任務
創建 DolphinDB 數據同步任務配置文件。
cd /KFDATA/datasyn-config
vim ddb-sink-postgres.json
配置如下
{"name": "ddb-sink-postgres","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max": "1","topics": "pg_factorDB.factorsch.stock_example,pg_factorDB.factorsch.index_example_tsdb,pg_factorDB.factorsch.index_example_olap","connection.url": "jdbc:dolphindb://192.168.189.130:8848?user=admin&password=123456","transforms": "unwrap","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones": "false","auto.evolve": "false","insert.mode": "upsert","delete.enabled": "true","batch.size":"10000","pk.mode": "record_key","ddbsync.config.table":"dfs://ddb_sync_config,sync_config","ddbsync.addSortColFlag":"true","ddbsync.config.engineTypes" :"TSDB,OLAP"}
}-
表 4-2 sink 連接器重要參數說明
參數說明:以上參數項為同步 DolphinDB 所需參數。如果對 Confluent 的JDBC Sink Connect 有經驗可適當調節。
通過 REST API 啟動source連接器:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://183.134.101.144:8083/connectors/ -d @ddb-sink-postgres.json
也可以通過 kafka-tools 中的腳本啟動:
cd /KFDATA/kafka-tools/bin
./rest.sh create /KFDATA/datasyn-config/ddb-sink-postgres.json
查看同步任務狀態, ddb-sink-postgres 是 DolphinDB 的數據同步任務,可以看到現在我們有兩個同步任務,這樣構成了從 PostgreSQL 到 DolphinDB 的數據同步鏈。
./rest.sh showall
圖 4-2 同步任務狀態信息
第四步: 查看表初始數據同步進度
在設置 PostgreSQL 同步任務時,將 snapshot.mode 選項值設置為 ”initial” ,該選項意味著 PostgreSQL 會同步表的初始數據到 Kafka 中,設置完下游的 DolphinDB 任務后,可以檢查初始數據的同步情況。
通過 kafka.sh 腳本查看消費者列表:
圖 4-3 Kafka 消費者列表信息
查看 DolphinDB 同步任務對應的 Kafka 消費組中的每一個 consumer 的消費進度,通過此命令可以查看同步程序中每一張的表同步進度。 Lag 為 0 則表示 Kafka 中 topic 當前沒有未消費的數據,即 Kafka 中的數據與對應表的數據是一致的。
./kafka.sh cm_detail connect-ddb-sink-postgres
圖 4-4 connect-ddb-sink 中每張表同步進度
如 圖 4-4 顯示,數據已被 DolphinDB 同步任務消費完畢,此時在 DolphinDB 的 web 界面查看表中數據,表數據和 PostgreSQL 表中數據是一致的。
圖 4-5 DolphinDB中 index_example 表數據
圖 4-6 DolphinDB 中 stock_example表數據
4.3 實時同步驗證
第一步:插入數據
向 PostgreSQL 中 factorsch.index_example_tsdb 表中插入兩條新數據:
insert into factorsch.index_example_tsdb
values (to_date('2006-11-30', 'YYYY-MM-DD'), '600054',
to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH:MI:SS'), '上證180', '000010', '三一重工',
'XXXB', 0.0043, to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into factorsch.index_example_tsdb
values (to_date('2006-11-30', 'YYYY-MM-DD'), '600055
',
to_date('2018-06-30 06:48:02', 'YYYY-MM-DD HH:MI:SS'), '滬深300', '000300', '三一重工',
'XSHG', 0.0029, to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH24:MI:SS'), 1);
查看 DolphinDB 對應的表數據:
select * from loadTable("dfs://index_data_tsdb","index_example")
可以看到新數據已寫入:
圖4-7 數據寫入成功
第二步:更新數據
更新 PostgreSQL 中 factorsch.index_example_olap 表的 tmp_stamp 和 secshortname 的值。
update factorsch.index_example_olap
set tm_stamp = to_date('2025-02-28 16:00:00', 'YYYY-MM-DD HH24:MI:SS'),
secshortname ='測試修改名稱'
where stock_code='000759'
查看 DolphinDB 中數據,數據已被修改:
select * from loadTable("dfs://index_data_olap","index_example")
圖4-8 數據更新成功
第三步:刪除數據
在 PostgreSQL 中的 factorsch.stock_example 刪除一條數據:
delete from factorsch.stock_example where ts_code='000004.SZ'
再查看 DolphinDB 中數據,數據已被刪除:
select * from loadTable("dfs://index_data_tsdb","index_example")
圖4-9 數據刪除成功
5. 部署注意事項
5.1 實時同步須知
DolphinDB 是一款支持海量數據的分布式時序數據庫。針對不同的數據處理需求,在底層架構上天然上與通常的關系型數據庫不同,所以需要有以下限制:
-
DolphinDB 的表沒有主鍵設計,若使用 TSDB 引擎,需將主鍵設置為 sortColumn 字段,并設置 keepDuplicates=LAST 來進行去重,以確保數據唯一性。TSDB 引擎的 sortColumn 是分區內去重,如果使用的是分區表,需要至少將其中一個主鍵列設置為分區列。若使用 OLAP 引擎,需在同步配置表中設置目標庫的主鍵。
-
PostgreSQL 表的主鍵可能不滿足 TSDB 引擎的 sortColumn 設置規則,有以下三種情況:
?1. PostgreSQL 表中有兩個及以上的主鍵,其中一個主鍵為整數類型或時間類型,但末尾列不是整數類型或時間類型:
-
該情況需要調整 sortColumn 設置的順序,將整數類型或時間類型的主鍵移動到末尾。
2. PostgreSQL 表中只有一個主鍵,或者 PostgreSQL 表中的主鍵的數據類型均不包含整數類型或時間類型:
-
該情況需要建表時在末尾補充一個 dummySortKey__ 列,值均設置為0,對應同步程序的配置表中需要將 add_sortcol_flag 列的值設置為“1”,并將 DolphinDB 同步連接任務中“ddbsync.addSortColFlag” 設置為 “true”。若使用 DataX 等工具進行初始全表同步,則需要做數據轉換,提供該字段對應初始值。
3.?PostgreSQL 表中的主鍵類型包含 DolphinDB 不支持的類型。
-
DolphinDB TSDB 引擎的 sortColumns 支持整數、日期或時間、字符串類型,暫時不支持小數類型,但在后續的版本里可能提供支持,請關注版本更新。
DDL 語句相關:
-
當前不支持 DDL 語句同步。
-
若表結構發生更改,需進行單表修復,具體操作后續會在實時同步的運維手冊文檔中給出。
其他:
-
表字段命名時,請盡量規避一些簡單的名字,比如 code, timestamp 等,這種命名與 DolphinDB 內關鍵字重復,可能會導致無法正確同步。
5.2 PostgreSQL-DolphinDB 數據類型對應表
以下的類型對應表為推薦設置的 DolphinDB 類型,注意兩者數據類型表示的精度范圍,確保 DolphinDB 數據類型的精度可以覆蓋原 PostgreSQL 類型。
表 5-1 PostgreSQL-DolphinDB 數據類型對應表
在浮點數數據處理上,PostgreSQL 的 DECIMAL 類型是精確值,DolphinDB 的 DOUBLE 類型的精度為15-16位有效數字,如果轉換成 DolphinDB 的 DOUBLE 類型,會存在浮點數精度丟失問題。因此推薦用戶轉換成 DolphinDB 的 DECIMAL 類型,確保浮點數精度。
在時間類型轉換上,請參照表中的類型映射,以保證 DolphinDB 中的時間類型字段在精度上可以覆蓋 PostgreSQL 中時間類型字段的精度。對于帶有時區信息的時間類型,DolphinDB 統一以 UTC 時區存儲。
6. 同步性能測試
6.1 性能測試配置
建表語句
PostgreSQL 建表,并生成測試數據代碼:
DROP TABLE if EXISTS factorsch.performance_test1;
CREATE TABLE factorsch.performance_test1 (dt date,id varchar(20),str1 char(10),val DECIMAL,qty varchar(20),tm TIMESTAMP
);
-- 生成100w行數據,每天1000行
INSERT INTO factorsch.performance_test1 (dt,id,str1,val,qty,tm)
SELECT ('2020-01-01'::date + (gs - 1) / 1000), ((gs-1)%1000+1),'aa', 1.234, 1000, '2024-01-01 15:00:00'::timestamp
FROM generate_series(1, 1000000) gs;ALTER TABLE factorsch.performance_test1
ADD CONSTRAINT pk_performance_test1 PRIMARY KEY (id, dt);DROP TABLE if EXISTS factorsch.performance_test2;
CREATE TABLE debezium.performance_test2 (dt date,id varchar(20),str1 char(10),val DECIMAL,qty varchar(20),tm TIMESTAMP
);
-- 生成1億行數據,每天100000行
INSERT INTO factorsch.performance_test2 (dt,id,str1,val,qty,tm)
SELECT ('2020-01-01'::date + (gs - 1) / 100000), ((gs-1)%100000+1),'aa', 1.234, 1000, '2024-01-01 15:00:00'::timestamp
FROM generate_series(1, 100000000) gs;ALTER TABLE factorsch.performance_test2
ADD CONSTRAINT pk_performance_test2 PRIMARY KEY (id, dt);grant select on factorsch.performance_test1 to datasyn;
grant select on factorsch.performance_test2 to datasyn;
DolphinDB 建表代碼:
dbName = "dfs://performance_test1"
tbName = "performance_test1"
colNames = `dt`id`str1`val`qty`tm
colTypes = `DATE`SYMBOL`SYMBOL`DOUBLE`LONG`TIMESTAMP
t = table(1:0, colNames, colTypes)
pkColumns = `id`dt
db = database(dbName, HASH, [SYMBOL, 2], , 'TSDB', 'CHUNK')
db.createTable(t, tbName, sortColumns=pkColumns, keepDuplicates=LAST, softDelete=true)dbName = "dfs://performance_test2"
tbName = "performance_test2"
colNames = `dt`id`str1`val`qty`tm
colTypes = `DATE`SYMBOL`SYMBOL`DOUBLE`LONG`TIMESTAMP
t = table(1:0, colNames, colTypes)
pkColumns = `id`dt
partitionCols = `dt`id
db1 = database(, RANGE, date(datetimeAdd(1990.01M, 0..100*12, 'M')))
db2 = database(, HASH, [SYMBOL, 50])
db = database(dbName, COMPO, [db1, db2], , `TSDB, `CHUNK)
db.createPartitionedTable(t, tbName, partitionColumns=partitionCols, sortColumns=pkColumns, keepDuplicates=LAST, softDelete=true)
6.2 性能測試結果
性能測試結果如下表所示,其中總耗時等于 DolphinDB 更新完成時間減去 PostgreSQL 更新完成時間,因此總耗時包含了以下數據同步的完整鏈路:
-
Debezium 挖掘 PostgreSQL 日志到 Kafka
-
Kafka 推送數據給相應 topic 的消費者
-
下游的 DolphinDB Connector 消費 Kafka 中數據,解析為相應的 DolphinDB 更新語句,并執行寫入 DolphinDB 完成
Kafka 每次推送的變更數據在 3000-4000 條,具體條數和 Kafka 的日志大小配置相關。對于 insert 和 update 類型的操作,DolphinDB 的處理效率很高。對于 delete 類型操作,由于 delete 操作涉及數據查找, DolphinDB 的處理效率和具體表的數據行數、分區方式相關。
表 6-1 使用 pgoutput 插件進行數據同步性能測試結果
7. 常見問題解答(FAQ)
7.1 創建同步任務時報錯
(1)json 文件格式錯誤
圖 7-1 json文件格式錯誤報錯信息
造成上述問題的原因可能是多了逗號、少了逗號或者括號不正確,需要檢查并修訂 json 文件。
(2)Failed to find any class that implements Connector
圖 7-2 PostgreSQL 數據庫無法正常連接報錯信息
該報錯提示意味著 PostgreSQL 數據庫無法正常連接。造成上述問題的可能原因:
-
未將 debezium-connector-postgres-2.5.1.Final.jar 包放置到插件目錄下
-
Kafka 用戶對 debezium-connector-postgres-2.5.1.Final.jar 文件沒有讀權限
(3)Can’t find JdbcSinkConnector
查看日志提示沒有 JdbcSinkConnector 包的加載。JdbcSinkConnector 是包含在 kafka-connect-jdbc-4.00.jar 包內,需要確認該 jar 包是否放置在 kafka connect 的插件路徑下,確認 kafka 對該文件的讀權限。再通過 java --version
查看 Java 版本是否是17,Java 版本較低時,可能無法正確加載插件。目前已知使用 Java 8 時無法正確加載該插件。
7.2 數據未同步或者未正確同步
當數據未同步或者未正確同步時,請先按以下兩步進行檢查。然后對照后面的提供的錯誤列表進行參考調整。
step1 查看同步任務狀態
先查看同步任務是否報錯:
cd /KFDATA/kafka-tools/bin
./rest.sh showall
再看 kafka connect 的日志中是否出現 ERROR:
cd /KFDATA/kafka-connect/logs
cat connect.log | grep ERROR
如果有出現 ERROR,看 ERROR 顯示的日志是 PostgreSQL 報錯還是 ddb-sink 報錯,查看具體的報錯信息。如果同步任務未報錯,也沒有 ERROR,再通過以下方式排查。
step2 查看 PostgreSQL 數據是否同步到 Kafka
查看 Kafka 所有的 topic:
cd /KFDATA/kafka-tools/bin
./kafka.sh tplist
?圖 7-3 kafka 中 topic 信息
在查看該 topic 對應的數據條數:
./kafka.sh get_offsets postgres_service.debezium.index_example
-
一張表出現兩個 topic 名字
這說明 PostgreSQL source 任務的 topic.prefix 或 DolphinDB sink 任務的 topics 配置項拼寫有誤,請檢查這兩項。DolphinDB sink 任務的 topics 必須為 {topic.prefix}.{schema}.{tablename} 的格式。創建 sink 任務時,如果 topic 不存在,則會自動創建 topic,因此拼寫錯誤會導致出現兩個 topic 。
-
沒有表對應的 topic / 有對應的 topic,但數據條數為0
這說明 PostgreSQL 數據未正常同步到 Kafka 中,請在同步任務的 table.include.list 中檢查 PostgreSQL 表名的拼寫,或者檢查用戶是否擁有 REPLICATION 和 LOGIN 權限和對對應表的 SELECT 權限。
-
有對應的 topic,有數據條數,但 DolphinDB 未同步
檢查 DolphinDB Sink 任務中 topics 配置項中的拼寫,檢查同步任務配置表中是否有相同的條數。
查看 Kafka 中數據是否與 PostgreSQL 變更數據一致:
./tpconsumer.sh --op=2 --topic=postgres_service.debezium.index_example --offset=0 --max-messages=20
在顯示的結果中,初始數據同步的消息數據 op = r,新插入數據 op = c,更新數據 op = u
7.3 同步任務運行報錯
-
Java.lang.OutOfMemoryError
Kafka Connect 的默認內存為 1 GB,當數據更新量較大時會出現 JVM 內存不足,需要調整 JVM 大小。根據之前配置的安裝路徑,修改 kafka connect 的配置文件:
vim /KFDATA/kafka-connect/etc/kafka-connect.env
在末尾加入 JVM 選項,內存大小根據實際需要調整:
KAFKA_HEAP_OPTS="-Xms10G -Xmx10G"
-
permission denied for database postgres at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initPublication(PostgresReplicationConnection
使用 pgoutput 插件同步時未預先創建發布,會報此錯誤。
CREATE PUBLICATION dbz_publication FOR TABLE debezium.index_example,debezium.stock_example;
-
Creation of replication slot failed; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each
每個 Debezium 連接器需要使用唯一的復制槽名稱。如果多個連接器使用相同的復制槽名稱,就會導致沖突。可以為每個連接器使用不同的復制槽名稱或者刪除原先復制槽。
SELECT * FROM pg_drop_replication_slot('your_slot_name');
8. 附錄
- DolphinDB 的 Kafka-Connect 插件包:kafka-connect-jdbc-4.00.jar
- DolphinDB 的 JDBC 包:jdbc-1.30.22.5-CDC.jar
- 運維腳本包 kafka-tools:kafka-tools.tar