一、DB2 數據庫核心配置
1. 啟用數據庫日志記錄與CDC支持
-- 以DB2管理員身份連接數據庫
CONNECT TO mydb USER db2inst1 USING password;-- 啟用數據庫歸檔日志模式(CDC依賴)
UPDATE DATABASE CONFIGURATION USING LOGARCHMETH1 DISK:/db2log/archive;
QUIESCE DATABASE IMMEDIATE FORCE CONNECTIONS;
BACKUP DATABASE mydb;
UNQUIESCE DATABASE;-- 驗證日志模式
GET DATABASE CONFIGURATION FOR mydb | grep LOGARCHMETH1;
-- 輸出應為:LOGARCHMETH1 (Log archive method 1) = DISK:/db2log/archive-- 創建捕獲模式和控制表
CREATE SCHEMA cdc;
SET SCHEMA cdc;-- 創建控制表(用于跟蹤捕獲進程)
CREATE TABLE cdc.control (id INTEGER PRIMARY KEY,last_commit_time TIMESTAMP
);
INSERT INTO cdc.control VALUES (1, CURRENT_TIMESTAMP);
2. 為捕獲表啟用變更數據捕獲
-- 為目標表啟用CDC(示例:products表)
SET SCHEMA myschema;-- 創建捕獲緩沖區
CREATE TRIGGER products_cdc_trg
AFTER INSERT OR UPDATE OR DELETE ON products
REFERENCING NEW AS n OLD AS o
FOR EACH ROW MODE DB2SQL
BEGIN ATOMICIF INSERTING THENINSERT INTO cdc.products_cdc_buffer (operation, op_ts, id, name, description, weight)VALUES ('I', CURRENT_TIMESTAMP, n.id, n.name, n.description, n.weight);ELSEIF UPDATING THENINSERT INTO cdc.products_cdc_buffer (operation, op_ts, id, name, description, weight)VALUES ('U', CURRENT_TIMESTAMP, o.id, o.name, o.description, o.weight);INSERT INTO cdc.products_cdc_buffer (operation, op_ts, id, name, description, weight)VALUES ('U', CURRENT_TIMESTAMP, n.id, n.name, n.description, n.weight);ELSEIF DELETING THENINSERT INTO cdc.products_cdc_buffer (operation, op_ts, id, name, description, weight)VALUES ('D', CURRENT_TIMESTAMP, o.id, o.name, o.description, o.weight);END IF;
END;-- 創建捕獲緩沖區表(根據實際表結構調整)
CREATE TABLE cdc.products_cdc_buffer (operation CHAR(1),op_ts TIMESTAMP,id INT,name VARCHAR(100),description VARCHAR(255),weight DECIMAL(10,3)
);
二、Flink 環境集成配置
1. 添加Maven依賴
<dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-db2-cdc</artifactId><version>3.0.1</version><scope>provided</scope>
</dependency><!-- DB2 JDBC驅動依賴 -->
<dependency><groupId>com.ibm.db2</groupId><artifactId>jcc</artifactId><version>11.5.0.0</version>
</dependency>
2. SQL Client部署
- 下載JAR包:
- flink-sql-connector-db2-cdc-3.0.1.jar
- db2jcc4.jar
- 將JAR包放入
$FLINK_HOME/lib/
目錄后重啟Flink集群。
三、Flink SQL 表定義與參數詳解
1. 完整建表示例(含元數據列)
-- 配置checkpoint(可選)
SET 'execution.checkpointing.interval' = '5s';-- 創建DB2 CDC表
CREATE TABLE db2_products (id INT,name STRING,description STRING,weight DECIMAL(10, 3),-- 元數據列:捕獲變更信息db_name STRING METADATA FROM 'database_name' VIRTUAL,schema_name STRING METADATA FROM 'schema_name' VIRTUAL,op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,PRIMARY KEY(id) NOT ENFORCED
) WITH ('connector' = 'db2-cdc','hostname' = '192.168.1.100','port' = '50000','username' = 'db2inst1','password' = 'password','database-name' = 'mydb','schema-name' = 'myschema','table-name' = 'products','server-time-zone' = 'Asia/Shanghai','scan.startup.mode' = 'initial'
);
2. 核心參數詳解
參數名 | 必選 | 默認值 | 類型 | 說明 |
---|---|---|---|---|
connector | 是 | 無 | String | 固定為db2-cdc |
hostname | 是 | 無 | String | DB2服務器IP或域名 |
username | 是 | 無 | String | 連接數據庫的用戶名 |
password | 是 | 無 | String | 連接數據庫的密碼 |
database-name | 是 | 無 | String | 數據庫名稱(如mydb ) |
schema-name | 是 | 無 | String | 模式名稱(如myschema ) |
table-name | 是 | 無 | String | 表名(如products ) |
port | 否 | 50000 | Integer | 數據庫端口號 |
scan.startup.mode | 否 | initial | String | 啟動模式:initial (首次啟動時執行快照)、latest-offset (僅讀取最新變更) |
server-time-zone | 否 | 系統時區 | String | 數據庫服務器時區(如Asia/Shanghai ),影響TIMESTAMP轉換 |
四、環境驗證與測試
1. 準備測試數據(DB2)
-- 創建測試表(若不存在)
CONNECT TO mydb USER db2inst1 USING password;
SET SCHEMA myschema;CREATE TABLE products (id INT PRIMARY KEY,name VARCHAR(100),description VARCHAR(255),weight DECIMAL(10,3)
);-- 插入測試數據
INSERT INTO products VALUES (1, '產品A', '測試產品A', 1.5);
INSERT INTO products VALUES (2, '產品B', '測試產品B', 2.3);
COMMIT;
2. Flink SQL 驗證
-- 查詢DB2 CDC表(首次觸發快照讀取)
SELECT * FROM db2_products;-- 在DB2中更新數據
UPDATE myschema.products SET weight = 1.8 WHERE id = 1;
COMMIT;-- 觀察Flink輸出:應顯示更新后的記錄,op_ts為變更時間
3. DataStream API 驗證
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.cdc.connectors.db2.Db2Source;public class Db2SourceExample {public static void main(String[] args) throws Exception {// 配置DB2 SourceSourceFunction<String> sourceFunction = Db2Source.<String>builder().hostname("192.168.1.100").port(50000).database("mydb").tableList("myschema.products").username("db2inst1").password("password").deserializer(new JsonDebeziumDeserializationSchema()).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(3000);env.addSource(sourceFunction).print().setParallelism(1);env.execute("DB2 CDC Test");}
}
五、常見問題與解決方案
-
日志模式未啟用
ERROR: DB2 CDC requires archive logging to be enabled
- 解決方案:執行
UPDATE DATABASE CONFIGURATION
啟用歸檔日志,并重啟數據庫。
- 解決方案:執行
-
觸發器權限不足
ERROR: User does not have permission to create triggers
- 解決方案:授予用戶
CREATE TRIGGER
權限:GRANT CREATETAB, BINDADD, IMPLICIT_SCHEMA, CREATE_NOT_FENCED_ROUTINE TO db2inst1;
- 解決方案:授予用戶
-
數據類型不支持(BOOLEAN)
ERROR: BOOLEAN type is not supported in SQL Replication on DB2
- 解決方案:將BOOLEAN列替換為SMALLINT(0/1)或CHAR(1)(‘Y’/‘N’)。
-
時間戳轉換異常
- 解決方案:顯式設置
server-time-zone
參數:'server-time-zone' = 'Asia/Shanghai'
- 解決方案:顯式設置
六、生產環境優化建議
-
性能調優
- 調整
debezium.poll.interval.ms
(如500
)控制輪詢間隔,debezium.snapshot.fetch.size
(如2048
)優化快照讀取。
- 調整
-
高可用配置
- 使用DB2 HADR(高可用性災難恢復)集群,Flink作業連接主節點,確保日志復制正常。
-
監控與維護
- 定期清理CDC緩沖區表:
DELETE FROM cdc.products_cdc_buffer WHERE op_ts < CURRENT_TIMESTAMP - 1 DAY;
- 定期清理CDC緩沖區表:
通過以上步驟,可完成Flink DB2 CDC的全流程配置與驗證。生產環境中需特別注意DB2日志模式配置、觸發器權限管理及BOOLEAN類型的兼容性問題,以確保數據一致性和系統穩定性。