Beam2.61.0版本消費kafka重復問題排查

1.問題出現過程

在測試環境測試flink的job的任務消費kafka的情況,通過往job任務發送一條消息,然后flink web ui上消費出現了兩條。然后通過重啟JobManager和TaskManager后,任務從checkpoint恢復后就會出現重復消費。當任務不從checkpoint恢復的時候,任務不會出現重復消費的情況。由此可見是beam從checkpoint恢復的時候出現了重復消費的問題。

2.任務排查過程

由于我們beam使用的是FlinkRunner,所以Beam消費Kafka會基于Flink的Source的規范實現相關的Source。

Flink中的Source實現的幾個重要的類:Source:工廠類負責實例化以下的幾個組件SourceSplit:封裝數據源的邏輯分片(如文件塊、Kafka 分區區間)。SplitEnumerator:負責分片發現與分配邏輯。SourceReader:處理分片數據讀取與反序列化。

在Beam中分別實現的Flink的KafkaSource是以下這幾個類:

FlinkUnboundedSource
FlinkSourceSplit
FlinkSourceSplitEnumerator
FlinkSourceReaderBase <- FlinkUnboundedSourceReader

其中在Flink中Source算子的執行和SourceOpearator和SourceCoordinator這兩個類有關,他們的執行順序如下:

  1. 初始化階段

    • SourceCoordinator 優先啟動:在 JobMaster(JobManager)啟動時,SourceCoordinator 作為獨立組件被創建,并負責初始化 SplitEnumerator(分片枚舉器)。

    • SourceOperator 后續啟動:在 TaskManager 上,每個并行任務實例(Task)啟動時,會初始化 SourceOperator,并在其open()方法中創建 SourceReader(數據讀取器)。

  2. 運行時協作

    • 分片分配:SourceCoordinator 的 SplitEnumerator 通過 RPC 響應 SourceOperator 的分片請求(如AddSplitEvent),動態分配分片(Split)。

    • 數據讀取:SourceOperator 將分配到的分片交給內部的 SourceReader,通過pollNext()方法讀取數據并發送到下游。

SourceOperator類邏輯

@Internal
public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStreamOperator<OUT>implements OperatorEventHandler,PushingAsyncDataInput<OUT>,TimestampsAndWatermarks.WatermarkUpdateListener {
?/** The state that holds the currently assigned splits. */// 狀態存儲當前被分配的分片信息private ListState<SplitT> readerState;@Overridepublic void open() throws Exception {// 初始化Reader操作initReader();
?// in the future when we this one is migrated to the "eager initialization" operator// (StreamOperatorV2), then we should evaluate this during operator construction.if (emitProgressiveWatermarks) {eventTimeLogic =TimestampsAndWatermarks.createProgressiveEventTimeLogic(watermarkStrategy,sourceMetricGroup,getProcessingTimeService(),getExecutionConfig().getAutoWatermarkInterval());} else {eventTimeLogic =TimestampsAndWatermarks.createNoOpEventTimeLogic(watermarkStrategy, sourceMetricGroup);}
?// restore the state if necessary.// 從checkpoint狀態中恢復出上一次被分配的分片信息final List<SplitT> splits = CollectionUtil.iterableToList(readerState.get());if (!splits.isEmpty()) {LOG.info("Restoring state for {} split(s) to reader.", splits.size());// 然后把分片信息添加到Reader中sourceReader.addSplits(splits);}
?// Register the reader to the coordinator.registerReader();
?sourceMetricGroup.idlingStarted();// Start the reader after registration, sending messages in start is allowed.sourceReader.start();
?eventTimeLogic.startPeriodicWatermarkEmits();}// SourceOperator處理算子的對應事件public void handleOperatorEvent(OperatorEvent event) {if (event instanceof WatermarkAlignmentEvent) {updateMaxDesiredWatermark((WatermarkAlignmentEvent) event);checkWatermarkAlignment();checkSplitWatermarkAlignment();} else if (event instanceof AddSplitEvent) {// 處理新增分片的事件:對應任務第一次消費,或者有心的分片增加了(對應到kafka中就是分區數增加了)handleAddSplitsEvent(((AddSplitEvent<SplitT>) event));} else if (event instanceof SourceEventWrapper) {sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent());} else if (event instanceof NoMoreSplitsEvent) {sourceReader.notifyNoMoreSplits();} else if (event instanceof IsProcessingBacklogEvent) {if (eventTimeLogic != null) {eventTimeLogic.emitImmediateWatermark(System.currentTimeMillis());}output.emitRecordAttributes(new RecordAttributesBuilder(Collections.emptyList()).setBacklog(((IsProcessingBacklogEvent) event).isProcessingBacklog()).build());} else {throw new IllegalStateException("Received unexpected operator event " + event);}}private void handleAddSplitsEvent(AddSplitEvent<SplitT> event) {try {List<SplitT> newSplits = event.splits(splitSerializer);numSplits += newSplits.size();if (operatingMode == OperatingMode.OUTPUT_NOT_INITIALIZED) {// For splits arrived before the main output is initialized, store them into the// pending list. Outputs of these splits will be created once the main output is// ready.outputPendingSplits.addAll(newSplits);} else {// Create output directly for new splits if the main output is already initialized.createOutputForSplits(newSplits);}// 將新增的分片信息添加到reader中。sourceReader.addSplits(newSplits);} catch (IOException e) {throw new FlinkRuntimeException("Failed to deserialize the splits.", e);}}
}

