Flink KeyedProcessFunction為什么能為每個key定義State和Timer?

問題描述

一個常見的開窗邏輯(12H 或者 500條):

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import java.time.Duration;public class UIDWindowWithProcessFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 假設輸入數據流包含uid字段和其他數據DataStream<Event> inputStream = env.addSource(...); inputStream.keyBy(event -> event.uid)  // 按UID分組.process(new CustomProcessFunction()).print();env.execute("UID-based Window Processing");}public static class CustomProcessFunction extends KeyedProcessFunction<String, Event, OutputEvent> {// 狀態用于計數private transient ValueState<Integer> countState;// 狀態用于記錄最后更新時間private transient ValueState<Long> lastTimerState;@Overridepublic void open(Configuration parameters) {ValueStateDescriptor<Integer> countDescriptor = new ValueStateDescriptor<>("count", Types.INT);countState = getRuntimeContext().getState(countDescriptor);ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>("timerState", Types.LONG);lastTimerState = getRuntimeContext().getState(timerDescriptor);}@Overridepublic void processElement(Event event, Context ctx, Collector<OutputEvent> out) throws Exception {// 獲取當前計數Integer currentCount = countState.value();if (currentCount == null) {currentCount = 0;}// 更新計數currentCount += 1;countState.update(currentCount);// 獲取當前定時器時間戳Long currentTimer = lastTimerState.value();// 如果是第一條記錄,注冊12小時后的定時器if (currentCount == 1) {long timerTime = ctx.timestamp() + Duration.ofHours(12).toMillis();ctx.timerService().registerProcessingTimeTimer(timerTime);lastTimerState.update(timerTime);}// 如果達到500條,立即觸發并重置if (currentCount >= 500) {// 觸發處理OutputEvent output = new OutputEvent(ctx.getCurrentKey(), currentCount,System.currentTimeMillis());out.collect(output);// 清除狀態countState.clear();// 取消之前的定時器if (currentTimer != null) {ctx.timerService().deleteProcessingTimeTimer(currentTimer);}lastTimerState.clear();}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<OutputEvent> out) throws Exception {// 定時器觸發時處理Integer currentCount = countState.value();if (currentCount != null && currentCount > 0) {OutputEvent output = new OutputEvent(ctx.getCurrentKey(),currentCount,timestamp);out.collect(output);// 清除狀態countState.clear();lastTimerState.clear();}}}// 定義輸入輸出事件類public static class Event {public String uid;// 其他字段...}public static class OutputEvent {public String uid;public int count;public long timestamp;public OutputEvent(String uid, int count, long timestamp) {this.uid = uid;this.count = count;this.timestamp = timestamp;}}
}

雖然?通過uid進行shuffle,即 keyBy(event -> event.uid)。

但因為Flink的并行度,也就是subtask數量 遠少于 uid數量,導致每個subtask會處理多個用戶的數據。而實際上每個subtask只有一個?CustomProcessFunction。那狀態計數是否會沖突?

// 獲取當前計數 
Integer currentCount = countState.value();

觸發的Timer又是否是只屬于一個用戶?

