手把手教你用 Flink + CDC 實現 MySQL 數據實時導入 StarRocks(干貨)
如何利用 Apache Flink 結合 CDC(Change Data Capture,變更數據捕獲)技術,將 MySQL 的數據實時導入 StarRocks,打造高效的實時數倉。這不僅是企業數字化轉型的利器,也是技術人提升競爭力的絕佳實戰場景!
在數據驅動的時代,實時性是企業的核心競爭力。傳統的批量 ETL(抽取-轉換-加載)方式往往因為延遲高、效率低而無法滿足實時分析需求。而 Flink 作為流處理的王者,搭配 CDC 捕獲 MySQL 的增量變更,再結合 StarRocks 的高性能分析能力,形成了一個強大的實時數據入湖方案。無論你是數據庫工程師、數據分析師,還是對數倉建設感興趣的初學者,這篇文章都將手把手帶你完成從環境搭建到整庫同步的實戰流程。
通過這篇博文,你將學會如何安裝 Flink 和 MySQL CDC 連接器,編寫 YAML 文件實現整庫同步,并將數據無縫導入 StarRocks。準備好服務器,泡杯咖啡,咱們一起開啟這場實時入湖的實戰之旅吧!
**準備好你的服務器,泡杯咖啡,咱們一起“上代碼、上步驟、上實戰”,開啟這場實時入湖的硬核之旅!
**
文章目錄
- 手把手教你用 Flink + CDC 實現 MySQL 數據實時導入 StarRocks(干貨)
- 第一步:為什么選擇 Flink + CDC + StarRocks?
- 第二步:環境準備與工具安裝
- 具體配置
- 2.0 MySql 配置
- 驗證是否是 binlog 模式
- 如果沒有 需要 啟動 binlog 模式
- 創建數據庫和表 用來同步
- 2.1 java 安裝
- 2.2 下載 Flink 1.20.1
- 2.3 解壓 Flink 1.20.1
- 2.4 配置 Flink-1.20.1/conf/config.yaml
- 2.5 下載 Flink-CDC-3.3.0
- 2.6 解壓 Flink-CDC-3.3.0
- 2.7 下載驅動 Flink-cdc-pipeline-connector-mysql-3.3.0.jar
- 2.8 下載驅動 Flink-cdc-pipeline-connector-starrocks-3.3.0.jar
- 2.9 移到 Flink-cdc-3.3.0/lib 下
- 2.10 下載 MySQL JDBC 驅動
- 2.11 啟動 Flink-1.20.1
- 2.12 網絡和數據庫連接 測試
- 2.13 創建 yaml
- 2.14 運行 yaml
- 最后驗證 是否同步
- 查看starrocks
- 連接 192.168.5.128:8030 ,并輸入 用戶名密碼
第一步:為什么選擇 Flink + CDC + StarRocks?
Flink 是一個強大的流處理框架,擅長處理實時數據流,吞吐量高、延遲低,非常適合實時數倉場景。而 CDC 技術能捕獲 MySQL 數據庫的增量變更(比如插入、更新、刪除操作),讓我們無需全量掃描數據庫,就能實時獲取數據變化。至于 StarRocks,它是一個高性能的分析型數據庫,查詢速度快,支持實時分析場景。把這三者結合起來,簡直是實時數據入湖的“黃金三角”!
第二步:環境準備與工具安裝
先是vm 上 安裝了 4臺 linux centos 8 (如下圖)
配置了 4臺 VM 虛擬機
3臺 安裝 StarRocks
1臺安裝 mysql +flink+flinkcdc
具體可以參考Streaming ELT 同步 MySQL 到 StarRocks
但官方的例子 需要安裝 docker 。當你真實配置你會發現,環境各有不同 ,不同
java 版本 ,Flink 和 cdc 用什么版本 ,有沒有什么依賴性都需要考慮 問題。
包括 mysql 和 starocks 版本等.
我在實踐中也出現的很多問題,經過很多嘗試最終完成.
具體配置
組件 | 安裝 IP 地址 | 角色/說明 |
---|---|---|
Apache Flink | 192.168.5.131 | Flink 集群(JobManager + TaskManager) |
Flink CDC 連接器 | 192.168.5.131 | 部署于 Flink 的 lib 目錄,用于捕獲 MySQL 變更數據 |
MySQL | 192.168.5.131 | 源數據庫,提供數據并啟用 Binlog |
StarRocks FE + BE | 192.168.5.128 | StarRocks 前端(FE)+ 后端(BE) |
StarRocks BE | 192.168.5.129 | StarRocks 后端(BE)節點 2 |
StarRocks BE | 192.168.5.130 | StarRocks 后端(BE)節點 3 |
說明
Flink 和 Flink CDC:均部署在 192.168.5.131,Flink CDC 連接器通常作為 JAR 文件放置在 Flink 的 lib 目錄下,與 Flink 共享同一節點。
MySQL:與 Flink 部署在同一服務器(192.168.5.131),需確保 Binlog 已啟用。
StarRocks:分布式部署,FE 和一個 BE 節點在 192.168.5.128,另外兩個 BE 節點分別在 192.168.5.129 和 192.168.5.130,形成一個典型的多節點集群。
網絡要求:確保所有 IP 地址之間網絡互通,特別是 MySQL(3306 端口)、Flink(8081 等端口)、StarRocks(8030、9030 等端口)需開放相關端口。
關于 怎么安裝 starrocks 可以訪問 Starrocks 中文論壇
關于 怎么安裝 mysql 可以訪問 這篇三步搞定 mysql 8.0的安裝
本案列不需要安裝 Docker
CentOS8_cd_Flink
可以看到 配置不高,主要用來完成這個實驗 ,
如果是生產環境建議以下配置
Flink 是一個資源密集型的流處理框架,對 CPU、內存、磁盤和網絡有一定要求。以下是推薦的硬件配置:
開發/測試環境(單節點或小型集群)
: CPU:4 核 ~ 8 核(如 Intel Xeon 或 AMD EPYC,推薦 2.5 GHz
以上)。 內存:16 GB ~ 32 GB(Flink 作業和 JVM 堆內存需至少 8 GB)。 磁盤:500 GB
SSD(用于存儲檢查點、日志和臨時數據)。 網絡:千兆網卡(1 Gbps),確保低延遲數據傳輸。
生產環境(分布式集群):
CPU:16 核 ~ 32 核 per TaskManager(推薦多核 CPU 以支持高并行度)。 內存:64 GB ~ 128 GB
per TaskManager(建議為 Flink 分配 70%~80% 的內存,剩余用于操作系統)。 磁盤:1 TB ~ 2 TB
NVMe SSD(高 IOPS,適合檢查點和狀態存儲)。 網絡:萬兆網卡(10 Gbps),支持高吞吐量數據傳輸。 節點數:至少 3
個節點(1 個 JobManager + 2 個 TaskManager),可根據任務規模擴展。
2.0 MySql 配置
驗證是否是 binlog 模式
SHOW VARIABLES LIKE 'log_bin'; SHOW VARIABLES LIKE '%binlog%';
如果沒有 需要 啟動 binlog 模式
# /etc/my.cnf
[mysqld]
log-bin=mysql-bin # 啟用
binlog-format=ROW
– 1. 檢查 binlog 是否啟用 SHOW VARIABLES LIKE ‘log_bin’; – 必須是 ON
– 2. 設置格式為 ROW(最重要) SET GLOBAL binlog_format = ‘ROW’;
– 3. 設置合理的過期時間 SET GLOBAL binlog_expire_logs_seconds = 604800; – 7 天
創建數據庫和表 用來同步
-- 創建數據庫
CREATE DATABASE app_db;USE app_db;-- 創建 orders 表
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);-- 插入數據
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);-- 創建 shipments 表
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- 插入數據
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');-- 創建 products 表
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- 插入數據
INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');
2.1 java 安裝
可以看到 java 已經安裝好 版本 11
2.2 下載 Flink 1.20.1
wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.20.1/flink-1.20.1-bin-scala_2.12.tgz
2.3 解壓 Flink 1.20.1
tar -xzf flink-1.20.1-bin-scala_2.12.tgz
cd flink-1.20.1
ll
2.4 配置 Flink-1.20.1/conf/config.yaml
vim flink-1.20.1/conf/config.yaml
改為 rest.address: 0.0.0.0
改為 rest.bind-address: 0.0.0.0
rest.address 和 rest.bind-address 是與 Flink 的 REST API 和 Web UI 相關的配置項,用于控制 Flink JobManager 的 REST 服務監聽地址。這些配置決定了 Flink 的 Web UI 和客戶端如何訪問 JobManager。
2.5 下載 Flink-CDC-3.3.0
wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-cdc-3.3.0/flink-cdc-3.3.0-bin.tar.gz
2.6 解壓 Flink-CDC-3.3.0
tar -xzf flink-cdc-3.3.0-bin.tar.gz
cd flink-cdc-3.3.0
2.7 下載驅動 Flink-cdc-pipeline-connector-mysql-3.3.0.jar
wget "https://aliyun-osm-maven.oss-cn-shanghai.aliyuncs.com/repository/central/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.3.0/flink-cdc-pipeline-connector-mysql-3.3.0.jar?Expires=1753349291&OSSAccessKeyId=LTAI5tQeTg2SkYgiUPXMyK7t&Signature=jnckjewN8vqzaE3tbupo691o8YY%3D" -O lib/flink-cdc-pipeline-connector-mysql-3.3.0.jar
2.8 下載驅動 Flink-cdc-pipeline-connector-starrocks-3.3.0.jar
wget "https://aliyun-osm-maven.oss-cn-shanghai.aliyuncs.com/repository/central/org/apache/flink/flink-cdc-pipeline-connector-starrocks/3.3.0/flink-cdc-pipeline-connector-starrocks-3.3.0.jar?Expires=1753349371&OSSAccessKeyId=LTAI5tQeTg2SkYgiUPXMyK7t&Signature=Fd0JxnlDxr1nKkP8wOoIHHhGV2c%3D" -O lib/flink-cdc-pipeline-connector-starrocks-3.3.0.jar
2.9 移到 Flink-cdc-3.3.0/lib 下
2.10 下載 MySQL JDBC 驅動
wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
2.11 啟動 Flink-1.20.1
./flink-1.20.1/bin/start-cluster.sh
打開 網頁
192.168.5.131 :8081,如果可以打開說明啟動成功
點擊 Task Managers ,說明基本正常
2.12 網絡和數據庫連接 測試
ping 192.168.5.131
ping 192.168.5.128telnet 192.168.5.131 3306
telnet 192.168.5.128 9030
2.13 創建 yaml
source:type: mysqlhostname: 192.168.5.131 # 修改為實際 MySQL 地址port: 3306username: rootpassword: 123456tables: app_db.\.* #app_db修改為庫名server-id: 5400-5404sink:type: starrocksjdbc-url: jdbc:mysql://192.168.5.128:9030 # 修改為實際 StarRocks 地址load-url: 192.168.5.128:8030username: rootpassword: 123456table.create.properties.replication_num: 1pipeline:name: MySQL to StarRocks Pipelineparallelism: 1
source:
type: mysql
hostname: 192.168.5.131 # MySQL 服務器 IP 地址
port: 3306 # MySQL 端口
username: root # 連接
MySQL 的用戶名
password: 123456 # 連接 MySQL 的密碼 tables:
app_db…* # 需要同步的表,支持正則,這里是 app_db 庫下的所有表
server-id:
5400-5404 # MySQL binlog server_id 范圍(Flink CDC 會隨機選一個)
sink:
type: starrocks
jdbc-url: jdbc:mysql://192.168.5.128:9030 # StarRocks JDBC 連接地址
load-url: 192.168.5.128:8030 # StarRocks Stream Load 地址
username: root # StarRocks 用戶名
password: 123456 # StarRocks 密碼
table.create.properties.replication_num: 1 # 表副本數設置為1
pipeline:
name: MySQL to StarRocks Pipeline # 管道名稱
parallelism: 1 # 并行度設置為1
2.14 運行 yaml
因為有依賴關系 需要放在 flink-cdc-3.3.0 目錄下面
./bin/flink-cdc.sh --flink-home /root/flink-1.20.1 mysql-to-starrocks-pipeline.yaml
./bin/flink-cdc.sh - 這是 Flink CDC 的啟動腳本
–flink-home /root/flink-1.20.1 - 指定 Flink 的安裝目錄
mysql-to-starrocks-pipeline.yaml - 配置文件路徑
頁面如下
設置
./bin/flink-cdc.sh -Dexecution.checkpointing.interval=3000 --flink-home /root/flink-1.20.1 mysql-to-starrocks-pipeline.yaml
-Dexecution.checkpointing.interval=3000
-D: 這是 JVM 參數前綴,用于設置系統屬性
execution.checkpointing.interval: Flink 配置參數名
3000: 時間間隔,單位是毫秒,即 3000ms = 3秒
最后驗證 是否同步
查看starrocks
可以從圖中看到原來 starrocks 是都沒有數據庫的
現在有數據庫,表也自動同步好了。
連接 192.168.5.128:8030 ,并輸入 用戶名密碼