以上可以看到在SourceOperator中,SourceReader新增分片的地方有兩個:Open()函數中從checkpoint中恢復的和handleAddSplitsEvent()中添加的分片信息,然后繼續看看sourceReader.addSplits(newSplits)中調用的是FlinkSourceReaderBase#addSplits(newSplits)方法。

由于Beam中kafka的FlinkSourceReader分別對應有界和無界,所以中間有一個抽象的類FlinkSourceReaderBase

FlinkSourceReaderBase類

public abstract class FlinkSourceReaderBase<T, OutputT>implements SourceReader<OutputT, FlinkSourceSplit<T>> {// 這是一個隊列,存儲的是分片信息 private final Queue<FlinkSourceSplit<T>> sourceSplits = new ArrayDeque<>();@Overridepublic void addSplits(List<FlinkSourceSplit<T>> splits) {checkExceptionAndMaybeThrow();LOG.info("Adding splits {}", splits);// 往隊列中添加了分片信息sourceSplits.addAll(splits);waitingForSplitChangeFuture.get().complete(null);}protected final Optional<ReaderAndOutput> createAndTrackNextReader() throws IOException {// 從隊列中消費分片 FlinkSourceSplit<T> sourceSplit = sourceSplits.poll();if (sourceSplit != null) {// 然后根據分片創建對應的Reader,進行消費Kafka的數據。Source.Reader<T> reader = createReader(sourceSplit);ReaderAndOutput readerAndOutput = new ReaderAndOutput(sourceSplit.splitId(), reader, false);beamSourceReaders.put(sourceSplit.splitIndex(), readerAndOutput);return Optional.of(readerAndOutput);}return Optional.empty();}
}

所以看到以上的代碼其實很清楚了,消費kafka重復很有可能是因為分片被重復添加導致的,由于在Kafka中KafkaConsumer在指定分區和Offset的情況下,是可以多個消費者在同一個消費者組中消費同一個分區的。

接下來使用arthas去監控sourceReader.addSplits(newSplits)的地方的調用情況:

// 監控SourceOperator#open()方法
watch org.apache.flink.util.CollectionUtil iterableToList '{params,returnObj,throwExp}' ?-n 5 ?-x 3 
?
// 監控SourceOperator#handleAddSplitsEvent()方法
watch org.apache.flink.streaming.api.operators.SourceOperator handleAddSplitsEvent '{params,returnObj,throwExp}' ?-n 5 ?-x 3 

