手把手教你用 Flink + CDC 實現 MySQL 數據實時導入 StarRocks(干貨)

手把手教你用 Flink + CDC 實現 MySQL 數據實時導入 StarRocks(干貨)

如何利用 Apache Flink 結合 CDC(Change Data Capture,變更數據捕獲)技術,將 MySQL 的數據實時導入 StarRocks,打造高效的實時數倉。這不僅是企業數字化轉型的利器,也是技術人提升競爭力的絕佳實戰場景!

在數據驅動的時代,實時性是企業的核心競爭力。傳統的批量 ETL(抽取-轉換-加載)方式往往因為延遲高、效率低而無法滿足實時分析需求。而 Flink 作為流處理的王者,搭配 CDC 捕獲 MySQL 的增量變更,再結合 StarRocks 的高性能分析能力,形成了一個強大的實時數據入湖方案。無論你是數據庫工程師、數據分析師,還是對數倉建設感興趣的初學者,這篇文章都將手把手帶你完成從環境搭建到整庫同步的實戰流程。

通過這篇博文,你將學會如何安裝 Flink 和 MySQL CDC 連接器,編寫 YAML 文件實現整庫同步,并將數據無縫導入 StarRocks。準備好服務器,泡杯咖啡,咱們一起開啟這場實時入湖的實戰之旅吧!

**準備好你的服務器,泡杯咖啡,咱們一起“上代碼、上步驟、上實戰”,開啟這場實時入湖的硬核之旅!
**


文章目錄

  • 手把手教你用 Flink + CDC 實現 MySQL 數據實時導入 StarRocks(干貨)
  • 第一步:為什么選擇 Flink + CDC + StarRocks?
  • 第二步:環境準備與工具安裝
    • 具體配置
    • 2.0 MySql 配置
      • 驗證是否是 binlog 模式
      • 如果沒有 需要 啟動 binlog 模式
      • 創建數據庫和表 用來同步
    • 2.1 java 安裝
    • 2.2 下載 Flink 1.20.1
    • 2.3 解壓 Flink 1.20.1
    • 2.4 配置 Flink-1.20.1/conf/config.yaml
    • 2.5 下載 Flink-CDC-3.3.0
    • 2.6 解壓 Flink-CDC-3.3.0
    • 2.7 下載驅動 Flink-cdc-pipeline-connector-mysql-3.3.0.jar
    • 2.8 下載驅動 Flink-cdc-pipeline-connector-starrocks-3.3.0.jar
    • 2.9 移到 Flink-cdc-3.3.0/lib 下
    • 2.10 下載 MySQL JDBC 驅動
    • 2.11 啟動 Flink-1.20.1
    • 2.12 網絡和數據庫連接 測試
    • 2.13 創建 yaml
    • 2.14 運行 yaml
  • 最后驗證 是否同步
    • 查看starrocks
    • 連接 192.168.5.128:8030 ,并輸入 用戶名密碼


第一步:為什么選擇 Flink + CDC + StarRocks?

Flink 是一個強大的流處理框架,擅長處理實時數據流,吞吐量高、延遲低,非常適合實時數倉場景。而 CDC 技術能捕獲 MySQL 數據庫的增量變更(比如插入、更新、刪除操作),讓我們無需全量掃描數據庫,就能實時獲取數據變化。至于 StarRocks,它是一個高性能的分析型數據庫,查詢速度快,支持實時分析場景。把這三者結合起來,簡直是實時數據入湖的“黃金三角”!


第二步:環境準備與工具安裝

先是vm 上 安裝了 4臺 linux centos 8 (如下圖)
配置了 4臺 VM 虛擬機
3臺 安裝 StarRocks
1臺安裝 mysql +flink+flinkcdc

具體可以參考Streaming ELT 同步 MySQL 到 StarRocks

但官方的例子 需要安裝 docker 。當你真實配置你會發現,環境各有不同 ,不同
java 版本 ,Flink 和 cdc 用什么版本 ,有沒有什么依賴性都需要考慮 問題。
包括 mysql 和 starocks 版本等.
我在實踐中也出現的很多問題,經過很多嘗試最終完成.

具體配置