@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<OutputEvent> out) throws Exception {// 定時器觸發時處理Integer currentCount = countState.value();if (currentCount != null && currentCount > 0) {

實際上這兩個問題的答案都是肯定的,實現機制在于:

  • getRuntimeContext().getState()怎么實現對于key綁定狀態
  • Timer怎么綁定key?

為什么getRuntimeContext().getState()能夠獲得和key綁定的state?

Subtask會根據是不是keyedProcessFunction 在處理每條數據時,設置currentKey

OneInputStreamTask 通過 StreamTaskNetworkOutput 處理每一條輸入數據。StreamTaskNetworkOutput則創建了recordProcessor 。

private StreamTaskNetworkOutput(Input<IN> operator, WatermarkGauge watermarkGauge, Counter numRecordsIn) {this.operator = checkNotNull(operator);this.watermarkGauge = checkNotNull(watermarkGauge);this.numRecordsIn = checkNotNull(numRecordsIn);this.recordProcessor = RecordProcessorUtils.getRecordProcessor(operator);}

RecordProcessorUtils.getRecordProcessor 根據是不是KeyStream會增加setKeyContextElement操作,這個process會設置Key再調用OP的 processElement。

    public static <T> ThrowingConsumer<StreamRecord<T>, Exception> getRecordProcessor(Input<T> input) {boolean canOmitSetKeyContext;if (input instanceof AbstractStreamOperator) {canOmitSetKeyContext = canOmitSetKeyContext((AbstractStreamOperator<?>) input, 0);} else {canOmitSetKeyContext =input instanceof KeyContextHandler&& !((KeyContextHandler) input).hasKeyContext();}if (canOmitSetKeyContext) {return input::processElement;} else if (input instanceof AsyncKeyOrderedProcessing&& ((AsyncKeyOrderedProcessing) input).isAsyncKeyOrderedProcessingEnabled()) {return ((AsyncKeyOrderedProcessing) input).getRecordProcessor(1);} else {return record -> {input.setKeyContextElement(record);input.processElement(record);};}}

AbstractStreamOperator setKey的實現

    @Override@SuppressWarnings({"unchecked", "rawtypes"})public void setKeyContextElement1(StreamRecord record) throws Exception {setKeyContextElement(record, stateKeySelector1);}private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector)throws Exception {if (selector != null) {Object key = selector.getKey(record.getValue());setCurrentKey(key);}}

RuntimeContext創建

AbstractStreamOperator 會創建?runtime

        this.runtimeContext =new StreamingRuntimeContext(environment,environment.getAccumulatorRegistry().getUserMap(),getMetricGroup(),getOperatorID(),getProcessingTimeService(),null,environment.getExternalResourceInfoProvider());

?AbstractUdfStreamOperator 會向udf注入runtime

      
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>extends AbstractStreamOperator<OUT>{@Overrideprotected void setup(StreamTask<?, ?> containingTask,StreamConfig config,Output<StreamRecord<OUT>> output) {super.setup(containingTask, config, output);FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());}//FunctionUtils
public static void setFunctionRuntimeContext(Function function, RuntimeContext context) {if (function instanceof RichFunction) {RichFunction richFunction = (RichFunction) function;richFunction.setRuntimeContext(context);}}

StreamingRuntimeContext?獲取狀態,這就是getRuntimeContext().getState()調用的。

    @Overridepublic <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);stateProperties.initializeSerializerUnlessSet(this::createSerializer);return keyedStateStore.getState(stateProperties);}// 返回成員對象private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(StateDescriptor<?, ?> stateDescriptor) {checkNotNull(stateDescriptor, "The state properties must not be null");checkNotNull(keyedStateStore,String.format("Keyed state '%s' with type %s can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.",stateDescriptor.getName(), stateDescriptor.getType()));return keyedStateStore;}

注意這個 keyedStateStore 在StreamingRuntimeContext?剛new出來時 是null,在AbstractStreamOperator 的以下函數進行初始化

    @Overridepublic final void initializeState(StreamTaskStateInitializer streamTaskStateManager)throws Exception {final TypeSerializer<?> keySerializer =config.getStateKeySerializer(getUserCodeClassloader());final StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask());final CloseableRegistry streamTaskCloseableRegistry =Preconditions.checkNotNull(containingTask.getCancelables());final StreamOperatorStateContext context =streamTaskStateManager.streamOperatorStateContext(getOperatorID(),getClass().getSimpleName(),getProcessingTimeService(),this,keySerializer,streamTaskCloseableRegistry,metrics,config.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.STATE_BACKEND,runtimeContext.getJobConfiguration(),runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),runtimeContext.getUserCodeClassLoader()),isUsingCustomRawKeyedState(),isAsyncKeyOrderedProcessingEnabled());stateHandler =new StreamOperatorStateHandler(context, getExecutionConfig(), streamTaskCloseableRegistry);timeServiceManager =isAsyncKeyOrderedProcessingEnabled()? context.asyncInternalTimerServiceManager(): context.internalTimerServiceManager();beforeInitializeStateHandler();stateHandler.initializeOperatorState(this);runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));}

