Flink CDC 是一個基于流的數據集成工具,旨在為用戶提供一套功能更加全面的編程接口(API),它基于數據庫日志的 CDC(變更數據捕獲)技術實現了統一的增量和全量數據讀取。 該工具使得用戶能夠以 YAML 配置文件的形式,優雅地定義其 ETL(Extract, Transform, Load)流程,并協助用戶自動化生成定制化的 Flink 算子并且提交 Flink 作業。 Flink CDC 在任務提交過程中進行了優化,并且增加了一些高級特性,如表結構變更自動同步(Schema Evolution)、數據轉換(Data Transformation)、整庫同步(Full Database Synchronization)以及 精確一次(Exactly-once)語義。
Flink CDC 深度集成并由 Apache Flink 驅動,提供以下核心功能:
- ? 端到端的數據集成框架
- ? 為數據集成的用戶提供了易于構建作業的 API
- ? 支持在 Source 和 Sink 中處理多個表
- ? 整庫同步
- ?具備表結構變更自動同步的能力(Schema Evolution)
一、如何使用 Flink CDC
Flink CDC 提供了基于?YAML
?格式的用戶 API,更適合于數據集成場景。以下是一個?YAML
?文件的示例,它定義了一個數據管道(Pipeline),該Pipeline從 MySQL 捕獲實時變更,并將它們同步到 Apache Doris:
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to Dorisparallelism: 2
通過使用?flink-cdc.sh
?提交 YAML 文件,一個 Flink 作業將會被編譯并部署到指定的 Flink 集群。
二、理解核心概念
1、Data Pipeline
由于Flink CDC中的事件以管道方式從上游流向下游,因此整個ETL任務被稱為數據管道。
我們可以使用下面的yaml文件來定義一個簡潔的數據管道,描述將MySQL app_db數據庫下的所有表同步到Doris:
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*sink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""pipeline:name: Sync MySQL Database to Dorisparallelism: 2
?我們可以使用下面的yaml文件定義一個復雜的數據管道,描述將MySQL app_db數據庫下的所有表同步到Doris,并給出特定的目標數據庫名稱ods_db和特定的目標表名稱前綴ods_:
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*sink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""transform:- source-table: adb.web_order01projection: \*, format('%S', product_name) as product_namefilter: addone(id) > 10 AND order_id > 100description: project fields and filter- source-table: adb.web_order02projection: \*, format('%S', product_name) as product_namefilter: addone(id) > 20 AND order_id > 200description: project fields and filterroute:- source-table: app_db.orderssink-table: ods_db.ods_orders- source-table: app_db.shipmentssink-table: ods_db.ods_shipments- source-table: app_db.productssink-table: ods_db.ods_productspipeline:name: Sync MySQL Database to Dorisparallelism: 2user-defined-function:- name: addoneclasspath: com.example.functions.AddOneFunctionClass- name: formatclasspath: com.example.functions.FormatFunctionClass
Pipeline 配置:?
支持數據管道級別的以下配置選項:
parameter | meaning | optional/required |
---|---|---|
name | 管道的名稱,將作為作業名稱提交到Flink集群。 | optional |
parallelism | 管道的全局并行性。默認為1。 | optional |
local-time-zone | 本地時區定義當前會話時區id。 | optional |
2、Data Source
數據源用于訪問元數據,并從外部系統讀取更改的數據。數據源可以同時從多個表中讀取數據。
要描述數據源,需要以下內容:
parameter | meaning | optional/required |
---|---|---|
type | 數據源的類型,如mysql。 | required |
name | 數據源的名稱,由用戶定義(提供默認值)。 | optional |
configurations of Data Source | 用于構建數據源的配置,例如連接配置和源表屬性。 | optional |
source:type: mysqlname: mysql-source #optional,description informationhost: localhostport: 3306username: adminpassword: passtables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_\.*
3、Data Sink
數據接收器用于應用架構更改并將更改數據寫入外部系統。數據接收器可以同時寫入多個表。
Parameters?
要描述數據接收器,需要以下內容:
parameter | meaning | optional/required |
---|---|---|
type | The type of the sink, such as doris or starrocks. | required |
name | The name of the sink, which is user-defined (a default value provided). | optional |
configurations of Data Sink | Configurations to build the Data Sink e.g. connection configurations and sink table properties. | optional |
Example?
我們可以使用此yaml文件定義doris接收器:
sink:type: dorisname: doris-sink # Optional parameter for description purposefenodes: 127.0.0.1:8030username: rootpassword: ""table.create.properties.replication_num: 1 # Optional parameter for advanced functionalities