組件安裝 IP 地址角色/說明
Apache Flink192.168.5.131Flink 集群(JobManager + TaskManager)
Flink CDC 連接器192.168.5.131部署于 Flink 的 lib 目錄,用于捕獲 MySQL 變更數據
MySQL192.168.5.131源數據庫,提供數據并啟用 Binlog
StarRocks FE + BE192.168.5.128StarRocks 前端(FE)+ 后端(BE)
StarRocks BE192.168.5.129StarRocks 后端(BE)節點 2
StarRocks BE192.168.5.130StarRocks 后端(BE)節點 3

說明
Flink 和 Flink CDC:均部署在 192.168.5.131,Flink CDC 連接器通常作為 JAR 文件放置在 Flink 的 lib 目錄下,與 Flink 共享同一節點。

MySQL:與 Flink 部署在同一服務器(192.168.5.131),需確保 Binlog 已啟用。

StarRocks:分布式部署,FE 和一個 BE 節點在 192.168.5.128,另外兩個 BE 節點分別在 192.168.5.129 和 192.168.5.130,形成一個典型的多節點集群。

網絡要求:確保所有 IP 地址之間網絡互通,特別是 MySQL(3306 端口)、Flink(8081 等端口)、StarRocks(8030、9030 等端口)需開放相關端口。

關于 怎么安裝 starrocks 可以訪問 Starrocks 中文論壇
關于 怎么安裝 mysql 可以訪問 這篇三步搞定 mysql 8.0的安裝

本案列不需要安裝 Docker
在這里插入圖片描述

CentOS8_cd_Flink
可以看到 配置不高,主要用來完成這個實驗 ,
在這里插入圖片描述
如果是生產環境建議以下配置
Flink 是一個資源密集型的流處理框架,對 CPU、內存、磁盤和網絡有一定要求。以下是推薦的硬件配置:

開發/測試環境(單節點或小型集群)

: CPU:4 核 ~ 8 核(如 Intel Xeon 或 AMD EPYC,推薦 2.5 GHz
以上)。 內存:16 GB ~ 32 GB(Flink 作業和 JVM 堆內存需至少 8 GB)。 磁盤:500 GB
SSD(用于存儲檢查點、日志和臨時數據)。 網絡:千兆網卡(1 Gbps),確保低延遲數據傳輸。

生產環境(分布式集群):

CPU:16 核 ~ 32 核 per TaskManager(推薦多核 CPU 以支持高并行度)。 內存:64 GB ~ 128 GB
per TaskManager(建議為 Flink 分配 70%~80% 的內存,剩余用于操作系統)。 磁盤:1 TB ~ 2 TB
NVMe SSD(高 IOPS,適合檢查點和狀態存儲)。 網絡:萬兆網卡(10 Gbps),支持高吞吐量數據傳輸。 節點數:至少 3
個節點(1 個 JobManager + 2 個 TaskManager),可根據任務規模擴展。

2.0 MySql 配置

驗證是否是 binlog 模式

SHOW VARIABLES LIKE 'log_bin'; SHOW VARIABLES LIKE '%binlog%'; 

在這里插入圖片描述
在這里插入圖片描述

如果沒有 需要 啟動 binlog 模式

# /etc/my.cnf
[mysqld]
log-bin=mysql-bin          # 啟用
binlog-format=ROW

– 1. 檢查 binlog 是否啟用 SHOW VARIABLES LIKE ‘log_bin’; – 必須是 ON

– 2. 設置格式為 ROW(最重要) SET GLOBAL binlog_format = ‘ROW’;

– 3. 設置合理的過期時間 SET GLOBAL binlog_expire_logs_seconds = 604800; – 7 天

創建數據庫和表 用來同步

-- 創建數據庫
CREATE DATABASE app_db;USE app_db;-- 創建 orders 表
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);-- 插入數據
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);-- 創建 shipments 表
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- 插入數據
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');-- 創建 products 表
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- 插入數據
INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');

2.1 java 安裝

可以看到 java 已經安裝好 版本 11
在這里插入圖片描述

2.2 下載 Flink 1.20.1

wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.20.1/flink-1.20.1-bin-scala_2.12.tgz

在這里插入圖片描述

2.3 解壓 Flink 1.20.1

tar -xzf flink-1.20.1-bin-scala_2.12.tgz
cd flink-1.20.1
ll

在這里插入圖片描述

在這里插入圖片描述

2.4 配置 Flink-1.20.1/conf/config.yaml

vim flink-1.20.1/conf/config.yaml
改為 rest.address: 0.0.0.0
改為 rest.bind-address: 0.0.0.0

