Flink-Source算子點位提交問題(Earliest)

背景

最近在做 Flink 任務數據源切換時遇到 offset 消費問題,遂寫篇文章記錄下來。

切換時只修改了 source 算子的 topic,uid 等其他信息保持不變:

  1. 發布時,發現算子的消費者點位重置為earliest,導致消息積壓。
  2. 消息積壓后,打算通過時間戳重置點位到發布前,但是發現點位重置失效。

原因分析

source算子點位初始化模式

source算子點位初始化有兩種方式:1)消費者組偏移量:setStartFromGroupOffsets;2)時間戳:setStartFromTimestamp。

消費組偏移量(FromGroupOffsets)

該方式會將 startupMode 初始化為 StartupMode.GROUP_OFFSETS:

startupMode枚舉:

時間戳(FromTimestamp)

該方式會將 startupMode 初始化為 StartupMode.TIMESTAMP:

source 算子初始化

示例代碼:

public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();// configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");// configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "xxx");configuration.setString("execution.savepoint.path", "xxx");configuration.setBoolean("execution.savepoint.ignore-unclaimed-state", true);// 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);// 啟用checkpointenv.enableCheckpointing(5000);env.setParallelism(1);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);ParameterTool argTools = ParameterTool.fromArgs(args);env.getConfig().setGlobalJobParameters(argTools);// 添加數據源// "old_topic", "new_topic"FlinkKafkaConsumer consumer = KafkaConfig.getConsumer();consumer.setStartFromGroupOffsets();DataStream<String> stream = env.addSource(consumer).uid("kafka-source").name("kafka-source");SingleOutputStreamOperator<HeartEntity> heart = stream.map(new MapFunction<String, HeartEntity>() {@Overridepublic HeartEntity map(String value) throws Exception {HeartEntity heartEntity = JSON.parseObject(value, HeartEntity.class);return heartEntity;}}).uid("map-heart").name("map-heart");// 使用狀態計數DataStream<Long> countStream = heart.keyBy(HeartEntity::getCommandNo).map(new RichMapFunction<HeartEntity, Long>() {private transient ValueState<Long> countState;private long count = 0;@Overridepublic void open(Configuration parameters) throws Exception {// 初始化狀態ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("count-state", TypeInformation.of(Long.class));countState = getRuntimeContext().getState(descriptor);}@Overridepublic Long map(HeartEntity value) throws Exception {count++;countState.update(count);return countState.value();}}).uid("count-map").name("count-map");// 打印計數結果countStream.print().uid("print").name("print");// 啟動Flink任務env.execute("Flink Kafka State Count Example");}
從狀態啟動
initializeState

狀態初始化時,FlinkKafkaConsumerBase 執行 initializeState 方法中:當 source topic 從 sg_lock_heart_msg_topic 切換為 sg_tw_common_com_lock_heart_report_topic 時,可以看到新 topic 綁定的 source 算子仍然是從老 topic 的算子狀態啟動的,因為 uid 沒變。

initializeState 往下走可以看到,restoreState 的是老 topic 分區的狀態;

open

算子初始化時,如果狀態不為空且 topic 分區不在狀態中,那么就會把新的 topic 分區加入到狀態中,并設置算子消費新分區的 startupMode 為 EARLIEST_OFFSET,即從最早的消息開始消費。

老的 topic 分區不會再消費,會被移除訂閱。

訂閱的 topic 分區

從指定時間戳啟動

setStartFromTimestamp 設置啟動模式為時間戳

然而在算子初始化時,由于從狀態啟動,新 topic分區 仍然會從 earliest 消費:

也就是說,checkpoint/savepoint 中存儲的 source 點位狀態在恢復時大于設置的時間戳。

解決方案

嘗試一(修改 uid)

從 source 算子初始化的 open 過程可知,既然從狀態啟動時會將已存在 source 算子(uid在狀態中)的新 topic 點位設置為最早,那么如果將新 topic 的 uid 改成與老 topic 的 uid 不一致,是否就能避免從 earliest 恢復:因為從狀態恢復時新的 uid 并不在狀態中,那么就不會走 open 中將新 topic 點位置為 earliest 的流程。

FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("sg_tw_common_com_lock_heart_report_topic");
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-new").name("kafka-source-new");

可以看到在狀態初始化階段(initializeState),source 算子的狀態 (restoreState)被置為空集合,而不是 null。為什么?

當在算子初始化時,因為 restoreState 不為 null,仍然會進入點位重置的流程:

可以看到這里將新 topic 分區放入了 restoreState 中,且點位置為 earliest(StarupMode 枚舉中,EARLIEST_OFFSET = -915623761775L)。

再往下走,restoreState 會將其中的新 topic 分區放入訂閱的分區中

從此,新 topic 又從最早開始消費😓。那么方案嘗試一是失敗的!

在線上實際操作時,消費點位確實被重置到了 earliest,又導致積壓了😦。

嘗試二(修改消費者組)

有沒有辦法讓 restoreState 置為 null 呢,那就真的不會走到點位重置的流程了🎊

突然看到 restoreState 的注釋:

如果消費者從狀態恢復,就會設置 restoreState。那怎么讓消費者不從狀態恢復?無狀態啟動肯定是不行的,不能讓其他算子的狀態丟了。那我直接換個消費組名!試一試呢

Properties props = new Properties();props.put("bootstrap.servers", "uat-kafka1.ttbike.com.cn:9092,uat-kafka2.ttbike.com.cn:9092,uat-kafka3.ttbike.com.cn:9092");props.put("group.id", "flink-label-engine-new");

還是不行,直到目前發現只要從狀態啟動,context 上下文會讓代碼走進給 restoreState 賦值的位置。

isRestored分析

isRestore分析

嘗試三(新增拓撲圖)

根據算子狀態恢復可知,只要新增的 source 算子跟其他已有算子形成了算子鏈,如果以狀態啟動,那么 source 的點位就會被置為 earliest。

  1. 新增一個新 topic 的 source 算子和 sink 算子(要保證新增的算子與已有算子隔離,不會形成算子鏈),然后修改老 source 算子的 uid 和 topic 與新的一致。
// old: sg_lock_heart_msg_topic
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("sg_lock_heart_msg_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-old").name("kafka-source-old");// new: sg_tw_common_com_lock_heart_report_topic
FlinkKafkaConsumer consumer_new = KafkaConfig.getConsumer("sg_tw_common_com_lock_heart_report_topic");
consumer_new.setStartFromGroupOffsets();
DataStream<String> stream_old = env.addSource(consumer_new).uid("kafka-source-new").name("kafka-source-new");
stream_old.print().uid("print-new").name("print-new");

由于從狀態啟動,且新加入的算子與其他算子隔離,老 source 算子的點位從狀態啟動;新 source 算子的點位被置為 GROUP_OFFSET。

1. 暫停并保存狀態;

2. 修改老 source 算子的 uid 和 topic 與 新算子保持一致,同時刪除新算子;

3. 然后從狀態啟動(/061c986d19612ae413ba794f68ff7727/chk-9),修改后的 source 算子點位從狀態恢復:

4. 下游 “count-map”的狀態是否正常:發送測試消息,可以看出狀態沒丟失

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/87654.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/87654.shtml
英文地址,請注明出處:http://en.pswp.cn/web/87654.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

如何錄制帶備注的演示文稿(LaTex Beamer + Pympress)

參考文獻&#xff1a; Pympress 官網Avidemux 官網Audacity 官網FFmpeg 官網2025年度25大視頻剪輯軟件推薦2025最新音頻降噪軟件盤點&#xff0c;從入門到專業的6個高效工具如何用一段音頻替換mp4視頻格式的原有音頻&#xff1f;免費簡單易用的視頻剪切編輯工具—AvidemuxFFmp…

VS Code 的 Copilot Chat 擴展程序

