Flink MySQL CDC 環境配置與驗證

一、MySQL 服務器配置詳解
1. 啟用二進制日志(Binlog)

MySQL CDC 依賴二進制日志獲取增量數據,需在 MySQL 配置文件(my.cnfmy.ini)中添加以下配置:

# 啟用二進制日志
log-bin=mysql-bin
# 二進制日志格式(推薦ROW模式,記錄行級變更)
binlog-format=ROW
# 啟用GTID(高可用必備)
gtid-mode=ON
enforce-gtid-consistency=ON
# 從庫同步時記錄binlog(主從架構需要)
log-slave-updates=ON
# 避免長連接超時(大表快照時需要)
interactive_timeout=3600
wait_timeout=3600

配置說明

  • log-bin:指定二進制日志文件名前綴,MySQL 會自動生成如 mysql-bin.000001 的文件
  • binlog-format=ROW:相比 STATEMENT 模式,ROW 模式能精確記錄每行數據的變更
  • gtid-mode:全局事務標識符,用于主從切換時保證數據一致性
  • log-slave-updates:若使用從庫同步,需開啟此配置讓從庫也記錄 binlog
2. 創建專用用戶并授權
-- 創建用戶(替換為實際用戶名和密碼)
CREATE USER 'flink_cdc'@'localhost' IDENTIFIED BY 'flink123';-- 授予必要權限(重要:REPLICATION SLAVE 用于讀取binlog)
GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_cdc'@'localhost';-- 刷新權限
FLUSH PRIVILEGES;

權限說明

  • SELECT:讀取表數據(快照階段需要)
  • SHOW DATABASES:獲取數據庫列表(用于正則匹配監控庫)
  • REPLICATION SLAVE:讀取 binlog 必備權限
  • REPLICATION CLIENT:獲取服務器狀態(如binlog位置)
3. 配置唯一 Server ID

每個 Flink 作業需配置不同的 Server ID(避免 binlog 位置沖突):

# 在my.cnf中添加
server-id=1001  # 任意唯一整數,建議范圍5400-6400

說明:若 Flink 作業并行度為 N,則 Server ID 可設為范圍(如 5400-5400+N),例如:

-- Flink SQL 中通過Hints設置Server ID范圍
SELECT * FROM mysql_table /*+ OPTIONS('server-id'='5401-5404') */;
二、Flink 環境配置步驟
1. 添加依賴(Maven 項目)

pom.xml 中添加 MySQL CDC 連接器依賴:

<dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>3.0.1</version><!-- 若使用Flink 1.14+,無需添加scope --><scope>provided</scope>
</dependency>
2. SQL Client 部署(非Maven環境)
  1. 下載連接器 JAR 包:flink-sql-connector-mysql-cdc-3.0.1.jar
  2. 將 JAR 包放入 $FLINK_HOME/lib/ 目錄
  3. 重啟 Flink 集群使依賴生效