StreamOperatorStateHandler 會根據有沒有keyedStateBackend 來判斷是不是要產生DefaultKeyedStateStore。

    public StreamOperatorStateHandler(StreamOperatorStateContext context,ExecutionConfig executionConfig,CloseableRegistry closeableRegistry) {this.context = context;this.keySerializer = context.keySerializer();this.operatorStateBackend = context.operatorStateBackend();this.keyedStateBackend = context.keyedStateBackend();this.asyncKeyedStateBackend = context.asyncKeyedStateBackend();this.closeableRegistry = closeableRegistry;if (keyedStateBackend != null || asyncKeyedStateBackend != null) {keyedStateStore =new DefaultKeyedStateStore(keyedStateBackend,asyncKeyedStateBackend,new SerializerFactory() {@Overridepublic <T> TypeSerializer<T> createSerializer(TypeInformation<T> typeInformation) {return typeInformation.createSerializer(executionConfig.getSerializerConfig());}});} else {keyedStateStore = null;}}

getState方法如下,最終調用?keyedStateBackend相關方法。

    @Overridepublic <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {requireNonNull(stateProperties, "The state properties must not be null");try {stateProperties.initializeSerializerUnlessSet(serializerFactory);return getPartitionedState(stateProperties);} catch (Exception e) {throw new RuntimeException("Error while getting state", e);}}protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor)throws Exception {checkState(keyedStateBackend != null&& supportKeyedStateApiSet == SupportKeyedStateApiSet.STATE_V1,"Current operator does not integrate the async processing logic, "+ "thus only supports state v1 APIs. Please use StateDescriptor under "+ "'org.apache.flink.runtime.state'.");return keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);}

那context中的keyedStateBackend是怎么注入的?AbstractStreamOperator初始化產生了StreamOperatorStateContext。

StreamOperatorStateContext context =streamTaskStateManager.streamOperatorStateContext(getOperatorID(),getClass().getSimpleName(),getProcessingTimeService(),this,keySerializer,streamTaskCloseableRegistry,metrics,config.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.STATE_BACKEND,runtimeContext.getJobConfiguration(),runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(),runtimeContext.getUserCodeClassLoader()),isUsingCustomRawKeyedState(),isAsyncKeyOrderedProcessingEnabled());

這里創建StreamOperatorStateContext實際使用?StreamTaskStateInitializerImpl?,該對象包含了操作符執行所需的各種狀態后端和時間服務管理器。

主要初始化內容

1. 狀態后端初始化

  • Keyed State Backend(鍵控狀態后端)

    • 根據?keySerializer?是否存在決定是否創建鍵控狀態后端
    • 支持同步和異步兩種鍵控狀態后端
    • 通過?StateBackend.createKeyedStateBackend()?或?StateBackend.createAsyncKeyedStateBackend()?創建
  • Operator State Backend(操作符狀態后端)

    • 創建?DefaultOperatorStateBackend?來管理操作符狀態
    • 處理操作符級別的狀態恢復

2. 原始狀態輸入流初始化

  • Raw Keyed State Inputs(原始鍵控狀態輸入)

    • 為自定義鍵控狀態提供輸入流
    • 處理從檢查點或保存點恢復的原始鍵控狀態數據
  • Raw Operator State Inputs(原始操作符狀態輸入)

    • 為自定義操作符狀態提供輸入流
    • 處理從檢查點或保存點恢復的原始操作符狀態數據

3. 時間服務管理器初始化

  • Internal Timer Service Manager(內部定時器服務管理器)
    • 創建和管理內部定時器服務
    • 支持同步和異步狀態后端的定時器管理
    • 當 keyedStatedBackend != null 創建 timeServiceManager

初始化依據