最終觀察到這兩個地方都被調用了,所以問題就是因為checkpoint恢復的時候添加了分片信息,而從SourceCoordinator中調用FlinkSourceSplitEnumerator()計算分片的地方又添加了一次導致最終kafka消費重復了。

FlinkSourceSplitEnumerator類

public class FlinkSourceSplitEnumerator<T>implements SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceSplitEnumerator.class);private final SplitEnumeratorContext<FlinkSourceSplit<T>> context;private final Source<T> beamSource;private final PipelineOptions pipelineOptions;private final int numSplits;private final Map<Integer, List<FlinkSourceSplit<T>>> pendingSplits;// 這里標識split計算是否被初始化過private boolean splitsInitialized; ?public FlinkSourceSplitEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> context,Source<T> beamSource,PipelineOptions pipelineOptions,int numSplits) {this.context = context;this.beamSource = beamSource;this.pipelineOptions = pipelineOptions;this.numSplits = numSplits;this.pendingSplits = new HashMap<>(numSplits);// 這里看到永遠都是false,所以無論有沒有從checkpoint恢復過,這里都會執行過一次。 this.splitsInitialized = false;}@Overridepublic void start() {context.callAsync(() -> {// 執行分片計算的操作,計算哪些kafka分區被分配給哪個并行度try {LOG.info("Starting source {}", beamSource);List<? extends Source<T>> beamSplitSourceList = splitBeamSource();Map<Integer, List<FlinkSourceSplit<T>>> flinkSourceSplitsList = new HashMap<>();int i = 0;for (Source<T> beamSplitSource : beamSplitSourceList) {int targetSubtask = i % context.currentParallelism();List<FlinkSourceSplit<T>> splitsForTask =flinkSourceSplitsList.computeIfAbsent(targetSubtask, ignored -> new ArrayList<>());splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource));i++;}return flinkSourceSplitsList;} catch (Exception e) {throw new RuntimeException(e);}},(sourceSplits, error) -> {if (error != null) {throw new RuntimeException("Failed to start source enumerator.", error);} else {pendingSplits.putAll(sourceSplits);// 這里標識設置為true了 splitsInitialized = true;// 將分配好的分片信息通過rpc發送給SourceOpeartor,對應并行度的task取對應并行度的分片信息。sendPendingSplitsToSourceReaders();}});}}

以上看到FlinkSourceSplitEnumerator被初始化的時候splitsInitialized被設置為false,然后接著看實例化FlinkSourceSplitEnumerator的FlinkSource中的邏輯。