三、Flink MySQL CDC 表定義與參數詳解
1. 完整建表示例(Flink SQL)
-- 設置checkpoint間隔(可選)
SET 'execution.checkpointing.interval' = '3s';-- 創建MySQL CDC表
CREATE TABLE mysql_orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,-- 可選:添加元數據列db_name STRING METADATA FROM 'database_name' VIRTUAL,table_name STRING METADATA FROM 'table_name' VIRTUAL,op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,row_kind STRING METADATA FROM 'row_kind' VIRTUAL,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.1.100','port' = '3306','username' = 'flink_cdc','password' = 'flink123','database-name' = 'mydb','table-name' = 'orders',-- 可選參數詳解'server-id' = '5401','scan.incremental.snapshot.enabled' = 'true','scan.incremental.snapshot.chunk.size' = '8096','scan.startup.mode' = 'initial','heartbeat.interval' = '30s','debezium.binary.handling.mode' = 'base64'
);
2. 核心參數詳解
參數名必選默認值類型說明
connectorString固定為 mysql-cdc
hostnameStringMySQL 服務器IP或域名
usernameString連接MySQL的用戶名
passwordString連接MySQL的密碼
database-nameString監控的數據庫名,支持正則表達式(如 ^(test).* 匹配以test開頭的庫)
table-nameString監控的表名,支持正則表達式(如 `orders
server-id5400-6400隨機StringFlink作業的唯一標識,需與其他MySQL客戶端(如主從復制)不同,并行作業建議設為范圍(如 5401-5404
scan.incremental.snapshot.enabledtrueBoolean啟用增量快照(并行讀取大表,無需全局鎖),建議保持默認
scan.startup.modeinitialString啟動模式:initial(快照+binlog)、earliest-offset(從最早binlog開始)、latest-offset(從最新binlog開始)
heartbeat.interval30sDuration心跳間隔,用于更新binlog位置,避免長時間無變更時binlog被清理
debezium.binary.handling.modenoneString二進制數據處理模式:base64(轉Base64字符串)、hex(轉十六進制),適用于BLOB/VARBINARY類型
四、環境驗證與測試
1. 準備測試數據(MySQL)
-- 創建測試數據庫和表
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE orders (order_id INT PRIMARY KEY,order_date TIMESTAMP,customer_name VARCHAR(100),price DECIMAL(10, 2),order_status BOOLEAN
);-- 插入測試數據
INSERT INTO orders VALUES 
(1, '2023-01-01 10:00:00', 'Alice', 100.50, true),
(2, '2023-01-02 11:00:00', 'Bob', 200.75, false);
2. 使用Flink SQL驗證
-- 查詢MySQL CDC表數據
SELECT * FROM mysql_orders;-- 觀察輸出:應顯示插入的兩條記錄
-- 后續在MySQL中更新數據,Flink會實時捕獲變更
UPDATE mydb.orders SET price = 150.00 WHERE order_id = 1;
3. DataStream API 驗證示例
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;public class MySqlCdcExample {public static void main(String[] args) throws Exception {// 創建MySQL SourceMySqlSource<String> source = MySqlSource.<String>builder().hostname("192.168.1.100").port(3306).databaseList("mydb").tableList("mydb.orders").username("flink_cdc").password("flink123").deserializer(new JsonDebeziumDeserializationSchema()) // 轉為JSON格式.startupOptions(StartupOptions.initial()) // 初始模式(快照+binlog).build();// 配置Flink環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000); // 5秒checkpointenv.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL CDC Source").print(); // 打印到控制臺env.execute("MySQL CDC Test");}
}
4. 驗證關鍵點
  1. 日志檢查

    • Flink 日志應包含 Binlog offset on checkpoint 字樣,表明成功獲取 binlog 位置
    • Access deniedPermission denied 錯誤,確認MySQL權限正確
  2. 數據變更測試

    • 在MySQL中執行 INSERT/UPDATE/DELETE 操作,Flink 應實時輸出變更數據
    • 查看輸出中的 row_kind 字段:+I(插入)、-D(刪除)、+U(更新后)、-U(更新前)
  3. 增量快照驗證

    • 若表數據量大,查看Flink Web UI的并行度,增量快照模式下多個任務應并行讀取
    • 日志中無 FLUSH TABLES WITH READ LOCK 相關記錄,確認未獲取全局鎖
五、常見問題與解決方案
  1. 權限不足錯誤

    ERROR: Access denied for user 'flink_cdc'@'localhost' (using password: YES)
    
    • 解決方案:確認MySQL用戶密碼正確,重新執行授權語句,確保包含 REPLICATION SLAVE 權限
  2. Server ID沖突

    ERROR: Another MySQL binlog client is using the same server id
    
    • 解決方案:修改 server-id 為唯一值,或在Flink SQL中通過 'server-id'='5401-5404' 設置范圍
  3. 增量快照失敗

    ERROR: Table has no primary key, cannot split snapshot chunks
    
    • 解決方案:為表添加主鍵,或設置 scan.incremental.snapshot.chunk.key-column 為非空列(如 'scan.incremental.snapshot.chunk.key-column'='unique_id'
  4. binlog未啟用

    ERROR: Binary logging is not enabled
    
    • 解決方案:檢查MySQL配置文件,確認 log-bin 已啟用,重啟MySQL服務

通過以上步驟,可完成Flink MySQL CDC的環境配置與驗證。生產環境中建議結合實際需求調整并行度、checkpoint策略和GTID配置,以確保數據一致性和系統穩定性。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/87552.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/87552.shtml
英文地址,請注明出處:http://en.pswp.cn/web/87552.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

如何查看自己電腦的CUDA版本?

在搜索欄輸入命令提示符 打開 輸入 nvidia-smi圖片中的兩個是CUDA版本和顯卡的信息

opencv使用 GStreamer 硬解碼和 CUDA 加速的方案

在Conda環境中從源代碼編譯OpenCV&#xff08;支持CUDA和GStreamer&#xff09; 以下是完整的方案步驟&#xff0c;包括必要的依賴庫安裝過程&#xff1a; 1. 安裝Miniconda&#xff08;如果尚未安裝&#xff09; # 下載Miniconda安裝腳本 wget https://repo.anaconda.com/m…

Java面試寶典:多線程一

1. run() vs start() 陷阱題 下面程序的運行結果 public static void main(String[] args) {Thread t = new Thread(

【CSS-14-基礎樣式表Base.css】如何編寫高質量的Base.css:前端樣式重置與基礎規范指南

在前端開發中&#xff0c;Base.css&#xff08;也稱為重置樣式表或基礎樣式表&#xff09;是整個項目樣式的基石。它負責消除瀏覽器默認樣式的差異&#xff0c;建立統一的樣式基準&#xff0c;為后續開發提供一致的起點。一個精心設計的Base.css能夠顯著提高開發效率&#xff0…

探索Python數據科學工具鏈NumPyPandas與Scikit-learn

NumPy&#xff1a;數值計算的基石 NumPy是Python中用于科學計算的核心庫&#xff0c;它提供了一個強大的N維數組對象&#xff0c;以及大量的數學函數庫&#xff0c;能夠高效地進行向量和矩陣運算。對于數據科學家而言&#xff0c;掌握NumPy是進行數據處理和算法實現的基礎。 創…

八股學習(三)---MySQL

一、MySQL中的回表是什么&#xff1f;我的回答&#xff1a;MySQL回表指的是在查詢使用非聚簇索引也就是二級索引時&#xff0c;葉子節點只存儲了索引列的值和主鍵Id&#xff0c;若要查詢其他字段&#xff0c;就要根據主鍵去聚簇索引查詢完整的數據。這個過程就是回表。比如用na…

NeighborGeo:基于鄰居的IP地理定位(一)

NeighborGeo:基于neighbors的IP地理定位 X. Wang, D. Zhao, X. Liu, Z. Zhang, T. Zhao, NeighborGeo: IP geolocation based on neighbors, Comput. Netw. 257 (2025) 110896, Abstract IP地址定位在網絡安全、電子商務、社交媒體等領域至關重要。當前主流的圖神經網絡方法…

MySQL 8.0:窗口函數

一、基礎知識 定義 窗口函數&#xff08;Window Function&#xff09;對查詢結果集的子集&#xff08;“窗口”&#xff09;進行計算&#xff0c;保留原始行而非聚合為單行&#xff0c;適合復雜分析&#xff08;如排名、累積和&#xff09;。 基本語法&#xff1a; 函數名() OV…

AI 深度學習面試題學習

1.神經網絡 1.1各個激活函數的優缺點? 1.2為什么ReLU常用于神經網絡的激活函數? 1.在前向傳播和反向傳播過程中,ReLU相比于Sigmoid等激活函數計算量小; 2.避免梯度消失問題。對于深層網絡,Sigmoid函數反向傳播時,很容易就會出現梯度消失問題(在Sigmoid接近飽和區時,變換…

遇到該問題:kex_exchange_identification: read: Connection reset`的解決辦法

kex_exchange_identification: read: Connection reset 是一個非常常見的 SSH 連接錯誤。它表明在 SSH 客戶端和服務器建立安全連接的初始階段&#xff08;密鑰交換&#xff0c;Key Exchange&#xff09;&#xff0c;連接就被對方&#xff08;服務器&#xff09;強制關閉了。 …

(論文蒸餾)語言模型中的多模態思維鏈推理

&#xff08;論文總結&#xff09;語言模型中的多模態思維鏈推理 論文名稱研究背景動機主要貢獻研究細節兩階段框架實驗結果促進收斂性擺脫人工標注錯誤分析與未來前景 論文名稱 Multimodal Chain-of-Thought Reasoning in Language Models http://arxiv.org/abs/2302.00923 …

React Native 接入 eCharts

React Native 圖表接入指南 概述 本文檔詳細介紹了在React Native項目中接入ECharts圖表的完整步驟&#xff0c;包括依賴安裝、組件配置、數據獲取、圖表渲染等各個環節。 目錄 1. 環境準備2. 依賴安裝3. 圖表組件創建4. 數據獲取Hook5. 圖表配置6. 組件集成7. 國際化支持8…

基于C#的OPCServer應用開發,引用WtOPCSvr.dll

操作流程&#xff1a; 1.引入WtOPCSvr.dll文件 2.注冊服務&#xff1a;使用UpdateRegistry方法注冊&#xff0c;注意關閉應用時使用UnregisterServer取消注冊。 3.初始化服務&#xff1a;使用InitWTOPCsvr初始化 4.使用CreateTag方法&#xff0c;創建標簽 5.讀寫參數使用下面三…

Java類加載器getResource行為簡單分析

今天嘗試集成一個第三方SDK&#xff0c;在IDE里運行正常&#xff0c;放到服務器上卻遇到了NPE&#xff0c;反編譯一看&#xff0c;原來在這一行&#xff1a;String path Test.class.getClassLoader().getResource("").getPath(); // Test.class.getClassLoader().ge…

【CodeTop】每日練習 2025.7.4

Leetcode 1143. 最長公共子序列 動態規劃解決&#xff0c;比較當前位置目標和實際字符串的字母&#xff0c;再根據不同情況計算接下來的情形。 class Solution {public int longestCommonSubsequence(String text1, String text2) {char[] t1 text1.toCharArray();char[] t2…

ES6從入門到精通:Promise與異步

Promise 基礎概念Promise 是 JavaScript 中處理異步操作的一種對象&#xff0c;代表一個異步操作的最終完成或失敗及其結果值。它有三種狀態&#xff1a;Pending&#xff08;進行中&#xff09;、Fulfilled&#xff08;已成功&#xff09;、Rejected&#xff08;已失敗&#xf…

數據結構:二維數組(2D Arrays)

目錄 什么是二維數組&#xff1f; 二維數組的聲明方式 方式 1&#xff1a;靜態二維數組 方式 2&#xff1a;數組指針數組&#xff08;數組中存放的是指針&#xff09; 方式 3&#xff1a;雙指針 二級堆分配 &#x1f4a1; 補充建議 如何用“第一性原理”去推導出 C 中…

HAProxy 和 Nginx的區別

HAProxy 和 Nginx 都是優秀的負載均衡工具&#xff0c;但它們在設計目標、適用場景和功能特性上有顯著區別。以下是兩者的詳細對比&#xff1a;1. 核心定位特性HAProxyNginx主要角色專業的負載均衡器/代理Web 服務器 反向代理/負載均衡設計初衷高性能流量分發高并發 HTTP 服務…

基于Java+SpringBoot的健身房管理系統

源碼編號&#xff1a;S586源碼名稱&#xff1a;基于SpringBoot的健身房管理系統用戶類型&#xff1a;多角色&#xff0c;用戶、教練、管理員數據庫表數量&#xff1a;13 張表主要技術&#xff1a;Java、Vue、ElementUl 、SpringBoot、Maven運行環境&#xff1a;Windows/Mac、JD…

【MySQL安裝-yum/手動安裝,卸載,問題排查處理完整文檔(linux)】

一.使用Yum倉庫自動安裝 步驟1:添加MySQL Yum倉庫 sudo rpm -Uvh https://dev.mysql.com/get/mysql80-community-release-el7-6.noarch.rpm步驟2:安裝MySQL服務器 sudo yum install mysql-server -y步驟3:啟動并設置開機自啟 sudo systemctl start mysqld sudo systemct…