一、MySQL 服務器配置詳解
1. 啟用二進制日志(Binlog)
MySQL CDC 依賴二進制日志獲取增量數據,需在 MySQL 配置文件(my.cnf
或 my.ini
)中添加以下配置:
# 啟用二進制日志
log-bin=mysql-bin
# 二進制日志格式(推薦ROW模式,記錄行級變更)
binlog-format=ROW
# 啟用GTID(高可用必備)
gtid-mode=ON
enforce-gtid-consistency=ON
# 從庫同步時記錄binlog(主從架構需要)
log-slave-updates=ON
# 避免長連接超時(大表快照時需要)
interactive_timeout=3600
wait_timeout=3600
配置說明:
log-bin
:指定二進制日志文件名前綴,MySQL 會自動生成如mysql-bin.000001
的文件binlog-format=ROW
:相比 STATEMENT 模式,ROW 模式能精確記錄每行數據的變更gtid-mode
:全局事務標識符,用于主從切換時保證數據一致性log-slave-updates
:若使用從庫同步,需開啟此配置讓從庫也記錄 binlog
2. 創建專用用戶并授權
-- 創建用戶(替換為實際用戶名和密碼)
CREATE USER 'flink_cdc'@'localhost' IDENTIFIED BY 'flink123';-- 授予必要權限(重要:REPLICATION SLAVE 用于讀取binlog)
GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_cdc'@'localhost';-- 刷新權限
FLUSH PRIVILEGES;
權限說明:
SELECT
:讀取表數據(快照階段需要)SHOW DATABASES
:獲取數據庫列表(用于正則匹配監控庫)REPLICATION SLAVE
:讀取 binlog 必備權限REPLICATION CLIENT
:獲取服務器狀態(如binlog位置)
3. 配置唯一 Server ID
每個 Flink 作業需配置不同的 Server ID(避免 binlog 位置沖突):
# 在my.cnf中添加
server-id=1001 # 任意唯一整數,建議范圍5400-6400
說明:若 Flink 作業并行度為 N,則 Server ID 可設為范圍(如 5400-5400+N
),例如:
-- Flink SQL 中通過Hints設置Server ID范圍
SELECT * FROM mysql_table /*+ OPTIONS('server-id'='5401-5404') */;
二、Flink 環境配置步驟
1. 添加依賴(Maven 項目)
在 pom.xml
中添加 MySQL CDC 連接器依賴:
<dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>3.0.1</version><!-- 若使用Flink 1.14+,無需添加scope --><scope>provided</scope>
</dependency>
2. SQL Client 部署(非Maven環境)
- 下載連接器 JAR 包:flink-sql-connector-mysql-cdc-3.0.1.jar
- 將 JAR 包放入
$FLINK_HOME/lib/
目錄 - 重啟 Flink 集群使依賴生效
三、Flink MySQL CDC 表定義與參數詳解
1. 完整建表示例(Flink SQL)
-- 設置checkpoint間隔(可選)
SET 'execution.checkpointing.interval' = '3s';-- 創建MySQL CDC表
CREATE TABLE mysql_orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,-- 可選:添加元數據列db_name STRING METADATA FROM 'database_name' VIRTUAL,table_name STRING METADATA FROM 'table_name' VIRTUAL,op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,row_kind STRING METADATA FROM 'row_kind' VIRTUAL,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.1.100','port' = '3306','username' = 'flink_cdc','password' = 'flink123','database-name' = 'mydb','table-name' = 'orders',-- 可選參數詳解'server-id' = '5401','scan.incremental.snapshot.enabled' = 'true','scan.incremental.snapshot.chunk.size' = '8096','scan.startup.mode' = 'initial','heartbeat.interval' = '30s','debezium.binary.handling.mode' = 'base64'
);
2. 核心參數詳解
參數名 | 必選 | 默認值 | 類型 | 說明 |
---|---|---|---|---|
connector | 是 | 無 | String | 固定為 mysql-cdc |
hostname | 是 | 無 | String | MySQL 服務器IP或域名 |
username | 是 | 無 | String | 連接MySQL的用戶名 |
password | 是 | 無 | String | 連接MySQL的密碼 |
database-name | 是 | 無 | String | 監控的數據庫名,支持正則表達式(如 ^(test).* 匹配以test開頭的庫) |
table-name | 是 | 無 | String | 監控的表名,支持正則表達式(如 `orders |
server-id | 否 | 5400-6400隨機 | String | Flink作業的唯一標識,需與其他MySQL客戶端(如主從復制)不同,并行作業建議設為范圍(如 5401-5404 ) |
scan.incremental.snapshot.enabled | 否 | true | Boolean | 啟用增量快照(并行讀取大表,無需全局鎖),建議保持默認 |
scan.startup.mode | 否 | initial | String | 啟動模式:initial (快照+binlog)、earliest-offset (從最早binlog開始)、latest-offset (從最新binlog開始) |
heartbeat.interval | 否 | 30s | Duration | 心跳間隔,用于更新binlog位置,避免長時間無變更時binlog被清理 |
debezium.binary.handling.mode | 否 | none | String | 二進制數據處理模式:base64 (轉Base64字符串)、hex (轉十六進制),適用于BLOB/VARBINARY類型 |
四、環境驗證與測試
1. 準備測試數據(MySQL)
-- 創建測試數據庫和表
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE orders (order_id INT PRIMARY KEY,order_date TIMESTAMP,customer_name VARCHAR(100),price DECIMAL(10, 2),order_status BOOLEAN
);-- 插入測試數據
INSERT INTO orders VALUES
(1, '2023-01-01 10:00:00', 'Alice', 100.50, true),
(2, '2023-01-02 11:00:00', 'Bob', 200.75, false);
2. 使用Flink SQL驗證
-- 查詢MySQL CDC表數據
SELECT * FROM mysql_orders;-- 觀察輸出:應顯示插入的兩條記錄
-- 后續在MySQL中更新數據,Flink會實時捕獲變更
UPDATE mydb.orders SET price = 150.00 WHERE order_id = 1;
3. DataStream API 驗證示例
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;public class MySqlCdcExample {public static void main(String[] args) throws Exception {// 創建MySQL SourceMySqlSource<String> source = MySqlSource.<String>builder().hostname("192.168.1.100").port(3306).databaseList("mydb").tableList("mydb.orders").username("flink_cdc").password("flink123").deserializer(new JsonDebeziumDeserializationSchema()) // 轉為JSON格式.startupOptions(StartupOptions.initial()) // 初始模式(快照+binlog).build();// 配置Flink環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000); // 5秒checkpointenv.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL CDC Source").print(); // 打印到控制臺env.execute("MySQL CDC Test");}
}
4. 驗證關鍵點
-
日志檢查:
- Flink 日志應包含
Binlog offset on checkpoint
字樣,表明成功獲取 binlog 位置 - 無
Access denied
或Permission denied
錯誤,確認MySQL權限正確
- Flink 日志應包含
-
數據變更測試:
- 在MySQL中執行
INSERT/UPDATE/DELETE
操作,Flink 應實時輸出變更數據 - 查看輸出中的
row_kind
字段:+I
(插入)、-D
(刪除)、+U
(更新后)、-U
(更新前)
- 在MySQL中執行
-
增量快照驗證:
- 若表數據量大,查看Flink Web UI的并行度,增量快照模式下多個任務應并行讀取
- 日志中無
FLUSH TABLES WITH READ LOCK
相關記錄,確認未獲取全局鎖
五、常見問題與解決方案
-
權限不足錯誤:
ERROR: Access denied for user 'flink_cdc'@'localhost' (using password: YES)
- 解決方案:確認MySQL用戶密碼正確,重新執行授權語句,確保包含
REPLICATION SLAVE
權限
- 解決方案:確認MySQL用戶密碼正確,重新執行授權語句,確保包含
-
Server ID沖突:
ERROR: Another MySQL binlog client is using the same server id
- 解決方案:修改
server-id
為唯一值,或在Flink SQL中通過'server-id'='5401-5404'
設置范圍
- 解決方案:修改
-
增量快照失敗:
ERROR: Table has no primary key, cannot split snapshot chunks
- 解決方案:為表添加主鍵,或設置
scan.incremental.snapshot.chunk.key-column
為非空列(如'scan.incremental.snapshot.chunk.key-column'='unique_id'
)
- 解決方案:為表添加主鍵,或設置
-
binlog未啟用:
ERROR: Binary logging is not enabled
- 解決方案:檢查MySQL配置文件,確認
log-bin
已啟用,重啟MySQL服務
- 解決方案:檢查MySQL配置文件,確認
通過以上步驟,可完成Flink MySQL CDC的環境配置與驗證。生產環境中建議結合實際需求調整并行度、checkpoint策略和GTID配置,以確保數據一致性和系統穩定性。