1. 任務環境信息

  • 通過?Environment?獲取任務的基本信息,包括:
    • 任務信息(TaskInfo)
    • 任務狀態管理器(TaskStateManager
    • 作業ID和任務索引等

2. 操作符標識

  • 根據?OperatorID?從?TaskStateManager?中獲取特定操作符的優先級狀態信息(PrioritizedOperatorSubtaskState)
  • 這包含了從檢查點或保存點恢復的狀態數據

3. 狀態恢復信息

  • 從?PrioritizedOperatorSubtaskState?獲取各種狀態:
    • 管理的鍵控狀態(getPrioritizedManagedKeyedState())
    • 管理的操作符狀態(getPrioritizedManagedOperatorState())
    • 原始鍵控狀態(getPrioritizedRawKeyedState())
    • 原始操作符狀態(getPrioritizedRawOperatorState())

4. 配置參數

  • managedMemoryFraction:管理內存的分配比例
  • isUsingCustomRawKeyedState:是否使用自定義原始鍵控狀態
  • isAsyncState:是否使用異步狀態后端

Timer怎么和key綁定?

Timer 詳細分析見:

揭秘Fliuk Timer機制:是否多線程觸發?

調用鏈:

  • 用戶在 KeyedProcessFunction 中調用 ctx.timerService().registerProcessingTimeTimer(...)

  • KeyedProcessOperator 將 context 注入?KeyedProcessFunctionKeyedProcessFunction 調用 ctx.timerService()實際轉發 KeyedProcessOperator 注入的 SimpleTimerService

  • SimpleTimerService 將調用轉發給 internalTimerService.registerProcessingTimeTimer(...)

  • InternalTimerService (內部使用一個支持刪除的索引堆,懶判斷到期后)StreamTaskProcessingTimeService 注冊一個回調。

KeyedProcessOperator 的 open方法 創建時間服務和Context。

    public void open() throws Exception {super.open();collector = new TimestampedCollector<>(output);InternalTimerService<VoidNamespace> internalTimerService =getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);TimerService timerService = new SimpleTimerService(internalTimerService);context = new ContextImpl(userFunction, timerService);onTimerContext = new OnTimerContextImpl(userFunction, timerService);}

調用了 AbstractStreamOperator的方法 獲取時間服務

    public <K, N> InternalTimerService<N> getInternalTimerService(String name, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {if (timeServiceManager == null) {throw new RuntimeException("The timer service has not been initialized.");}@SuppressWarnings("unchecked")InternalTimeServiceManager<K> keyedTimeServiceHandler =(InternalTimeServiceManager<K>) timeServiceManager;TypeSerializer<K> keySerializer = stateHandler.getKeySerializer();checkState(keySerializer != null, "Timers can only be used on keyed operators.");return keyedTimeServiceHandler.getInternalTimerService(name, keySerializer, namespaceSerializer, triggerable);}

Triggerable 接口有兩個方法:onEventTime(InternalTimer<K, N> timer) 和 onProcessingTime(InternalTimer<K, N> timer)。當 InternalTimerService 檢測到有定時器到期時,就會調用實現了這個接口的對象的相應方法。

這個方法根據? InternalTimeServiceManagerImpl 獲取 TimerService

    public <N> InternalTimerService<N> getInternalTimerService(String name,TypeSerializer<K> keySerializer,TypeSerializer<N> namespaceSerializer,Triggerable<K, N> triggerable) {checkNotNull(keySerializer, "Timers can only be used on keyed operators.");// the following casting is to overcome type restrictions.TimerSerializer<K, N> timerSerializer =new TimerSerializer<>(keySerializer, namespaceSerializer);InternalTimerServiceImpl<K, N> timerService =registerOrGetTimerService(name, timerSerializer);timerService.startTimerService(timerSerializer.getKeySerializer(),timerSerializer.getNamespaceSerializer(),triggerable);return timerService;}

register中保證每個名字只有一個?TimerService

<N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) {InternalTimerServiceImpl<K, N> timerService =(InternalTimerServiceImpl<K, N>) timerServices.get(name);if (timerService == null) {if (priorityQueueSetFactory instanceof AsyncKeyedStateBackend) {timerService =new InternalTimerServiceAsyncImpl<>(taskIOMetricGroup,localKeyGroupRange,keyContext,processingTimeService,createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer),cancellationContext);} else {timerService =new InternalTimerServiceImpl<>(taskIOMetricGroup,localKeyGroupRange,keyContext,processingTimeService,createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer),createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer),cancellationContext);}timerServices.put(name, timerService);}return timerService;}