rest.address 和 rest.bind-address 是與 Flink 的 REST API 和 Web UI 相關的配置項,用于控制 Flink JobManager 的 REST 服務監聽地址。這些配置決定了 Flink 的 Web UI 和客戶端如何訪問 JobManager。
在這里插入圖片描述

2.5 下載 Flink-CDC-3.3.0

wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-cdc-3.3.0/flink-cdc-3.3.0-bin.tar.gz

在這里插入圖片描述

2.6 解壓 Flink-CDC-3.3.0

tar -xzf flink-cdc-3.3.0-bin.tar.gz
cd flink-cdc-3.3.0

在這里插入圖片描述

2.7 下載驅動 Flink-cdc-pipeline-connector-mysql-3.3.0.jar

wget "https://aliyun-osm-maven.oss-cn-shanghai.aliyuncs.com/repository/central/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.3.0/flink-cdc-pipeline-connector-mysql-3.3.0.jar?Expires=1753349291&OSSAccessKeyId=LTAI5tQeTg2SkYgiUPXMyK7t&Signature=jnckjewN8vqzaE3tbupo691o8YY%3D" -O lib/flink-cdc-pipeline-connector-mysql-3.3.0.jar

2.8 下載驅動 Flink-cdc-pipeline-connector-starrocks-3.3.0.jar

wget "https://aliyun-osm-maven.oss-cn-shanghai.aliyuncs.com/repository/central/org/apache/flink/flink-cdc-pipeline-connector-starrocks/3.3.0/flink-cdc-pipeline-connector-starrocks-3.3.0.jar?Expires=1753349371&OSSAccessKeyId=LTAI5tQeTg2SkYgiUPXMyK7t&Signature=Fd0JxnlDxr1nKkP8wOoIHHhGV2c%3D" -O lib/flink-cdc-pipeline-connector-starrocks-3.3.0.jar

2.9 移到 Flink-cdc-3.3.0/lib 下

在這里插入圖片描述

2.10 下載 MySQL JDBC 驅動

wget https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar

2.11 啟動 Flink-1.20.1

./flink-1.20.1/bin/start-cluster.sh

在這里插入圖片描述

打開 網頁
192.168.5.131 :8081,如果可以打開說明啟動成功
在這里插入圖片描述

點擊 Task Managers ,說明基本正常

在這里插入圖片描述

2.12 網絡和數據庫連接 測試

ping 192.168.5.131
ping 192.168.5.128telnet 192.168.5.131 3306
telnet 192.168.5.128 9030

在這里插入圖片描述

2.13 創建 yaml

在這里插入圖片描述

source:type: mysqlhostname: 192.168.5.131  # 修改為實際 MySQL 地址port: 3306username: rootpassword: 123456tables: app_db.\.*  #app_db修改為庫名server-id: 5400-5404sink:type: starrocksjdbc-url: jdbc:mysql://192.168.5.128:9030  # 修改為實際 StarRocks 地址load-url: 192.168.5.128:8030username: rootpassword: 123456table.create.properties.replication_num: 1pipeline:name: MySQL to StarRocks Pipelineparallelism: 1

source:
type: mysql
hostname: 192.168.5.131 # MySQL 服務器 IP 地址
port: 3306 # MySQL 端口
username: root # 連接
MySQL 的用戶名
password: 123456 # 連接 MySQL 的密碼 tables:
app_db…* # 需要同步的表,支持正則,這里是 app_db 庫下的所有表
server-id:
5400-5404 # MySQL binlog server_id 范圍(Flink CDC 會隨機選一個)

sink:
type: starrocks
jdbc-url: jdbc:mysql://192.168.5.128:9030 # StarRocks JDBC 連接地址
load-url: 192.168.5.128:8030 # StarRocks Stream Load 地址
username: root # StarRocks 用戶名
password: 123456 # StarRocks 密碼
table.create.properties.replication_num: 1 # 表副本數設置為1

pipeline:
name: MySQL to StarRocks Pipeline # 管道名稱
parallelism: 1 # 并行度設置為1

2.14 運行 yaml

因為有依賴關系 需要放在 flink-cdc-3.3.0 目錄下面
在這里插入圖片描述

./bin/flink-cdc.sh --flink-home /root/flink-1.20.1 mysql-to-starrocks-pipeline.yaml

