在 Flink + Kafka 實時數倉中,如何確保端到端的 Exactly-Once

在 Flink + Kafka 構建實時數倉時,確保端到端的 Exactly-Once(精確一次) 需要從 數據消費(Source)、處理(Processing)、寫入(Sink) 三個階段協同設計,結合 Flink 的 檢查點機制(Checkpoint) 和 Kafka 的 事務支持。以下是具體實現方法及示例配置:


1. 核心機制

(1) Flink Checkpoint
  • 作用:定期將算子的狀態(State)和 Kafka 消費偏移量(Offset)持久化到可靠存儲(如 HDFS、S3)。

  • 配置
    ?

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(60000); // 60秒觸發一次Checkpoint
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); // Checkpoint間最小間隔

(2) Kafka 事務
  • 兩階段提交(2PC):Flink 的 Kafka Producer 在 Checkpoint 完成時提交事務,確保數據僅寫入一次。

  • 關鍵參數

    • transactional.id:唯一事務標識,需確保每個 Producer 實例的 ID 唯一。

    • transaction.timeout.ms:需大于 Flink Checkpoint 間隔(避免事務超時)。


2. 端到端 Exactly-Once 實現步驟

(1) Source 端:Kafka Consumer 偏移量管理
  • Flink 的 Kafka Consumer 會在 Checkpoint 時將 消費偏移量 存入狀態后端,恢復時從該偏移量重新消費。

  • 配置

    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "kafka:9092");
    props.setProperty("group.id", "flink-group");
    props.setProperty("isolation.level", "read_committed"); // 只讀取已提交的事務數據
    ?
    FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), props
    );

(2) 處理階段:狀態一致性
  • Flink 的算子狀態(如 KeyedStateOperatorState)通過 Checkpoint 持久化,確保故障恢復后狀態一致。

(3) Sink 端:Kafka Producer 事務寫入
  • 事務性 Producer:在 Checkpoint 完成時提交事務,確保數據僅寫入一次。

  • 配置

    Properties sinkProps = new Properties();
    sinkProps.setProperty("bootstrap.servers", "kafka:9092");
    sinkProps.setProperty("transaction.timeout.ms", "600000"); // 大于 Checkpoint 間隔
    ?
    FlinkKafkaProducer<String> sink = new FlinkKafkaProducer<>("output-topic",new SimpleStringSchema(),sinkProps,FlinkKafkaProducer.Semantic.EXACTLY_ONCE // 啟用Exactly-Once模式
    );
    ?
    stream.addSink(sink);


3. 端到端流程詳解

  1. Checkpoint 觸發

    • JobManager 向所有 TaskManager 發送 Checkpoint 信號。

    • Kafka Consumer 提交當前消費偏移量到狀態后端。

    • Flink 算子狀態持久化。

    • Kafka Producer 預提交事務(寫入數據但未提交)。

  2. Checkpoint 完成

    • 所有算子確認狀態保存成功后,JobManager 標記 Checkpoint 完成。

    • Kafka Producer 提交事務(數據對下游可見)。

  3. 故障恢復

    • Flink 回滾到最近一次成功的 Checkpoint。

    • Kafka Consumer 從 Checkpoint 中的偏移量重新消費。

    • Kafka Producer 回滾未提交的事務(避免數據重復)。


4. 關鍵注意事項

  • 事務超時時間:確保 transaction.timeout.ms > checkpoint間隔 + max checkpoint duration

  • 唯一 Transactional ID:每個 Kafka Producer 實例需分配唯一 ID(可通過算子ID + 子任務ID生成)。

  • 冪等性 Sink:若 Sink 為非 Kafka 系統(如數據庫),需支持冪等寫入或事務(如 MySQL 的 INSERT ... ON DUPLICATE KEY UPDATE)。


5. 示例場景:實時交易風控

  • 需求:從 Kafka 讀取交易流水,實時計算用戶交易頻次(1分鐘內超過10次觸發風控),結果寫回 Kafka。

  • 實現

    DataStream<Transaction> transactions = env.addSource(kafkaSource).map(parseTransaction); // 解析交易數據
    ?
    DataStream<Alert> alerts = transactions.keyBy(Transaction::getUserId).window(TumblingEventTimeWindows.of(Time.minutes(1))).process(new FraudDetectionProcessFunction()); // 檢測高頻交易
    ?
    alerts.addSink(kafkaSink); // 事務性寫入告警結果

  • Exactly-Once 保障

    • 消費偏移量由 Checkpoint 管理。

    • 窗口計數狀態由 Flink 持久化。

    • 告警結果通過 Kafka 事務寫入。


