Flink cdc 使用總結

Flink 與 Flink CDC 版本兼容對照表

Flink 版本支持的 Flink CDC 版本關鍵說明
Flink 1.11.xFlink CDC 1.2.x早期版本,需注意 Flink 1.11.0 的 Bug(如 Upsert 寫入問題),建議使用 1.11.1 及以上。
Flink 1.12.xFlink CDC 2.0.x(MySQL 使用?flink-connector-mysql-cdcFlink 1.12.x 支持 CDC 2.0.x,MySQL 使用新版 Connector。
Flink 1.13.xFlink CDC 2.2.x, 2.3.x, 2.4.x2.2.x 起支持 Flink 1.13.x,2.4.x 兼容性更廣(支持到 Flink 1.15.x)。
Flink 1.14.xFlink CDC 2.2.x, 2.3.x, 2.4.x同 Flink 1.13.x,需注意 2.4.x 對 1.14.x 的支持。
Flink 1.15.xFlink CDC 2.3.x, 2.4.x2.4.x 是 Flink 1.15.x 的推薦版本,支持增量快照框架。
Flink 1.16.xFlink CDC 2.3.x, 2.4.x2.4.x 支持 Flink 1.16.x,但需注意部分功能可能受限。
Flink 1.17.xFlink CDC 2.5.x 及以上(如 2.5.0)官方未聲明 2.4.x 支持 Flink 1.17.x,需升級 Flink CDC 至 2.5+ 或降級 Flink 至 1.15.x。
Flink 2.0.x未明確說明(需參考最新 Flink CDC 文檔)Flink 2.0 為新版本,建議關注 Flink CDC 官方文檔的最新支持情況。
Flink CDC 3.x僅支持 Flink 1.13.x 及以上(具體版本需看文檔)Flink CDC 3.x 是新一代數據集成框架,需與 Flink 1.13+ 配合使用。

1.flink? cdc 的兩種使用方式

source:type: mysql-cdchostname: localhostport: 3306username: rootpassword: "123456"database-list: app_dbtable-list: app_db.*scan.startup.mode: initialscan.incremental.snapshot.enabled: truescan.newly-added-table.enabled: truesink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL to Dorisparallelism: 2execution.runtime-mode: STREAMING

./bin/flink-cdc.sh run mysql-to-doris.yaml

2. flink cdc 另一種使用方式

        <dependency><groupId>com.ververica</groupId><artifactId>flink-connector-postgres-cdc</artifactId><version>${flink.cdc.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink.cdc.version}</version></dependency>

package com.example.demo.cdc;import com.example.demo.ConnectionConstants;
import com.example.demo.deserial.SafeStringKafkaDeserializationSchema;
import com.example.demo.domain.TableData;
import com.example.demo.dynamic.ExtractKafaRowAndTableName;
import com.example.demo.sink.DynamicJdbcSink;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.*;public class FlinkKafkaSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);Properties kafkaProps = new Properties();kafkaProps.setProperty("bootstrap.servers", "192.168.64.141:9092");SafeStringKafkaDeserializationSchema schema = new SafeStringKafkaDeserializationSchema();//CustomKafkaDeserializationSchema schema = new CustomKafkaDeserializationSchema();FlinkKafkaConsumer<ConsumerRecord<String, String>> kafkaSource = new FlinkKafkaConsumer<>("part.t_part", // 匹配所有 testdb 下的表schema,kafkaProps);kafkaSource.setStartFromEarliest();DataStreamSource<ConsumerRecord<String, String>> ds = env.addSource(kafkaSource);ExtractKafaRowAndTableName  extractRowAndTableName = new ExtractKafaRowAndTableName();SingleOutputStreamOperator<TableData> mapStream = ds.map(extractRowAndTableName);JdbcExecutionOptions options = JdbcExecutionOptions.builder().withBatchSize(1000).withBatchIntervalMs(2000).withMaxRetries(2).build();DynamicJdbcSink dynamicJdbcSink = new DynamicJdbcSink(ConnectionConstants.PG_DRIVER_CLSSNAME,ConnectionConstants.PG_URL,ConnectionConstants.PG_USER_NAME,ConnectionConstants.PG_PASSWORD);mapStream.addSink(dynamicJdbcSink);env.enableCheckpointing(5000); // 每 5 秒做一次 checkpointkafkaSource.setCommitOffsetsOnCheckpoints(true);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000); // 最小間隔env.getCheckpointConfig().setCheckpointTimeout(6000); // 超時時間env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 并行數env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.execute("Multi-table CDC to PostgreSQL via DataStream");}
}

