flink cdc的source數據流如何配置事件時間,如何設置時間語義,分配時間戳并生成水位線

在 Flink CDC 中為 Source 數據流配置事件時間需要結合時間語義設置時間戳分配水位線生成三個核心步驟。以下是具體配置方法及注意事項:


1. 設置時間語義

Flink 默認使用處理時間(Processing Time),需顯式指定事件時間語義:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 設置為事件時間

若使用 Flink 1.12+ 版本,事件時間已是默認語義,但仍建議顯式設置以避免混淆。


2. 分配時間戳

(1) 從 CDC 數據中提取時間戳

CDC 數據(如 MySQL Binlog)通常包含變更時間字段(如 update_time),需通過 TimestampAssigner 提取:

DataStream<ChangeEvent> cdcStream = env.addSource(MySqlSource.create(...));DataStream<ChangeEvent> timedStream = cdcStream.assignTimestampsAndWatermarks(WatermarkStrategy.<ChangeEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event, recordTimestamp) -> event.getTimestamp() // 從事件中提取時間戳(毫秒))
);

關鍵點

  • 字段選擇:優先使用業務字段(如訂單創建時間)或數據庫的 update_time 作為事件時間戳。
  • 類型轉換:若時間戳為字符串(如 "2023-10-01 12:00:00"),需先轉換為毫秒值。

(2) 通過 DDL 定義時間屬性(Table API)

若使用 Flink SQL/Table API,可在 DDL 中直接定義時間屬性:

CREATE TABLE orders (id INT,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ('connector' = 'mysql-cdc',...
);

此方式通過 WATERMARK 語句隱式分配時間戳并生成水位線。


3. 生成水位線

水位線用于處理亂序事件,需根據業務容忍的延遲設置策略:

(1) 固定延遲策略(BoundedOutOfOrderness)

WatermarkStrategy.<ChangeEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(...);

此策略允許最大 5 秒的亂序延遲,適用于大多數業務場景。

(2) 單調遞增策略(MonotonousTimestamps)

WatermarkStrategy.<ChangeEvent>forMonotonousTimestamps();

若數據嚴格有序(如 Kafka 分區有序),可直接使用此策略。

(3) 自定義水位線生成器

對于復雜邏輯(如動態調整延遲),需實現 WatermarkGenerator 接口:

public class CustomWatermarkStrategy implements WatermarkGenerator<ChangeEvent> {@Overridepublic void onEvent(ChangeEvent event, long eventTimestamp, WatermarkOutput output) {// 動態計算最大事件時間maxTimestamp = Math.max(maxTimestamp, eventTimestamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(maxTimestamp - 5000)); // 延遲5秒}
}

4. CDC 源的特殊處理

(1) MySQL CDC 的時間戳提取

MySQL Binlog 中的 ts_sec 字段表示事務提交時間,可將其作為事件時間戳:

.withTimestampAssigner((event, recordTimestamp) -> event.getSource().get("ts_sec") // 提取Binlog中的時間戳字段
)

(2) 處理無時間戳的 CDC 數據

若 CDC 數據無時間戳字段,可回退到處理時間或攝取時間:

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 切換為處理時間

5. 注意事項

  1. 水位線生成位置:盡量在 Source 后第一個算子分配時間戳,避免因并行度變化導致亂序。
  2. 水位線間隔調整:默認 200ms 生成一次,可通過 env.getConfig().setAutoWatermarkInterval(1000) 調整為 1 秒。
  3. 狀態 TTL:若 CDC 數據量極大,需設置狀態 TTL 防止 OOM:
    StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(1)).build();
    

完整示例(DataStream API)

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 定義 MySQL CDC Source
MySqlSource<ChangeEvent> source = MySqlSource.<ChangeEvent>builder().hostname("localhost").port(3306).databaseList("mydb").tableList("mydb.orders").username("user").password("pass").deserializer(new JsonDebeziumDeserializationSchema()).build();// 分配時間戳與水位線
DataStream<ChangeEvent> stream = env.fromSource(source,WatermarkStrategy.<ChangeEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event, ts) -> event.getUpdateTime()),"MySQL Source"
);// 后續窗口處理
stream.keyBy(event -> event.getOrderId()).window(TumblingEventTimeWindows.of(Time.minutes(5))).aggregate(...);

通過以上配置,Flink CDC 數據流即可正確使用事件時間語義,處理亂序數據并觸發窗口計算。具體策略需根據業務延遲容忍度和數據特征調整。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/75884.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/75884.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/75884.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

