現象?
查看flink源碼時 sink kafka有事務提交機制,查看源碼發現是使用兩階段提交策略,而事務提交是checkpoint完成后才執行,那么如果checkpoint設置間隔時間比較長時,事務未提交之前,后端應該消費不到數據,而觀察實際現象為寫入kafka的消費數據可以立馬消費。
測試用例
測試流程
- 編寫任務1,設置較長的checkpoint時間,并且指定?CheckpointingMode.EXACTLY_ONCE,輸出輸出到kafka。
- 編寫任務2消費任務的結果topic,打印控制臺,驗證結果。
- 根據現象查看源碼,分析原因。
測試用例
測試任務1
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(1000*60l, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///flink/checkpoint");// 超時時間,checkpoint沒在時間內完成則丟棄env.getCheckpointConfig().setCheckpointTimeout(50000L); //10秒env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);//最小間隔時間(前一次結束時間,與下一次開始時間間隔)env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
// 當 Flink 任務取消時,保留外部保存的 checkpoint 信息KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("127.0.0.1:9092").setTopics("test001").setGroupId("my-group")
// .setStartingOffsets(OffsetsInitializer()).setStartingOffsets(OffsetsInitializer.committedOffsets()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 從文件讀取數據
// DataStream<SensorReading> dataStream = env.addSource( new SourceTest4.MySensorSource() );DataStream<String> map = kafkaSource.map(new MapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {return s;}});Properties properties = new Properties();
// 根據上面的介紹自己計算這邊的超時時間,滿足條件即可properties.setProperty("transaction.timeout.ms","900000");
// properties.setProperty("bootstrap.servers", "127.0.0.1:9092");KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("192.168.65.128:9092").setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("test002").setValueSerializationSchema(new SimpleStringSchema()).build()).setKafkaProducerConfig(properties).setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("flink-xhaodream-").build();map.sinkTo(sink);// 打印輸出env.execute();
測試任務2
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(1000*150l, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("file:///flink/checkpoint");
// 當 Flink 任務取消時,保留外部保存的 checkpoint 信息Properties properties1 = new Properties();
// properties1.put("isolation.level","read_committed");KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("127.0.0.1:9092").setTopics("test002").setGroupId("my-group2").setProperties(properties1).setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");kafkaSource.print(" test2接受數據");// 打印輸出env.execute();
測試結果分析
測試結果:
任務1開啟后,無論是否執行checkpoint,任務checkpoint都可以正常消費數據,與預期不符合。
原因排查
查看kafkaSink 的源碼,找到跟與兩階段提交相關的代碼,1.18源碼中TwoPhaseCommittingSink有重構。kafkasink實現TwoPhaseCommittingSink接口實現,創建Commiter和Writer。
@PublicEvolving
public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {PrecommittingSinkWriter<InputT, CommT> createWriter(Sink.InitContext var1) throws IOException;Committer<CommT> createCommitter() throws IOException;SimpleVersionedSerializer<CommT> getCommittableSerializer();@PublicEvolvingpublic interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> {Collection<CommT> prepareCommit() throws IOException, InterruptedException;}
}--------------------------------------
public class KafkaSink<IN>implements StatefulSink<IN, KafkaWriterState>,TwoPhaseCommittingSink<IN, KafkaCommittable> {private final DeliveryGuarantee deliveryGuarantee;private final KafkaRecordSerializationSchema<IN> recordSerializer;private final Properties kafkaProducerConfig;private final String transactionalIdPrefix;KafkaSink(DeliveryGuarantee deliveryGuarantee,Properties kafkaProducerConfig,String transactionalIdPrefix,KafkaRecordSerializationSchema<IN> recordSerializer) {this.deliveryGuarantee = deliveryGuarantee;this.kafkaProducerConfig = kafkaProducerConfig;this.transactionalIdPrefix = transactionalIdPrefix;this.recordSerializer = recordSerializer;}/*** Create a {@link KafkaSinkBuilder} to construct a new {@link KafkaSink}.** @param <IN> type of incoming records* @return {@link KafkaSinkBuilder}*/public static <IN> KafkaSinkBuilder<IN> builder() {return new KafkaSinkBuilder<>();}
-- 創建Committer@Internal@Overridepublic Committer<KafkaCommittable> createCommitter() throws IOException {return new KafkaCommitter(kafkaProducerConfig);}@Internal@Overridepublic SimpleVersionedSerializer<KafkaCommittable> getCommittableSerializer() {return new KafkaCommittableSerializer();}
-- 創建writer@Internal@Overridepublic KafkaWriter<IN> createWriter(InitContext context) throws IOException {return new KafkaWriter<IN>(deliveryGuarantee,kafkaProducerConfig,transactionalIdPrefix,context,recordSerializer,context.asSerializationSchemaInitializationContext(),Collections.emptyList());}@Internal@Overridepublic KafkaWriter<IN> restoreWriter(InitContext context, Collection<KafkaWriterState> recoveredState) throws IOException {return new KafkaWriter<>(deliveryGuarantee,kafkaProducerConfig,transactionalIdPrefix,context,recordSerializer,context.asSerializationSchemaInitializationContext(),recoveredState);}@Internal@Overridepublic SimpleVersionedSerializer<KafkaWriterState> getWriterStateSerializer() {return new KafkaWriterStateSerializer();}@VisibleForTestingprotected Properties getKafkaProducerConfig() {return kafkaProducerConfig;}
}
KafkaWriter和KafkaCommitter源碼,
在KafkaWriter中snapshotState方法中發現如果deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE的開啟事務的判斷邏輯。
class KafkaWriter<IN>implements StatefulSink.StatefulSinkWriter<IN, KafkaWriterState>,TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, KafkaCommittable> {
.... 省略代碼 @Overridepublic Collection<KafkaCommittable> prepareCommit() {if (deliveryGuarantee != DeliveryGuarantee.EXACTLY_ONCE) {return Collections.emptyList();}// only return a KafkaCommittable if the current transaction has been written some dataif (currentProducer.hasRecordsInTransaction()) {final List<KafkaCommittable> committables =Collections.singletonList(KafkaCommittable.of(currentProducer, producerPool::add));LOG.debug("Committing {} committables.", committables);return committables;}// otherwise, we commit the empty transaction as is (no-op) and just recycle the producercurrentProducer.commitTransaction();producerPool.add(currentProducer);return Collections.emptyList();}@Overridepublic List<KafkaWriterState> snapshotState(long checkpointId) throws IOException {
-- 開啟事務判斷
if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {currentProducer = getTransactionalProducer(checkpointId + 1);currentProducer.beginTransaction();}return Collections.singletonList(kafkaWriterState);}
。。。。。
}
?查看?KafkaCommitter的commit()方法發現producer.commitTransaction();操作
/*** Committer implementation for {@link KafkaSink}** <p>The committer is responsible to finalize the Kafka transactions by committing them.*/
class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {private static final Logger LOG = LoggerFactory.getLogger(KafkaCommitter.class);public static final String UNKNOWN_PRODUCER_ID_ERROR_MESSAGE ="because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.\n"+ "To avoid data loss, the application will restart.";private final Properties kafkaProducerConfig;@Nullable private FlinkKafkaInternalProducer<?, ?> recoveryProducer;KafkaCommitter(Properties kafkaProducerConfig) {this.kafkaProducerConfig = kafkaProducerConfig;}@Overridepublic void commit(Collection<CommitRequest<KafkaCommittable>> requests)throws IOException, InterruptedException {for (CommitRequest<KafkaCommittable> request : requests) {final KafkaCommittable committable = request.getCommittable();final String transactionalId = committable.getTransactionalId();LOG.debug("Committing Kafka transaction {}", transactionalId);Optional<Recyclable<? extends FlinkKafkaInternalProducer<?, ?>>> recyclable =committable.getProducer();FlinkKafkaInternalProducer<?, ?> producer;try {producer =recyclable.<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject).orElseGet(() -> getRecoveryProducer(committable));--- 事務提交producer.commitTransaction();producer.flush();recyclable.ifPresent(Recyclable::close);} catch (RetriableException e) {LOG.warn("Encountered retriable exception while committing {}.", transactionalId, e);request.retryLater();} catch (ProducerFencedException e) {......}}}
。。。。
}
分析結果
發現除了設置checkpoint還需要kafkasink單獨設置.才會實現輸出端的開啟事務,因此在任務1中添加設置setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("192.168.65.128:9092").setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("test002").setValueSerializationSchema(new SimpleStringSchema()).build()).setKafkaProducerConfig(properties).setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix("flink-xhaodream-").build();
再次驗證任務任務2依然可以正常消費。這是有一點頭大,不明白為什么?想到既然開啟事務肯定有事務的隔離級別,查詢了kafka的事務隔離級別,有兩種,分別是讀已提交和讀未提交,默認消費事務是讀未提交。
?
kafka的事務隔離級別:
讀已提交(Read?committed):此隔離級別保證消費者只能讀取已經提交的消息。這意味著事務中的消息在提交之前對消費者是不可見的。使用此隔離級別可以避免消費者讀取到未提交的事務消息,確保消費者只讀取到已經持久化的消息。讀未提交(Read Uncommitted):此隔離級別允許消費者讀取未提交的消息。這意味著事務中的消息在提交之前就對消費者可見。使用此隔離級別可以實現更低的延遲,但可能會導致消費者讀取到未提交的事務消息。?
在任務2中添加isolation.level="read_committed",設定讀取消費事務級別為讀已提交,再次測試,發現任務1執行完checkpoint前任務2消費不到數據。而命令行可以及時消費任務1的輸出topic可可以消費到數據。結果與預期相同。
Properties properties1 = new Properties();properties1.put("isolation.level","read_committed");KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("127.0.0.1:9092").setTopics("test002").setGroupId("my-group2").setProperties(properties1)
注意事項
Kafka | Apache Flink
FlinkKafkaProducer
?已被棄用并將在 Flink 1.15 中移除,請改用?KafkaSink
。
官網文檔信息
Kafka | Apache Flink
Kafka Consumer 提交 Offset 的行為配置?#
Flink Kafka Consumer 允許有配置如何將 offset 提交回 Kafka broker 的行為。請注意:Flink Kafka Consumer 不依賴于提交的 offset 來實現容錯保證。提交的 offset 只是一種方法,用于公開 consumer 的進度以便進行監控。
配置 offset 提交行為的方法是否相同,取決于是否為 job 啟用了 checkpointing。
-
禁用 Checkpointing:?如果禁用了 checkpointing,則 Flink Kafka Consumer 依賴于內部使用的 Kafka client 自動定期 offset 提交功能。 因此,要禁用或啟用 offset 的提交,只需將?
enable.auto.commit
?或者?auto.commit.interval.ms
?的Key 值設置為提供的?Properties
?配置中的適當值。 -
啟用 Checkpointing:?如果啟用了 checkpointing,那么當 checkpointing 完成時,Flink Kafka Consumer 將提交的 offset 存儲在 checkpoint 狀態中。 這確保 Kafka broker 中提交的 offset 與 checkpoint 狀態中的 offset 一致。 用戶可以通過調用 consumer 上的?
setCommitOffsetsOnCheckpoints(boolean)
?方法來禁用或啟用 offset 的提交(默認情況下,這個值是 true )。 注意,在這個場景中,Properties
?中的自動定期 offset 提交設置會被完全忽略。
kafkasink支持語義保證
kafkaSink
?總共支持三種不同的語義保證(DeliveryGuarantee
)。對于?DeliveryGuarantee.AT_LEAST_ONCE
?和?DeliveryGuarantee.EXACTLY_ONCE
,Flink checkpoint 必須啟用。默認情況下?KafkaSink
?使用?DeliveryGuarantee.NONE
。 以下是對不同語義保證的解釋:
DeliveryGuarantee.NONE
?不提供任何保證:消息有可能會因 Kafka broker 的原因發生丟失或因 Flink 的故障發生重復。DeliveryGuarantee.AT_LEAST_ONCE
: sink 在 checkpoint 時會等待 Kafka 緩沖區中的數據全部被 Kafka producer 確認。消息不會因 Kafka broker 端發生的事件而丟失,但可能會在 Flink 重啟時重復,因為 Flink 會重新處理舊數據。DeliveryGuarantee.EXACTLY_ONCE
: 該模式下,Kafka sink 會將所有數據通過在 checkpoint 時提交的事務寫入。因此,如果 consumer 只讀取已提交的數據(參見 Kafka consumer 配置?isolation.level
),在 Flink 發生重啟時不會發生數據重復。然而這會使數據在 checkpoint 完成時才會可見,因此請按需調整 checkpoint 的間隔。請確認事務 ID 的前綴(transactionIdPrefix)對不同的應用是唯一的,以保證不同作業的事務 不會互相影響!此外,強烈建議將 Kafka 的事務超時時間調整至遠大于 checkpoint 最大間隔 + 最大重啟時間,否則 Kafka 對未提交事務的過期處理會導致數據丟失。
推薦查看1.14版本和1.18版本結合起來看,在一些細節處理上有差異。
Kafka | Apache Flink
其他源碼簡介
如果查看1.18版本源碼不太好理解兩階段提交,可以查看1.14.5的源碼,發現FlinkKafkaProducer被標記廢除
請改用?KafkaSink,
并將在 Flink 1.15 中移除, 在1.14.5中TwoPhaseCommitSinkFunction為抽象類,有明確定開啟事務、預提交和提交的抽象方法,比較好理解。
?
?查看1.14.5版本的KafkaSink 的依賴,發現沒有直接使用TwoPhaseCommitSinkFunction,但是查看源碼可以看到使用了commiter和kafkawriter對象
?
public class KafkaSink<IN> implements Sink<IN, KafkaCommittable, KafkaWriterState, Void> { public static <IN> KafkaSinkBuilder<IN> builder() {return new KafkaSinkBuilder<>();}
-- KafkaWriter 中會判斷是否需要開啟事務@Overridepublic SinkWriter<IN, KafkaCommittable, KafkaWriterState> createWriter(InitContext context, List<KafkaWriterState> states) throws IOException {final Supplier<MetricGroup> metricGroupSupplier =() -> context.metricGroup().addGroup("user");return new KafkaWriter<>(deliveryGuarantee,kafkaProducerConfig,transactionalIdPrefix,context,recordSerializer,new InitContextInitializationContextAdapter(context.getUserCodeClassLoader(), metricGroupSupplier),states);}-- 事務提交在kafkaCommitter@Overridepublic Optional<Committer<KafkaCommittable>> createCommitter() throws IOException {return Optional.of(new KafkaCommitter(kafkaProducerConfig));}@Overridepublic Optional<GlobalCommitter<KafkaCommittable, Void>> createGlobalCommitter()throws IOException {return Optional.empty();}...
}
KafkaWriter源碼
@Overridepublic List<KafkaCommittable> prepareCommit(boolean flush) {if (deliveryGuarantee != DeliveryGuarantee.NONE || flush) {currentProducer.flush();}if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {final List<KafkaCommittable> committables =Collections.singletonList(KafkaCommittable.of(currentProducer, producerPool::add));LOG.debug("Committing {} committables, final commit={}.", committables, flush);return committables;}return Collections.emptyList();}
-- 快照狀態開啟事務@Overridepublic List<KafkaWriterState> snapshotState(long checkpointId) throws IOException {if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {currentProducer = getTransactionalProducer(checkpointId + 1);currentProducer.beginTransaction();}return ImmutableList.of(kafkaWriterState);}
1.14.5 版本TwoPhaseCommitSinkFunction是一個抽象類 在1.18 中是接口
/*** Flink Sink to produce data into a Kafka topic. By default producer will use {@link* FlinkKafkaProducer.Semantic#AT_LEAST_ONCE} semantic. Before using {@link* FlinkKafkaProducer.Semantic#EXACTLY_ONCE} please refer to Flink's Kafka connector documentation.** @deprecated Please use {@link org.apache.flink.connector.kafka.sink.KafkaSink}.*/
@Deprecated
@PublicEvolving
public class FlinkKafkaProducer<IN>extends TwoPhaseCommitSinkFunction<IN,FlinkKafkaProducer.KafkaTransactionState,FlinkKafkaProducer.KafkaTransactionContext> {。。。}
-- 1.14 版本TwoPhaseCommitSinkFunction 為抽象類@PublicEvolving
public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichSinkFunction<IN>implements CheckpointedFunction, CheckpointListener { }-- 1.18 版本
@PublicEvolving
public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {PrecommittingSinkWriter<InputT, CommT> createWriter(Sink.InitContext var1) throws IOException;Committer<CommT> createCommitter() throws IOException;SimpleVersionedSerializer<CommT> getCommittableSerializer();@PublicEvolvingpublic interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> {Collection<CommT> prepareCommit() throws IOException, InterruptedException;}
}
?
FlinkKafkaProducer繼承TwoPhaseCommitSinkFunction,會重寫其中的方法,查看重寫開啟事務的方法
-- FlinkKafkaProducer 中重寫beginTransaction 方法@Overrideprotected FlinkKafkaProducer.KafkaTransactionState beginTransaction()throws FlinkKafkaException {switch (semantic) {case EXACTLY_ONCE:FlinkKafkaInternalProducer<byte[], byte[]> producer = createTransactionalProducer();
-- 開啟kafka的procder的事務producer.beginTransaction();return new FlinkKafkaProducer.KafkaTransactionState(producer.getTransactionalId(), producer);case AT_LEAST_ONCE:case NONE:// Do not create new producer on each beginTransaction() if it is not necessaryfinal FlinkKafkaProducer.KafkaTransactionState currentTransaction =currentTransaction();if (currentTransaction != null && currentTransaction.producer != null) {return new FlinkKafkaProducer.KafkaTransactionState(currentTransaction.producer);}return new FlinkKafkaProducer.KafkaTransactionState(initNonTransactionalProducer(true));default:throw new UnsupportedOperationException("Not implemented semantic");}}
只有當FlinkKafkaProducer.Semantic 為EXACTLY_ONCE時才會開啟事務,查看其構造方法
public FlinkKafkaProducer(String topicId,SerializationSchema<IN> serializationSchema,Properties producerConfig,@Nullable FlinkKafkaPartitioner<IN> customPartitioner,FlinkKafkaProducer.Semantic semantic,int kafkaProducersPoolSize) {this(topicId,null,null,new KafkaSerializationSchemaWrapper<>(topicId, customPartitioner, false, serializationSchema),producerConfig,semantic,kafkaProducersPoolSize);}