startTimerService方法是?InternalTimerServiceImpl?的初始化入口。它負責設置必要的序列化器、觸發目標(通常是算子自身),并且在從故障恢復時處理已保存的定時器。

與處理時間定時器的聯系點:

// ... existing code ...this.triggerTarget = Preconditions.checkNotNull(triggerTarget);// re-register the restored timers (if any)// 關鍵點:檢查處理時間定時器隊列 (processingTimeTimersQueue) 的頭部是否有定時器final InternalTimer<K, N> headTimer = processingTimeTimersQueue.peek();if (headTimer != null) {// 如果存在(通常意味著是從快照恢復的),// 則調用 processingTimeService.registerTimer 來重新注冊這個最早到期的處理時間定時器。// this::onProcessingTime 是回調方法,當定時器觸發時,會調用 InternalTimerServiceImpl 的 onProcessingTime 方法。nextTimer =processingTimeService.registerTimer(headTimer.getTimestamp(), this::onProcessingTime);}this.isInitialized = true;} else {
// ... existing code ...
  • 恢復處理時間定時器:
    • 在?if (restoredTimersSnapshot != null)?的邏輯塊之后(或者如果?restoredTimersSnapshot?為?null),代碼會檢查?processingTimeTimersQueue。這個隊列存儲了當前算子實例負責的所有處理時間定時器。
    • 如果?processingTimeTimersQueue.peek()?返回一個非?null?的?headTimer,這通常意味著在任務啟動時,狀態后端已經恢復了之前保存的定時器到這個隊列中。
    • 此時,InternalTimerServiceImpl?需要告訴底層的?ProcessingTimeService?(由?StreamTask?提供,通常是基于 JVM 的?ScheduledExecutorService):“嘿,我這里最早有一個處理時間定時器需要在?headTimer.getTimestamp()?這個時間點觸發,到時請調用我的?onProcessingTime?方法。”
    • processingTimeService.registerTimer(headTimer.getTimestamp(), this::onProcessingTime)?就是在執行這個注冊操作。this::onProcessingTime?是一個方法引用,指向?InternalTimerServiceImpl?自己的?onProcessingTime?方法。當?ProcessingTimeService?確定時間到達后,會通過 Mailbox 機制回調這個方法。
    • nextTimer?字段保存了?ProcessingTimeService?返回的?ScheduledFuture<?>,允許后續取消或管理這個已注冊的系統級定時器。

所以,startTimerService?在初始化階段確保了從狀態恢復的處理時間定時器能夠被正確地重新調度。

registerProcessingTimeTimer?方法是用戶(通過?KeyedProcessFunction?->?SimpleTimerService)實際注冊一個新的處理時間定時器時調用的核心邏輯。

注意這里向Timer隊列添加的時候,Timer 包含?keyContext.getCurrentKey()