使用flink cdc 寫代碼的時候jar 包方式,你需要考慮不同數據庫的序列化和反序列化問題, yaml 方式就是沒提供的功能你無法用不夠靈活。

一、判斷Flink CDC同步完成的常見方法

Flink CDC的同步分為全量同步和增量同步階段,完成標志如下:

  1. 監控 currentEmitEventTimeLag 指標

    • 這是核心判斷依據。該指標表示數據從數據庫產生到離開Source節點的時間延遲。
    • 全量同步完成標志:當?currentEmitEventTimeLag?從?≤0?變為?>0?時,表示已從全量階段進入增量(Binlog)讀取階段。
    • 原理:全量階段該指標為0(無延遲),進入增量階段后延遲值變為正數。
    • 實現:通過Flink的Metrics系統(如Prometheus、Grafana)實時監控該指標。
  2. 檢查日志輸出

    • 在日志中搜索關鍵詞?BinlogSplitReader is created?或?全量同步結束,這通常標志全量階段完成。
    • 全量同步完成后,日志會顯示Binlog讀取開始。
  3. 觀察作業狀態和指標

    • 作業狀態:通過Flink Web UI或API檢查Job狀態,若為?FINISHED(僅限批處理任務),表示同步完成。
    • 其他指標
      • sourceIdleTime:源空閑時間增加,可能表示無新數據。
      • currentFetchEventTimeLag:類似?currentEmitEventTimeLag,監控數據讀取延遲。
  4. 驗證目標數據

    • 對比源數據庫和目標存儲(如數據湖)的數據一致性:
      • 全量同步后,目標數據應包含源數據庫的所有記錄。
      • 使用數據校驗工具(如比對哈希值)確保一致性。

二、為什么不用數據條數判斷?

  • 動態性:增量同步中數據持續流入,條數無法作為靜態終點。
  • 準確性問題
    • 數據刪除、更新可能導致條數波動。
    • 分布式系統中,分片同步可能不同步完成。
  • 替代方案:上述指標和狀態監控更實時可靠。

三、實踐建議

  • 實時監控:優先使用?currentEmitEventTimeLag,結合Prometheus等工具告警。
  • 自動化驗證:在ETL管道中加入數據校驗步驟,確保同步質量。
  • 日志審計:定期審查日志,輔助異常排查。

如果您有具體同步場景(如MySQL到數據湖),可進一步優化方案。

Flink中的滑動窗口和滾動窗口

1. 滑動窗口(Sliding Window):

  • 定義:滑動窗口有一個固定的大小,并且可以有重疊。這意味著數據項可能會被包含在一個或多個窗口中。
  • 用途:適用于需要分析一段時間內的趨勢或模式的情況,例如計算過去5分鐘內每1分鐘的數據平均值。
  • 特點
    • 窗口大小和滑動步長可以獨立配置。
    • 可能導致較高的計算成本,因為它涉及到更多的窗口操作。

2. 滾動窗口(Tumbling Window):

  • 定義:滾動窗口是滑動窗口的一種特殊情況,其中窗口之間沒有重疊(即滑動步長等于窗口大小)。每個數據項只會屬于一個特定的窗口。
  • 用途:適合于定期匯總數據的場景,比如每天統計一次用戶活動量。
  • 特點
    • 簡單易懂,實現起來相對直接。
    • 數據不會跨窗口重復處理,減少了計算負擔。

限流熔斷機制中的滑動窗口

3. 限流熔斷機制中的滑動窗口:

  • 定義:在分布式系統或微服務架構中,為了防止某個服務過載而采取的一種保護措施。這里的滑動窗口通常用于監控請求速率,以便決定是否應該限制請求或觸發熔斷。
  • 用途:主要用于控制流量、保護下游服務免受突發流量的影響。
  • 特點
    • 主要關注點在于時間間隔內的請求數量或錯誤率。
    • 實現方式可能包括固定大小的時間桶(buckets),隨著時間推移,新的請求會進入最新的時間桶,而舊的時間桶會被丟棄。
    • 目的是快速響應流量變化,提供即時反饋以調整系統的負載能力。