public abstract class FlinkSource<T, OutputT>implements Source<OutputT, FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {// 這里是沒有checkpoint的時候執行的 @Overridepublic SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext) throws Exception {return new FlinkSourceSplitEnumerator<>(enumContext, beamSource, serializablePipelineOptions.get(), numSplits);}
?// 這里是從checkppoint中恢復的地方@Overridepublic SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>restoreEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext,Map<Integer, List<FlinkSourceSplit<T>>> checkpoint)throws Exception {// 在這里實例化了FlinkSourceSplitEnumeratorFlinkSourceSplitEnumerator<T> enumerator =new FlinkSourceSplitEnumerator<>(enumContext, beamSource, serializablePipelineOptions.get(), numSplits);checkpoint.forEach((subtaskId, splitsForSubtask) -> enumerator.addSplitsBack(splitsForSubtask, subtaskId));return enumerator;}}

以上看到在實例化FlinkSourceSplitEnumerator的地方,只要是從checkpoint中恢復的時候,將標識splitsInitialized設置為true,那么就不會從checkpoint中恢復的時候,去重復計算和添加分片從而導致重復消費了。

3.問題解決

后來在Beam的2.64.0版本中,發現這個bug已經被修復了,FlinkSource中restoreEnumerator的地方已經加上了判斷邏輯了。

public class FlinkSourceSplitEnumerator<T>implements SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {
?@Overridepublic SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>restoreEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext,Map<Integer, List<FlinkSourceSplit<T>>> checkpoint)throws Exception {// 這里將splitInitialized標識設置為了trueSplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> enumerator =createEnumerator(enumContext, true);checkpoint.forEach((subtaskId, splitsForSubtask) -> enumerator.addSplitsBack(splitsForSubtask, subtaskId));return enumerator;}public SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext, boolean splitInitialized)throws Exception {
?if (boundedness == Boundedness.BOUNDED) {return new LazyFlinkSourceSplitEnumerator<>(enumContext, beamSource, serializablePipelineOptions.get(), numSplits, splitInitialized);} else {return new FlinkSourceSplitEnumerator<>(enumContext, beamSource, serializablePipelineOptions.get(), numSplits, splitInitialized);}}}
?
?
public class FlinkSourceSplitEnumerator<T>implements SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {public FlinkSourceSplitEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> context,Source<T> beamSource,PipelineOptions pipelineOptions,int numSplits,boolean splitsInitialized) {
?this.context = context;this.beamSource = beamSource;this.pipelineOptions = pipelineOptions;this.numSplits = numSplits;this.pendingSplits = new HashMap<>(numSplits);this.splitsInitialized = splitsInitialized;}
?@Overridepublic void start() {// 這里加上了判斷邏輯了,為true不會執行了if (!splitsInitialized) {initializeSplits();}}
?private void initializeSplits() {context.callAsync(() -> {try {LOG.info("Starting source {}", beamSource);List<? extends Source<T>> beamSplitSourceList = splitBeamSource();Map<Integer, List<FlinkSourceSplit<T>>> flinkSourceSplitsList = new HashMap<>();int i = 0;for (Source<T> beamSplitSource : beamSplitSourceList) {int targetSubtask = i % context.currentParallelism();List<FlinkSourceSplit<T>> splitsForTask =flinkSourceSplitsList.computeIfAbsent(targetSubtask, ignored -> new ArrayList<>());splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource));i++;}return flinkSourceSplitsList;} catch (Exception e) {throw new RuntimeException(e);}},(sourceSplits, error) -> {if (error != null) {throw new RuntimeException("Failed to start source enumerator.", error);} else {pendingSplits.putAll(sourceSplits);splitsInitialized = true;sendPendingSplitsToSourceReaders();}});}
}

4.其他問題

從上可以看到Beam的KafkaSource實際上對比Flink原生的KafkaSource其實還有很多功能上的不足,比如說:

1.Beam中KafkaSource當從checkpoint恢復任務時,且這時候手動增加了Kafka的分區數實際上是不會被消費到的。

2.Beam中KafkaSource沒有動態分區發現的功能,既不能在不手動重啟任務且不從checkpoint恢復的情況下下消費到新分區的。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/87074.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/87074.shtml
英文地址,請注明出處:http://en.pswp.cn/web/87074.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

關于 java:9. Java 網絡編程

一、Socket 編程 Socket&#xff08;套接字&#xff09;是網絡通信的端點&#xff0c;是對 TCP/IP 協議的編程抽象&#xff0c;用于實現兩臺主機間的數據交換。 通俗來說&#xff1a; 可以把 Socket 理解為“電話插口”&#xff0c;插上后客戶端和服務端才能“通話”。 Sock…

主流零信任安全產品深度介紹

騰訊 iOA 零信任安全管理系統 功能&#xff1a;提供零信任接入、終端安全、數據防泄密等十余種功能模塊。可實現基于身份的動態訪問控制、終端安全一體化防護、數據防泄密體系等。核心優勢&#xff1a;基于騰訊內部千萬級終端實踐打磨&#xff0c;沉淀豐富場景方案&#xff0c…