// ... existing code ...@Overridepublic void registerProcessingTimeTimer(N namespace, long time) {// 獲取當前處理時間定時器隊列中最早的定時器 (如果存在)InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();// 將新的定時器添加到處理時間定時器隊列中// TimerHeapInternalTimer 包含了時間戳、key 和 namespace// keyContext.getCurrentKey() 獲取當前正在處理的元素的 keyif (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {// 如果添加成功 (通常意味著隊列狀態改變了,比如新定時器成了新的頭部,或者隊列之前是空的)// 獲取之前隊列頭部的觸發時間,如果隊列之前為空,則認為是 Long.MAX_VALUElong nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;// 檢查新注冊的定時器是否比當前已調度的系統級定時器更早if (time < nextTriggerTime) {// 如果新定時器更早,說明需要重新調度if (nextTimer != null) {// 取消之前已注冊的系統級定時器 (nextTimer)// false 表示不中斷正在執行的任務 (如果回調已經在執行中)nextTimer.cancel(false);}// 使用 processingTimeService 注冊新的、更早的定時器// 當這個新的時間點到達時,會回調 this::onProcessingTimenextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);}}}
// ... existing code ...
  • 添加定時器到內部隊列:
    • 首先,new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace)?創建了一個新的處理時間定時器對象。
    • processingTimeTimersQueue.add(...)?將這個新定時器添加到內部的優先隊列中。這個隊列會根據時間戳對定時器進行排序。
  • 與?ProcessingTimeService?交互以優化調度:
    • InternalTimerServiceImpl?只會向底層的?ProcessingTimeService?注冊一個系統級的定時器,即其內部隊列中最早到期的那個處理時間定時器。這樣做是為了避免向系統注冊過多的定時器回調,提高效率。
    • InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();?獲取在添加新定時器之前隊列中最早的定時器。
    • long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;?獲取之前需要觸發的時間。
    • if (time < nextTriggerTime): 這個判斷至關重要。它檢查新注冊的定時器?time?是否比當前已在?ProcessingTimeService?中注冊的下一個觸發時間?nextTriggerTime?更早。
      • 如果新定時器確實更早,那么之前向?ProcessingTimeService?注冊的那個?nextTimer?就作廢了(因為它不再是最早的了)。
      • nextTimer.cancel(false);?取消舊的系統級定時器。
      • nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);?然后向?ProcessingTimeService?注冊這個新的、更早的定時器。
    • 如果新注冊的定時器并不比當前已調度的?nextTimer?更早,那么就不需要做任何操作,因為當前的?nextTimer?仍然是有效的,它會在其預定時間觸發,屆時?onProcessingTime?方法會處理所有到期的定時器(包括這個新加入但不是最早的定時器)。

Timer觸發的時候怎么綁定key

KeyedProcessOperator 的 onProcessingTime 函數 調用觸發 udf 的?onTimer

       @Overridepublic void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {collector.eraseTimestamp();invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);}private void invokeUserFunction(TimeDomain timeDomain, InternalTimer<K, VoidNamespace> timer)throws Exception {onTimerContext.timeDomain = timeDomain;onTimerContext.timer = timer;userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);onTimerContext.timeDomain = null;onTimerContext.timer = null;}

而這個函數通過?InternalTimerServiceImpl 調用,這里通過timer.getKey()設置了key