區別總結

  • 應用場景不同:Flink的窗口函數主要用于流處理任務中的數據分析;而限流熔斷機制中的滑動窗口則用于保障系統穩定性和可用性。
  • 技術細節差異:Flink中的窗口涉及復雜的數據聚合邏輯,可能跨越多個節點進行分布式計算;相比之下,限流熔斷機制中的滑動窗口更注重實時性和效率,通常在單個服務實例內部執行。
  • 目標不同:前者旨在提取有價值的信息,如統計信息、模式識別等;后者的目標是通過限制請求頻率來維持系統的健康狀態。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/89176.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/89176.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/89176.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

企業培訓筆記:axios 發送 ajax 請求

文章目錄axios 簡介一&#xff0c;Vue工程中安裝axios二&#xff0c;編寫app.vue三&#xff0c;編寫HomeView.vue四&#xff0c;Idea打開后臺項目五&#xff0c;創建HelloController六&#xff0c;配置web訪問端口七&#xff0c;運行項目&#xff0c;查看效果&#xff08;一&am…

Maven下載與配置對Java項目的理解

目錄 一、背景 二、JAVA項目與Maven的關系 2.1標準java項目 2.2 maven 2.2.1 下載maven 1、下載 2、配置環境 2.2.2 setting.xml 1、配置settings.xml 2、IDEA配置maven 一、背景 在java項目中&#xff0c;新手小白很有可能看不懂整體的目錄結構&#xff0c;以及每個…

Mars3d的走廊只能在一個平面的無法折疊的解決方案

問題場景&#xff1a;1. Mars3d的CorridorEntity只能在一個平面修改高度值&#xff0c;無法根據坐標點位制作有高度值的走廊效果&#xff0c;想要做大蜀山盤山走廊的效果實現不了。解決方案&#xff1a;1.使用原生cesium實現對應的走廊的截面形狀、走廊的坐標點&#xff0c;包括…

LeetCode 每日一題 2025/7/7-2025/7/13

記錄了初步解題思路 以及本地實現代碼&#xff1b;并不一定為最優 也希望大家能一起探討 一起進步 目錄7/7 1353. 最多可以參加的會議數目7/8 1751. 最多可以參加的會議數目 II7/9 3439. 重新安排會議得到最多空余時間 I7/10 3440. 重新安排會議得到最多空余時間 II7/11 3169. …

Bash常見條件語句和循環語句

以下是 Bash 中常用的條件語句和循環語句分類及語法說明&#xff0c;附帶典型用例&#xff1a;一、條件語句 1. if 語句 作用&#xff1a;根據條件執行不同代碼塊 語法&#xff1a; if [ 條件 ]; then# 條件為真時執行 elif [ 其他條件 ]; then# 其他條件為真時執行 else# 所有…

uni-app 選擇國家區號

uni-app選擇國家區號組件 hy-countryPicker 我們在做登錄注冊功能的時候&#xff0c;可能會遇到選擇區號來使用不同國家手機號來登錄或者注冊的功能。這里我就介紹下我這個uni-app中使用的選擇區號的組件&#xff0c;包含不同國家國旗圖標。 效果圖 別的不說&#xff0c;先來…

客戶端主機宕機,服務端如何處理 TCP 連接?詳解

文章目錄一、客戶端主機宕機后迅速重啟1、服務端有數據發送2、服務端開啟「保活」機制3、服務端既沒有數據發送&#xff0c;也沒有開啟「保活」機制二、客戶端主機宕機后一直沒有重啟1、服務端有數據發送2、服務端開啟「保活」機制3、服務端既沒有數據發送&#xff0c;也沒有開…

《大數據技術原理與應用》實驗報告五 熟悉 Hive 的基本操作

目 錄 一、實驗目的 二、實驗環境 三、數據集 四、實驗內容與完成情況 4.1 創建一個內部表 stocks&#xff0c;字段分隔符為英文逗號&#xff0c;表結構下所示。 4.2 創建一個外部分區表 dividends&#xff08;分區字段為 exchange 和symbol&#xff09;&#xff0c;字段…

【橘子分布式】Thrift RPC(編程篇)

一、簡介 之前我們研究了一下thrift的一些知識&#xff0c;我們知道他是一個rpc框架&#xff0c;他作為rpc自然是提供了客戶端到服務端的訪問以及兩端數據傳輸的消息序列化&#xff0c;消息的協議解析和傳輸&#xff0c;所以我們今天就來了解一下他是如何實現這些功能&#xff…

