What’s Flink-cdc?
Flink CDC 是基于Apache Flink的一種數據變更捕獲技術,用于從數據源(如數據庫)中捕獲和處理數據的變更事件。CDC技術允許實時地捕獲數據庫中的增、刪、改操作,將這些變更事件轉化為流式數據,并能夠對這些事件進行實時處理和分析。
Flink CDC提供了與各種數據源集成的功能,包括常見的關系型數據庫(如MySQL、PostgreSQL、Oracle等)以及NoSQL數據庫(如MongoDB、HBase等)。它通過監控數據庫的日志或輪詢方式來捕獲數據變更,并將變更事件作為數據流發送到Flink的任務中進行處理。
Flink CDC 深度集成并由 Apache Flink 驅動,提供以下核心功能:
? 端到端的數據集成框架
? 為數據集成的用戶提供了易于構建作業的 API
? 支持在 Source 和 Sink 中處理多個表
? 整庫同步
?具備表結構變更自動同步的能力(Schema Evolution)
在使用者的角度,就是Flink-cdc可以簡化流處理的流程:
-
引入Flink-cdc之前流處理流程
-
引入Flink-cdc之后后流處理流程
如上所示,在flink-cdc被引入后大大簡化了流處理流程
Flink-cdc支持的鏈接及對應的版本
Pipeline Connectors
Source Connectors
截止目前(2024-05-23)
Flink-cdc與Flink對應對影版本的關系
截止目前(2024-05-23)
flink-connector-mysql-cdc 實例分析
示例代碼
demo代碼:
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class MySqlSourceDemo {public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("mysql-server-host").port(3306).databaseList("mydb") // 設置捕獲的數據庫.tableList("mydb.products") // 設置捕獲的表,如果需要同步整個數據庫,請將 tableList 設置為 ".*".
// .tableList(".*") // 捕獲整個數據庫的表
// .tableList("^(?!mysql|information_schema|performance_schema).*") // 設置捕獲的表,排除系統庫
// .tableList("mydb.(?!products|orders).*") // 同步排除products和orders表之外的整個my_db庫.username("flink-cdc").password("xxx").serverId("5400-5405").deserializer(new JsonDebeziumDeserializationSchema()) // 將 SourceRecord 轉換為 JSON 字符串.serverTimeZone("Asia/Shanghai") // 設置時區.startupOptions(StartupOptions.initial()).scanNewlyAddedTableEnabled(true) // 啟用掃描新添加的表功能
// .includeSchemaChanges(true) // 包括 schema 變更.build();org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration();config.setString("rest.port", "8081");
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(config); //本地環境,調試用StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 設置 3s 的 checkpoint 間隔env.enableCheckpointing(3000);env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage("file:///tmp/ck");//本地文件系統
// env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 1.14.0 版本開始支持env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")// 設置 source 節點的并行度為 4.setParallelism(5).print().setParallelism(1); // 設置 sink 節點并行度為 1env.execute("Print MySQL Snapshot + Binlog");}
}
maven依賴:
<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.14.5</flink.version><scala.binary.version>2.12</scala.binary.version></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency><!-- 將 Apache Flink 的 Web 運行時模塊添加到項目中 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope> <!--provided生命周期在test模式才可以運行,在main模式會找不到包--></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>compile</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version><scope>provided</scope></dependency></dependencies>
日志配置文件:
log4j.properties
log4j.rootCategory=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %c{1}:%L - %m%n
啟動standalone Flink級群
# jobmanager
docker run -d \
--name flink-jm \
--hostname flink-jm \
-p 8082:8081 \
--env FLINK_PROPERTIES="jobmanager.rpc.address: flink-jm" \
--network flink-network-standalone \
ponylee/flink:1.15.0-java8 \
jobmanager# taskmanager
docker run -d \
--name flink-tm \
--hostname flink-tm \
--env FLINK_PROPERTIES="jobmanager.rpc.address: flink-jm" \
--network flink-network-standalone \
ponylee/flink:1.15.0-java8 \
taskmanager \
-Dtaskmanager.memory.process.size=1024m \
-Dtaskmanager.numberOfTaskSlots=5 \
-Drest.flamegraph.enabled=true
分析說明
為每個 Reader 設置不同的 Server id
每個用于讀取 binlog 的 MySQL 數據庫客戶端都應該有一個唯一的 id,稱為 Server id。 MySQL 服務器將使用此 id 來維護網絡連接和 binlog 位置。 因此,如果不同的作業共享相同的 Server id, 則可能導致從錯誤的 binlog 位置讀取數據。 因此,建議通過為每個 Reader 設置不同的 Server id , 假設 Source 并行度為 4,server id 配置必須:serverId(“5400-5405”),5405-5400=5 >= 4。來為 4 個 Source readers 中的每一個分配唯一的 Server id。
查看mysql鏈接發現
select * from information_schema.processlist where user = ‘flink-cdc’;
Flink-cdc對mysql的影響
正常情況下,Flink-cdc是No-lock Read,主庫可以繼續處理事務和查詢,而不會導致主庫進程阻塞,不會對主庫產生直接影響。但是,在某些情況下數據同步的過程中可能會對主庫產生一些間接影響,比如:網絡、IO、CPU負載以及mysql的并發連接數等資源消耗。但這些對主庫的開銷影響相對較小(全量同步階段可能比較耗能,但時間相對比較短)。
斷點續傳
通過從checkpoint/savepoint 恢復,flink-cdc可以保證斷點續傳。
-
從checkpoint/savepoint恢復,縮小同步范圍,例如:從tableList(“mydb.products,mydb.orders”)或tableList(“.*”) 縮小到 tableList(“mydb.products”),應用更新生效。
-
應用從checkpoint/savepoint恢復,擴大同步范圍的部分不會生效,例如:從tableList(“mydb.products”) 到 tableList(“mydb.products,mydb.orders”)或tableList(“.*”),應用更新不生效生效。若想使動態加表生效,可以顯示制定scanNewlyAddedTableEnabled(true) ,來啟用掃描新添加的表功能。如沒有特殊情況,建議在開發環境開啟此配置。
flink-cdc包名變更
Flink CDC 項目 從 2.0.0 版本將 group id 從com.alibaba.ververica 改成 com.ververica, 自 3.1 版本從將 group id 從 com.ververica 改成 org.apache.flink。 這是為了讓項目更加社區中立,讓各個公司的開發者共建時更方便。所以在maven倉庫找 2.x 的包時,路徑是 /com/ververica;找3.1及以上版本的包時,路徑是/org/apache/flink
參考:
flink-cdc
flink-cdc docs