./bin/flink-cdc.sh - 這是 Flink CDC 的啟動腳本
–flink-home /root/flink-1.20.1 - 指定 Flink 的安裝目錄
mysql-to-starrocks-pipeline.yaml - 配置文件路徑

在這里插入圖片描述

頁面如下
在這里插入圖片描述
在這里插入圖片描述
設置

./bin/flink-cdc.sh -Dexecution.checkpointing.interval=3000 --flink-home /root/flink-1.20.1 mysql-to-starrocks-pipeline.yaml

-Dexecution.checkpointing.interval=3000
-D: 這是 JVM 參數前綴,用于設置系統屬性
execution.checkpointing.interval: Flink 配置參數名
3000: 時間間隔,單位是毫秒,即 3000ms = 3秒

最后驗證 是否同步

查看starrocks

可以從圖中看到原來 starrocks 是都沒有數據庫的
現在有數據庫,表也自動同步好了。
在這里插入圖片描述

連接 192.168.5.128:8030 ,并輸入 用戶名密碼

在這里插入圖片描述
在這里插入圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/92975.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/92975.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/92975.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Rust:anyhow 高效錯誤處理庫核心用法詳解

以下是 anyhow 庫在 Rust 中的核心用法詳解(結合最佳實踐和示例): 🔰 一、anyhow 的核心價值 用于簡化錯誤處理,尤其適合: 需要快速原型開發的應用需要豐富錯誤上下文(Context)的場…

阿里云服務linux安裝單機版

一、單機安裝Redis 阿里教程 下載地址:redis下載地址 1、首先需要安裝Redis所需要的依賴: yum install -y gcc tcl 2、下載Redis 注:也可以自己下好然后上傳到云服務 wget https://gitcode.net/weixin_44624117/software/-/raw/master/software/Li…

python之uv使用

文章目錄安裝與更新standalonepip 安裝創建以及初始化項目依賴管理uv run直接在命令行運行python代碼片段直接運行項目中可執行腳本文件運行python包中快捷指令uv項目本地運行調試細節vscode 中運行調試uv項目命令行運行深入理解 uv lock, uv sync, uv lockuv lock 行為解析:uv…

【CV 目標檢測】①——目標檢測概述

一、目標檢測概述 1.目標檢測 目標檢測(Object Detection)的任務是找出圖像中所有感興趣的目標,并確定它們的類別(分類任務)和位置(回歸任務) 目標檢測中能檢測出來的物體取決于當前任務&…

C#圖形庫SciChart與ScottPlot及LiveCharts2對比

一.概述 1.SciChart SciChart 是一個專為企業級應用設計的高性能數據可視化庫,提供跨平臺的圖表解決方案,支持 .NET、JavaScript、iOS 和 Android 等多個平臺。它以卓越的渲染性能、豐富的專業圖表類型和強大的交互功能著稱, 廣泛應用于金…

Win10電腦密碼忘記如何進入操作系統

http://xq128.com/zj.htmlhttps://share.feijipan.com/s/LbFdbUKl下載后,準備一個空的U盤,大于4G。將U盤制作為PE盤。之后將制作好的PE盤插入到電腦中,啟動待去除密碼的電腦臺式機,啟動后一直按住F12,進入BIOS。選擇下…

[免費]基于Python的網易云音樂熱門歌單可視化大屏項目(flask+pandas+echarts+request庫)【論文+源碼+SQL腳本】

大家好,我是python222_小鋒老師,看到一個不錯的基于Python的網易云音樂熱門歌單可視化大屏項目(flaskpandasechartsrequest庫),分享下哈。 項目視頻演示 【免費】基于Python的網易云音樂熱門歌單可視化大屏項目(flaskpandasecharts爬蟲) Py…

AR 智能眼鏡:從入門到未來

從零看懂 AR 智能眼鏡:未來 10 年技術演進與新手入門指南 在這個數字技術飛速迭代的時代,AR 智能眼鏡正從科幻電影走進現實。從 2025 年重量不足 35 克的消費級產品,到 2030 年成為 “第二大腦” 的生活剛需,再到 2040 年進化為神經接口終端,AR 智能眼鏡的發展將重塑人類…

初識Vue2及MVVM理解

1、什么是Vue Vue是一款用于構建用戶界面的JavaScript框架。它基于標準HTML、CSS和JavaScript構建,并提供了一套聲明式的、組件化的編程模型,可以高效地開發用戶界面。 Vue.js是一套構建用戶界面的漸進式框架,采用自底向上增量開發的設計&…

