Flink Task線程處理模型:Mailbox

Task的線程 和?MailboxProcessor?的綁定

executingThread?是?Task?類(StreamTask?的父類)在構造時創建的物理線程。MailboxProcessor?是?StreamTask?用來處理異步事件和驅動其主要處理邏輯(processInput)的核心組件。它們之間的綁定關系如下:

  • Task?作為?Runnable:

    • Task?類實現了?Runnable?接口,其?run()?方法是?executingThread?的入口點。
      // ...
      public class Task implements Runnable, TaskSlotPayload {// ...private final Thread executingThread; // 在構造函數中創建public Task(/*...*/) {// ...this.executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask); // this 指向 Task 實例}@Overridepublic void run() { // 這是 executingThread 執行的入口try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) {doRun(); // 調用實際的工作方法} finally {terminationFuture.complete(executionState);}}private void doRun() {// ...// 對于 StreamTask,這里會調用到 StreamTask 的 invoke() 方法invokable.invoke(); // invokable 就是 StreamTask 實例// ...}// ...
      }
      

StreamTask創建了TaskMailboxImpl,傳遞給MailboxProcessor,因此是MailboxProcessor的執行線程。

 protected StreamTask(Environment environment,@Nullable TimerService timerService,Thread.UncaughtExceptionHandler uncaughtExceptionHandler,StreamTaskActionExecutor actionExecutor)throws Exception {this(environment,timerService,uncaughtExceptionHandler,actionExecutor,new TaskMailboxImpl(Thread.currentThread()));}this.mailboxProcessor =new MailboxProcessor(this::processInput, mailbox, actionExecutor, mailboxMetricsControl);

StreamTask.invoke()?和?MailboxProcessor:

  • 當?executingThread?啟動并執行到?StreamTask.invoke()?時,StreamTask?會使用其內部的?MailboxProcessor?來驅動其核心事件循環。

    StreamTask.java

    // ...
    @Override
    public final void invoke() throws Exception {// ... (初始化,如 restoreInternal()) ...// let the task do its workgetEnvironment().getMetricGroup().getIOMetricGroup().markTaskStart();runMailboxLoop(); // <--- 關鍵調用// ... (清理工作,如 afterInvoke()) ...
    }public void runMailboxLoop() throws Exception {mailboxProcessor.runMailboxLoop(); // 將控制權交給 MailboxProcessor
    }
    // ...
    
  • mailboxProcessor.runMailboxLoop()?是一個阻塞調用(從?executingThread?的視角看)。這個方法會在?executingThread?上運行一個循環,不斷地從郵箱 (Mailbox) 中取出郵件 (Mail) 并執行它們,或者在沒有郵件時執行默認操作 (通常是?StreamTask.processInput(),用于處理輸入數據和調用算子)。

Mailbox 的線程模型:

  • MailboxProcessor?被設計為在其“擁有者”線程(即?executingThread)上執行其核心循環和郵件處理。
  • TaskMailbox?(被?MailboxProcessor?使用) 內部有檢查,確保其關鍵方法(如?take,?put?的某些變體,以及郵件的執行)是在預期的郵箱線程(即?executingThread)上調用的。

    MailboxProcessor.java

    public void runMailboxLoop() throws Exception {// ...final TaskMailbox localMailbox = mailbox;checkState(localMailbox.isMailboxThread(), // 確保當前線程是郵箱線程"Method must be executed by declared mailbox thread!");// ...while (isNextLoopPossible()) {processMail(localMailbox, false); // 處理郵件,在 executingThread 上執行if (isNextLoopPossible()) {mailboxDefaultAction.runDefaultAction(mailboxController); // 執行默認動作,在 executingThread 上執行}}
    }
    
  • MailboxDefaultAction?通常包裝了?StreamTask.processInput(),所以數據處理和算子調用也是在?executingThread?上發生的。
  • 其他線程(例如網絡線程接收到數據后,或者定時器線程觸發定時器)想要與?StreamTask?交互時,它們不會直接調用?StreamTask?的方法,而是向其?Mailbox?中放入一個“郵件”(一個?Runnable?或?Callable)。MailboxProcessor?會在?executingThread?上從郵箱中取出這個郵件并執行它。

總結線程與 Mailbox 的綁定:

  1. Task?構造時創建?executingThread,并將?Task?自身作為?Runnable?傳遞給該線程。
  2. executingThread?啟動后,執行?Task.run()?->?Task.doRun()?->?StreamTask.invoke()
  3. 在?StreamTask.invoke()?中,調用?mailboxProcessor.runMailboxLoop()
  4. mailboxProcessor.runMailboxLoop()?在?executingThread?上運行,它負責從郵箱中拉取任務并執行,或者執行默認的數據處理邏輯 (processInput)。
  5. 所有提交到該?StreamTask?郵箱的異步操作最終都會在?executingThread?上被?MailboxProcessor?串行化執行。

因此,executingThread?成為了?MailboxProcessor?的“工作線程”。MailboxProcessor?確保了?StreamTask?的核心邏輯(包括狀態訪問、算子調用等)都在這個單一的?executingThread?上順序執行,從而簡化了并發控制。

MailboxProcessor的功能

MailboxProcessor?是 Flink 中任務(Task)執行模型的核心組件,它實現了基于郵箱(Mailbox)的單線程執行模式。其主要能力包括:

管理郵箱 (TaskMailbox):

  • 持有一個?TaskMailbox?實例,用于存儲需要串行執行的各種動作(Mail)。這些動作可以是來自外部的請求(如 Checkpoint 觸發、Timer 回調)或內部控制命令。
// ... existing code ...
public class MailboxProcessor implements Closeable {// ... existing code .../*** The mailbox data-structure that manages request for special actions, like timers,* checkpoints, ...*/protected final TaskMailbox mailbox;
// ... existing code ...

執行默認動作 (MailboxDefaultAction):

  • 在郵箱為空時,會循環執行一個預定義的“默認動作”。在?StreamTask?的上下文中,這個默認動作通常是處理輸入數據(processInput)。
this.mailboxProcessor =new MailboxProcessor(this::processInput, mailbox, actionExecutor, mailboxMetricsControl);// ... existing code .../*** Action that is repeatedly executed if no action request is in the mailbox. Typically record* processing.*/protected final MailboxDefaultAction mailboxDefaultAction;
// ... existing code ...

單線程執行循環 (runMailboxLoop):

  • 核心方法?runMailboxLoop()?驅動整個執行邏輯。它會不斷檢查郵箱中是否有新的?Mail,如果有則執行它們;如果沒有,則執行默認動作。
  • 這種機制保證了默認動作(如數據處理)和郵箱中的其他動作(如 Checkpoint、Timer 事件)之間是單線程順序執行的,避免了并發沖突。
// ... existing code ...public void runMailboxLoop() throws Exception {suspended = !mailboxLoopRunning;final TaskMailbox localMailbox = mailbox;checkState(localMailbox.isMailboxThread(),"Method must be executed by declared mailbox thread!");assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";final MailboxController mailboxController = new MailboxController(this);while (isNextLoopPossible()) {// The blocking `processMail` call will not return until default action is available.processMail(localMailbox, false);if (isNextLoopPossible()) {mailboxDefaultAction.runDefaultAction(mailboxController); // lock is acquired inside default action as needed}}}
// ... existing code ...

提供郵箱執行器 (MailboxExecutor):

  • 通過?getMainMailboxExecutor()?和?getMailboxExecutor(int priority)?方法,向外部提供?MailboxExecutor。這使得其他組件(如 TimerService、CheckpointCoordinator)可以將它們的動作提交到郵箱中,由?MailboxProcessor?在其單線程循環中統一調度執行。
// ... existing code ...public MailboxExecutor getMainMailboxExecutor() {return new MailboxExecutorImpl(mailbox, MIN_PRIORITY, actionExecutor);}/*** Returns an executor service facade to submit actions to the mailbox.** @param priority the priority of the {@link MailboxExecutor}.*/public MailboxExecutor getMailboxExecutor(int priority) {return new MailboxExecutorImpl(mailbox, priority, actionExecutor, this);}
// ... existing code ...

生命周期管理:

  • 實現了?Closeable?接口,并有?prepareClose()?和?close()?方法,對應?TaskMailbox?的?quiesce()?和?close()。這確保了在任務結束時,郵箱能被正確關閉,并處理(如取消)剩余的?Mail
// ... existing code .../** Lifecycle method to close the mailbox for action submission. */public void prepareClose() {mailbox.quiesce();}/*** Lifecycle method to close the mailbox for action submission/retrieval. This will cancel all* instances of {@link java.util.concurrent.RunnableFuture} that are still contained in the* mailbox.*/@Overridepublic void close() {List<Mail> droppedMails = mailbox.close();
// ... existing code ...}

掛起與恢復:

  • MailboxProcessor?的執行循環可以被掛起 (suspend()) 和恢復(通過再次調用?runMailboxLoop()?或相關控制邏輯)。默認動作也可以通過?MailboxController?暫時掛起。
// ... existing code .../** Suspend the running of the loop which was started by {@link #runMailboxLoop()}}. */public void suspend() {sendPoisonMail(() -> suspended = true);}
// ... existing code ...

以及?MailboxController?中的?suspendDefaultAction()

異常處理:

