背景
修改 source 算子
kafka_old_topic?消費任務運行一段時間后,暫停狀態并保留。然后將 uid 和 topic 都改了,消費者 offset 會從 earliest 開始。
// before
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_old_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-old").name("kafka-source-old");// after
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_new_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-new").name("kafka-source-new");
新增 source 算子
但是只新增一個同樣的 kafka-source-new 算子(old 保留,消費者 offset 卻會從最近開始。
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_old_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-old").name("kafka-source-old");// 新增
FlinkKafkaConsumer consumer = KafkaConfig.getConsumer("kafka_new_topic");
consumer.setStartFromGroupOffsets();
DataStream<String> stream = env.addSource(consumer).uid("kafka-source-new").name("kafka-source-new");
算子(鏈)子任務狀態列表(operatorSubtaskStates)
針對第一種情況,job 的算子狀態(localStates)有三個,分別對應xxx,
當給 Task【Source: kafka-source-new -> map-heart (org.apache.flink.streaming.runtime.tasks.SourceStreamTask) 】(被修改的 source)分配狀態時,該 Task 的每個算子都會綁定一個狀態(OperatorState):“kafka-source-new”、“map-heart”,只不過這兩個 OperatorState 有點差異:
這兩個算子狀態的 operatorSubtaskStates (存儲算子子任務的狀態信息)集合一個為空,一個不為空。原因就是在分配 “kafka-source-new” 算子狀態時,由于其不在 localState,于是走了默認的構造函數創建 OperatorState 對象:
其實關鍵點就在 operatorSubtaskStates 的封裝。
TaskStateAssignment 任務狀態分配
TaskStateAssignment 的構造方法有個核心參數 hasNonFinishedState。
如果當前 Task 的子任務狀態列表(operatorSubtaskStates全集)不為空,該值就為 true。
一旦該值為 true,就會執行 assignTaskStateToExecutionJobVertices:
給當前 Task 的每個 subTask 賦值狀態:
那么每個 subTask 都會有一份狀態(JobManagerTaskRestore,綁定 checkpointId):
JobManagerTaskRestore(JM與TM狀態交互中間站)
一個 Execution 就是一個 subTask:
Task 部署階段(JM 向 TM 提交 Task 任務),TM 會根據 TaskDeploymentDescriptor 來恢復狀態和創建算子(其中 taskRestore 就是 JobManagerTaskRestore,在 setInitialState 中賦值)。
TM 接收到提交任務請求時,解析出 taskRestore 創建任務狀態管理器(TaskStateManager)
TaskStateManager(TM 的任務管理器)
算子子任務狀態獲取
prioritizedOperatorState:傳入算子 ID,即可從 JobMangerTaskRestore 獲取子任務狀態。
- 如果 JobMangerTaskRestore 為 null,那么返回一個空的 PrioritizedOperatorSubtaskState(checkpoint設置為null)
- 如果不為 null,則會從 JobManagerTaskRestore 中根據算子ID封裝 PrioritizedOperatorSubtaskState。
StateInitializationContext(UDF-算子狀態初始化上下文)
KafkaConsumerBase 在初始化狀態階段,會調用context.isRestored()
判斷是否從狀態恢復:
算子狀態句柄(StreamOperatorStateHandler)處理算子狀態的初始化,該階段會調用 UDFKafkaConsumerBase.initializeState
初始化算子的本地狀態并且 checkpointId 就是在這里被寫入狀態上下文 StateInitializationContext(該上下文是可以被用戶訪問的)。
StreamOperatorStateContext(initializeState全局上下文)
以上得知 checkpointId 來自context.getRestoredCheckpointId
,那么 context (該上下文是不可以被用戶訪問的)從何而來?
算子狀態初始化AbstractStreamOperator.initialState
,利用 StreamTaskStateInitializer
封裝 StreamOperatorStateContext:
那么 checkpointId 的封裝肯定在streamTaskStateManger.streamOperatorStateContext
中:
方法中通過 taskStateManger 封裝算子狀態,如果 prioritizedOperatorSubtaskState 為空對象,那么這里的 checkpointId 就為 null。
針對第二種情況,task 的算子狀態(有多個算子,算子鏈)不存在 localOperators 中,則默認使用構造方法封裝 OperatorState,每個OperatorState 的 operatorSubtaskStates 集合都為空。