public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N> {void onProcessingTime(long time) throws Exception {// null out the timer in case the Triggerable calls registerProcessingTimeTimer()// inside the callback.nextTimer = null;InternalTimer<K, N> timer;while ((timer = processingTimeTimersQueue.peek()) != null&& timer.getTimestamp() <= time&& !cancellationContext.isCancelled()) {keyContext.setCurrentKey(timer.getKey());processingTimeTimersQueue.poll();triggerTarget.onProcessingTime(timer);taskIOMetricGroup.getNumFiredTimers().inc();}if (timer != null && nextTimer == null) {nextTimer =processingTimeService.registerTimer(timer.getTimestamp(), this::onProcessingTime);}}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/98681.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/98681.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/98681.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【C++】模版初階---函數模版、類模版

&#x1f31f;個人主頁&#xff1a;第七序章 &#x1f308;專欄系列&#xff1a;C&#xff0b;&#xff0b; 目錄 ??前言&#xff1a; &#x1f308;1.泛型編程&#xff1a; &#x1f308;2.函數模板 &#x1f36d;2.1函數模板概念 &#x1f36d;2.2函數模板格式 &am…

查找算法(Java)

目錄 一.定義 二.分類 三.線性查找 原理&#xff1a; 思路分析 代碼實現 例題實踐 1.兩數之和 方法一&#xff1a;暴力窮舉法 思路分析 代碼實現 方法二&#xff1a;創建哈希表 思路分析 代碼實現 2.移動零 思路分析 代碼實現 四.二分查找 原理&#xff1a; …

計算機網絡--四層模型,IP地址和MAC地址

四層模型&#xff1a;分別是應用層&#xff0c;傳輸層&#xff0c;網絡層和鏈路層。應用層&#xff1a;提供了應用程序之間相互通信的接口&#xff0c;允許用戶訪問網絡服務。這一層定義了應用程序如何與底層網絡進行交互。例如HTTP協議。傳輸層&#xff1a;它處理數據的分段、…

解析、創建Excel文件的開源庫OpenXLSX介紹

OpenXLSX是一個C庫&#xff0c;用于讀取、寫入、創建和修改.xlsx格式的Microsoft Excel文件&#xff0c;源碼地址&#xff1a;https://github.com/troldal/OpenXLSX &#xff0c;License為BSD-3-Clause&#xff0c;可在Windows、Linux、MaCOS平臺上使用。最新發布版本為v0.3.2&…

【C++】C++11 篇二

【C】C11 篇二前言移動構造函數移動賦值運算符重載類成員變量初始化 &#xff08;缺省值出自C11強制生成默認函數的關鍵字default:禁止生成默認函數的關鍵字delete:繼承和多態中的final與override關鍵字&#xff08;出自C11可變參數模板遞歸函數方式展開參數包逗號表達式展開參…

構建Python環境的幾種工具

本文主要介紹如何構建Python環境來處理不同的工作。 1.常用的構建Python環境的工具 ①venv(內置模塊):Python 3.3 內置標準庫模塊&#xff0c;無需額外安裝。 ②virtualenv:venv的前身&#xff0c;功能更強大且支持舊版Python。 ③conda:來自 Anaconda 或 Miniconda。不僅能…

c#項目編譯時外部依賴文件的同步問題

很多場景因為資源文件太多或太大無法放到資源里面或者是依賴的dll文件&#xff0c;需要編譯時同步到bin\debug或bin\release下的&#xff0c;這里面要修改工程文件代碼實現。 比如&#xff0c;我把這個項目依賴的dll和附加文件放到ref_dll文件夾里面&#xff0c;希望編譯的時候…

數學建模常用算法-模擬退火算法

一、模擬退火算法模擬退火的靈感來源于物理中的 “退火過程”—— 將金屬加熱到高溫后&#xff0c;緩慢冷卻&#xff0c;金屬原子會在熱能作用下自由運動&#xff0c;逐漸形成能量最低的穩定結構。算法將這一過程抽象為數學模型&#xff1a;“溫度 T”&#xff1a;對應物理中的…

架構很簡單:業務架構圖

緣起業務架構是一個復雜的體系&#xff0c;如何更簡單的表達&#xff0c;并能使用起來呢&#xff1f;所謂&#xff1a;大道至簡。基于此&#xff0c;這篇文章就開始了。業務是一切架構的開始&#xff0c;如果沒有業務&#xff0c;架構又有什么作用呢&#xff1f;所以做架構首先…

【前端埋點】純前端實現 A/B Test

“純前端實現 A/B Test”&#xff0c;意思就是 沒有后端分流、也不依賴流量網關&#xff0c;那么只能靠前端邏輯來做“流量切分”。 &#x1f3af; 目標 80% 的用戶 → A 頁面20% 的用戶 → B 頁面且要保證 同一個用戶每次訪問結果一致&#xff08;否則用戶刷新頁面時 A/B 會跳…

Day22_【機器學習—集成學習(3)—Boosting—Adaboost算法】

Adaptive Boosting(自適應提升)是基于 Boosting思想實現的一種集成學習算法&#xff0c;核心思想是通過逐步提高那些被前一步分類錯誤的樣本的權重來訓練一個強分類器。一、Adaboost算法直線相當于一個弱學習器&#xff0c;正確的數據權重減小&#xff0c;錯誤的數據權重增加二…

C#語言入門詳解(18)傳值、輸出、引用、數組、具名、可選參數、擴展方法

C#語言入門詳解&#xff08;18&#xff09;傳值、輸出、引用、數組、具名、可選參數、擴展方法一、傳值參數1. 值類型2. 引用類型&#xff0c;并且創建對象3. 引用類型&#xff0c;不創建對象二、引用參數1. 值類型2. 引用類型&#xff0c;創建新對象3. 引用類型&#xff0c;不…

模型部署:(二)安卓端部署Yolov5-v6.0目標檢測項目全流程記錄

模型部署&#xff1a;&#xff08;二&#xff09;安卓端部署Yolov5-v6.0目標檢測項目全流程記錄1、下載NCNN項目2、下載基于安卓的yolov5項目3、修改相應的配置文件4、將項目打包成apk包5、部署自己訓練的yolov5模型6、相關資料下載&#xff1a;1、下載NCNN項目 # 鏈接地址 ht…

圖形基礎算法:如何將點與帶曲線邊的多邊形位置關系算法做穩定

簡介判斷點與多邊形位置關系算法是幾何算法中最基礎的算法之一&#xff0c;包括布爾運算在內的非常非常多的算法都會用到它。它的穩定是算法庫穩定的關鍵。下面我們從一個邊都是直線的多邊形開始了解射線法的原理。然后看看引入曲線后會帶來哪些問題&#xff0c;以及在實際應用…

react的filber架構

好問題 &#x1f44d; 你問的是 React Fiber 架構。這是 React 16 引入的核心機制&#xff0c;用來解決 React 在大規模更新時的性能問題。下面我給你從 背景 → Fiber 是什么 → 原理 → 優點 → 流程 來系統講。一、為什么需要 Fiber&#xff1f;在 React 15 及以前&#xff…

Lucky STUN穿透結合群暉NAS實現docker下transmission監聽端口動態更新

參考文章 LCUKY系列教程 一 「LUCKY STUN穿透」使用 cURL 自動修改 Transmission 的監聽端口 二 「LUCKY STUN穿透」使用 Webhook 自動修改 qbittorrent 的監聽端口 三 LUCKY STUN穿透在Windows上使用UPnP工具為BT客戶端自動添加內外端口號不同的映射規則 四「LUCKY STUN穿透」…

如何在Ubuntu暢玩鳴潮等游戲

本教程只包括Steam上的游戲。# 更新軟件源 sudo apt update # 安裝Steam sudo apt install steam首先&#xff0c;在Ubuntu的snap商店安裝Steam&#xff0c;啟動&#xff0c;登陸&#xff0c;下載游戲。到這里的操作都比較簡單&#xff0c;對于沒有反作弊的游戲&#xff0c;往往…

機器學習09——聚類(聚類性能度量、K均值聚類、層次聚類)

上一章&#xff1a;機器學習08——集成學習 下一章&#xff1a;機器學習10——降維與度量學習 機器學習實戰項目&#xff1a;【從 0 到 1 落地】機器學習實操項目目錄&#xff1a;覆蓋入門到進階&#xff0c;大學生就業 / 競賽必備 文章目錄一、聚類任務&#xff08;無監督學習…

解決 Docker 構建中 Python 依賴沖突的完整指南

問題背景 在基于 registry.cn-shenzhen.aliyuncs.com/all_dev/dev:invoice-base 鏡像構建 Docker 容器時,我們遇到了一個常見的 Python 依賴管理問題: ERROR: ResolutionImpossible: for help visit https://pip.pypa.io/en/latest/topics/dependency-resolution/#dealing-…

光子計算芯片實戰:Lightmatter Passage互連架構性能評測

點擊 “AladdinEdu&#xff0c;同學們用得起的【H卡】算力平臺”&#xff0c;H卡級別算力&#xff0c;80G大顯存&#xff0c;按量計費&#xff0c;靈活彈性&#xff0c;頂級配置&#xff0c;學生更享專屬優惠。 摘要 隨著人工智能計算需求呈指數級增長&#xff0c;傳統電子計算…