6. 常見問題與調優

  • 問題1:事務超時導致數據丟失 解決:增大 transaction.timeout.ms(默認15分鐘)并監控 Checkpoint 耗時。

  • 問題2:Checkpoint 失敗 解決:優化反壓(如增加并行度)、調大 checkpoint timeout

  • 問題3:Kafka Producer 緩沖區滿 解決:增大 buffer.memorybatch.size


總結

通過 Flink Checkpoint + Kafka 事務 的協同機制,可以實現從 Kafka 消費到 Kafka 寫入的端到端 Exactly-Once。核心在于:

  1. Flink 統一管理消費偏移量和狀態快照;

  2. Kafka Producer 通過事務提交保證數據原子性寫入;

  3. 合理配置超時參數與資源,避免因超時或反壓導致的一致性中斷。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/79265.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/79265.shtml
英文地址,請注明出處:http://en.pswp.cn/web/79265.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

當可視化遇上 CesiumJS:突破傳統,打造前沿生產配套方案

CesiumJS 技術基礎介紹 CesiumJS 是一款基于 JavaScript 的開源庫&#xff0c;專門用于創建動態、交互式的地理空間可視化。它利用 WebGL 技術&#xff0c;能夠在網頁瀏覽器中流暢地渲染高分辨率的三維地球和地圖場景。CesiumJS 支持多種地理空間數據格式&#xff0c;包括但不…

RabbitMQ深入學習

繼續上一節的學習&#xff0c;上一節學習了RabbitMQ的基本內容&#xff0c;本節學習RabbitMQ的高級特性。 RocketMQ的高級特性學習見這篇博客 目錄 1.消息可靠性1.1生產者消息確認1.2消息持久化1.3消費者消息確認1.4消費失敗重試機制1.5消息可靠性保證總結 2.什么是死信交換機…

Linux系統:虛擬文件系統與文件緩沖區(語言級內核級)

本節重點 初步理解一切皆文件理解文件緩沖區的分類用戶級文件緩沖區與內核級文件緩沖區用戶級文件緩沖區的刷新機制兩級緩沖區的分層協作 一、虛擬文件系統 1.1 理解“一切皆文件” 我們都知道操作系統訪問不同的外部設備&#xff08;顯示器、磁盤、鍵盤、鼠標、網卡&#…

在c++中老是碰到string,這是什么意思?

定義一個string類型變量的引用&#xff0c;相當于給現有變量起個別名&#xff0c;與指針還是不一樣的。比如string a;string& ba;這兩句&#xff0c;b與a實際上是一回事&#xff0c;表示的是同一塊內存。 std是系統的一個命名空間(有關命名空間可以參閱namespace_百度百科)…

Day21 奇異值分解(SVD)全面解析

一、奇異值分解概述 奇異值分解是線性代數中一個重要的矩陣分解方法&#xff0c;對于任何矩陣&#xff0c;無論是結構化數據轉化成的“樣本 * 特征”矩陣&#xff0c;還是天然以矩陣形式存在的圖像數據&#xff0c;都能進行等價的奇異值分解&#xff08;SVD&#xff09;。 二…

akshare爬蟲限制,pywencai頻繁升級個人做量化,穩定數據源和券商的選擇

做量化&#xff0c;數據和交易接口是策略和自動化交易的基石&#xff0c;而穩定的數據和快人一步的交易接口是個人做量化的催化劑。 之前寫過一篇文章&#xff1a;個人做量化常用的數據&#xff0c;多以爬蟲為主&#xff0c;最近akshare爬蟲限制&#xff0c;pywencai頻繁升級。…

數字簽名與證書

1. 數字簽名與證書 摘要算法用來實現完整性&#xff0c;能夠為數據生成獨一無二的“指紋”&#xff0c;常用的算法是 SHA-2&#xff1b;數字簽名是私鑰對摘要的加密&#xff0c;可以由公鑰解密后驗證&#xff0c;實現身份認證和不可否認&#xff1b;公鑰的分發需要使用數字證書…

Ubuntu22.04安裝顯卡驅動/卸載顯卡驅動

報錯 今日輸入nvidia-smi報錯,在安裝了535和550,包括560都沒辦法解決,但是又怕亂搞導致環境損壞,打算把顯卡卸載然后重新安裝系統默認推薦版本的顯卡驅動 qinqin:~$ nvidia-smi Failed to initialize NVML: Driver/library version mismatch NVML library version: 560.35卸載…

Web 架構之負載均衡全解析

文章目錄 一、引言二、思維導圖三、負載均衡的定義與作用定義作用1. 提高可用性2. 增強性能3. 實現擴展性 四、負載均衡類型硬件負載均衡代表設備優缺點 軟件負載均衡應用層負載均衡代表軟件優缺點 網絡層負載均衡代表軟件優缺點 五、負載均衡算法輪詢算法&#xff08;Round Ro…