C++ 指針類型轉換全面解析與最佳實踐

文章目錄 C 指針類型轉換全面解析與最佳實踐1. 隱式轉換基類和派生類指針 2. 顯式轉換(1) static_cast(2) dynamic_cast(3) reinterpret_cast(4) const_cast 3. C 風格轉換4. 常見問題與注意事項5. 總結最佳實踐 C 指針類型轉換全面解析與最佳實踐 在 C 中&#xff0c;指針類型…

批量將 txt/html/json/xml/csv 等文本拆分成多個文件

我們的文本文件太大的時候&#xff0c;我們通常需要對文本文件進行拆分&#xff0c;比如按多少行一個文件將一個大的文本文件拆分成多個小的文本文件。這樣我們在打開或者傳輸的時候都比較方便。今天就給大家介紹一種同時對多個文本文件進行批量拆分的方法&#xff0c;可以快速…

ARM 匯編啟動代碼詳解:從中斷向量表到中斷處理

ARM 匯編啟動代碼詳解&#xff1a;從中斷向量表到中斷處理 引言 在嵌入式系統開發中&#xff0c;ARM 處理器&#xff08;如 Cortex-A 系列&#xff09;的啟動代碼是系統初始化和運行的基礎。啟動代碼通常包括中斷向量表的創建、初始化硬件狀態&#xff08;如關閉緩存和 MMU&a…

4.7學習總結 可變參數+集合工具類Collections+不可變集合

可變參數&#xff1a; 示例&#xff1a; public class test {public static void main(String[] args) {int sumgetSum(1,2,3,4,5,6,7,8,9,10);System.out.println(sum);}public static int getSum(int...arr){int sum0;for(int i:arr){sumi;}return sum;} } 細節&#xff1a…

2023年藍橋杯第十四屆CC++大學B組真題及代碼

目錄 1A&#xff1a;日期統計 解析代碼_暴力_正解 2B&#xff1a;01串的熵 解析代碼_暴力_正解 3C&#xff1a;冶煉金屬 解析代碼_暴力_正解 4D&#xff1a;飛機降落 解析代碼_暴力dfs_正解 5E&#xff1a;接龍數列 解析代碼_dp_正解 6F&#xff1a;島嶼個數 解析代…

rom定制系列------小米10pro機型定制解鎖固件 原生安卓15批量線刷固件 操作解析與界面預覽

注意;固件用于自己機型忘記密碼或者手機號注銷等出現設備鎖 過保修期 售后無視的機型&#xff0c;勿用于非法途徑 目前有粉絲聯系&#xff0c;自己的機型由于手機號注銷導致手機更新系統后出現設備鎖界面。另外也沒有解鎖bl。目前無法使用手機。經過詢問是小米10pro機型。根據…

信息學奧賽一本通 1861:【10NOIP提高組】關押罪犯 | 洛谷 P1525 [NOIP 2010 提高組] 關押罪犯

【題目鏈接】 ybt 1861&#xff1a;【10NOIP提高組】關押罪犯 洛谷 P1525 [NOIP 2010 提高組] 關押罪犯 【題目考點】 1. 圖論&#xff1a;二分圖 2. 二分答案 3. 種類并查集 【解題思路】 解法1&#xff1a;種類并查集 一個囚犯是一個頂點&#xff0c;一個囚犯對可以看…

我的NISP二級之路-01

目錄 一.SSE-CMM系統安全工程-能力成熟度模型(Systems Security Engineering - Capability Maturity Model) 二.ISMS 即信息安全管理體系(Information Security Management System),是一種基于風險管理的、系統化的管理體系 三.Kerberos協議 1. 用戶登錄與 AS 請求 2…

WEB安全--內網滲透--利用Net-NTLMv2 Hash

一、前言 在前兩篇文章中分析了NTLM協議中Net-NTLMv2 Hash的生成、如何捕獲Net-NTLMv2 Hash&#xff0c;現在就來探討一下在內網環境中&#xff0c;如何利用Net-NTLMv2 Hash進行滲透。 二、Net-NTLM Hash的破解 工具&#xff1a;hashcat 原理&#xff1a;利用其內部的字典對…

如何正確使用 `apiStore` 進行 API 管理