  • reportThrowable(Throwable throwable)?方法允許將其他線程中發生的異常報告給郵箱線程,并在郵箱線程中重新拋出,從而中斷任務執行。
// ... existing code ...public void reportThrowable(Throwable throwable) {sendControlMail(() -> {if (throwable instanceof Exception) {throw (Exception) throwable;} else if (throwable instanceof Error) {throw (Error) throwable;} else {throw WrappingRuntimeException.wrapIfNecessary(throwable);}},"Report throwable %s",throwable);}
// ... existing code ...

度量指標控制 (MailboxMetricsController):

  • 包含一個?MailboxMetricsController?用于控制和訪問郵箱相關的度量指標,如郵箱延遲、處理的郵件數量等。
// ... existing code ...private final MailboxMetricsController mailboxMetricsControl;// ... existing code ...@VisibleForTestingpublic MailboxMetricsController getMailboxMetricsControl() {return this.mailboxMetricsControl;}
// ... existing code ...

MailboxProcessor 與 StreamTask 的互動

MailboxProcessor?為?StreamTask?提供了一個強大的、基于郵箱的單線程執行引擎。StreamTask?委托?MailboxProcessor?來驅動其核心的數據處理循環,并通過?MailboxExecutor?將所有需要與任務主線程同步的異步操作(如 Timer、Checkpoint 事件)統一提交到郵箱中進行調度。這種設計確保了任務內部操作的串行化,簡化了并發控制,并提高了系統的穩定性和可維護性。

StreamTask?是 Flink 流處理任務的基類,它使用?MailboxProcessor?來管理其核心執行邏輯。

  1. 創建和持有?MailboxProcessor:

    • StreamTask?在其構造函數中創建并持有一個?MailboxProcessor?實例。
    • MailboxDefaultAction?通常被設置為?StreamTask::processInput,這意味著當郵箱為空時,StreamTask?會執行其數據處理邏輯。
    • StreamTaskActionExecutor?也被傳遞給?MailboxProcessor
  2. 驅動執行循環:

    • StreamTask?的?invoke()?方法是任務的執行入口。在其核心邏輯中,它會調用?mailboxProcessor.runMailboxLoop()?來啟動郵箱處理循環。這個循環會一直運行,直到任務完成或被取消。
    • 代碼見?StreamTask.invoke():

      StreamTask.java

      // ... existing code ...
      public final void invoke() throws Exception {// ... initialization ...try {// ...// Run mailbox until all gates will be recovered.mailboxProcessor.runMailboxLoop(); // 啟動郵箱循環// ...} finally {// ... cleanup ...// let mailbox execution reject all new letters from this pointmailboxProcessor.prepareClose();// ...mailboxProcessor.close();}
      }
      // ... existing code ...
      
  3. 提交異步動作:

    • StreamTask?及其相關的組件(如?TimerServiceSubtaskCheckpointCoordinator)需要執行一些異步操作,例如觸發 Timer、執行 Checkpoint、響應外部事件等。這些操作需要確保在任務的主線程中執行,以避免并發問題。
    • StreamTask?通過從?mailboxProcessor?獲取的?MailboxExecutor?來提交這些異步操作。這些操作會被封裝成?Mail?放入郵箱,由?MailboxProcessor?在其循環中按順序執行。
    • 例如,ProcessingTimeService?的實現會使用?MailboxExecutor?來調度 Timer 的觸發:

      StreamOperatorFactoryUtil.java

      // ... existing code ...public static <OUT, OP extends StreamOperator<OUT>> OP createOperator(// ...MailboxExecutor mailboxExecutor = // Obtained via containingTask.getMailboxExecutorFactory()containingTask.getMailboxExecutorFactory().createExecutor(configuration.getChainIndex());// ...final ProcessingTimeService processingTimeService;if (operatorFactory instanceof ProcessingTimeServiceAware) {processingTimeService =((ProcessingTimeServiceAware) operatorFactory).createProcessingTimeService(mailboxExecutor);} else {processingTimeService = processingTimeServiceFactory.get();}
      // ... existing code ...
      
      ProcessingTimeServiceImpl?內部會使用這個?mailboxExecutor?來?execute?或?schedule?定時任務。
  4. 控制流程與狀態:

