1.問題出現過程
在測試環境測試flink的job的任務消費kafka的情況,通過往job任務發送一條消息,然后flink web ui上消費出現了兩條。然后通過重啟JobManager和TaskManager后,任務從checkpoint恢復后就會出現重復消費。當任務不從checkpoint恢復的時候,任務不會出現重復消費的情況。由此可見是beam從checkpoint恢復的時候出現了重復消費的問題。
2.任務排查過程
由于我們beam使用的是FlinkRunner,所以Beam消費Kafka會基于Flink的Source的規范實現相關的Source。
Flink中的Source實現的幾個重要的類:Source:工廠類負責實例化以下的幾個組件SourceSplit:封裝數據源的邏輯分片(如文件塊、Kafka 分區區間)。SplitEnumerator:負責分片發現與分配邏輯。SourceReader:處理分片數據讀取與反序列化。
在Beam中分別實現的Flink的KafkaSource是以下這幾個類:
FlinkUnboundedSource
FlinkSourceSplit
FlinkSourceSplitEnumerator
FlinkSourceReaderBase <- FlinkUnboundedSourceReader
其中在Flink中Source算子的執行和SourceOpearator和SourceCoordinator這兩個類有關,他們的執行順序如下:
-
初始化階段
-
SourceCoordinator 優先啟動:在 JobMaster(JobManager)啟動時,SourceCoordinator 作為獨立組件被創建,并負責初始化 SplitEnumerator(分片枚舉器)。
-
SourceOperator 后續啟動:在 TaskManager 上,每個并行任務實例(Task)啟動時,會初始化 SourceOperator,并在其
open()
方法中創建 SourceReader(數據讀取器)。
-
-
運行時協作
-
分片分配:SourceCoordinator 的 SplitEnumerator 通過 RPC 響應 SourceOperator 的分片請求(如AddSplitEvent),動態分配分片(Split)。
-
數據讀取:SourceOperator 將分配到的分片交給內部的 SourceReader,通過
pollNext()
方法讀取數據并發送到下游。
-
SourceOperator類邏輯
@Internal
public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStreamOperator<OUT>implements OperatorEventHandler,PushingAsyncDataInput<OUT>,TimestampsAndWatermarks.WatermarkUpdateListener {
?/** The state that holds the currently assigned splits. */// 狀態存儲當前被分配的分片信息private ListState<SplitT> readerState;@Overridepublic void open() throws Exception {// 初始化Reader操作initReader();
?// in the future when we this one is migrated to the "eager initialization" operator// (StreamOperatorV2), then we should evaluate this during operator construction.if (emitProgressiveWatermarks) {eventTimeLogic =TimestampsAndWatermarks.createProgressiveEventTimeLogic(watermarkStrategy,sourceMetricGroup,getProcessingTimeService(),getExecutionConfig().getAutoWatermarkInterval());} else {eventTimeLogic =TimestampsAndWatermarks.createNoOpEventTimeLogic(watermarkStrategy, sourceMetricGroup);}
?// restore the state if necessary.// 從checkpoint狀態中恢復出上一次被分配的分片信息final List<SplitT> splits = CollectionUtil.iterableToList(readerState.get());if (!splits.isEmpty()) {LOG.info("Restoring state for {} split(s) to reader.", splits.size());// 然后把分片信息添加到Reader中sourceReader.addSplits(splits);}
?// Register the reader to the coordinator.registerReader();
?sourceMetricGroup.idlingStarted();// Start the reader after registration, sending messages in start is allowed.sourceReader.start();
?eventTimeLogic.startPeriodicWatermarkEmits();}// SourceOperator處理算子的對應事件public void handleOperatorEvent(OperatorEvent event) {if (event instanceof WatermarkAlignmentEvent) {updateMaxDesiredWatermark((WatermarkAlignmentEvent) event);checkWatermarkAlignment();checkSplitWatermarkAlignment();} else if (event instanceof AddSplitEvent) {// 處理新增分片的事件:對應任務第一次消費,或者有心的分片增加了(對應到kafka中就是分區數增加了)handleAddSplitsEvent(((AddSplitEvent<SplitT>) event));} else if (event instanceof SourceEventWrapper) {sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent());} else if (event instanceof NoMoreSplitsEvent) {sourceReader.notifyNoMoreSplits();} else if (event instanceof IsProcessingBacklogEvent) {if (eventTimeLogic != null) {eventTimeLogic.emitImmediateWatermark(System.currentTimeMillis());}output.emitRecordAttributes(new RecordAttributesBuilder(Collections.emptyList()).setBacklog(((IsProcessingBacklogEvent) event).isProcessingBacklog()).build());} else {throw new IllegalStateException("Received unexpected operator event " + event);}}private void handleAddSplitsEvent(AddSplitEvent<SplitT> event) {try {List<SplitT> newSplits = event.splits(splitSerializer);numSplits += newSplits.size();if (operatingMode == OperatingMode.OUTPUT_NOT_INITIALIZED) {// For splits arrived before the main output is initialized, store them into the// pending list. Outputs of these splits will be created once the main output is// ready.outputPendingSplits.addAll(newSplits);} else {// Create output directly for new splits if the main output is already initialized.createOutputForSplits(newSplits);}// 將新增的分片信息添加到reader中。sourceReader.addSplits(newSplits);} catch (IOException e) {throw new FlinkRuntimeException("Failed to deserialize the splits.", e);}}
}
以上可以看到在SourceOperator中,SourceReader新增分片的地方有兩個:Open()
函數中從checkpoint中恢復的和handleAddSplitsEvent()
中添加的分片信息,然后繼續看看sourceReader.addSplits(newSplits)
中調用的是FlinkSourceReaderBase#addSplits(newSplits)
方法。
由于Beam中kafka的FlinkSourceReader分別對應有界和無界,所以中間有一個抽象的類FlinkSourceReaderBase
FlinkSourceReaderBase類
public abstract class FlinkSourceReaderBase<T, OutputT>implements SourceReader<OutputT, FlinkSourceSplit<T>> {// 這是一個隊列,存儲的是分片信息 private final Queue<FlinkSourceSplit<T>> sourceSplits = new ArrayDeque<>();@Overridepublic void addSplits(List<FlinkSourceSplit<T>> splits) {checkExceptionAndMaybeThrow();LOG.info("Adding splits {}", splits);// 往隊列中添加了分片信息sourceSplits.addAll(splits);waitingForSplitChangeFuture.get().complete(null);}protected final Optional<ReaderAndOutput> createAndTrackNextReader() throws IOException {// 從隊列中消費分片 FlinkSourceSplit<T> sourceSplit = sourceSplits.poll();if (sourceSplit != null) {// 然后根據分片創建對應的Reader,進行消費Kafka的數據。Source.Reader<T> reader = createReader(sourceSplit);ReaderAndOutput readerAndOutput = new ReaderAndOutput(sourceSplit.splitId(), reader, false);beamSourceReaders.put(sourceSplit.splitIndex(), readerAndOutput);return Optional.of(readerAndOutput);}return Optional.empty();}
}
所以看到以上的代碼其實很清楚了,消費kafka重復很有可能是因為分片被重復添加導致的,由于在Kafka中KafkaConsumer在指定分區和Offset的情況下,是可以多個消費者在同一個消費者組中消費同一個分區的。
接下來使用arthas去監控sourceReader.addSplits(newSplits)
的地方的調用情況:
// 監控SourceOperator#open()方法
watch org.apache.flink.util.CollectionUtil iterableToList '{params,returnObj,throwExp}' ?-n 5 ?-x 3
?
// 監控SourceOperator#handleAddSplitsEvent()方法
watch org.apache.flink.streaming.api.operators.SourceOperator handleAddSplitsEvent '{params,returnObj,throwExp}' ?-n 5 ?-x 3
最終觀察到這兩個地方都被調用了,所以問題就是因為checkpoint恢復的時候添加了分片信息,而從SourceCoordinator
中調用FlinkSourceSplitEnumerator()
計算分片的地方又添加了一次導致最終kafka消費重復了。
FlinkSourceSplitEnumerator類
public class FlinkSourceSplitEnumerator<T>implements SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceSplitEnumerator.class);private final SplitEnumeratorContext<FlinkSourceSplit<T>> context;private final Source<T> beamSource;private final PipelineOptions pipelineOptions;private final int numSplits;private final Map<Integer, List<FlinkSourceSplit<T>>> pendingSplits;// 這里標識split計算是否被初始化過private boolean splitsInitialized; ?public FlinkSourceSplitEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> context,Source<T> beamSource,PipelineOptions pipelineOptions,int numSplits) {this.context = context;this.beamSource = beamSource;this.pipelineOptions = pipelineOptions;this.numSplits = numSplits;this.pendingSplits = new HashMap<>(numSplits);// 這里看到永遠都是false,所以無論有沒有從checkpoint恢復過,這里都會執行過一次。 this.splitsInitialized = false;}@Overridepublic void start() {context.callAsync(() -> {// 執行分片計算的操作,計算哪些kafka分區被分配給哪個并行度try {LOG.info("Starting source {}", beamSource);List<? extends Source<T>> beamSplitSourceList = splitBeamSource();Map<Integer, List<FlinkSourceSplit<T>>> flinkSourceSplitsList = new HashMap<>();int i = 0;for (Source<T> beamSplitSource : beamSplitSourceList) {int targetSubtask = i % context.currentParallelism();List<FlinkSourceSplit<T>> splitsForTask =flinkSourceSplitsList.computeIfAbsent(targetSubtask, ignored -> new ArrayList<>());splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource));i++;}return flinkSourceSplitsList;} catch (Exception e) {throw new RuntimeException(e);}},(sourceSplits, error) -> {if (error != null) {throw new RuntimeException("Failed to start source enumerator.", error);} else {pendingSplits.putAll(sourceSplits);// 這里標識設置為true了 splitsInitialized = true;// 將分配好的分片信息通過rpc發送給SourceOpeartor,對應并行度的task取對應并行度的分片信息。sendPendingSplitsToSourceReaders();}});}}
以上看到FlinkSourceSplitEnumerator
被初始化的時候splitsInitialized
被設置為false,然后接著看實例化FlinkSourceSplitEnumerator
的FlinkSource中的邏輯。
public abstract class FlinkSource<T, OutputT>implements Source<OutputT, FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {// 這里是沒有checkpoint的時候執行的 @Overridepublic SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext) throws Exception {return new FlinkSourceSplitEnumerator<>(enumContext, beamSource, serializablePipelineOptions.get(), numSplits);}
?// 這里是從checkppoint中恢復的地方@Overridepublic SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>restoreEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext,Map<Integer, List<FlinkSourceSplit<T>>> checkpoint)throws Exception {// 在這里實例化了FlinkSourceSplitEnumeratorFlinkSourceSplitEnumerator<T> enumerator =new FlinkSourceSplitEnumerator<>(enumContext, beamSource, serializablePipelineOptions.get(), numSplits);checkpoint.forEach((subtaskId, splitsForSubtask) -> enumerator.addSplitsBack(splitsForSubtask, subtaskId));return enumerator;}}
以上看到在實例化FlinkSourceSplitEnumerator
的地方,只要是從checkpoint中恢復的時候,將標識splitsInitialized
設置為true,那么就不會從checkpoint中恢復的時候,去重復計算和添加分片從而導致重復消費了。
3.問題解決
后來在Beam的2.64.0版本中,發現這個bug已經被修復了,FlinkSource
中restoreEnumerator的地方已經加上了判斷邏輯了。
public class FlinkSourceSplitEnumerator<T>implements SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {
?@Overridepublic SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>restoreEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext,Map<Integer, List<FlinkSourceSplit<T>>> checkpoint)throws Exception {// 這里將splitInitialized標識設置為了trueSplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> enumerator =createEnumerator(enumContext, true);checkpoint.forEach((subtaskId, splitsForSubtask) -> enumerator.addSplitsBack(splitsForSubtask, subtaskId));return enumerator;}public SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext, boolean splitInitialized)throws Exception {
?if (boundedness == Boundedness.BOUNDED) {return new LazyFlinkSourceSplitEnumerator<>(enumContext, beamSource, serializablePipelineOptions.get(), numSplits, splitInitialized);} else {return new FlinkSourceSplitEnumerator<>(enumContext, beamSource, serializablePipelineOptions.get(), numSplits, splitInitialized);}}}
?
?
public class FlinkSourceSplitEnumerator<T>implements SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {public FlinkSourceSplitEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> context,Source<T> beamSource,PipelineOptions pipelineOptions,int numSplits,boolean splitsInitialized) {
?this.context = context;this.beamSource = beamSource;this.pipelineOptions = pipelineOptions;this.numSplits = numSplits;this.pendingSplits = new HashMap<>(numSplits);this.splitsInitialized = splitsInitialized;}
?@Overridepublic void start() {// 這里加上了判斷邏輯了,為true不會執行了if (!splitsInitialized) {initializeSplits();}}
?private void initializeSplits() {context.callAsync(() -> {try {LOG.info("Starting source {}", beamSource);List<? extends Source<T>> beamSplitSourceList = splitBeamSource();Map<Integer, List<FlinkSourceSplit<T>>> flinkSourceSplitsList = new HashMap<>();int i = 0;for (Source<T> beamSplitSource : beamSplitSourceList) {int targetSubtask = i % context.currentParallelism();List<FlinkSourceSplit<T>> splitsForTask =flinkSourceSplitsList.computeIfAbsent(targetSubtask, ignored -> new ArrayList<>());splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource));i++;}return flinkSourceSplitsList;} catch (Exception e) {throw new RuntimeException(e);}},(sourceSplits, error) -> {if (error != null) {throw new RuntimeException("Failed to start source enumerator.", error);} else {pendingSplits.putAll(sourceSplits);splitsInitialized = true;sendPendingSplitsToSourceReaders();}});}
}
4.其他問題
從上可以看到Beam的KafkaSource實際上對比Flink原生的KafkaSource其實還有很多功能上的不足,比如說:
1.Beam中KafkaSource當從checkpoint恢復任務時,且這時候手動增加了Kafka的分區數實際上是不會被消費到的。
2.Beam中KafkaSource沒有動態分區發現的功能,既不能在不手動重啟任務且不從checkpoint恢復的情況下下消費到新分區的。