安裝與啟用 Copilot Chat 擴展 在 VS Code 中打開擴展市場&#xff08;快捷鍵 CtrlShiftX 或點擊左側活動欄的擴展圖標&#xff09;。搜索“GitHub Copilot Chat”&#xff0c;點擊安裝。安裝完成后需登錄 GitHub 賬戶并授權 Copilot 權限。確保已訂閱 GitHub Copilot 服務&am…

bash 腳本比較 100 個程序運行時間,精確到毫秒,腳本

腳本如下&#xff1a; #!/bin/bash# 設置測試次數 NUM_TESTS100 # 設置要測試的程序路徑 PROGRAM"./your_program" # 替換為你的程序路徑 # 設置程序參數&#xff08;如果沒有參數則留空&#xff09; ARGS"" # 例如: "input.txt output.txt"#…

【Linux學習】Linux安裝并配置Redis

安裝Redis在Linux系統上安裝Redis可以通過包管理器或源碼編譯兩種方式進行。以下是兩種方法的詳細步驟。使用包管理器安裝Redis&#xff08;以Ubuntu為例&#xff09;&#xff1a;sudo apt update sudo apt install redis-server通過源碼編譯安裝Redis&#xff1a;wget https:/…

redis每種數據結構對應的底層數據結構原理

Redis 的每種數據結構(String、List、Hash、Set、Sorted Set)在底層都采用了不同的實現方式,根據數據規模和特性動態選擇最優的編碼(encoding)以節省內存和提高性能。以下是詳細原理分析: 1. String(字符串) 底層實現: int:當存儲整數值且可用 long 表示時,直接使用…

WPF控件大全:核心屬性詳解

WPF常用控件及核心屬性 以下是WPF開發中最常用的控件及其關鍵屬性&#xff08;按功能分類&#xff09;&#xff1a; 基礎布局控件 Grid&#xff08;網格布局&#xff09; RowDefinitions&#xff1a;行定義集合&#xff08;如Height"Auto"&#xff09;ColumnDefinit…

馬斯克腦機接口(Neuralink)技術進展,已經實現癱瘓患者通過BCI控制電腦、玩視頻游戲、學習編程,未來盲人也能恢復視力了

目錄 圖片總結文字版總結1. 核心目標與愿景1.1 增強人類能力1.2 解決腦部疾病1.3 理解意識1.4 應對AI風險 2. 技術進展與產品2.1 Telepathy&#xff08;意念操控&#xff09;功能與目標技術細節參與者案例 2.2 Blindsight&#xff08;視覺恢復&#xff09;**功能與目標**技術細…

Vuex身份認證

雖說上一節我們實現了登錄功能&#xff0c;但是實際上還是可以通過瀏覽器的地址來跳過登錄訪問到后臺&#xff0c;這種可有可無的登錄功能使得系統沒有安全性&#xff0c;而且沒有意義 為了讓登錄這個功能有意義&#xff0c;我們應該&#xff1a; 應當在用戶登錄成功之后給用戶…

springboot中使用線程池

1.什么場景下使用線程池&#xff1f; 在異步的場景下&#xff0c;可以使用線程池 不需要同步等待&#xff0c; 不需要管上一個方法是否執行完畢&#xff0c;你當前的方法就可以立即執行 我們來模擬一下&#xff0c;在一個方法里面執行3個子任務&#xff0c;不需要相互等待 …

Flask+LayUI開發手記(十):構建統一的選項集合服務

作為前端最主要的組件&#xff0c;無論是layui-table表格還是layui-form表單&#xff0c;其中都涉及到選項列的處理。如果是普通編程&#xff0c;一個任務對應一個程序&#xff0c;自然可以就事論事地單對單處理&#xff0c;前后端都配制好選項&#xff0c;手工保證兩者的一致性…

redis的數據初始化或增量更新的方法

