1.FlinkCDC概述
1.1FlinkCDC是什么?
? ? ? ? FlinkCDC(Flink Change Data Capture)是一個用于實時捕獲數據庫變更日志的工具,它可以將數據庫的變更實時同步到ApacheFlink系統中。
1.2 FlinkCDC的三個版本?
? ? ? ? 1.x 這個版本的FlinkCDC的提供了DataStream以及FlinkSQL的方式實現數據的動態獲取
? ? ? ? 存在的問題:
? ? ? ? 就是在生產環境中,我們在同步環境的時候,萬一前腳剛同步了數據,后腳就被修改了怎么辦呢?這樣我們不就是讀到了,不正確的數據了。因此,在FlinkCDC1.x的版本中的解決方案是,在讀表的過程中,鎖住整張表,這時候不會有新的數據寫入了。但是由此又帶來了一個新的問題,生產環境中時時刻刻就是會有新的數據寫入的,如果鎖住整張表,就會對線上產生很多問題。所以迎來了2.x版本。
? ? ? ? 2.x 這個版本提供了豐富的數據庫對接以及增加全量的同步鎖表的解決問題的解決方案。
? ? ? ? 提供API或者FlinkSql去進行操作,打包代碼上傳到集群去進行操作,但是我們的本身任務并不復雜,就是一個導數據的任務,所以需要更簡單的方法去實現數據導入的作用。
? ? ? ? 3.x 這個版本提供了StreamingETL方式導入數據方案。?
? ? ? ? FlinkCDC在這個版本形成了自己的框架,可以像平時的那些框架文件如果spark,hadoop一樣又bin,conf等文件夾,所以我們在使用FlinkCDC的時候就是可以直接在Conf中配置Resource(要導入的數據庫)sink(目標文件),可以通過命令啟動來進行同步。
順帶提一下兩種CDC的同步方式:
? ? ? ? CDC一種是通過查詢的方式和通過Binlog兩種方式,簡單說一下兩種的不同
基于查詢的CDC | 基于binlog的cdc | |
產品 | Sqoop、DataX | Canal |
執行模式 | Batch | Streaming |
是否可以檢測到所有變化 | 否(同步最終態) | 是 |
延遲性 | 延遲高(按天進行同步) | 低延遲 |
增加數據庫壓力 | 是 | 否 |
2.flinkCDC 同步mysql數據庫數據到doris
? ? ? ? 2.1 環境準備
? ? ? ? ? ?1)安裝FlinkCDC
? ? ? ? ? ? ? ?flinkCDC下載地址?https://pan.baidu.com/s/1_BKPxommK5dsY3hD7rYVUA 提取碼: pisv?
tar -zxvf flink-cdc-3.0.0-bin.tar.gz -C /opt/module/
? ? ? ? 2)向FlinkCDC的目錄下的lib目錄下傳入Mysql以及Doris 的依賴包
? ? ? ? doris的jar包
????????https://pan.baidu.com/s/1pgtsYT9VyXD1U4RbjYA6rg 提取碼: kx2q?
? ? ? ? mysql的jar包
????????https://pan.baidu.com/s/1pxCy0-iSutqN9YjdzGAfZw 提取碼: p65d?
? ? ? ? 還需要一個mysql-connector的jar
????????https://pan.baidu.com/s/1lzJuQRPL3KtDqDXaoGBSMQ 提取碼: dwnw?
? ? ? ? 為啥還需要已經有了mysql的jar包了還需要一個mysql-connector?
????????是因為mysql的jar包依賴于mysql-connector,
? ? ? ? 為啥不封裝在一起?
? ? ? ? 首先來說就是MySQL的jar相當于對數據庫一個能力的封裝底層可以用別的connector,也是為了解耦,還有一個原因就是兩個包所用的協議不一樣,上面的這個msyql的jar包是用的apache的協議。??
? ? ? ? 為啥用了mysql的驅動包,不用doris的驅動包?
? ? ? ? 因為doris兼容mysql的協議。
? ? ? ? 2.2 同步變更
? ? ? ? 編寫 MySQL同步到doris的配置文件
? ? ? ? 可以選擇在FlinkCDC中創建一個單獨的文件夾寫配置文件,也可以寫在conf的目錄下。
vim mysql-to-doris.yaml
source:
?#數據源的數據庫類型
??type: mysql
?#地址/主機名稱
??hostname: hadoop103
?#端口號
??port: 3306
?#數據庫用戶名
??username: root
? #數據庫密碼
??password: "000000"
? #要同步的表名
? tables: test.\.*
? #ServerID 下面詳細解釋
??server-id: 5400-5404
?#時區
??server-time-zone: UTC+8
sink:
? #目標數據庫類型
??type: doris
? #目標數據庫物理存儲主機名加端口號
??fenodes: hadoop102:7030
? #數據庫用戶名
??username: root
??#數據庫密碼
??password: "000000"
?#是否同步表的初始變化,就是類似新增字段之類的
??table.create.properties.light_schema_change: true
?#副本數
??table.create.properties.replication_num: 1
pipeline:
? #任務名稱
??name: Sync MySQL Database to Doris
? #并行任務數量
??parallelism: 1
server-id 的作用:
MySQL復制標識:在MySQL主從復制中,每個從庫必須有一個唯一的server-id來標識自己。同樣,當你的CDC工具連接MySQL時,它實際上扮演了一個MySQL從庫的角色,通過binlog來獲取數據變更。
避免沖突:如果你有多個CDC工具或從庫連接同一個MySQL主庫,每個實例必須有不同的server-id,否則會導致沖突和數據不一致。
????????這種配置方式通常在分布式或并行環境中使用,允許多個任務實例使用不同的server-id(在你的配置中parallelism: 4表示并行度為4,所以需要4個不同的server-id)。
? ? ? ? 啟動環境
? ? ? ? 1)開啟Flink集群
? ? ? ? 首先要添加如下配置
vim conf/flink-conf.yaml
添加如下配置信息
execution.checkpointing.interval: 5000
#啟動集群
bin/start-cluster.sh
? ? ? ? 2)開啟doris的FE
bin/start_fe.sh
? ? ? ? 3)? 開啟Doris的BE
bin/start_be.sh
? ? ? ? 4)啟動FlinkCDC同步變更任務
? ? ? ? 尚硅谷給的是這個命令,但是我用這個命令不行
flink-cdc-3.0.0]$ bin/flink-cdc.sh job/mysql-to-doris.yaml
? ? ? ? 我用的這個可以
bin/flink-cdc.sh config/你的配置文件 --jar lib/mysql....
? ? ? ? 然后刷新數據庫觀察結果
? ? ? ? 以上情況適用于就是我們的主庫mysql的數據庫名.表名,在doris中的數據庫名.表名是一樣。如果doris中的表名不一樣就用到下面的路由變更。
2.3路由變更
ource:
??type: mysql
??hostname: hadoop103
??port: 3306
??username: root
??password: "000000"
??tables: test_route.\.*
??server-id: 5400-5404
??server-time-zone: UTC+8
sink:
??type: doris
??fenodes: hadoop102:7030
??benodes: hadoop102:7040
??username: root
??password: "000000"
??table.create.properties.light_schema_change: true
??table.create.properties.replication_num: 1
#增加了路由規則
route:
??- source-table: test_route.t1
????sink-table: doris_test_route.doris_t1
??- source-table: test_route.t2
????sink-table: doris_test_route.doris_t1
??- source-table: test_route.t3
????sink-table: doris_test_route.doris_t3
pipeline:
??name: Sync MySQL Database to Doris
??parallelism: 1