問題描述
一個常見的開窗邏輯(12H 或者 500條):
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import java.time.Duration;public class UIDWindowWithProcessFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 假設輸入數據流包含uid字段和其他數據DataStream<Event> inputStream = env.addSource(...); inputStream.keyBy(event -> event.uid) // 按UID分組.process(new CustomProcessFunction()).print();env.execute("UID-based Window Processing");}public static class CustomProcessFunction extends KeyedProcessFunction<String, Event, OutputEvent> {// 狀態用于計數private transient ValueState<Integer> countState;// 狀態用于記錄最后更新時間private transient ValueState<Long> lastTimerState;@Overridepublic void open(Configuration parameters) {ValueStateDescriptor<Integer> countDescriptor = new ValueStateDescriptor<>("count", Types.INT);countState = getRuntimeContext().getState(countDescriptor);ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>("timerState", Types.LONG);lastTimerState = getRuntimeContext().getState(timerDescriptor);}@Overridepublic void processElement(Event event, Context ctx, Collector<OutputEvent> out) throws Exception {// 獲取當前計數Integer currentCount = countState.value();if (currentCount == null) {currentCount = 0;}// 更新計數currentCount += 1;countState.update(currentCount);// 獲取當前定時器時間戳Long currentTimer = lastTimerState.value();// 如果是第一條記錄,注冊12小時后的定時器if (currentCount == 1) {long timerTime = ctx.timestamp() + Duration.ofHours(12).toMillis();ctx.timerService().registerProcessingTimeTimer(timerTime);lastTimerState.update(timerTime);}// 如果達到500條,立即觸發并重置if (currentCount >= 500) {// 觸發處理OutputEvent output = new OutputEvent(ctx.getCurrentKey(), currentCount,System.currentTimeMillis());out.collect(output);// 清除狀態countState.clear();// 取消之前的定時器if (currentTimer != null) {ctx.timerService().deleteProcessingTimeTimer(currentTimer);}lastTimerState.clear();}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<OutputEvent> out) throws Exception {// 定時器觸發時處理Integer currentCount = countState.value();if (currentCount != null && currentCount > 0) {OutputEvent output = new OutputEvent(ctx.getCurrentKey(),currentCount,timestamp);out.collect(output);// 清除狀態countState.clear();lastTimerState.clear();}}}// 定義輸入輸出事件類public static class Event {public String uid;// 其他字段...}public static class OutputEvent {public String uid;public int count;public long timestamp;public OutputEvent(String uid, int count, long timestamp) {this.uid = uid;this.count = count;this.timestamp = timestamp;}}
}
雖然?通過uid進行shuffle,即 keyBy(event -> event.uid)。
但因為Flink的并行度,也就是subtask數量 遠少于 uid數量,導致每個subtask會處理多個用戶的數據。而實際上每個subtask只有一個?CustomProcessFunction。那狀態計數是否會沖突?
// 獲取當前計數
Integer currentCount = countState.value();
觸發的Timer又是否是只屬于一個用戶?
@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<OutputEvent> out) throws Exception {// 定時器觸發時處理Integer currentCount = countState.value();if (currentCount != null && currentCount > 0) {
實際上這兩個問題的答案都是肯定的,實現機制在于:
getRuntimeContext().getState()怎么實現對于key綁定狀態
Timer怎么綁定key?
為什么getRuntimeContext().getState()能夠獲得和key綁定的state?
Subtask會根據是不是keyedProcessFunction 在處理每條數據時,設置currentKey
OneInputStreamTask 通過 StreamTaskNetworkOutput 處理每一條輸入數據。StreamTaskNetworkOutput則創建了recordProcessor 。
private StreamTaskNetworkOutput(Input<IN> operator, WatermarkGauge watermarkGauge, Counter numRecordsIn) {this.operator = checkNotNull(operator);this.watermarkGauge = checkNotNull(watermarkGauge);this.numRecordsIn = checkNotNull(numRecordsIn);this.recordProcessor = RecordProcessorUtils.getRecordProcessor(operator);}
RecordProcessorUtils.getRecordProcessor 根據是不是KeyStream會增加setKeyContextElement操作,這個process會設置Key再調用OP的 processElement。
public static <T> ThrowingConsumer<StreamRecord<T>, Exception> getRecordProcessor(Input<T> input) {boolean canOmitSetKeyContext;if (input instanceof AbstractStreamOperator) {canOmitSetKeyContext = canOmitSetKeyContext((AbstractStreamOperator<?>) input, 0);} else {canOmitSetKeyContext =input instanceof KeyContextHandler&& !((KeyContextHandler) input).hasKeyContext();}if (canOmitSetKeyContext) {return input::processElement;} else if (input instanceof AsyncKeyOrderedProcessing&& ((AsyncKeyOrderedProcessing) input).isAsyncKeyOrderedProcessingEnabled()) {return ((AsyncKeyOrderedProcessing) input).getRecordProcessor(1);} else {return record -> {input.setKeyContextElement(record);input.processElement(record);};}}
AbstractStreamOperator setKey的實現
@Override@SuppressWarnings({"unchecked", "rawtypes"})public void setKeyContextElement1(StreamRecord record) throws Exception {setKeyContextElement(record, stateKeySelector1);}private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector)throws Exception {if (selector != null) {Object key = selector.getKey(record.getValue());setCurrentKey(key);}}
RuntimeContext創建
AbstractStreamOperator 會創建?runtime
this.runtimeContext =new StreamingRuntimeContext(environment,environment.getAccumulatorRegistry().getUserMap(),getMetricGroup(),getOperatorID(),getProcessingTimeService(),null,environment.getExternalResourceInfoProvider());
?AbstractUdfStreamOperator 會向udf注入runtime
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>extends AbstractStreamOperator<OUT>{@Overrideprotected void setup(StreamTask<?, ?> containingTask,StreamConfig config,Output<StreamRecord<OUT>> output) {super.setup(containingTask, config, output);FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());}//FunctionUtils
public static void setFunctionRuntimeContext(Function function, RuntimeContext context) {if (function instanceof RichFunction) {RichFunction richFunction = (RichFunction) function;richFunction.setRuntimeContext(context);}}
StreamingRuntimeContext?獲取狀態,這就是getRuntimeContext().getState()調用的。
@Overridepublic <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);stateProperties.initializeSerializerUnlessSet(this::createSerializer);return keyedStateStore.getState(stateProperties);}// 返回成員對象private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(StateDescriptor<?, ?> stateDescriptor) {checkNotNull(stateDescriptor, "The state properties must not be null");checkNotNull(keyedStateStore,String.format("Keyed state '%s' with type %s can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.",stateDescriptor.getName(), stateDescriptor.getType()));return keyedStateStore;}
注意這個 keyedStateStore 在StreamingRuntimeContext?剛new出來時 是null,在AbstractStreamOperator 的以下函數進行初始化
@Overridepublic final void initializeState(StreamTaskStateInitializer streamTaskStateManager)throws Exception {final TypeSerializer<?> keySerializer =config.getStateKeySerializer(getUserCodeClassloader());final StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask());final CloseableRegistry streamTaskCloseableRegistry =Preconditions.checkNotNull(containingTask.getCancelables());final StreamOperatorStateContext context =streamTaskStateManager.streamOperatorStateContext(getOperatorID(),getClass().getSimpleName(),getProcessingTimeService(),this,keySerializer,streamTaskCloseableRegistry,metrics,config.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.STATE_BACKEND,runtimeContext.getJobConfiguration(),runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),runtimeContext.getUserCodeClassLoader()),isUsingCustomRawKeyedState(),isAsyncKeyOrderedProcessingEnabled());stateHandler =new StreamOperatorStateHandler(context, getExecutionConfig(), streamTaskCloseableRegistry);timeServiceManager =isAsyncKeyOrderedProcessingEnabled()? context.asyncInternalTimerServiceManager(): context.internalTimerServiceManager();beforeInitializeStateHandler();stateHandler.initializeOperatorState(this);runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));}
StreamOperatorStateHandler 會根據有沒有keyedStateBackend 來判斷是不是要產生DefaultKeyedStateStore。
public StreamOperatorStateHandler(StreamOperatorStateContext context,ExecutionConfig executionConfig,CloseableRegistry closeableRegistry) {this.context = context;this.keySerializer = context.keySerializer();this.operatorStateBackend = context.operatorStateBackend();this.keyedStateBackend = context.keyedStateBackend();this.asyncKeyedStateBackend = context.asyncKeyedStateBackend();this.closeableRegistry = closeableRegistry;if (keyedStateBackend != null || asyncKeyedStateBackend != null) {keyedStateStore =new DefaultKeyedStateStore(keyedStateBackend,asyncKeyedStateBackend,new SerializerFactory() {@Overridepublic <T> TypeSerializer<T> createSerializer(TypeInformation<T> typeInformation) {return typeInformation.createSerializer(executionConfig.getSerializerConfig());}});} else {keyedStateStore = null;}}
getState方法如下,最終調用?keyedStateBackend相關方法。
@Overridepublic <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {requireNonNull(stateProperties, "The state properties must not be null");try {stateProperties.initializeSerializerUnlessSet(serializerFactory);return getPartitionedState(stateProperties);} catch (Exception e) {throw new RuntimeException("Error while getting state", e);}}protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor)throws Exception {checkState(keyedStateBackend != null&& supportKeyedStateApiSet == SupportKeyedStateApiSet.STATE_V1,"Current operator does not integrate the async processing logic, "+ "thus only supports state v1 APIs. Please use StateDescriptor under "+ "'org.apache.flink.runtime.state'.");return keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);}
那context中的keyedStateBackend是怎么注入的?AbstractStreamOperator初始化產生了StreamOperatorStateContext。
StreamOperatorStateContext context =streamTaskStateManager.streamOperatorStateContext(getOperatorID(),getClass().getSimpleName(),getProcessingTimeService(),this,keySerializer,streamTaskCloseableRegistry,metrics,config.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.STATE_BACKEND,runtimeContext.getJobConfiguration(),runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),runtimeContext.getUserCodeClassLoader()),isUsingCustomRawKeyedState(),isAsyncKeyOrderedProcessingEnabled());
這里創建StreamOperatorStateContext實際使用?StreamTaskStateInitializerImpl?,該對象包含了操作符執行所需的各種狀態后端和時間服務管理器。
主要初始化內容
1. 狀態后端初始化
-
Keyed State Backend(鍵控狀態后端):
- 根據?keySerializer?是否存在決定是否創建鍵控狀態后端
- 支持同步和異步兩種鍵控狀態后端
- 通過?
StateBackend.createKeyedStateBackend()
?或?StateBackend.createAsyncKeyedStateBackend()
?創建
-
Operator State Backend(操作符狀態后端):
- 創建?
DefaultOperatorStateBackend
?來管理操作符狀態 - 處理操作符級別的狀態恢復
- 創建?
2. 原始狀態輸入流初始化
-
Raw Keyed State Inputs(原始鍵控狀態輸入):
- 為自定義鍵控狀態提供輸入流
- 處理從檢查點或保存點恢復的原始鍵控狀態數據
-
Raw Operator State Inputs(原始操作符狀態輸入):
- 為自定義操作符狀態提供輸入流
- 處理從檢查點或保存點恢復的原始操作符狀態數據
3. 時間服務管理器初始化
- Internal Timer Service Manager(內部定時器服務管理器):
- 創建和管理內部定時器服務
- 支持同步和異步狀態后端的定時器管理
-
當 keyedStatedBackend != null 創建 timeServiceManager
初始化依據
1. 任務環境信息
- 通過?Environment?獲取任務的基本信息,包括:
- 任務信息(TaskInfo)
- 任務狀態管理器(
TaskStateManager
) - 作業ID和任務索引等
2. 操作符標識
- 根據?OperatorID?從?
TaskStateManager
?中獲取特定操作符的優先級狀態信息(PrioritizedOperatorSubtaskState) - 這包含了從檢查點或保存點恢復的狀態數據
3. 狀態恢復信息
- 從?PrioritizedOperatorSubtaskState?獲取各種狀態:
- 管理的鍵控狀態(getPrioritizedManagedKeyedState())
- 管理的操作符狀態(getPrioritizedManagedOperatorState())
- 原始鍵控狀態(getPrioritizedRawKeyedState())
- 原始操作符狀態(getPrioritizedRawOperatorState())
4. 配置參數
- managedMemoryFraction:管理內存的分配比例
- isUsingCustomRawKeyedState:是否使用自定義原始鍵控狀態
- isAsyncState:是否使用異步狀態后端
Timer怎么和key綁定?
Timer 詳細分析見:
揭秘Fliuk Timer機制:是否多線程觸發?
調用鏈:
-
用戶在
KeyedProcessFunction
中調用ctx.timerService().registerProcessingTimeTimer(...)
。 -
KeyedProcessOperator 將 context 注入?
KeyedProcessFunction
,KeyedProcessFunction
調用ctx.timerService()
實際轉發 KeyedProcessOperator 注入的 SimpleTimerService -
SimpleTimerService
將調用轉發給internalTimerService.registerProcessingTimeTimer(...)
。 -
InternalTimerService (內部使用一個支持刪除的索引堆,懶判斷到期后)
向StreamTask
的ProcessingTimeService
注冊一個回調。
KeyedProcessOperator 的 open方法 創建時間服務和Context。
public void open() throws Exception {super.open();collector = new TimestampedCollector<>(output);InternalTimerService<VoidNamespace> internalTimerService =getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);TimerService timerService = new SimpleTimerService(internalTimerService);context = new ContextImpl(userFunction, timerService);onTimerContext = new OnTimerContextImpl(userFunction, timerService);}
調用了 AbstractStreamOperator的方法 獲取時間服務
public <K, N> InternalTimerService<N> getInternalTimerService(String name, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {if (timeServiceManager == null) {throw new RuntimeException("The timer service has not been initialized.");}@SuppressWarnings("unchecked")InternalTimeServiceManager<K> keyedTimeServiceHandler =(InternalTimeServiceManager<K>) timeServiceManager;TypeSerializer<K> keySerializer = stateHandler.getKeySerializer();checkState(keySerializer != null, "Timers can only be used on keyed operators.");return keyedTimeServiceHandler.getInternalTimerService(name, keySerializer, namespaceSerializer, triggerable);}
Triggerable 接口有兩個方法:onEventTime(InternalTimer<K, N> timer) 和 onProcessingTime(InternalTimer<K, N> timer)。當 InternalTimerService 檢測到有定時器到期時,就會調用實現了這個接口的對象的相應方法。
這個方法根據? InternalTimeServiceManagerImpl 獲取 TimerService
public <N> InternalTimerService<N> getInternalTimerService(String name,TypeSerializer<K> keySerializer,TypeSerializer<N> namespaceSerializer,Triggerable<K, N> triggerable) {checkNotNull(keySerializer, "Timers can only be used on keyed operators.");// the following casting is to overcome type restrictions.TimerSerializer<K, N> timerSerializer =new TimerSerializer<>(keySerializer, namespaceSerializer);InternalTimerServiceImpl<K, N> timerService =registerOrGetTimerService(name, timerSerializer);timerService.startTimerService(timerSerializer.getKeySerializer(),timerSerializer.getNamespaceSerializer(),triggerable);return timerService;}
register中保證每個名字只有一個?TimerService
<N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) {InternalTimerServiceImpl<K, N> timerService =(InternalTimerServiceImpl<K, N>) timerServices.get(name);if (timerService == null) {if (priorityQueueSetFactory instanceof AsyncKeyedStateBackend) {timerService =new InternalTimerServiceAsyncImpl<>(taskIOMetricGroup,localKeyGroupRange,keyContext,processingTimeService,createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer),cancellationContext);} else {timerService =new InternalTimerServiceImpl<>(taskIOMetricGroup,localKeyGroupRange,keyContext,processingTimeService,createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer),cancellationContext);}timerServices.put(name, timerService);}return timerService;}
startTimerService
方法是?InternalTimerServiceImpl
?的初始化入口。它負責設置必要的序列化器、觸發目標(通常是算子自身),并且在從故障恢復時處理已保存的定時器。
與處理時間定時器的聯系點:
// ... existing code ...this.triggerTarget = Preconditions.checkNotNull(triggerTarget);// re-register the restored timers (if any)// 關鍵點:檢查處理時間定時器隊列 (processingTimeTimersQueue) 的頭部是否有定時器final InternalTimer<K, N> headTimer = processingTimeTimersQueue.peek();if (headTimer != null) {// 如果存在(通常意味著是從快照恢復的),// 則調用 processingTimeService.registerTimer 來重新注冊這個最早到期的處理時間定時器。// this::onProcessingTime 是回調方法,當定時器觸發時,會調用 InternalTimerServiceImpl 的 onProcessingTime 方法。nextTimer =processingTimeService.registerTimer(headTimer.getTimestamp(), this::onProcessingTime);}this.isInitialized = true;} else {
// ... existing code ...
- 恢復處理時間定時器:
- 在?
if (restoredTimersSnapshot != null)
?的邏輯塊之后(或者如果?restoredTimersSnapshot
?為?null
),代碼會檢查?processingTimeTimersQueue
。這個隊列存儲了當前算子實例負責的所有處理時間定時器。 - 如果?
processingTimeTimersQueue.peek()
?返回一個非?null
?的?headTimer
,這通常意味著在任務啟動時,狀態后端已經恢復了之前保存的定時器到這個隊列中。 - 此時,
InternalTimerServiceImpl
?需要告訴底層的?ProcessingTimeService
?(由?StreamTask
?提供,通常是基于 JVM 的?ScheduledExecutorService
):“嘿,我這里最早有一個處理時間定時器需要在?headTimer.getTimestamp()
?這個時間點觸發,到時請調用我的?onProcessingTime
?方法。” processingTimeService.registerTimer(headTimer.getTimestamp(), this::onProcessingTime)
?就是在執行這個注冊操作。this::onProcessingTime
?是一個方法引用,指向?InternalTimerServiceImpl
?自己的?onProcessingTime
?方法。當?ProcessingTimeService
?確定時間到達后,會通過 Mailbox 機制回調這個方法。nextTimer
?字段保存了?ProcessingTimeService
?返回的?ScheduledFuture<?>
,允許后續取消或管理這個已注冊的系統級定時器。
- 在?
所以,startTimerService
?在初始化階段確保了從狀態恢復的處理時間定時器能夠被正確地重新調度。
registerProcessingTimeTimer?
方法是用戶(通過?KeyedProcessFunction
?->?SimpleTimerService
)實際注冊一個新的處理時間定時器時調用的核心邏輯。
注意這里向Timer隊列添加的時候,Timer 包含?keyContext.getCurrentKey()
// ... existing code ...@Overridepublic void registerProcessingTimeTimer(N namespace, long time) {// 獲取當前處理時間定時器隊列中最早的定時器 (如果存在)InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();// 將新的定時器添加到處理時間定時器隊列中// TimerHeapInternalTimer 包含了時間戳、key 和 namespace// keyContext.getCurrentKey() 獲取當前正在處理的元素的 keyif (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {// 如果添加成功 (通常意味著隊列狀態改變了,比如新定時器成了新的頭部,或者隊列之前是空的)// 獲取之前隊列頭部的觸發時間,如果隊列之前為空,則認為是 Long.MAX_VALUElong nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;// 檢查新注冊的定時器是否比當前已調度的系統級定時器更早if (time < nextTriggerTime) {// 如果新定時器更早,說明需要重新調度if (nextTimer != null) {// 取消之前已注冊的系統級定時器 (nextTimer)// false 表示不中斷正在執行的任務 (如果回調已經在執行中)nextTimer.cancel(false);}// 使用 processingTimeService 注冊新的、更早的定時器// 當這個新的時間點到達時,會回調 this::onProcessingTimenextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);}}}
// ... existing code ...
- 添加定時器到內部隊列:
- 首先,
new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace)
?創建了一個新的處理時間定時器對象。 processingTimeTimersQueue.add(...)
?將這個新定時器添加到內部的優先隊列中。這個隊列會根據時間戳對定時器進行排序。
- 首先,
- 與?
ProcessingTimeService
?交互以優化調度:InternalTimerServiceImpl
?只會向底層的?ProcessingTimeService
?注冊一個系統級的定時器,即其內部隊列中最早到期的那個處理時間定時器。這樣做是為了避免向系統注冊過多的定時器回調,提高效率。InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
?獲取在添加新定時器之前隊列中最早的定時器。long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
?獲取之前需要觸發的時間。if (time < nextTriggerTime)
: 這個判斷至關重要。它檢查新注冊的定時器?time
?是否比當前已在?ProcessingTimeService
?中注冊的下一個觸發時間?nextTriggerTime
?更早。- 如果新定時器確實更早,那么之前向?
ProcessingTimeService
?注冊的那個?nextTimer
?就作廢了(因為它不再是最早的了)。 nextTimer.cancel(false);
?取消舊的系統級定時器。nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);
?然后向?ProcessingTimeService
?注冊這個新的、更早的定時器。
- 如果新定時器確實更早,那么之前向?
- 如果新注冊的定時器并不比當前已調度的?
nextTimer
?更早,那么就不需要做任何操作,因為當前的?nextTimer
?仍然是有效的,它會在其預定時間觸發,屆時?onProcessingTime
?方法會處理所有到期的定時器(包括這個新加入但不是最早的定時器)。
Timer觸發的時候怎么綁定key
KeyedProcessOperator 的 onProcessingTime 函數 調用觸發 udf 的?onTimer
@Overridepublic void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {collector.eraseTimestamp();invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);}private void invokeUserFunction(TimeDomain timeDomain, InternalTimer<K, VoidNamespace> timer)throws Exception {onTimerContext.timeDomain = timeDomain;onTimerContext.timer = timer;userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);onTimerContext.timeDomain = null;onTimerContext.timer = null;}
而這個函數通過?InternalTimerServiceImpl 調用,這里通過timer.getKey()設置了key。
public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N> {void onProcessingTime(long time) throws Exception {// null out the timer in case the Triggerable calls registerProcessingTimeTimer()// inside the callback.nextTimer = null;InternalTimer<K, N> timer;while ((timer = processingTimeTimersQueue.peek()) != null&& timer.getTimestamp() <= time&& !cancellationContext.isCancelled()) {keyContext.setCurrentKey(timer.getKey());processingTimeTimersQueue.poll();triggerTarget.onProcessingTime(timer);taskIOMetricGroup.getNumFiredTimers().inc();}if (timer != null && nextTimer == null) {nextTimer =processingTimeService.registerTimer(timer.getTimestamp(), this::onProcessingTime);}}