linux下的Redis的編譯安裝與配置

配合做開發經常會用到redis&#xff0c;整理下編譯安裝配置過程&#xff0c;僅供參考&#xff01; --------------------------------------Redis的安裝與配置-------------------------------------- 下載 wget https://download.redis.io/releases/redis-6.2.6.tar.gz tar…

A2A大模型協議及Java示例

A2A大模型協議概述 1. 協議作用 A2A協議旨在解決以下問題&#xff1a; 數據交換&#xff1a;不同應用程序之間的數據格式可能不一致&#xff0c;A2A協議通過定義統一的接口和數據格式解決這一問題。模型調用&#xff1a;提供標準化的接口&#xff0c;使得外部應用可以輕松調…

關鍵點檢測--使用YOLOv8對Leeds Sports Pose(LSP)關鍵點檢測

目錄 1. Leeds Sports Pose數據集下載2. 數據集處理2.1 獲取標簽2.2 將圖像文件和標簽文件處理成YOLO能使用的格式 3. 用YOLOv8進行訓練3.1 訓練3.2 預測 1. Leeds Sports Pose數據集下載 從kaggle官網下載這個數據集&#xff0c;地址為link&#xff0c;下載好的數據集文件如下…

20250508在WIN10下使用移遠的4G模塊EC200A-CN直接上網

1、在WIN10/11下安裝驅動程序&#xff1a;Quectel_Windows_USB_DriverA_Customer_V1.1.13.zip 2、使用移遠的專用串口工具&#xff1a;QCOM_V1.8.2.7z QCOM_V1.8.2_win64.exe 3、配置串口UART42/COM42【移遠會自動生成連續三個串口&#xff0c;最小的那一個】 AT命令&#xf…

第J7周:ResNeXt解析

&#x1f368; 本文為&#x1f517;365天深度學習訓練營 中的學習記錄博客&#x1f356; 原作者&#xff1a;K同學啊 目標 具體實現 &#xff08;一&#xff09;環境 語言環境&#xff1a;Python 3.10 編 譯 器: PyCharm 框 架: Tensorflow &#xff08;二&#xff09;具體…

C++之類和對象:初始化列表,static成員,友元,const成員 ……

目錄 const成員函數&#xff1a; 前置和后置重載&#xff1a; 取地址及const取地址操作符重載&#xff1a; 初始化列表&#xff1a; explicit關鍵字&#xff1a; static成員&#xff1a; 友元&#xff1a; 友元函數&#xff1a; 友元類&#xff1a; 內部類&#xff1a…

uni-app 中的條件編譯與跨端兼容

uni-app 為了實現一套代碼編譯到多個平臺&#xff08;包括小程序&#xff0c;App&#xff0c;H5 等&#xff09;&#xff0c;引入了條件編譯機制。 通過條件編譯&#xff0c;我們可以針對不同的平臺編寫特定的代碼&#xff0c;從而實現跨端兼容。 一、條件編譯的作用 平臺差異…

Linux平臺下SSH 協議克隆Github遠程倉庫并配置密鑰

目錄 注意&#xff1a;先提前配置好SSH密鑰&#xff0c;然后再git clone 1. 檢查現有 SSH 密鑰 2. 生成新的 SSH 密鑰 3. 將 SSH 密鑰添加到 ssh-agent 4. 將公鑰添加到 GitHub 5. 測試 SSH 連接 6. 配置 Git 使用 SSH 注意&#xff1a;先提前配置好SSH密鑰&#xff0c;然…

[C++] 大數減/除法

目錄 高精度博客 - 前兩講高精度減法高精度除法高精度系列函數完整版 高精度博客 - 前兩講 講次名稱鏈接高精加法[C] 高精度加法(作用 模板 例題)高精乘法[C] 高精度乘法 高精度減法 void subBIG(int x[], int y[], int z[]){z[0] max(x[0], y[0]);for(int i 1; i < …

視頻添加字幕腳本分享

腳本簡介 這是一個給視頻添加字幕的腳本&#xff0c;可以方便的在指定的位置給視頻添加不同大小、字體、顏色的文本字幕&#xff0c;添加方式可以直接修改腳本中的文本信息&#xff0c;或者可以提前編輯好.srt字幕文件。腳本執行環境&#xff1a;windowsmingwffmpeg。本方法僅…

ubuntu nobel + qt5.15.2 設置qss語法識別正確

問題展示 解決步驟 首選項里面的高亮怎么編輯選擇都沒用。如果已經有generic-highlighter和css.xml&#xff0c;直接修改css.xml文件最直接&#xff01; 在generic-highlighter目錄下找到css.xml文件&#xff0c;位置是&#xff1a;/opt/Qt/Tools/QtCreator/share/qtcreator/…