在現代前端開發中&#xff0c;API 管理是一個非常重要的環節。apiStore 是一個基于 Pinia 的狀態管理工具&#xff0c;它可以幫助我們更高效地管理和調用 API。本文將詳細介紹如何正確使用 apiStore&#xff0c;包括如何創建 API 配置文件、在組件中使用 apiStore 以及如何配置…

瓦片數據合并方法

影像數據 假如有兩份影像數據 1.全球底層影像0-5級別如下&#xff1a; 2.局部高清影像數據級別9-14如下&#xff1a; 合并方法 將9-14文件夾復制到全球底層0-5的目錄下 如下&#xff1a; 然后合并xml文件 使得Tileset設置到最高級&#xff08;包含所有級別&#xff09;&…

C++中的類和對象(上)

1 類的定義 1.1 類定義的格式 1 class為定義類的關鍵字&#xff0c;Stack為類的名字&#xff0c;{}中為類的主體&#xff0c;注意類定義結束時后面分號不能省 略》。類體中內容稱為類的成員&#xff1a;類中的變量稱為類的屬性或成員變量; 類中的函數稱為類的方法或者成員函數…

【Tauri2】013——前端Window Event與創建Window

前言 【Tauri2】012——on_window_event函數-CSDN博客https://blog.csdn.net/qq_63401240/article/details/146909801?spm1001.2014.3001.5501 前面介紹了on_window_event&#xff0c;這個在Builder中的方法&#xff0c;里面有許多事件&#xff0c;比如Moved&#xff0c;Res…

【問題處理】webpack4升webpack5,報錯Uncaught ReferrnceError: process is not defined

問題 正在做webpack4升webpack5&#xff0c;項目構建項目成功后在瀏覽器打開時報錯 Uncaught ReferrnceError: process is not defined。 原因 webpack 5 不再自動 polyfill Node.js 的核心模塊。 如果你在瀏覽器運行的代碼中使用它&#xff0c;需要從 NPM 中安裝兼容模塊…

軟件工程師減肥計劃

一、目標設定 在 3 個月內減輕體重 5-7kg&#xff0c;改善身體代謝水平和體脂率&#xff0c;增強身體活力和精神狀態&#xff0c;以更好地適應工作強度。 二、飲食調整 &#xff08;一&#xff09;基本原則 控制熱量攝入&#xff0c;保證每天攝入熱量低于消耗熱量 500-800 …

即時訪問成為降低風險的關鍵

云計算和軟件即服務 (SaaS) 解決方案的廣泛采用從根本上重塑了企業的數字格局。 不同行業的組織越來越多地利用云固有的可擴展性和成本效益來推動創新和簡化運營。 這種向基于云的環境的轉變也帶來了一系列新的復雜安全挑戰&#xff0c;需要仔細考慮并制定強有力的緩解策略。…

[環境配置] 1. 開發環境搭建

開發環境搭建 本文檔將詳細介紹如何搭建深度學習開發環境&#xff0c;包括 Python 環境配置、IDE 選擇與配置以及虛擬環境管理。 也會介紹一下最近比較流行的 uv 工具。它是一個用 Rust 編寫的極其快速的 Python 包和項目管理工具。 uv 是一個非常強大的工具&#xff0c;它可…

rust 同時處理多個異步任務,并在一個任務完成退出

use std::thread; use tokio::{sync::mpsc,time::{sleep, Duration}, };async fn check_for_one() {// 該函數會每秒打印一次 "write"loop {println!("write");sleep(Duration::from_secs(1)).await;} }async fn start_print_task() -> Result<(), (…

“群芳爭艷”:CoreData 4 種方法計算最大值的效率比較(上)

概覽 在 CoreData 支持的 App 中&#xff0c;一種常見操作就是計算數據庫表中指定字段的最大值&#xff08;或最小值&#xff09;。就是這樣一種看起來“不足掛齒”的任務&#xff0c;可能稍不留神就會“馬失前蹄”。 在實際的代碼中&#xff0c;我們怎樣才能既迅速又簡潔的…

skynet網絡包庫(lua-netpack.c)的作用解析

目錄 網絡包庫&#xff08;lua-netpack.c&#xff09;的作用解析1. 數據包的分片與重組2. 網絡事件處理3. 內存管理4. 數據打包與解包 動態庫&#xff08;.so&#xff09;在 Lua 中的使用1. 編譯為動態庫2. Lua 中加載與調用(1) 加載模塊(2) 核心方法(3) 使用示例 3. 注意事項 …