    • StreamTask?的?processInput?方法(作為?MailboxDefaultAction)可以通過?MailboxDefaultAction.Controller?與?MailboxProcessor?交互。例如,當輸入數據處理完畢或遇到反壓時,它可以調用?controller.suspendDefaultAction()?來暫時掛起默認動作的執行,讓?MailboxProcessor?優先處理郵箱中的其他?Mail
    • 代碼見?StreamTask.processInput():
      // ... existing code ...
      protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {DataInputStatus status = inputProcessor.processInput();switch (status) {// ...case END_OF_INPUT:// Suspend the mailbox processor, it would be resumed in afterInvoke and finished// after all records processed by the downstream tasks. We also suspend the default// actions to avoid repeat executing the empty default operation (namely process// records).controller.suspendDefaultAction(); // 通過 Controller 控制 MailboxProcessormailboxProcessor.suspend();return;}// ...
      }
      // ... existing code ...
      
  5. 生命周期同步:

    • StreamTask?在其生命周期的不同階段(如?cancelTask,?afterInvoke)會調用?MailboxProcessor?的相應方法(如?prepareClose,?close,?allActionsCompleted)來同步狀態和清理資源。
    • 例如,在任務正常結束或需要最終 Checkpoint 完成后,會調用?mailboxProcessor.allActionsCompleted():

      StreamTask.java

      // ... existing code ...FutureUtils.waitForAll(terminationConditions).thenRun(mailboxProcessor::allActionsCompleted);// Resumes the mailbox processor. The mailbox processor would be completed// after all records are processed by the downstream tasks.mailboxProcessor.runMailboxLoop();
      // ... existing code ...
      

TaskMailboxImpl?

雖然這個類的核心結構是“一個鎖(ReentrantLock)加一個隊列(Deque<Mail>)”,但它的實現中包含了一些針對 Flink Task 執行模型的特定優化和設計,使其不僅僅是一個簡單的線程安全隊列。

@ThreadSafe
public class TaskMailboxImpl implements TaskMailbox {/** Lock for all concurrent ops. */private final ReentrantLock lock = new ReentrantLock();/** Internal queue of mails. */@GuardedBy("lock")private final Deque<Mail> queue = new ArrayDeque<>();/** Condition that is triggered when the mailbox is no longer empty. */@GuardedBy("lock")private final Condition notEmpty = lock.newCondition();/** The state of the mailbox in the lifecycle of open, quiesced, and closed. */@GuardedBy("lock")private State state = OPEN;/** Reference to the thread that executes the mailbox mails. */@Nonnull private final Thread taskMailboxThread;/*** The current batch of mails. A new batch can be created with {@link #tryBuildBatch()} and* consumed with {@link #tryTakeFromBatch()}.*/private final Deque<Mail> batch = new ArrayDeque<>();/*** Performance optimization where hasNewMail == !queue.isEmpty(). Will not reflect the state of* {@link #batch}.*/private volatile boolean hasNewMail = false;/*** Performance optimization where there is new urgent mail. When there is no urgent mail in the* batch, it should be checked every time mail is taken, including taking mail from batch queue.*/private volatile boolean hasNewUrgentMail = false;public TaskMailboxImpl(@Nonnull final Thread taskMailboxThread) {this.taskMailboxThread = taskMailboxThread;}@VisibleForTestingpublic TaskMailboxImpl() {this(Thread.currentThread());}

核心結構:

  1. lock: ReentrantLock: 這是實現線程安全的核心。所有對內部隊列?queue?和狀態?state?的并發訪問都由此鎖保護。

  2. queue: Deque<Mail>:

    • 實際存儲待處理?Mail?對象的雙端隊列。Mail?對象封裝了需要執行的動作(通常是一個?Runnable)及其優先級。
    • 使用?ArrayDeque?作為底層實現。
  3. notEmpty: Condition:

    • 與?lock?關聯的條件變量。當郵箱從空變為非空時(即有新的?Mail?被放入),會通過?notEmpty.signal()?或?notEmpty.signalAll()?來喚醒可能正在等待獲取?Mail?的線程(主要是郵箱處理線程)。
    • 在?take()?方法中,如果隊列為空,線程會調用?notEmpty.await()?等待。
  4. state: State?(enum:?OPEN,?QUIESCED,?CLOSED):

    • 表示郵箱的生命周期狀態:
      • OPEN: 郵箱正常工作,可以接收和發送郵件。
      • QUIESCED: 郵箱處于靜默狀態,不再接受新的郵件(put?操作會失敗),但仍然可以取出已有的郵件。通常在任務準備關閉時進入此狀態。
      • CLOSED: 郵箱已關閉,不能進行任何操作。所有未處理的郵件會被清空。
    • 狀態轉換由?quiesce()?和?close()?方法控制,并且這些操作也受?lock?保護。
  5. taskMailboxThread: Thread:

    • 一個非常重要的字段,它存儲了被指定為“郵箱線程”的線程引用。
    • 很多操作(如?take,?tryTake,?hasMail,?createBatch,?tryTakeFromBatch,?quiesce,?close)都強制要求調用者必須是這個?taskMailboxThread,通過?checkIsMailboxThread()?進行檢查。這是因為 Flink 的 Task 執行模型是單線程的,MailboxProcessor?會在其專用的線程中處理郵箱中的郵件和默認動作。
  6. batch: Deque<Mail>:

    • 這是一個性能優化的設計。MailboxProcessor?在其主循環中,會先調用?createBatch()?將主隊列?queue?中的所有郵件一次性轉移到這個?batch?隊列中。然后,MailboxProcessor?會優先從?batch?中通過?tryTakeFromBatch()?獲取郵件進行處理。
    • 目的: 減少鎖的競爭。createBatch()?在持有鎖的情況下將一批郵件轉移出來,之后?MailboxProcessor?處理?batch?中的郵件時就不再需要頻繁獲取鎖去訪問主隊列?queue。這對于高吞吐量的場景非常重要。
    • batch?的操作也僅限于?taskMailboxThread
  7. hasNewMail: volatile boolean:

    • 這是另一個性能優化。它大致反映了主隊列?queue?是否為空 (!queue.isEmpty())。
    • volatile?關鍵字確保了不同線程對它的可見性。
    • 目的: 允許郵箱線程在不獲取鎖的情況下快速檢查是否有新郵件。例如,在?hasMail()?和?tryTake()?方法中,會先檢查?batch,然后檢查?hasNewMail,只有當?hasNewMail?為?true?時,才嘗試獲取鎖并檢查主隊列?queue
    • 當有新郵件通過?put()?或?putFirst()(從非郵箱線程調用時)添加到?queue?時,hasNewMail?會被設置為?true。當郵件從?queue?中被取出或通過?createBatch()?轉移到?batch?時,hasNewMail?會被更新。

特別需要注意的點:

  1. 單消費者(郵箱線程)設計:

    • 盡管?put()?和?putFirst()?方法允許從任何線程添加郵件(是線程安全的),但所有取郵件的操作(take,?tryTake,?createBatch,?tryTakeFromBatch)以及生命周期管理方法(quiesce,?close)都必須由?taskMailboxThread?調用。這是 Flink Mailbox 模型的核心設計,確保了任務邏輯的單線程執行。
  2. 批處理優化 (batch?隊列):

    • 理解?batch?隊列的作用對于分析性能至關重要。它不是一個獨立的郵箱,而是主隊列?queue?的一個臨時緩存,用于減少鎖爭用。MailboxProcessor?會周期性地將?queue?中的內容“批發”到?batch?中。
  3. hasNewMail?優化:?hasNewMail?變量提供了一種輕量級的檢查機制,避免了郵箱線程在主隊列可能為空時仍頻繁獲取鎖。

  4. 優先級處理 (takeOrNull?方法):

    • takeOrNull(Deque<Mail> queue, int priority)?方法實現了從隊列中根據優先級取出郵件的邏輯。它會遍歷隊列,找到第一個優先級大于或等于指定?priority?的郵件并返回。這意味著高優先級的郵件(如控制命令、Checkpoint barrier)可以被優先處理。
  5. putFirst()?的特殊行為:

    • putFirst(@Nonnull Mail mail)?方法很有意思:
      • 如果調用者是?taskMailboxThread,郵件會直接被添加到?batch?隊列的頭部。這是因為郵箱線程是當前批次郵件的消費者,將郵件直接放入批處理隊列的頭部可以使其被更快處理,而無需等待下一輪?createBatch
      • 如果調用者不是?taskMailboxThread,郵件會被添加到主隊列?queue?的頭部,并通過?notEmpty.signal()?喚醒郵箱線程。
  6. 生命周期管理 (state,?quiesce(),?close()):

    • 郵箱的生命周期狀態轉換是嚴格控制的,并且與任務的生命周期緊密相關。
    • quiesce(): 使郵箱不再接受新郵件,但允許處理完已有的郵件。
    • close(): 徹底關閉郵箱,清空所有郵件,并喚醒所有可能在等待的線程(通過?notEmpty.signalAll()),通常是為了讓它們感知到關閉狀態并退出。
  7. 鎖的粒度和使用:

    • ReentrantLock?用于保護對共享數據(queue,?state)的訪問。
    • Condition?(notEmpty) 用于實現生產者-消費者模式中的等待和通知機制。
    • lock.lockInterruptibly()?在?take()?方法中使用,允許等待的郵箱線程響應中斷。
  8. runExclusively(Runnable runnable):

    • 提供了一種機制,允許以獨占方式在郵箱的鎖保護下執行一段代碼。這對于需要原子地執行多個郵箱操作(例如,檢查狀態然后根據狀態放入郵件)的場景非常有用,可以避免競態條件。

總而言之,TaskMailboxImpl?雖然基于簡單的鎖和隊列,但通過引入批處理、hasNewMail?標志、嚴格的線程模型以及精細的生命周期管理,為 Flink 的?MailboxProcessor?提供了一個高效且功能完備的郵件調度機制。這些設計都是為了在保證單線程執行模型的前提下,最大化吞吐量并減少不必要的同步開銷。

Mail 類分析

Mail?類是 Apache Flink 流處理運行時任務郵箱機制中的一個核心組件。它代表一個可執行的任務單元,綁定到特定的操作符鏈中,可以被下游郵箱處理器選擇執行。

主要屬性

  • mailOptions: 郵件選項,用于配置郵件的行為,如是否可延遲執行。
  • runnable: 要執行的操作,是一個?ThrowingRunnable?類型的實例,可以拋出異常。
  • priority: 郵件的優先級。優先級并不直接決定執行順序,而是用于避免上下游操作符之間的活鎖或死鎖問題。
  • descriptionFormat?和?descriptionArgs: 用于調試和錯誤報告的郵件描述信息。
  • actionExecutor: 用于執行?runnable?的執行器。

Mail?類提供了三個構造函數,允許靈活地創建郵件對象:

  1. 最簡單的構造函數只需要?runnablepriority?和描述信息。
  2. 可以指定?MailboxExecutor.MailOptions?來配置郵件選項。
  3. 可以指定?StreamTaskActionExecutor?來控制操作的執行方式。

核心方法

  • getMailOptions(): 獲取郵件選項。
  • getPriority(): 獲取郵件的優先級。如果郵件是可延遲的,則返回最小優先級。
  • tryCancel(): 嘗試取消郵件的執行。
  • toString(): 返回郵件的描述信息。
  • run(): 執行郵件中的操作。

Mail?類在 Flink 的流處理任務中扮演著重要角色。它允許將任務分解為小的、可執行的單元,并通過郵箱機制進行調度和執行。這種設計有助于提高任務的并發性和響應性,同時避免復雜的同步問題。

在實際使用中,可以通過創建?Mail?對象來封裝需要執行的操作,并將其提交到郵箱中等待執行。通過設置不同的優先級和選項,可以控制操作的執行順序和行為。

MailboxExecutorImpl

MailboxExecutorImpl?實現了?flink.api.common.operators.MailboxExecutor?接口,它充當了向 Flink Task 的郵箱(TaskMailbox)提交執行單元(Runnable?或?Callable)的一個入口或門面。它的核心目標是允許其他組件將代碼片段(封裝為?Mail?對象)放入郵箱,這些代碼片段最終會由?MailboxProcessor?在其專用的單線程中執行。

核心成員變量:

  • mailbox: TaskMailbox: 這是實際存儲待執行郵件的郵箱實例。MailboxExecutorImpl?將通過它來提交新的郵件。

    MailboxExecutorImpl.java

    // ... existing code .../** The mailbox that manages the submitted runnable objects. */@Nonnull private final TaskMailbox mailbox;
    // ... existing code ...
    
  • priority: int: 與此?MailboxExecutorImpl?實例關聯的郵件的默認優先級。當通過這個執行器提交任務時,任務會帶上這個優先級。
    // ... existing code ...private final int priority;
    // ... existing code ...
    
  • actionExecutor: StreamTaskActionExecutor: 這是一個執行器,用于實際運行封裝在?Mail?對象中的命令。Mail?對象在被?MailboxProcessor?取出后,其?run()?方法會使用這個?actionExecutor?來執行具體的邏輯。
    // ... existing code ...private final StreamTaskActionExecutor actionExecutor;
    // ... existing code ...
    
  • mailboxProcessor: MailboxProcessor?(可能為?null): 指向驅動郵箱循環的?MailboxProcessor。主要用于?isIdle()?方法的判斷。
    // ... existing code ...private final MailboxProcessor mailboxProcessor;
    // ... existing code ...
    

構造函數:

  • 提供了兩個構造函數,主要的區別在于是否傳入?MailboxProcessor
    // ... existing code ...public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, StreamTaskActionExecutor actionExecutor) {this(mailbox, priority, actionExecutor, null);}public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox,int priority,StreamTaskActionExecutor actionExecutor,MailboxProcessor mailboxProcessor) {this.mailbox = mailbox;this.priority = priority;this.actionExecutor = Preconditions.checkNotNull(actionExecutor);this.mailboxProcessor = mailboxProcessor;}
    // ... existing code ...
    
    它們初始化了執行器的核心組件。

主要方法分析:

execute

// ... existing code ...@Overridepublic void execute(MailOptions mailOptions,final ThrowingRunnable<? extends Exception> command,final String descriptionFormat,final Object... descriptionArgs) {try {mailbox.put(new Mail(mailOptions,command,priority,actionExecutor,descriptionFormat,descriptionArgs));} catch (MailboxClosedException mbex) {throw new RejectedExecutionException(mbex);}}
// ... existing code ...
  • 這是向郵箱提交任務的核心方法。
  • 它接收一個?ThrowingRunnable?作為要執行的命令,以及?MailOptions(用于配置郵件行為,例如是否可延遲)、描述信息等。
  • 內部會創建一個新的?Mail?對象,該對象封裝了傳入的?command、此執行器實例的?priorityactionExecutor?以及描述信息。
  • 然后調用?mailbox.put(new Mail(...))?將這個新創建的?Mail?對象放入?TaskMailbox?中。
  • 如果郵箱已經關閉(MailboxClosedException),則會拋出?RejectedExecutionException,這是 Executor 服務在無法接受新任務時的標準行為。

yield?

此方法設計為由郵箱線程自身調用

  • 它嘗試從?mailbox?中獲取一個至少具有 此執行器?priority?的郵件 (mailbox.take(priority))。這是一個阻塞操作,如果當前沒有符合條件的郵件,它會等待。
  • 一旦獲取到?Mail?對象,它會立即在當前線程(即郵箱線程)中執行?mail.run()
  • 目的: 允許當前正在郵箱線程中執行的某個可能耗時較長的操作(例如用戶函數)主動暫停,讓郵箱中其他待處理的郵件(特別是具有相同或更高優先級的郵件,如 Checkpoint Barrier)有機會執行。這是一種協作式多任務處理機制,對于保證郵箱系統的響應性至關重要。
    @Overridepublic void yield() throws InterruptedException {Mail mail = mailbox.take(priority);try {mail.run();} catch (Exception ex) {throw WrappingRuntimeException.wrapIfNecessary(ex);}}

  1. tryYield():

    • 與?yield()?類似,但是一個非阻塞版本。
    • 它調用?mailbox.tryTake(priority)?嘗試獲取郵件。
    • 如果成功獲取到郵件,則執行它并返回?true
    • 如果沒有符合條件的郵件,則立即返回?false,不會阻塞。
    • 需要注意的是,根據?MailboxExecutor?接口的約定和?MailOptions.deferrable()?的設計,yield()?和?tryYield()?通常不會執行被標記為 "deferrable"(可延遲)的郵件。這是為了在需要快速讓出執行權(例如為了處理 Checkpoint)時,避免執行那些可以稍后處理的低優先級或非緊急任務。
  2. shouldInterrupt():

    • 此方法用于指示當前正在郵箱線程中執行的操作是否應該被中斷(例如,一個長時間運行的用戶函數)。
    • 目前的實現是簡單地檢查?mailbox.hasMail(),即只要郵箱中還有任何待處理的郵件,就建議中斷。
    • 代碼中的?TODO: FLINK-35051?注釋表明,這是一個待優化的點。理想情況下,只有當郵箱中有時間敏感的郵件(例如與 Checkpoint 相關的郵件)時,才應該建議中斷,以避免不必要的性能開銷。
  3. isIdle():

    // ... existing code ...public boolean isIdle() {return !mailboxProcessor.isDefaultActionAvailable()&& !mailbox.hasMail()&& mailbox.getState().isAcceptingMails();}
    // ... existing code ...
    
    • 檢查關聯的?MailboxProcessor?是否處于空閑狀態。
    • 判斷條件為:
      • MailboxProcessor?的默認操作(通常是?processInput)當前不可用(即被掛起)。
      • TaskMailbox?中沒有待處理的郵件。
      • TaskMailbox?的狀態仍然是接受郵件的狀態(即不是?QUIESCED?或?CLOSED)。
    • 這個方法需要?mailboxProcessor?成員不為?null

總結與作用

MailboxExecutorImpl?為 Flink 的異步操作和事件驅動模型提供了一個關鍵的接口。它使得系統中的不同部分(例如 Timer Service、Checkpoint Coordinator,甚至是算子自身)能夠安全地將需要在 Task 主執行線程(即郵箱線程)中執行的邏輯提交到郵箱隊列。

  • 封裝提交邏輯: 它將創建?Mail?對象并將其放入?TaskMailbox?的細節封裝起來,提供了一個更簡潔的?Executor?風格的 API。
  • 支持優先級: 允許為通過特定執行器實例提交的任務指定一個默認優先級。
  • 協作式調度 (yield/tryYield): 這是 Mailbox 模型單線程執行模式下實現并發感和響應性的核心機制。它允許長時間運行的任務主動讓出控制權,確保高優先級任務(如系統事件)能夠及時處理。
  • 中斷提示 (shouldInterrupt): 為長時間運行的用戶代碼提供了一個檢查點,以便在需要時(例如為了執行 Checkpoint)能夠優雅地中斷。

通過?MailboxExecutorImpl,Flink 能夠確保所有關鍵的 Task 級別操作(數據處理、狀態訪問、Checkpoint、Timer 回調等)都在同一個線程中有序執行,從而避免了復雜的并發控制問題,簡化了狀態管理和一致性保證。

MailboxProcessor?細節分析

MailboxProcessor?封裝了基于 Mailbox 的執行模型的完整邏輯。它的核心是一個事件循環?(runMailboxLoop),該循環持續執行兩個主要任務:

  1. 處理郵箱中的郵件 (Mail): 檢查?TaskMailbox?中是否有待處理的郵件(例如 Checkpoint 觸發、Timer 事件、用戶通過?MailboxExecutor?提交的自定義邏輯等),并按優先級順序執行它們。
  2. 執行默認動作 (MailboxDefaultAction): 如果郵箱中沒有郵件,或者郵件處理完畢后,它會執行一個“默認動作”。在?StreamTask?的上下文中,這個默認動作通常是?processInput(),即處理來自上游的數據。

這種設計確保了 Task 內部所有操作(數據處理、Checkpoint、Timer 等)的單線程執行,從而極大地簡化了并發控制和狀態管理。

主要結構組件:

  1. mailbox: TaskMailbox: 這是實際存儲和管理?Mail?對象的組件。MailboxProcessor?從它那里獲取郵件。

  2. mailboxDefaultAction: MailboxDefaultAction: 代表在郵箱空閑時重復執行的默認操作。它通過?MailboxDefaultAction.Controller?與?MailboxProcessor?交互,例如在沒有輸入數據時通知?MailboxProcessor?暫停調用默認動作。

  3. actionExecutor: StreamTaskActionExecutor: 用于實際執行?Mail?中封裝的?RunnableMail?對象本身不直接執行邏輯,而是委托給這個執行器。

  4. 控制標志 (Control Flags)?- 這些標志必須只能從郵箱線程訪問,以避免競態條件:

    • mailboxLoopRunning: boolean: 控制主事件循環是否應該繼續運行。當設置為?false?時,循環會在當前迭代完成后終止。
    • suspended: boolean: 控制郵箱處理器是否被臨時掛起。如果為?truerunMailboxLoop?會退出,但之后可以被重新調用以恢復。
    • suspendedDefaultAction: DefaultActionSuspension: 記錄當前默認動作是否被掛起。如果非?null,表示默認動作已掛起,MailboxProcessor?不會調用它。
    // ... existing code .../*** Control flag to terminate the mailbox processor. Once it was terminated could not be* restarted again. Must only be accessed from mailbox thread.*/private boolean mailboxLoopRunning;/*** Control flag to temporary suspend the mailbox loop/processor. After suspending the mailbox* processor can be still later resumed. Must only be accessed from mailbox thread.*/private boolean suspended;/*** Remembers a currently active suspension of the default action. Serves as flag to indicate a* suspended default action (suspended if not-null) and to reuse the object as return value in* consecutive suspend attempts. Must only be accessed from mailbox thread.*/private DefaultActionSuspension suspendedDefaultAction;
    // ... existing code ...
    
  5. mailboxMetricsControl: MailboxMetricsController: 用于管理和暴露與郵箱相關的度量指標。

MailboxProcessor?提供了多個構造函數,允許不同程度的定制。核心的構造函數接收?MailboxDefaultActionTaskMailboxStreamTaskActionExecutor?和?MailboxMetricsController

一個常見的用法是傳入一個?MailboxDefaultAction,然后?MailboxProcessor?會使用默認的?TaskMailboxImpl(與當前線程綁定)和?StreamTaskActionExecutor.IMMEDIATE

// ... existing code ...public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction,TaskMailbox mailbox,StreamTaskActionExecutor actionExecutor,MailboxMetricsController mailboxMetricsControl) {this.mailboxDefaultAction = Preconditions.checkNotNull(mailboxDefaultAction);this.actionExecutor = Preconditions.checkNotNull(actionExecutor);this.mailbox = Preconditions.checkNotNull(mailbox);this.mailboxLoopRunning = true;this.suspendedDefaultAction = null;this.mailboxMetricsControl = mailboxMetricsControl;}
// ... existing code ...

runMailboxLoop()

// ... existing code ...public void runMailboxLoop() throws Exception {suspended = !mailboxLoopRunning;final TaskMailbox localMailbox = mailbox;checkState(localMailbox.isMailboxThread(),"Method must be executed by declared mailbox thread!");assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";final MailboxController mailboxController = new MailboxController(this);while (isNextLoopPossible()) {// The blocking `processMail` call will not return until default action is available.processMail(localMailbox, false);if (isNextLoopPossible()) {mailboxDefaultAction.runDefaultAction(mailboxController); // lock is acquired inside default action as needed}}}private boolean isNextLoopPossible() {// 'Suspended' can be false only when 'mailboxLoopRunning' is true.return !suspended;}
// ... existing code ...
  • 這是?MailboxProcessor?的心臟。它在一個?while (isNextLoopPossible())?循環中運行。
  • 前置檢查: 確保該方法由指定的郵箱線程執行,并且郵箱處于?OPEN?狀態。
  • 創建?MailboxController:?MailboxController?是?MailboxDefaultAction?與?MailboxProcessor?交互的橋梁。
  • 循環體:
    • processMail(localMailbox, false): 調用此方法處理郵箱中的郵件。這是一個關鍵步驟,它會嘗試非阻塞地處理一批郵件。如果默認操作被掛起,它可能會阻塞地等待郵件或默認操作變為可用。false?表示不是單步執行。
    • if (isNextLoopPossible()) { mailboxDefaultAction.runDefaultAction(mailboxController); }: 如果循環仍然可以繼續(例如,沒有被掛起或關閉),并且默認動作是可用的,則執行默認動作。
  • 設計理念: 注釋中提到,runMailboxLoop?的設計目標是保持熱路徑(默認動作,郵箱中沒有郵件)盡可能快。因此,對控制標志(如?mailboxLoopRunning,?suspendedDefaultAction)的檢查通常與?mailbox.hasMail()?為?true?相關聯。這意味著,如果要在郵箱線程內部更改這些標志,必須確保郵箱中至少有一個郵件,以便更改能被及時感知。

processMail

// ... existing code ...private boolean processMail(TaskMailbox mailbox, boolean singleStep) throws Exception {// Doing this check is an optimization to only have a volatile read in the expected hot// path, locks are only// acquired after this point.boolean isBatchAvailable = mailbox.createBatch();// Take mails in a non-blockingly and execute them.boolean processed = isBatchAvailable && processMailsNonBlocking(singleStep);if (singleStep) {return processed;}// If the default action is currently not available, we can run a blocking mailbox execution// until the default action becomes available again.processed |= processMailsWhenDefaultActionUnavailable();return processed;}private boolean processMailsNonBlocking(boolean singleStep) throws Exception {long processedMails = 0;Optional<Mail> maybeMail;while (isNextLoopPossible() && (maybeMail = mailbox.tryTakeFromBatch()).isPresent()) {if (processedMails++ == 0) {maybePauseIdleTimer();}runMail(maybeMail.get());if (singleStep) {break;}}if (processedMails > 0) {maybeRestartIdleTimer();return true;} else {return false;}}
// ... existing code ...

其中?processMailsNonBlocking?和?processMailsWhenDefaultActionUnavailable?內部會調用?runMail(Mail mail)?來實際執行郵件:

// ... existing code ...private void runMail(Mail mail) throws Exception {mailboxMetricsControl.getMailCounter().inc();mail.run();
// ... existing code ...
  • 此方法負責處理郵箱中的郵件。
  • mailbox.createBatch(): 首先嘗試從主隊列創建一批郵件到?TaskMailbox?的內部批處理隊列。這是一個優化,減少鎖競爭。
  • processMailsNonBlocking(singleStep): 非阻塞地處理批處理隊列中的郵件。如果?singleStep?為?true,則只處理一個郵件(用于測試或調試)。
  • processMailsWhenDefaultActionUnavailable(): 如果默認動作當前不可用(例如,由于反壓或沒有輸入),此方法會嘗試從郵箱中獲取并處理郵件。它可能會阻塞地等待新郵件的到來,直到默認動作再次可用或循環終止。
  • 返回?true?如果至少處理了一封郵件。

suspend()

// ... existing code .../** Suspend the running of the loop which was started by {@link #runMailboxLoop()}}. */public void suspend() {sendPoisonMail(() -> suspended = true);}/** Send mail in first priority for internal needs. */private void sendPoisonMail(RunnableWithException mail) {mailbox.runExclusively(() -> {// keep state check and poison mail enqueuing atomic, such that no intermediate// #close may cause a// MailboxStateException in #sendPriorityMail.if (mailbox.getState() == TaskMailbox.State.OPEN) {sendControlMail(mail, "poison mail");}});public void runExclusively(Runnable runnable) {lock.lock();try {runnable.run();} finally {lock.unlock();}}
// ... existing code ...
  • 用于從外部(非郵箱線程)請求掛起郵箱循環。
  • 它通過?sendPoisonMail()?向郵箱頭部插入一個高優先級的“毒丸”郵件。當這個郵件被處理時,它會將?suspended?標志設置為?true,從而導致?runMailboxLoop?在下一次檢查?isNextLoopPossible()?時退出。
  • Poison Mail: 是一種特殊控制郵件,用于改變?MailboxProcessor?的內部狀態。

allActionsCompleted()

// ... existing code .../*** This method must be called to end the stream task when all actions for the tasks have been* performed.*/public void allActionsCompleted() {sendPoisonMail(() -> {mailboxLoopRunning = false;suspended = true;});}
// ... existing code ...
  • 當 Task 的所有動作都已完成,需要終止郵箱循環時調用此方法。
  • 與?suspend()?類似,它也通過?sendPoisonMail()?發送一個毒丸郵件。該郵件會將?mailboxLoopRunning?設置為?false?并將?suspended?設置為?true,從而徹底停止事件循環。

sendPoisonMail?和?sendControlMail(...):

// ... existing code .../** Send mail in first priority for internal needs. */private void sendPoisonMail(RunnableWithException mail) {mailbox.runExclusively(() -> {// keep state check and poison mail enqueuing atomic, such that no intermediate// #close may cause a// MailboxStateException in #sendPriorityMail.if (mailbox.getState() == TaskMailbox.State.OPEN) {sendControlMail(mail, "poison mail");}});}/*** Sends the given <code>mail</code> using {@link TaskMailbox#putFirst(Mail)} . Intended use is* to control this <code>MailboxProcessor</code>; no interaction with tasks should be performed;*/private void sendControlMail(RunnableWithException mail, String descriptionFormat, Object... descriptionArgs) {mailbox.putFirst(new Mail(mail,Integer.MAX_VALUE /*not used with putFirst*/,descriptionFormat,descriptionArgs));}
// ... existing code ...
  • sendPoisonMail: 確保在郵箱?OPEN?狀態下,通過?sendControlMail?發送一個控制郵件。它使用?mailbox.runExclusively?來原子地檢查狀態和入隊。
  • sendControlMail: 將一個具有最高優先級的?Mail?對象(通過?mailbox.putFirst())放入郵箱。這些郵件用于內部控制,如掛起、終止、報告錯誤等。

生命周期方法 (prepareClose(),?close()):

// ... existing code .../** Lifecycle method to close the mailbox for action submission. */public void prepareClose() {mailbox.quiesce();}/*** Lifecycle method to close the mailbox for action submission/retrieval. This will cancel all* instances of {@link java.util.concurrent.RunnableFuture} that are still contained in the* mailbox.*/@Overridepublic void close() {List<Mail> droppedMails = mailbox.close();
// ... existing code ...
  • prepareClose(): 調用?mailbox.quiesce()。這會使郵箱進入靜默狀態,不再接受新的郵件,但允許處理已有的郵件。這是關閉過程的第一步。
  • close(): 調用?mailbox.close()。這會徹底關閉郵箱,清空所有未處理的郵件,并嘗試取消仍在郵箱中的?RunnableFuture?實例。

與?MailboxDefaultAction?的交互 (通過?MailboxController):

// ... existing code ...
protected static final class MailboxController implements MailboxDefaultAction.Controller {private final MailboxProcessor mailboxProcessor;protected MailboxController(MailboxProcessor mailboxProcessor) {this.mailboxProcessor = mailboxProcessor;}@Overridepublic void allActionsCompleted() {mailboxProcessor.allActionsCompleted();}@Overridepublic MailboxDefaultAction.Suspension suspendDefaultAction(PeriodTimer suspensionPeriodTimer) {return mailboxProcessor.suspendDefaultAction(suspensionPeriodTimer);}
// ... existing code ...
}// ... existing code ...
private final class DefaultActionSuspension implements MailboxDefaultAction.Suspension {@Nullable private final PeriodTimer suspensionTimer;public DefaultActionSuspension(@Nullable PeriodTimer suspensionTimer) {this.suspensionTimer = suspensionTimer;}@Overridepublic void resume() {if (mailbox.isMailboxThread()) {resumeInternal();} else {try {sendControlMail(this::resumeInternal, "resume default action");} catch (MailboxClosedException ex) {// Ignored}}}private void resumeInternal() {// This method must be called from the mailbox thread.if (mailboxProcessor.suspendedDefaultAction == this) {mailboxProcessor.suspendedDefaultAction = null;if (suspensionTimer != null) {suspensionTimer.markEnd();}}}}
// ... existing code ...
  • MailboxController?是一個內部類,實現了?MailboxDefaultAction.Controller?接口。
  • MailboxDefaultAction?通過這個?Controller?來與?MailboxProcessor?通信。
  • suspendDefaultAction(): 當默認動作(如?processInput)發現當前沒有工作可做時(例如,沒有輸入數據或下游反壓),它會調用?controller.suspendDefaultAction()
  • MailboxProcessor.suspendDefaultAction(@Nullable PeriodTimer suspensionTimer):
    • 此方法(只能由郵箱線程調用)將?suspendedDefaultAction?設置為一個新的?DefaultActionSuspension?實例。
    • DefaultActionSuspension?實現了?MailboxDefaultAction.Suspension?接口,其?resume()?方法用于恢復默認動作的執行。resume()?可以從任何線程調用,如果不是郵箱線程,它會發送一個控制郵件來確保恢復邏輯在郵箱線程中執行。

獲取?MailboxExecutor:

// ... existing code ...public MailboxExecutor getMainMailboxExecutor() {return new MailboxExecutorImpl(mailbox, MIN_PRIORITY, actionExecutor);}/*** Returns an executor service facade to submit actions to the mailbox.** @param priority the priority of the {@link MailboxExecutor}.*/public MailboxExecutor getMailboxExecutor(int priority) {return new MailboxExecutorImpl(mailbox, priority, actionExecutor, this);}
// ... existing code ...
  • getMainMailboxExecutor(): 返回一個具有最低優先級的?MailboxExecutor
  • getMailboxExecutor(int priority): 返回一個指定優先級的?MailboxExecutor。這些?MailboxExecutor?實例允許其他組件向此?MailboxProcessor?的郵箱提交任務。

總結

MailboxProcessor?是 Flink Task 單線程執行模型的核心。它通過一個事件循環來協調處理高優先級的控制/事件郵件和低優先級的默認數據處理動作。這種機制確保了:

  • 單線程執行: 所有關鍵邏輯都在同一個線程中執行,避免了復雜的并發同步。
  • 響應性: 高優先級郵件(如 Checkpoint barriers)可以搶占默認動作,保證系統事件的及時處理。
  • 可控性: 提供了掛起、恢復、終止事件循環的機制。
  • 可擴展性: 通過?MailboxExecutor?允許外部組件向郵箱提交自定義任務。

processInput

processInput 方法是 StreamTask 執行其核心數據處理邏輯的地方。它是作為 MailboxProcessor默認動作 (MailboxDefaultAction) 來執行的。這意味著,當 MailboxProcessor 的郵箱中沒有更高優先級的“郵件”(如 Checkpoint 觸發、Timer 事件等)需要處理時,它就會循環調用這個 processInput 方法。

下面是對 processInput 方法的詳細分析:

  1. 方法職責與設計理念:

    1. 處理輸入事件: 其核心職責是從輸入源(由 inputProcessor 代表)獲取一個事件(通常是一條記錄或一組記錄),并將其傳遞給后續的算子鏈進行處理。

    2. 非阻塞性: 注釋中強調“Implementations should (in general) be non-blocking”。這是非常關鍵的一點。因為 MailboxProcessor 是單線程執行其郵箱中的郵件和默認動作的,如果 processInput 長時間阻塞,將會導致 Checkpoint barriers、Timer 等重要事件無法及時處理,影響任務的正確性和性能。

    3. 與 MailboxProcessor 協作: 通過 MailboxDefaultAction.Controller controller 參數,processInput 可以與 MailboxProcessor 進行交互,例如在沒有數據或遇到反壓時,通知 MailboxProcessor 暫停調用默認動作。

  2. 處理輸入 (inputProcessor.processInput()):

  • 方法首先調用 inputProcessor.processInput()InputProcessor 負責從上游讀取數據、反序列化,并將數據喂給當前 Task 的第一個 Operator。
  • processInput() 的返回值 DataInputStatus 描述了本次輸入處理的結果。

根據 DataInputStatus 進行分支處理:

DataInputStatus status = inputProcessor.processInput();switch (status) {case MORE_AVAILABLE:if (taskIsAvailable()) {return;}break;case NOTHING_AVAILABLE:break;case END_OF_RECOVERY:throw new IllegalStateException("We should not receive this event here.");case STOPPED:endData(StopMode.NO_DRAIN);return;case END_OF_DATA:endData(StopMode.DRAIN);notifyEndOfData();return;case END_OF_INPUT:// Suspend the mailbox processor, it would be resumed in afterInvoke and finished// after all records processed by the downstream tasks. We also suspend the default// actions to avoid repeat executing the empty default operation (namely process// records).controller.suspendDefaultAction();mailboxProcessor.suspend();return;}

MORE_AVAILABLE: 表示 inputProcessor 中還有更多數據可以立即處理。

  1. if (taskIsAvailable()) { return; }: 如果當前任務本身也是可用的(例如,下游沒有反壓),則直接返回。MailboxProcessor 會很快再次調用 processInput 來處理更多數據。

  2. NOTHING_AVAILABLE: 表示 inputProcessor 當前沒有可用的數據。此時,方法不會立即返回,而是會繼續檢查是否存在反壓等情況,可能需要暫停默認動作的執行。

  3. END_OF_RECOVERY: 這是一個不期望在此處出現的狀態,表示任務恢復邏輯可能存在問題,因此拋出 IllegalStateException

  4. STOPPED: 表示輸入流被強制停止(例如任務被取消,且不需要流干數據)。

    • endData(StopMode.NO_DRAIN): 通知算子鏈以非排空模式結束處理。

    • return;: 結束當前 processInput 調用。

  5. END_OF_DATA: 表示當前輸入流的所有數據都已到達(例如,有限流Source結束)。

    • endData(StopMode.DRAIN): 通知算子鏈以排空模式結束處理(處理完所有已緩沖的數據)。

    • notifyEndOfData(): 通知 TaskManager 當前任務的數據已結束。

    • return;: 結束當前 processInput 調用。

  6. END_OF_INPUT: 表示該 Task 的所有輸入都已經結束。這是一個更強的結束信號。

    • controller.suspendDefaultAction(): 通知 MailboxProcessor 暫停調用 processInput。因為已經沒有新的輸入了,再繼續調用也沒有意義。

    • mailboxProcessor.suspend(): 暫停整個 MailboxProcessor 的事件循環。任務此時會等待下游處理完所有數據,并完成最終的 Checkpoint 等操作。

    • return;: 結束當前 processInput 調用。

處理反壓和等待邏輯 (當 NOTHING_AVAILABLE 或其他需要等待的情況): 如果 inputProcessor.processInput() 返回 NOTHING_AVAILABLE,或者雖然有數據但任務本身不可用(例如下游反壓),代碼會進入等待邏輯:

// 如果前面沒有returnTaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup();PeriodTimer timer;CompletableFuture<?> resumeFuture;if (!recordWriter.isAvailable()) {timer = new GaugePeriodTimer(ioMetrics.getSoftBackPressuredTimePerSecond());resumeFuture = recordWriter.getAvailableFuture();} else if (!inputProcessor.isAvailable()) {timer = new GaugePeriodTimer(ioMetrics.getIdleTimeMsPerSecond());resumeFuture = inputProcessor.getAvailableFuture();} else if (changelogWriterAvailabilityProvider != null&& !changelogWriterAvailabilityProvider.isAvailable()) {// waiting for changelog availability is reported as busytimer = new GaugePeriodTimer(ioMetrics.getChangelogBusyTimeMsPerSecond());resumeFuture = changelogWriterAvailabilityProvider.getAvailableFuture();} else {// data availability has changed in the meantime; retry immediatelyreturn;}assertNoException(resumeFuture.thenRun(new ResumeWrapper(controller.suspendDefaultAction(timer), timer)));
  1. TaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup();: 獲取IO相關的度量指標組。

  2. PeriodTimer timer; CompletableFuture<?> resumeFuture;: 聲明計時器和用于恢復的 Future。

  3. 檢查輸出是否可用 (!recordWriter.isAvailable()):

    • 如果 recordWriter(負責將處理結果寫到下游)不可用,說明下游存在反壓。

    • timer = new GaugePeriodTimer(ioMetrics.getSoftBackPressuredTimePerSecond());: 啟動一個計時器,用于度量由于下游反壓導致的等待時間。

    • resumeFuture = recordWriter.getAvailableFuture();: 獲取一個 Future,當 recordWriter 再次可用時,該 Future 會完成。

  4. 檢查輸入處理器是否可用 (!inputProcessor.isAvailable()):

    • 如果 inputProcessor 本身不可用(例如,等待網絡緩沖區的到來)。

    • timer = new GaugePeriodTimer(ioMetrics.getIdleTimeMsPerSecond());: 啟動一個計時器,用于度量由于上游輸入不可用導致的空閑時間。

    • resumeFuture = inputProcessor.getAvailableFuture();: 獲取一個 Future,當 inputProcessor 再次可用時,該 Future 會完成。

  5. 檢查 Changelog Writer 是否可用:

    • 如果使用了 Changelog State Backend,并且其 changelogWriterAvailabilityProvider 表示不可用。

    • timer = new GaugePeriodTimer(ioMetrics.getChangelogBusyTimeMsPerSecond());: 啟動計時器,度量等待 Changelog Writer 的繁忙時間。

    • resumeFuture = changelogWriterAvailabilityProvider.getAvailableFuture();: 獲取 Future,等待其可用。

  6. 數據可用性已改變 (else { return; }):

    • 如果以上等待條件都不滿足,說明在 inputProcessor.processInput() 調用之后,數據的可用性可能已經發生了變化(例如,新的數據剛剛到達)。此時直接 return,讓 MailboxProcessor 立即重試 processInput

  7. 掛起默認動作并等待恢復:

    • controller.suspendDefaultAction(timer): 調用 controllersuspendDefaultAction 方法,并傳入之前啟動的 timer。這會通知 MailboxProcessor 暫時停止調用 processInputMailboxProcessor 會使用這個 timer 來記錄掛起的時間(用于監控和度量)。該方法返回一個 MailboxDefaultAction.Suspension 對象。

    • resumeFuture.thenRun(new ResumeWrapper(controller.suspendDefaultAction(timer), timer)): 當 resumeFuture 完成時(即等待的條件解除,例如下游不再反壓或上游數據到達),會執行 ResumeWrapper 中的邏輯。ResumeWrapper 會調用 Suspension 對象的 resume() 方法,這會通知 MailboxProcessor 可以重新開始調用 processInput 了。同時,timer 也會被停止。

    • assertNoException(...): 確保 thenRun 中的操作不會拋出未捕獲的異常。

Checkpoint

StreamTask?通過其內部的?MailboxProcessor?和相關的?MailboxExecutor?來發送和處理與 Checkpoint 相關的郵件(即需要在 Task 主線程中執行的 Checkpoint 操作)。

以下是?StreamTask?如何發送和處理與 Checkpoint 相關郵件的關鍵機制分析:

  1. MailboxProcessor?和?MailboxExecutor:

    • 每個?StreamTask?都有一個?MailboxProcessor?實例 (mailboxProcessor),它負責驅動 Task 的事件循環。
    • StreamTask?可以通過?mailboxProcessor.getMailboxExecutor(priority)?獲取一個?MailboxExecutor。這個?MailboxExecutor?提供了?execute(...)?方法,可以將一個?Runnable(封裝了 Checkpoint 相關邏輯)作為?Mail?提交到郵箱中。
    • 這些郵件會被?MailboxProcessor?在其主循環中按優先級取出并執行。
  2. SubtaskCheckpointCoordinator:

    • StreamTask?包含一個?SubtaskCheckpointCoordinator?實例 (subtaskCheckpointCoordinator)。這個協調器負責處理 Task 級別的 Checkpoint 邏輯,例如觸發操作符的快照、處理 Barrier 對齊、通知 Checkpoint 完成或中止等。
    • 很多 Checkpoint 相關的操作會首先由?SubtaskCheckpointCoordinator?發起或處理,然后它可能會通過?StreamTask?的?MailboxExecutor?將具體的執行步驟提交到郵箱。
  3. actionExecutor:

    • StreamTask?還有一個?StreamTaskActionExecutor?實例 (actionExecutor)。雖然?MailboxExecutor?用于將任務?放入?郵箱,但當?Mail?從郵箱中被取出后,其內部的?Runnable?通常會通過這個?actionExecutor?來實際執行。對于 Checkpoint 相關的操作,這確保了它們在正確的 Task 主線程上下文中運行。

發送 Checkpoint 相關郵件的典型場景和方法:

  • 觸發 Checkpoint (triggerCheckpointAsync):

    • 當 JobManager 向 TaskManager 發送觸發 Checkpoint 的 RPC 時,Task(通常是?StreamTask?的父類或其本身)會調用?triggerCheckpointAsync?方法。
    • 這個方法會將實際的 Checkpoint 執行邏輯封裝成一個?Runnable,并通過?mainMailboxExecutor(一個具有默認優先級的?MailboxExecutor)提交到郵箱。
    • 這樣做是為了確保 Checkpoint 的所有階段(例如調用操作符的?snapshotState)都在 Task 的主線程中執行,從而避免與正常的數據處理流程發生并發沖突。

    StreamTask.java

    // ... existing code ...
    @Override
    public CompletableFuture<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {checkForcedFullSnapshotSupport(checkpointOptions);CompletableFuture<Boolean> result = new CompletableFuture<>();mainMailboxExecutor.execute(() -> {try {// Lock the mailbox to ensure that the checkpoint is not concurrent with other// actionssynchronized (mailboxProcessor) {result.complete(triggerUnfinishedChannelsCheckpoint(checkpointMetaData, checkpointOptions));}} catch (Exception ex) {// Report the failure both via the Future result but also to the mailboxresult.completeExceptionally(ex);throw ex;}},"checkpoint %s with %s",checkpointMetaData,checkpointOptions);return result;
    }
    // ... existing code ...
    

    在上面的代碼片段中,mainMailboxExecutor.execute(...)?就是將 Checkpoint 觸發邏輯(triggerUnfinishedChannelsCheckpoint)作為郵件發送到郵箱的關鍵步驟。

  • 通知 Checkpoint 完成 (notifyCheckpointCompleteAsync):

    • 當 Task 完成一個 Checkpoint 并收到 JobManager 的確認后,會調用此方法。
    • 同樣,通知操作符 Checkpoint 完成的邏輯也會被封裝并通過?MailboxExecutor?提交到郵箱。

    StreamTask.java

    // ... existing code ...
    @Override
    public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {return notifyCheckpointOperation(() -> notifyCheckpointComplete(checkpointId),String.format("checkpoint %d completed", checkpointId));
    }
    // ... existing code ...
    

    而?notifyCheckpointOperation?內部會使用?MailboxExecutor

    StreamTask.java

    // ... existing code ...
    private Future<Void> notifyCheckpointOperation(RunnableWithException runnable, String description) {CompletableFuture<Void> result = new CompletableFuture<>();mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).execute(() -> {try {runnable.run();} catch (Exception ex) {result.completeExceptionally(ex);throw ex;}result.complete(null);},description);return result;
    }
    // ... existing code ...
    

    這里使用了?TaskMailbox.MAX_PRIORITY,表明這是一個高優先級的操作。

  • 通知 Checkpoint 中止 (notifyCheckpointAbortAsync):

    • 當一個 Checkpoint 因為各種原因(超時、錯誤、被新的 Checkpoint 取代)需要中止時,會調用此方法。
    • 中止邏輯,包括清理操作符可能產生的臨時狀態,也會通過郵件發送到郵箱執行。

    StreamTask.java

    // ... existing code ...
    @Override
    public Future<Void> notifyCheckpointAbortAsync(long checkpointId, long latestCompletedCheckpointId) {return notifyCheckpointOperation(() -> {if (latestCompletedCheckpointId > 0) {notifyCheckpointComplete(latestCompletedCheckpointId);}if (isCurrentSyncSavepoint(checkpointId)) {throw new FlinkRuntimeException("Stop-with-savepoint failed.");}subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, this::isRunning);},String.format("checkpoint %d aborted", checkpointId));
    }
    // ... existing code ...
    

    同樣,它也使用了?notifyCheckpointOperation?方法,將中止邏輯放入郵箱。

  • 處理 Barrier 對齊時的 Timer 回調:

    • 當使用 Barrier 對齊策略時,如果一個 InputGate 等待某個 Channel 的 Barrier 超時,SubtaskCheckpointCoordinator?會注冊一個 Timer。當這個 Timer 觸發時,其回調邏輯(例如取消 Checkpoint 或強制觸發 Checkpoint)也會被封裝成郵件并通過?MailboxExecutor?提交到郵箱執行。
    • 在?BarrierAlignmentUtil.createRegisterTimerCallback?中可以看到相關的邏輯,它會返回一個?BiConsumer<Long, Long>,這個 Consumer 內部會使用?mainMailboxExecutor?來執行超時處理。
  • Source Task 的特定行為:

    • 例如在?SourceOperatorStreamTask?中,notifyCheckpointAbortAsync?和?notifyCheckpointSubsumedAsync?方法會直接使用?mainMailboxExecutor?來執行清理 Checkpoint 的邏輯。

    SourceOperatorStreamTask.java

    // ... existing code ...
    @Override
    public Future<Void> notifyCheckpointAbortAsync(long checkpointId, long latestCompletedCheckpointId) {mainMailboxExecutor.execute(() -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);return super.notifyCheckpointAbortAsync(checkpointId, latestCompletedCheckpointId);
    }@Override
    public Future<Void> notifyCheckpointSubsumedAsync(long checkpointId) {mainMailboxExecutor.execute(() -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);return super.notifyCheckpointSubsumedAsync(checkpointId);
    // ... existing code ...
    

總結:

StreamTask?依賴其?MailboxProcessor?和通過它獲取的?MailboxExecutor?來確保所有與 Checkpoint 相關的關鍵操作(觸發、通知完成/中止、Barrier 處理等)都在 Task 的主事件循環線程中串行執行。這避免了復雜的并發控制,保證了 Checkpoint 過程與正常數據處理流程的一致性和正確性。當需要執行一個 Checkpoint 相關操作時,通常會將其封裝為一個?Runnable,然后通過?MailboxExecutor.execute()?方法將其作為一封郵件提交到郵箱隊列中,等待?MailboxProcessor?的調度執行。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/96209.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/96209.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/96209.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

OpenCV 銀行卡號識別

目錄 一、項目原理與核心技術 二、環境準備與工具包導入 1. 環境依賴 2. 工具包導入 三、自定義工具類 myutils.py 實現 四、主程序核心流程&#xff08;銀行卡識別.py&#xff09; 1. 命令行參數設置 2. 銀行卡類型映射 3. 輔助函數&#xff1a;圖像展示 五、步驟 1…

計算機二級Python

一.靜態語言和腳本語言高級語言根據計算機執行機制的不同分為兩類&#xff1a;靜態語言和腳本語言靜態語言的核心特征&#xff1a;變量的類型在編譯時&#xff08;寫代碼時&#xff09;就必須確定并固定下來&#xff0c;即在使用一個變量前必須顯式地聲明它地類型一旦聲明&…

Mybatis Log Plugin打印日志,會導致CPU升高卡死

原因 大量日志輸出:MyBatis Log Plugin 會打印大量的 SQL 日志,包括 SQL 語句及其參數。如果項目中 SQL 查詢頻繁且復雜,日志量會非常大,導致 CPU 使用率升高,甚至卡死。 日志級別設置不當:如果將日志級別設置為 DEBUG 或 TRACE,MyBatis 會輸出非常詳細的日志信息,這會…

鴻蒙:深色模式適配和淺色模式的切換

前言&#xff1a; 有些時候我們需要對應用進行深色模式的適配處理&#xff0c;并且在不需要的時候切換到淺色狀態&#xff0c;下面和大家一起照著官方文檔來學習。 下面是官方文檔的鏈接&#xff1a; https://developer.huawei.com/consumer/cn/doc/best-practices/bpta-dark-…

Coze源碼分析-資源庫-刪除插件-后端源碼-數據訪問和基礎設施層

5. 數據訪問層 5.1 倉儲接口定義 插件倉儲接口 文件位置&#xff1a;backend/domain/plugin/repository/plugin.go type PluginRepository interface {// DeleteDraftPlugin 刪除插件草稿DeleteDraftPlugin(ctx context.Context, pluginID int64) error// DeleteAPPAllPlugins …

案例一: 對基礎選擇器的使用【網頁盒子】

【1】樣例&#xff1a;首先&#xff0c;觀察到&#xff0c;幾個元素豎著排列的&#xff0c;所以使用塊級元素&#xff0c;而不是行內元素。【2】代碼演示<head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width,…

爬蟲項目優化:如何用 Redis 實現 “斷點續爬”?避免重復采集電商數據

在電商數據采集場景中&#xff0c;爬蟲常因網絡波動、服務器重啟、IP 封禁等問題中斷。若缺乏斷點續爬機制&#xff0c;重啟后需從頭開始&#xff0c;不僅浪費帶寬與時間&#xff0c;還可能因重復采集導致數據冗余。Redis 憑借其高性能、原子操作、多樣數據結構的特性&#xff…

決策樹概念與原理

決策樹簡介決策樹是一種樹形結構樹中每個內部節點表示一個特征上的判斷&#xff0c;每個分支代表一個判斷結果的輸出&#xff0c;每個葉子節點代表一種分類結果(僅舉例無其他意義或隱喻)就像一個女孩去相親&#xff0c;那么首先詢問是否大于30&#xff0c;大于則不見&#xff0…

SQL面試題及詳細答案150道(116-135) --- 高級查詢與函數篇

《前后端面試題》專欄集合了前后端各個知識模塊的面試題,包括html,javascript,css,vue,react,java,Openlayers,leaflet,cesium,mapboxGL,threejs,nodejs,mangoDB,MySQL,Linux… 。 前后端面試題-專欄總目錄 文章目錄 一、本文面試題目錄 116. 如何使用CASE語句實…

VeRL:強化學習與大模型訓練的高效融合框架

本文由「大千AI助手」原創發布&#xff0c;專注用真話講AI&#xff0c;回歸技術本質。拒絕神話或妖魔化。搜索「大千AI助手」關注我&#xff0c;一起撕掉過度包裝&#xff0c;學習真實的AI技術&#xff01; 1 概述&#xff1a;VeRL的起源與核心價值 VeRL&#xff08;Versatile…

2. 計算機系統基礎知識

1 計算機系統概述 計算機系統 (Computer System) 是指用于數據管理的計算機硬件、軟件及網絡組成的系統。 計算機系統可劃分為硬件(子系統)和軟件(子系統)兩部分。硬件由機械、電子元器件、磁介質和光介質等物理實體構成&#xff0c;例如處理器(含運算單元和控制單元)、存儲器、…

國產EtherCAT從站芯片FCE1353與N32G435 MCU功能板測試流程

上期推薦&#xff0c;我們在前期介紹了FCE1353與國民N32G435 MCU開發板的基本情況&#xff0c;本期主要介紹此開發板的測試流程&#xff0c;以便用戶拿到此板做功能驗證、兼容性測試、可靠性測試時更加便捷地提高開發驗證效率。01概述FCE1353_N32G435RBL7_GPIO_V1 開發板主要通…

向日葵亮點16功能解析:被控端“快速解鎖”

向日葵16重磅上線&#xff0c;本次更新新增了諸多實用功能&#xff0c;提升遠控效率&#xff0c;實現應用融合突破設備邊界&#xff0c;同時全面提升遠控性能&#xff0c;操作更順滑、畫質更清晰&#xff01;無論遠程辦公、設計、IT運維、開發還是游戲娛樂&#xff0c;向日葵16…

深度解析:IService 與 ServiceImpl 的區別

在使用 MyBatis-Plus 開發業務邏輯時&#xff0c;IService 和 ServiceImpl 是經常遇到的兩個核心類。很多初學者會疑惑&#xff1a; 為什么要定義 IService&#xff1f;ServiceImpl 又解決了什么問題&#xff1f;它們之間到底有什么區別與聯系&#xff1f; 本文將結合源碼與應用…

YOLO12 改進、魔改|通道自注意力卷積塊CSA-ConvBlock,通過動態建模特征圖通道間的依賴關系,優化通道權重分配,在強化有效特征、抑制冗余信息

在分割的研究中&#xff0c;傳統卷積神經網絡&#xff08;CNN&#xff09;存在兩大關鍵問題&#xff1a;一是池化操作雖能降低計算復雜度&#xff0c;卻會導致特征圖中有效空間信息丟失&#xff0c;尤其太陽暗條這類不規則、精細結構的特征易被削弱&#xff1b;二是傳統 CNN 對…

JuiceFS分布式文件系統

對象存儲雖然具備極高的擴展性和成本優勢&#xff0c;卻缺乏對POSIX語義的支持&#xff0c;導致許多應用無法直接使用。正是在這樣的背景下&#xff0c;JuiceFS 應運而生——它巧妙地融合了對象存儲的彈性與傳統文件系統的易用性&#xff0c;為現代應用提供了一種全新的存儲解決…

nginx配置前端請求轉發到指定的后端ip

nginx conf配置 配置把“前端靜態文件”和“后端接口”統一收在 同一個 server{} 塊 里&#xff0c;通過 兩條 location 做分流&#xff0c;其中 /api 這條 location 用到了一點“小技巧”把路徑裁掉后再轉發。下面按執行順序逐句拆解&#xff0c;告訴你“請求是怎么被轉發到 1…

HTML 各種標簽的使用說明書

HTML 各種標簽的使用說明書 1. HTML 簡介 HTML&#xff08;HyperText Markup Language&#xff0c;超文本標記語言&#xff09;是用于創建網頁的標準標記語言。它使用一系列標簽來描述網頁的結構和內容&#xff0c;這些標簽被瀏覽器解釋并渲染成用戶看到的網頁。HTML是構建We…

從關鍵詞到語義理解:小陌引擎如何重構AI搜索優化邏輯?

引言&#xff1a;AI搜索時代的范式轉變在傳統互聯網時代&#xff0c;SEO&#xff08;搜索引擎優化&#xff09;是企業數字營銷的核心策略&#xff0c;通過關鍵詞密度、外鏈建設等技術手段提升網頁在搜索引擎結果頁&#xff08;SERP&#xff09;中的排名。然而&#xff0c;隨著生…

ADE explorer遇到XVFB服務器的問題

遇到這個報錯&#xff0c;是因為服務器沒有安裝xvfb的原因。yum install Xvfb即可解決問題。