Flink 與 Flink CDC 版本兼容對照表
Flink 版本 | 支持的 Flink CDC 版本 | 關鍵說明 |
---|---|---|
Flink 1.11.x | Flink CDC 1.2.x | 早期版本,需注意 Flink 1.11.0 的 Bug(如 Upsert 寫入問題),建議使用 1.11.1 及以上。 |
Flink 1.12.x | Flink CDC 2.0.x(MySQL 使用?flink-connector-mysql-cdc ) | Flink 1.12.x 支持 CDC 2.0.x,MySQL 使用新版 Connector。 |
Flink 1.13.x | Flink CDC 2.2.x, 2.3.x, 2.4.x | 2.2.x 起支持 Flink 1.13.x,2.4.x 兼容性更廣(支持到 Flink 1.15.x)。 |
Flink 1.14.x | Flink CDC 2.2.x, 2.3.x, 2.4.x | 同 Flink 1.13.x,需注意 2.4.x 對 1.14.x 的支持。 |
Flink 1.15.x | Flink CDC 2.3.x, 2.4.x | 2.4.x 是 Flink 1.15.x 的推薦版本,支持增量快照框架。 |
Flink 1.16.x | Flink CDC 2.3.x, 2.4.x | 2.4.x 支持 Flink 1.16.x,但需注意部分功能可能受限。 |
Flink 1.17.x | Flink CDC 2.5.x 及以上(如 2.5.0) | 官方未聲明 2.4.x 支持 Flink 1.17.x,需升級 Flink CDC 至 2.5+ 或降級 Flink 至 1.15.x。 |
Flink 2.0.x | 未明確說明(需參考最新 Flink CDC 文檔) | Flink 2.0 為新版本,建議關注 Flink CDC 官方文檔的最新支持情況。 |
Flink CDC 3.x | 僅支持 Flink 1.13.x 及以上(具體版本需看文檔) | Flink CDC 3.x 是新一代數據集成框架,需與 Flink 1.13+ 配合使用。 |
1.flink? cdc 的兩種使用方式
source:type: mysql-cdchostname: localhostport: 3306username: rootpassword: "123456"database-list: app_dbtable-list: app_db.*scan.startup.mode: initialscan.incremental.snapshot.enabled: truescan.newly-added-table.enabled: truesink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL to Dorisparallelism: 2execution.runtime-mode: STREAMING
./bin/flink-cdc.sh run mysql-to-doris.yaml
2. flink cdc 另一種使用方式
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-postgres-cdc</artifactId><version>${flink.cdc.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink.cdc.version}</version></dependency>
package com.example.demo.cdc;import com.example.demo.ConnectionConstants;
import com.example.demo.deserial.SafeStringKafkaDeserializationSchema;
import com.example.demo.domain.TableData;
import com.example.demo.dynamic.ExtractKafaRowAndTableName;
import com.example.demo.sink.DynamicJdbcSink;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.*;public class FlinkKafkaSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);Properties kafkaProps = new Properties();kafkaProps.setProperty("bootstrap.servers", "192.168.64.141:9092");SafeStringKafkaDeserializationSchema schema = new SafeStringKafkaDeserializationSchema();//CustomKafkaDeserializationSchema schema = new CustomKafkaDeserializationSchema();FlinkKafkaConsumer<ConsumerRecord<String, String>> kafkaSource = new FlinkKafkaConsumer<>("part.t_part", // 匹配所有 testdb 下的表schema,kafkaProps);kafkaSource.setStartFromEarliest();DataStreamSource<ConsumerRecord<String, String>> ds = env.addSource(kafkaSource);ExtractKafaRowAndTableName extractRowAndTableName = new ExtractKafaRowAndTableName();SingleOutputStreamOperator<TableData> mapStream = ds.map(extractRowAndTableName);JdbcExecutionOptions options = JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(2000).withMaxRetries(2).build();DynamicJdbcSink dynamicJdbcSink = new DynamicJdbcSink(ConnectionConstants.PG_DRIVER_CLSSNAME,ConnectionConstants.PG_URL,ConnectionConstants.PG_USER_NAME,ConnectionConstants.PG_PASSWORD);mapStream.addSink(dynamicJdbcSink);env.enableCheckpointing(5000); // 每 5 秒做一次 checkpointkafkaSource.setCommitOffsetsOnCheckpoints(true);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000); // 最小間隔env.getCheckpointConfig().setCheckpointTimeout(6000); // 超時時間env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 并行數env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.execute("Multi-table CDC to PostgreSQL via DataStream");}
}
使用flink cdc 寫代碼的時候jar 包方式,你需要考慮不同數據庫的序列化和反序列化問題, yaml 方式就是沒提供的功能你無法用不夠靈活。
一、判斷Flink CDC同步完成的常見方法
Flink CDC的同步分為全量同步和增量同步階段,完成標志如下:
監控
currentEmitEventTimeLag
指標:- 這是核心判斷依據。該指標表示數據從數據庫產生到離開Source節點的時間延遲。
- 全量同步完成標志:當?
currentEmitEventTimeLag
?從?≤0?變為?>0?時,表示已從全量階段進入增量(Binlog)讀取階段。 - 原理:全量階段該指標為0(無延遲),進入增量階段后延遲值變為正數。
- 實現:通過Flink的Metrics系統(如Prometheus、Grafana)實時監控該指標。
檢查日志輸出:
- 在日志中搜索關鍵詞?
BinlogSplitReader is created
?或?全量同步結束
,這通常標志全量階段完成。 - 全量同步完成后,日志會顯示Binlog讀取開始。
- 在日志中搜索關鍵詞?
觀察作業狀態和指標:
- 作業狀態:通過Flink Web UI或API檢查Job狀態,若為?
FINISHED
(僅限批處理任務),表示同步完成。 - 其他指標:
sourceIdleTime
:源空閑時間增加,可能表示無新數據。currentFetchEventTimeLag
:類似?currentEmitEventTimeLag
,監控數據讀取延遲。
- 作業狀態:通過Flink Web UI或API檢查Job狀態,若為?
驗證目標數據:
- 對比源數據庫和目標存儲(如數據湖)的數據一致性:
- 全量同步后,目標數據應包含源數據庫的所有記錄。
- 使用數據校驗工具(如比對哈希值)確保一致性。
- 對比源數據庫和目標存儲(如數據湖)的數據一致性:
二、為什么不用數據條數判斷?
- 動態性:增量同步中數據持續流入,條數無法作為靜態終點。
- 準確性問題:
- 數據刪除、更新可能導致條數波動。
- 分布式系統中,分片同步可能不同步完成。
- 替代方案:上述指標和狀態監控更實時可靠。
三、實踐建議
- 實時監控:優先使用?
currentEmitEventTimeLag
,結合Prometheus等工具告警。 - 自動化驗證:在ETL管道中加入數據校驗步驟,確保同步質量。
- 日志審計:定期審查日志,輔助異常排查。
如果您有具體同步場景(如MySQL到數據湖),可進一步優化方案。
Flink中的滑動窗口和滾動窗口
1. 滑動窗口(Sliding Window):
- 定義:滑動窗口有一個固定的大小,并且可以有重疊。這意味著數據項可能會被包含在一個或多個窗口中。
- 用途:適用于需要分析一段時間內的趨勢或模式的情況,例如計算過去5分鐘內每1分鐘的數據平均值。
- 特點:
- 窗口大小和滑動步長可以獨立配置。
- 可能導致較高的計算成本,因為它涉及到更多的窗口操作。
2. 滾動窗口(Tumbling Window):
- 定義:滾動窗口是滑動窗口的一種特殊情況,其中窗口之間沒有重疊(即滑動步長等于窗口大小)。每個數據項只會屬于一個特定的窗口。
- 用途:適合于定期匯總數據的場景,比如每天統計一次用戶活動量。
- 特點:
- 簡單易懂,實現起來相對直接。
- 數據不會跨窗口重復處理,減少了計算負擔。
限流熔斷機制中的滑動窗口
3. 限流熔斷機制中的滑動窗口:
- 定義:在分布式系統或微服務架構中,為了防止某個服務過載而采取的一種保護措施。這里的滑動窗口通常用于監控請求速率,以便決定是否應該限制請求或觸發熔斷。
- 用途:主要用于控制流量、保護下游服務免受突發流量的影響。
- 特點:
- 主要關注點在于時間間隔內的請求數量或錯誤率。
- 實現方式可能包括固定大小的時間桶(buckets),隨著時間推移,新的請求會進入最新的時間桶,而舊的時間桶會被丟棄。
- 目的是快速響應流量變化,提供即時反饋以調整系統的負載能力。
區別總結
- 應用場景不同:Flink的窗口函數主要用于流處理任務中的數據分析;而限流熔斷機制中的滑動窗口則用于保障系統穩定性和可用性。
- 技術細節差異:Flink中的窗口涉及復雜的數據聚合邏輯,可能跨越多個節點進行分布式計算;相比之下,限流熔斷機制中的滑動窗口更注重實時性和效率,通常在單個服務實例內部執行。
- 目標不同:前者旨在提取有價值的信息,如統計信息、模式識別等;后者的目標是通過限制請求頻率來維持系統的健康狀態。