Rust:專業級錯誤處理工具 thiserror 詳解

Rust:專業級錯誤處理工具 thiserror 詳解 thiserror 是 Rust 中用于高效定義自定義錯誤類型的庫,特別適合庫開發。相比 anyhow 的應用級錯誤處理,thiserror 提供更精確的錯誤控制,讓庫用戶能模式匹配具體錯誤。 📦 基…

Python網絡爬蟲(一) - 爬取靜態網頁

文章目錄一、靜態網頁概述1. 靜態網頁介紹2. 靜態網頁爬取技術Requests介紹二、安裝 Requests 庫三、發送請求并獲取響應1. 發送 GET 請求1.1 get() 方法介紹1.2 get() 方法簽名介紹1.3 get() 方法參數介紹1.4 示例:發送get請求2. 發送 POST 請求2.1 post() 方法介紹…

.NET/C# webapi框架下給swagger的api文檔中顯示注釋(可下載源碼)

bg&#xff1a;.NET/C#真的是越來越涼了。用的是.net9&#xff0c;創建完自帶一個天氣預報api拿來測試就行 1、在Controllers中弄多幾個&#xff0c;并寫上注釋 /// <summary> /// Post注釋 /// </summary> /// <returns></returns> [HttpPost] publ…

2508C++,檢測S模式

原文 可用Windows.System.Profile.WindowsIntegrityPolicy類檢測S模式. //C# using Windows.System.Profile; if (WindowsIntegrityPolicy.IsEnabled) {//系統在S模式if (WindowsIntegrityPolicy.CanDisable) {//系統在S模式,但可退出S模式suggestCompanion true;} else {//系…

Coding Exercising Day 9 of “Code Ideas Record“:StackQueue part 01

文章目錄1. Theoretical basisThe C standard library has multiple versions. To understand the implementation principles of stack and queue, we must know which STL version we are using.The stack and queue discussed next are data structures in *SGI STL*. Only …

Mysql數據倉庫備份腳本

Mysql數據倉庫備份腳本 #!/bin/bash# MySQL數據庫完整備份腳本 # 功能: 查詢所有數據庫 -> 分別導出 -> 壓縮打包# 配置區域 # MySQL連接信息 MYSQL_USER"root" MYSQL_PASSWORD"root" MYSQL_HOST"localhost" MYSQL_PORT"3306"…

基于嵌入式Linux RK3568 qt 車機系統開發

嵌入式系統、Qt/QML 與車機系統的發展趨勢分析 1. RK3568 開發板與 OpenGL ES 3 支持&#xff0c;為圖形應用打下堅實基礎 RK3568 是瑞芯微&#xff08;Rockchip&#xff09;推出的一款高性能、低功耗的64位處理器&#xff0c;廣泛用于工業控制、智能終端、嵌入式車載系統等領…

OceanBase架構設計

本文主要參考《大規模分布式存儲系統》 基本結構客戶端&#xff1a;發起請求。 RootServer&#xff1a;管理集群中的所有服務器&#xff0c;子表數據分布及副本管理&#xff0c;一般為一主一備&#xff0c;數據強同步。 UpdateServer&#xff1a;存儲增量變更數據&#xff0c;一…

[Element-plus]動態設置組件的語言

nuxt element-plus國際化vue element-plus國際化<template><div class"container"> <!-- <LangSwitcher />--><button click"toggle(zh-cn)">中文</button><button click"toggle(en)">English<…

【VS Code - Qt】如何基于Docker Linux配置Windows10下的VS Code,開發調試ARM 版的Qt應用程序?

如何在Windows 10上配置VS Code以開發和調試ARM版Qt應用程序。這需要設置一個基于Docker的Linux環境。首先&#xff0c;讓我們了解一下你的具體需求和環境&#xff1a;你有一個Qt項目&#xff08;看起來是醫學設備相關的設置程序&#xff09;目標平臺是ARM架構你希望在Windows …

linux常見故障系列文章 1-linux進程掛掉原因總結和排查思路

問題一 &#xff1a;運行時常見的進程崩潰原因 內存不足&#xff09; **0. 內存不足 內存不足&#xff08;OOM Killer&#xff09; 排查 OOM&#xff1a;free -h → dmesg → ps aux --sort-%mem 預防 OOM&#xff1a;限制關鍵進程內存、調整 OOM Killer 策略、增加 swap 長期優…