清理C盤--辦法

c盤經常爆紅1、命令行2、屬性3、臨時文件

Java-71 深入淺出 RPC Dubbo 上手 父工程配置編寫 附詳細POM與代碼

點一下關注吧&#xff01;&#xff01;&#xff01;非常感謝&#xff01;&#xff01;持續更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持續更新中&#xff01;&#xff08;長期更新&#xff09; AI煉丹日志-29 - 字節跳動 DeerFlow 深度研究框斜體樣式架 私有…

創客匠人:創始人 IP 打造的內核,藏在有效的精神成長里

當創始人 IP 成為企業增長的重要引擎&#xff0c;許多人急于尋找 “爆款公式”&#xff0c;卻忽略了一個更本質的問題&#xff1a;IP 的生命力&#xff0c;終究源于創始人的精神成長。創客匠人在深耕知識付費賽道的過程中&#xff0c;見證了無數案例&#xff1a;那些能持續實現…

GPT和MBR分區

GPT&#xff08;GUID分區表&#xff09;和MBR&#xff08;主引導記錄&#xff09;是兩種不同的磁盤分區表格式&#xff0c;用于定義硬盤上分區的布局、位置及啟動信息&#xff0c;二者在設計、功能和適用場景上有顯著差異。以下從多個維度詳細對比&#xff1a; 一、核心定義與起…

c#進階之數據結構(字符串篇)----String

1、String介紹首先我們得明白&#xff0c;string和String代表的實際上是同一個類型&#xff0c;string是C#中的關鍵字&#xff0c;代表String類型&#xff0c;因此我們直接來學習String類型。從官方的底層實現代碼可以看出&#xff0c;當前String類型實際上就是一個Char類型的聚…

快速排序遞歸和非遞歸方法的簡單介紹

基本思想為&#xff1a;任取待排序元素序列中 的某元素作為基準值&#xff0c;按照該排序碼將待排序集合分割成兩子序列&#xff0c;左子序列中所有元素均小于基準值&#xff0c;右 子序列中所有元素均大于基準值&#xff0c;然后最左右子序列重復該過程&#xff0c;直到所有元…

從零開始的云計算生活——第三十二天,四面楚歌,HAProxy負載均衡

目錄 一.HAProxy簡介 二.HAProxy特點和優點&#xff1a; 三.HAProxy保持會話的三種解決方法 四.HAProxy的balance 8種負載均衡算法 1&#xff09;RR&#xff08;Round Robin&#xff09; 2&#xff09;LC&#xff08;Least Connections&#xff09; 3&#xff09;SH&am…

策略模式及優化

策略模式&#xff08;Strategy Pattern&#xff09;是一種行為設計模式&#xff0c;其核心思想是將算法的定義與使用分離&#xff0c;使算法可以獨立于客戶端進行變化。它通過定義一系列算法&#xff0c;將每個算法封裝到獨立的類中&#xff0c;并使它們可以互相替換&#xff0…

微信小程序開發-桌面端和移動端UI表現不一致問題記錄

桌面端和移動端UI表現不一致零、引擎說明一、樣式不同1、text 單行&#xff1a;1.1 空格開發者工具不展示&#xff0c;手機/PC端正常1.2 正常展示省略號&#xff0c;需要2、點擊按鈕z-index: -1。webview - 桌面端不行&#xff0c; skyline - 移動端可以&#xff1b;3、其他說明…

極限狀態下函數開根號的計算理解(含示意圖)

遇到一個挺有意思的題做個記錄&#xff1a; 求曲線y (x21)(x2?1)0.5\frac{\left(x^{2}1\right)}{\left(x^{2}-1\right)^{0.5}}(x2?1)0.5(x21)?漸近線的條數 比較明顯的x 1是無定義點。但是在求極限的時候發現1和1-得到的極限值似乎不一樣。似乎是1是趨向于∞&#xff0c;1…

C++——模版(函數模版和類模版)

C 模板&#xff08;Templates&#xff09;完整介紹模板是 C 中一種強大的泛型編程機制&#xff0c;允許開發者編寫與類型無關的代碼&#xff0c;從而提高代碼的復用性和靈活性。通過模板&#xff0c;可以避免為不同數據類型重復編寫相似的函數或類&#xff0c;實現真正的代碼復…