LabVIEW裝配車體撓度無線測量

針對軌道交通車輛裝配過程中車體撓度測量需求&#xff0c;基于LabVIEW開發無線快速測量系統&#xff0c;采用品牌硬件構建高精度數據采集與傳輸架構。系統通過 ZigBee 無線傳輸技術、高精度模數轉換模塊及激光位移傳感器&#xff0c;實現裝配車體撓度的實時、自動、非接觸測量&…

java微服務-linux單機CPU接近100%優化

你這個場景&#xff1a; 4核16G 機器 同時運行了 8個 Spring Boot 微服務&#xff0c;每個 JAR 文件 100多 MB 導致 CPU 接近100% 確實是一個常見但資源緊繃的部署情境。下面是分層的優化建議&#xff0c;包括 JVM、系統、服務架構等多個方面&#xff0c;幫助你 降 CPU、穩…

MySQL表的約束和基本查詢

一.表的約束 1.1空屬性 當我們填寫問卷的時候,經常會有不允許為空的問題,比如電話號,姓名等等.而mysql上我們可以在創建表的時候,如果想要某一列不允許為空,可以加上not null來加以限制: mysql> create table myclass( -> class_name varchar(20) not null, -> cla…

VBA代碼解決方案第二十六講:如何新建EXCEL工作簿文件

《VBA代碼解決方案》(版權10028096)這套教程是我最早推出的教程&#xff0c;目前已經是第三版修訂了。這套教程定位于入門后的提高&#xff0c;在學習這套教程過程中&#xff0c;側重點是要理解及掌握我的“積木編程”思想。要靈活運用教程中的實例像搭積木一樣把自己喜歡的代碼…

【unity游戲開發——網絡】套接字Socket的重要API

注意&#xff1a;考慮到熱更新的內容比較多&#xff0c;我將熱更新的內容分開&#xff0c;并全部整合放在【unity游戲開發——網絡】專欄里&#xff0c;感興趣的小伙伴可以前往逐一查看學習。 文章目錄 1、Socket套接字的作用2、Socket類型與創建3、核心屬性速查表4、關鍵方法指…

計算機網絡(二)應用層HTTP協議

目錄 1、HTTP概念 ?編輯2、工作流程?? 3、HTTP vs HTTPS?? 4、HTTP請求特征總結? 5、持久性和非持久性連接 非持久連接&#xff08;HTTP/1.0&#xff09;?? ??持久連接&#xff08;HTTP/1.1&#xff09;?? 1、HTTP概念 HTTP&#xff08;HyperText Transfer …

c# IO密集型與CPU密集型任務詳解,以及在異步編程中的使用示例

文章目錄 IO密集型與CPU密集型任務詳解&#xff08;C#示例&#xff09;一、基本概念1. IO密集型任務2. CPU密集型任務 二、C#示例1. IO密集型示例1.1 文件操作異步示例1.2 網絡請求異步示例1.3 數據庫操作異步示例 2. CPU密集型示例2.1 基本CPU密集型異步處理2.2 并行處理CPU密…

用lines_gauss的width屬性提取缺陷

自己做了一個圖&#xff0c;這個圖放在資源里了 結果圖是這樣&#xff08;這里只結算了窄區&#xff09; 代碼和備注如下 read_image (Image11, C:/Users/Administrator/Desktop/分享/15/11.png) rgb1_to_gray (Image11, GrayImage) invert_image (GrayImage, ImageInvert) thr…

從0到100:房產中介小程序開發筆記(中)

背景調研 為中介帶來諸多優勢&#xff0c;能借助它打造專屬小程序&#xff0c;方便及時更新核實租賃信息&#xff0c;確保信息準確無誤&#xff0c;像房屋的大致地址、租金數額、租賃條件、房源優缺點等關鍵信息都能清晰呈現。還可上傳房屋拍攝照片&#xff0c;這樣用戶能提前…

【AI 時代的網絡爬蟲新形態與防護思路研究】

網絡爬蟲原理與攻擊防護的深度研究報告 網絡爬蟲技術已進入AI驅動的4.0時代&#xff0c;全球自動化請求流量占比突破51%&#xff0c;傳統防御手段在面對高度仿真的AI爬蟲時已顯疲態。基于2025年最新數據&#xff0c;深入剖析網絡爬蟲的基本原理、工作流程、分類與攻擊方式&…

低代碼平臺架構設計與關鍵組件

低代碼平臺的架構設計是其核心能力的關鍵支撐&#xff0c;需要平衡可視化開發的便捷性、生成應用的健壯性與性能、可擴展性以及企業級需求&#xff08;如安全、多租戶、集成&#xff09;。以下是一個典型的企業級低代碼平臺架構概覽及其關鍵組件&#xff1a; https://example.…

電商 ERP 系統集成接口指南

電商 ERP 系統的高效運行依賴于與多個業務系統的無縫對接&#xff0c;需要集成的核心接口包括&#xff1a;商品管理、訂單處理、庫存同步、物流配送、客戶管理、財務結算等。這些接口是實現數據互通、業務協同的關鍵橋梁。 一、電商 ERP 系統集成所需接口類型 &#xff08;一…

Python實現對WPS協作群進行群消息自動推送

前言 本文是該專欄的第59篇,后面會持續分享python的各種干貨知識,值得關注。 相信有些同學在工作或者項目中,都會使用到“WPS協作”作為辦公聊天軟件。如果說,有些項目的監控預警正好需要你同步到WPS協作群,這個時候需要怎么去做呢? 而本文,筆者將基于WPS協作,通過Py…

js嚴格模式和非嚴格模式

好的&#xff0c;這是一個非常基礎且重要的概念。我們來詳細解析一下 JavaScript 中的嚴格模式&#xff08;Strict Mode&#xff09;和非嚴格模式&#xff08;Sloppy Mode&#xff09;。 可以把它想象成參加一場考試&#xff1a; 非嚴格模式&#xff1a;就像是開卷、不計時的…

板凳-------Mysql cookbook學習 (十一--------1)

第11章&#xff1a;生成和使用序列 11.0 引言 11.1 創建一個序列列并生成序列值 CREATE TABLE insect ( id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY (id)&#xff0c;name VARCHAR(30) NOT NULL,date DATE NOT NULL,origin VARCHAR(30) NOT NULL); 字段說明 ?id…

Vue3 中 Excel 導出的性能優化與實戰指南

文章目錄 Vue3 中 Excel 導出的性能優化與實戰指南引言&#xff1a;為什么你的導出功能會卡死瀏覽器&#xff1f;一、前端導出方案深度剖析1.1 xlsx (SheetJS) - 輕量級冠軍1.2 exceljs - 功能強大的重量級選手 二、后端導出方案&#xff1a;大數據處理的救星2.1 為什么大數據需…

安卓RecyclerView實現3D滑動輪播效果全流程實戰

安卓RecyclerView實現3D滑動輪播效果全流程實戰 1. 前言 作為一名學習安卓的人,在接觸之前和之后兩種完全不同的想法: 好看和怎么實現 當初接觸到RecyclerView就覺得這個控件就可以把關于列表的所有UI實現,即便不能,也是功能十分強大 放在現在依然是應用最廣的滑動列表控…

電機控制——電機位置傳感器零位標定

在有感FOC算法中電機位置是一個重要的輸入&#xff0c;電機位置傳感器的作用就是測量電機的旋轉角度&#xff0c;通常是輸出sin(Theta)和cos(Theta)兩路模擬信號&#xff0c;根據這兩路模擬信號測得電機旋轉絕對角度。注意傳感器測量的是機械角度&#xff0c;不是電角度。 關于…