背景
最近在做 Flink 任務數據源切換時遇到 offset 消費問題,遂寫篇文章記錄下來。
切換時只修改了 source 算子的 topic,uid 等其他信息保持不變:
- 發布時,發現算子的消費者點位重置為earliest,導致消息積壓。
- 消息積壓后,打算通過時間戳重置點位到發布前,但是發現點位重置失效。
原因分析
source算子點位初始化模式
source算子點位初始化有兩種方式:1)消費者組偏移量:setStartFromGroupOffsets;2)時間戳:setStartFromTimestamp。
消費組偏移量(FromGroupOffsets)
該方式會將 startupMode 初始化為 StartupMode.GROUP_OFFSETS:
startupMode枚舉:
時間戳(FromTimestamp)
該方式會將 startupMode 初始化為 StartupMode.TIMESTAMP:
source 算子初始化
示例代碼:
public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();// configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");// configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "xxx");configuration.setString("execution.savepoint.path", "xxx");configuration.setBoolean("execution.savepoint.ignore-unclaimed-state", true);// 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);// 啟用checkpointenv.enableCheckpointing(5000);env.setParallelism(1);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);ParameterTool argTools = ParameterTool.fromArgs(args);env.getConfig().setGlobalJobParameters(argTools);// 添加數據源// "old_topic", "new_topic"FlinkKafkaConsumer consumer = KafkaConfig.getConsumer();consumer.setStartFromGroupOffsets();DataStream<String> stream = env.addSource(consumer).uid("kafka-source").name("kafka-source");SingleOutputStreamOperator<HeartEntity> heart = stream.map(new MapFunction<String, HeartEntity>() {@Overridepublic HeartEntity map(String value) throws Exception {HeartEntity heartEntity = JSON.parseObject(value, HeartEntity.class);return heartEntity;}}).uid("map-heart").name("map-heart");// 使用狀態計數DataStream<Long> countStream = heart.keyBy(HeartEntity::getCommandNo).map(new RichMapFunction<HeartEntity, Long>() {private transient ValueState<Long> countState;private long count = 0;@Overridepublic void open(Configuration parameters) throws Exception {// 初始化狀態ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("count-state", TypeInformation.of(Long.class));countState = getRuntimeContext().getState(descriptor);}@Overridepublic Long map(HeartEntity value) throws Exception {count++;countState.update(count);return countState.value();}}).uid("count-map").name("count-map");// 打印計數結果countStream.print().uid("print").name("print");// 啟動Flink任務env.execute("Flink Kafka State Count Example");}
從狀態啟動
initializeState
狀態初始化時,FlinkKafkaConsumerBase 執行 initializeState 方法中:當 source topic 從 sg_lock_heart_msg_topic 切換為 sg_tw_common_com_lock_heart_report_topic 時,可以看到新 topic 綁定的 source 算子仍然是從老 topic 的算子狀態啟動的,因為 uid 沒變。
initializeState 往下走可以看到,restoreState 的是老 topic 分區的狀態;
open
算子初始化時,如果狀態不為空且 topic 分區不在狀態中,那么就會把新的 topic 分區加入到狀態中,并設置算子消費新分區的 startupMode 為 EARLIEST_OFFSET,即從最早的消息開始消費。
老的 topic 分區不會再消費,會被移除訂閱。
訂閱的 topic 分區
從指定時間戳啟動
setStartFromTimestamp 設置啟動模式為時間戳
然而在算子初始化時,由于從狀態啟動,新 topic分區 仍然會從 earliest 消費:
也就是說,checkpoint/savepoint 中存儲的 source 點位狀態在恢復時大于設置的時間戳。
解決方案
嘗試一(修改 uid)
從 source 算子初始化的 open 過程可知,既然從狀態啟動時會將已存在 source 算子(uid在狀態中)的新 topic 點位設置為最早,那么如果將新 topic 的 uid 改成與老 topic 的 uid 不一致,是否就能避免從 earliest 恢復:因為從狀態恢復時新的 uid 并不在狀態中,那么就不會走 open 中將新 topic 點位置為 earliest 的流程。
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("sg_tw_common_com_lock_heart_report_topic");
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-new").name("kafka-source-new");
可以看到在狀態初始化階段(initializeState),source 算子的狀態 (restoreState)被置為空集合,而不是 null。為什么?
當在算子初始化時,因為 restoreState 不為 null,仍然會進入點位重置的流程:
可以看到這里將新 topic 分區放入了 restoreState 中,且點位置為 earliest(StarupMode 枚舉中,EARLIEST_OFFSET = -915623761775L)。
再往下走,restoreState 會將其中的新 topic 分區放入訂閱的分區中
從此,新 topic 又從最早開始消費😓。那么方案嘗試一是失敗的!
在線上實際操作時,消費點位確實被重置到了 earliest,又導致積壓了😦。
嘗試二(修改消費者組)
有沒有辦法讓 restoreState 置為 null 呢,那就真的不會走到點位重置的流程了🎊
突然看到 restoreState 的注釋:
如果消費者從狀態恢復,就會設置 restoreState。那怎么讓消費者不從狀態恢復?無狀態啟動肯定是不行的,不能讓其他算子的狀態丟了。那我直接換個消費組名!試一試呢
Properties props = new Properties();props.put("bootstrap.servers", "uat-kafka1.ttbike.com.cn:9092,uat-kafka2.ttbike.com.cn:9092,uat-kafka3.ttbike.com.cn:9092");props.put("group.id", "flink-label-engine-new");
還是不行,直到目前發現只要從狀態啟動,context 上下文會讓代碼走進給 restoreState 賦值的位置。
isRestored分析
isRestore分析
嘗試三(新增拓撲圖)
根據算子狀態恢復可知,只要新增的 source 算子跟其他已有算子形成了算子鏈,如果以狀態啟動,那么 source 的點位就會被置為 earliest。
- 新增一個新 topic 的 source 算子和 sink 算子(要保證新增的算子與已有算子隔離,不會形成算子鏈),然后修改老 source 算子的 uid 和 topic 與新的一致。
// old: sg_lock_heart_msg_topic
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("sg_lock_heart_msg_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-old").name("kafka-source-old");// new: sg_tw_common_com_lock_heart_report_topic
FlinkKafkaConsumer consumer_new = KafkaConfig.getConsumer("sg_tw_common_com_lock_heart_report_topic");
consumer_new.setStartFromGroupOffsets();
DataStream<String> stream_old = env.addSource(consumer_new).uid("kafka-source-new").name("kafka-source-new");
stream_old.print().uid("print-new").name("print-new");
由于從狀態啟動,且新加入的算子與其他算子隔離,老 source 算子的點位從狀態啟動;新 source 算子的點位被置為 GROUP_OFFSET。
1. 暫停并保存狀態;
2. 修改老 source 算子的 uid 和 topic 與 新算子保持一致,同時刪除新算子;
3. 然后從狀態啟動(/061c986d19612ae413ba794f68ff7727/chk-9),修改后的 source 算子點位從狀態恢復:
4. 下游 “count-map”的狀態是否正常:發送測試消息,可以看出狀態沒丟失