做系統開發的時候&#xff0c;經常需要切換環境&#xff0c;做一些數據的初始化的工作&#xff0c;而redis的初始化&#xff0c;假如通過命令來執行&#xff0c;又太復雜&#xff0c;因為redis有很多種數據類型&#xff0c;全部通過敲擊命令來初始化的話&#xff0c;打的命令實…

【PaddleOCR】OCR表格識別數據集介紹,包含PubTabNet、好未來表格識別、WTW中文場景表格等數據,持續更新中......

&#x1f9d1; 博主簡介&#xff1a;曾任某智慧城市類企業算法總監&#xff0c;目前在美國市場的物流公司從事高級算法工程師一職&#xff0c;深耕人工智能領域&#xff0c;精通python數據挖掘、可視化、機器學習等&#xff0c;發表過AI相關的專利并多次在AI類比賽中獲獎。CSDN…

sparkjar任務運行

mainclass&#xff1a; test.sparkjar.SparkJarTest

Web攻防-文件下載文件讀取文件刪除目錄遍歷路徑穿越

知識點&#xff1a; 1、WEB攻防-文件下載&讀取&刪除-功能點&URL 2、WEB攻防-目錄遍歷&穿越-功能點&URL 黑盒分析&#xff1a; 1、功能點 文件上傳&#xff0c;文件下載&#xff0c;文件刪除&#xff0c;文件管理器等地方 2、URL特征 文件名&#xff1a; d…

使用LIMIT + OFFSET 分頁時,數據重復的風險

在使用 LIMIT OFFSET 分頁時&#xff0c;數據重復的風險不僅與排序字段的唯一性有關&#xff0c;還與數據變動&#xff08;插入、刪除、更新&#xff09;密切相關。以下是詳細分析&#xff1a; 一、數據變動如何導致分頁異常 1. 插入新數據 場景&#xff1a;用戶在瀏覽第 1 頁…

Excel 數據透視表不夠用時,如何處理來自多個數據源的數據?

當數據透視表感到“吃力”時&#xff0c;我們該怎么辦&#xff1a; 數據量巨大&#xff1a;Excel工作表有104萬行的限制&#xff0c;當有幾十萬行數據時&#xff0c;透視表和公式就會變得非常卡頓。數據來源多樣&#xff1a;數據分散在多個Excel文件、CSV文件、數據庫甚至網頁…

cf(1034)Div3(補題A B C D E F)

哈&#xff0c;這個比賽在開了不久之后&#xff0c;不知道為啥卡了差不多20來分鐘&#xff0c;后面卡著卡著就想睡覺了。實在是太困了.... 題目意思&#xff1a; Alice做一次操作&#xff0c;刪除任意數字a,而Bob做一次操作刪除b使得ab對4取余是3。 獲勝條件&#xff0c;有人…

瀏覽器與服務器的交互

瀏覽器地址欄輸入URL&#xff08;網址??&#xff09; ????(1) 服務器進行URL解析??&#xff1a;驗證URL格式&#xff0c;提取協議、域名等 ????(2) 服務器進行DNS查詢??&#xff1a;將域名轉換為IP地址&#xff08;可能涉及緩存或DNS預取&#xff09; ????…

Spring Boot中POST請求參數校驗的實戰指南

在現代的Web開發中&#xff0c;數據校驗是確保應用程序穩定性和安全性的關鍵環節。Spring Boot提供了強大而靈活的校驗機制&#xff0c;能夠幫助開發者輕松地對POST請求參數進行校驗。本文將詳細介紹如何在Spring Boot中實現POST請求參數的校驗&#xff0c;并通過具體的代碼示例…

Spring Boot + MyBatis/MyBatis Plus:XML中循環處理List參數的終極指南

重要提醒&#xff1a;使用Param注解時&#xff0c;務必導入正確的包&#xff01; import org.apache.ibatis.annotations.Param; 很多開發者容易錯誤導入Spring的Param&#xff0c;導致參數綁定失敗&#xff01; 一、為什么需要傳遞List參數&#xff1f; 最常見的場景是動態構…