Task的線程 和?MailboxProcessor
?的綁定
executingThread
?是?Task
?類(StreamTask
?的父類)在構造時創建的物理線程。MailboxProcessor
?是?StreamTask
?用來處理異步事件和驅動其主要處理邏輯(processInput
)的核心組件。它們之間的綁定關系如下:
Task
?作為?Runnable
:Task
?類實現了?Runnable
?接口,其?run()
?方法是?executingThread
?的入口點。// ... public class Task implements Runnable, TaskSlotPayload {// ...private final Thread executingThread; // 在構造函數中創建public Task(/*...*/) {// ...this.executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask); // this 指向 Task 實例}@Overridepublic void run() { // 這是 executingThread 執行的入口try (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) {doRun(); // 調用實際的工作方法} finally {terminationFuture.complete(executionState);}}private void doRun() {// ...// 對于 StreamTask,這里會調用到 StreamTask 的 invoke() 方法invokable.invoke(); // invokable 就是 StreamTask 實例// ...}// ... }
StreamTask創建了TaskMailboxImpl,傳遞給MailboxProcessor,因此是MailboxProcessor的執行線程。
protected StreamTask(Environment environment,@Nullable TimerService timerService,Thread.UncaughtExceptionHandler uncaughtExceptionHandler,StreamTaskActionExecutor actionExecutor)throws Exception {this(environment,timerService,uncaughtExceptionHandler,actionExecutor,new TaskMailboxImpl(Thread.currentThread()));}this.mailboxProcessor =new MailboxProcessor(this::processInput, mailbox, actionExecutor, mailboxMetricsControl);
StreamTask.invoke()
?和?MailboxProcessor
:
- 當?
executingThread
?啟動并執行到?StreamTask.invoke()
?時,StreamTask
?會使用其內部的?MailboxProcessor
?來驅動其核心事件循環。StreamTask.java
// ... @Override public final void invoke() throws Exception {// ... (初始化,如 restoreInternal()) ...// let the task do its workgetEnvironment().getMetricGroup().getIOMetricGroup().markTaskStart();runMailboxLoop(); // <--- 關鍵調用// ... (清理工作,如 afterInvoke()) ... }public void runMailboxLoop() throws Exception {mailboxProcessor.runMailboxLoop(); // 將控制權交給 MailboxProcessor } // ...
mailboxProcessor.runMailboxLoop()
?是一個阻塞調用(從?executingThread
?的視角看)。這個方法會在?executingThread
?上運行一個循環,不斷地從郵箱 (Mailbox) 中取出郵件 (Mail) 并執行它們,或者在沒有郵件時執行默認操作 (通常是?StreamTask.processInput()
,用于處理輸入數據和調用算子)。
Mailbox 的線程模型:
MailboxProcessor
?被設計為在其“擁有者”線程(即?executingThread
)上執行其核心循環和郵件處理。TaskMailbox
?(被?MailboxProcessor
?使用) 內部有檢查,確保其關鍵方法(如?take
,?put
?的某些變體,以及郵件的執行)是在預期的郵箱線程(即?executingThread
)上調用的。MailboxProcessor.java
public void runMailboxLoop() throws Exception {// ...final TaskMailbox localMailbox = mailbox;checkState(localMailbox.isMailboxThread(), // 確保當前線程是郵箱線程"Method must be executed by declared mailbox thread!");// ...while (isNextLoopPossible()) {processMail(localMailbox, false); // 處理郵件,在 executingThread 上執行if (isNextLoopPossible()) {mailboxDefaultAction.runDefaultAction(mailboxController); // 執行默認動作,在 executingThread 上執行}} }
MailboxDefaultAction
?通常包裝了?StreamTask.processInput()
,所以數據處理和算子調用也是在?executingThread
?上發生的。- 其他線程(例如網絡線程接收到數據后,或者定時器線程觸發定時器)想要與?
StreamTask
?交互時,它們不會直接調用?StreamTask
?的方法,而是向其?Mailbox
?中放入一個“郵件”(一個?Runnable
?或?Callable
)。MailboxProcessor
?會在?executingThread
?上從郵箱中取出這個郵件并執行它。
總結線程與 Mailbox 的綁定:
Task
?構造時創建?executingThread
,并將?Task
?自身作為?Runnable
?傳遞給該線程。executingThread
?啟動后,執行?Task.run()
?->?Task.doRun()
?->?StreamTask.invoke()
。- 在?
StreamTask.invoke()
?中,調用?mailboxProcessor.runMailboxLoop()
。 mailboxProcessor.runMailboxLoop()
?在?executingThread
?上運行,它負責從郵箱中拉取任務并執行,或者執行默認的數據處理邏輯 (processInput
)。- 所有提交到該?
StreamTask
?郵箱的異步操作最終都會在?executingThread
?上被?MailboxProcessor
?串行化執行。
因此,executingThread
?成為了?MailboxProcessor
?的“工作線程”。MailboxProcessor
?確保了?StreamTask
?的核心邏輯(包括狀態訪問、算子調用等)都在這個單一的?executingThread
?上順序執行,從而簡化了并發控制。
MailboxProcessor的功能
MailboxProcessor
?是 Flink 中任務(Task)執行模型的核心組件,它實現了基于郵箱(Mailbox)的單線程執行模式。其主要能力包括:
管理郵箱 (TaskMailbox
):
- 持有一個?
TaskMailbox
?實例,用于存儲需要串行執行的各種動作(Mail
)。這些動作可以是來自外部的請求(如 Checkpoint 觸發、Timer 回調)或內部控制命令。
// ... existing code ...
public class MailboxProcessor implements Closeable {// ... existing code .../*** The mailbox data-structure that manages request for special actions, like timers,* checkpoints, ...*/protected final TaskMailbox mailbox;
// ... existing code ...
執行默認動作 (MailboxDefaultAction
):
- 在郵箱為空時,會循環執行一個預定義的“默認動作”。在?
StreamTask
?的上下文中,這個默認動作通常是處理輸入數據(processInput
)。
this.mailboxProcessor =new MailboxProcessor(this::processInput, mailbox, actionExecutor, mailboxMetricsControl);// ... existing code .../*** Action that is repeatedly executed if no action request is in the mailbox. Typically record* processing.*/protected final MailboxDefaultAction mailboxDefaultAction;
// ... existing code ...
單線程執行循環 (runMailboxLoop
):
- 核心方法?
runMailboxLoop()
?驅動整個執行邏輯。它會不斷檢查郵箱中是否有新的?Mail
,如果有則執行它們;如果沒有,則執行默認動作。 - 這種機制保證了默認動作(如數據處理)和郵箱中的其他動作(如 Checkpoint、Timer 事件)之間是單線程順序執行的,避免了并發沖突。
// ... existing code ...public void runMailboxLoop() throws Exception {suspended = !mailboxLoopRunning;final TaskMailbox localMailbox = mailbox;checkState(localMailbox.isMailboxThread(),"Method must be executed by declared mailbox thread!");assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";final MailboxController mailboxController = new MailboxController(this);while (isNextLoopPossible()) {// The blocking `processMail` call will not return until default action is available.processMail(localMailbox, false);if (isNextLoopPossible()) {mailboxDefaultAction.runDefaultAction(mailboxController); // lock is acquired inside default action as needed}}}
// ... existing code ...
提供郵箱執行器 (MailboxExecutor
):
- 通過?
getMainMailboxExecutor()
?和?getMailboxExecutor(int priority)
?方法,向外部提供?MailboxExecutor
。這使得其他組件(如 TimerService、CheckpointCoordinator)可以將它們的動作提交到郵箱中,由?MailboxProcessor
?在其單線程循環中統一調度執行。
// ... existing code ...public MailboxExecutor getMainMailboxExecutor() {return new MailboxExecutorImpl(mailbox, MIN_PRIORITY, actionExecutor);}/*** Returns an executor service facade to submit actions to the mailbox.** @param priority the priority of the {@link MailboxExecutor}.*/public MailboxExecutor getMailboxExecutor(int priority) {return new MailboxExecutorImpl(mailbox, priority, actionExecutor, this);}
// ... existing code ...
生命周期管理:
- 實現了?
Closeable
?接口,并有?prepareClose()
?和?close()
?方法,對應?TaskMailbox
?的?quiesce()
?和?close()
。這確保了在任務結束時,郵箱能被正確關閉,并處理(如取消)剩余的?Mail
。
// ... existing code .../** Lifecycle method to close the mailbox for action submission. */public void prepareClose() {mailbox.quiesce();}/*** Lifecycle method to close the mailbox for action submission/retrieval. This will cancel all* instances of {@link java.util.concurrent.RunnableFuture} that are still contained in the* mailbox.*/@Overridepublic void close() {List<Mail> droppedMails = mailbox.close();
// ... existing code ...}
掛起與恢復:
MailboxProcessor
?的執行循環可以被掛起 (suspend()
) 和恢復(通過再次調用?runMailboxLoop()
?或相關控制邏輯)。默認動作也可以通過?MailboxController
?暫時掛起。
// ... existing code .../** Suspend the running of the loop which was started by {@link #runMailboxLoop()}}. */public void suspend() {sendPoisonMail(() -> suspended = true);}
// ... existing code ...
以及?MailboxController
?中的?suspendDefaultAction()
。
異常處理:
reportThrowable(Throwable throwable)
?方法允許將其他線程中發生的異常報告給郵箱線程,并在郵箱線程中重新拋出,從而中斷任務執行。
// ... existing code ...public void reportThrowable(Throwable throwable) {sendControlMail(() -> {if (throwable instanceof Exception) {throw (Exception) throwable;} else if (throwable instanceof Error) {throw (Error) throwable;} else {throw WrappingRuntimeException.wrapIfNecessary(throwable);}},"Report throwable %s",throwable);}
// ... existing code ...
度量指標控制 (MailboxMetricsController
):
- 包含一個?
MailboxMetricsController
?用于控制和訪問郵箱相關的度量指標,如郵箱延遲、處理的郵件數量等。
// ... existing code ...private final MailboxMetricsController mailboxMetricsControl;// ... existing code ...@VisibleForTestingpublic MailboxMetricsController getMailboxMetricsControl() {return this.mailboxMetricsControl;}
// ... existing code ...
MailboxProcessor 與 StreamTask 的互動
MailboxProcessor
?為?StreamTask
?提供了一個強大的、基于郵箱的單線程執行引擎。StreamTask
?委托?MailboxProcessor
?來驅動其核心的數據處理循環,并通過?MailboxExecutor
?將所有需要與任務主線程同步的異步操作(如 Timer、Checkpoint 事件)統一提交到郵箱中進行調度。這種設計確保了任務內部操作的串行化,簡化了并發控制,并提高了系統的穩定性和可維護性。
StreamTask
?是 Flink 流處理任務的基類,它使用?MailboxProcessor
?來管理其核心執行邏輯。
創建和持有?
MailboxProcessor
:StreamTask
?在其構造函數中創建并持有一個?MailboxProcessor
?實例。MailboxDefaultAction
?通常被設置為?StreamTask::processInput
,這意味著當郵箱為空時,StreamTask
?會執行其數據處理邏輯。StreamTaskActionExecutor
?也被傳遞給?MailboxProcessor
。
驅動執行循環:
StreamTask
?的?invoke()
?方法是任務的執行入口。在其核心邏輯中,它會調用?mailboxProcessor.runMailboxLoop()
?來啟動郵箱處理循環。這個循環會一直運行,直到任務完成或被取消。- 代碼見?
StreamTask.invoke()
:StreamTask.java
// ... existing code ... public final void invoke() throws Exception {// ... initialization ...try {// ...// Run mailbox until all gates will be recovered.mailboxProcessor.runMailboxLoop(); // 啟動郵箱循環// ...} finally {// ... cleanup ...// let mailbox execution reject all new letters from this pointmailboxProcessor.prepareClose();// ...mailboxProcessor.close();} } // ... existing code ...
提交異步動作:
StreamTask
?及其相關的組件(如?TimerService
、SubtaskCheckpointCoordinator
)需要執行一些異步操作,例如觸發 Timer、執行 Checkpoint、響應外部事件等。這些操作需要確保在任務的主線程中執行,以避免并發問題。StreamTask
?通過從?mailboxProcessor
?獲取的?MailboxExecutor
?來提交這些異步操作。這些操作會被封裝成?Mail
?放入郵箱,由?MailboxProcessor
?在其循環中按順序執行。- 例如,
ProcessingTimeService
?的實現會使用?MailboxExecutor
?來調度 Timer 的觸發:StreamOperatorFactoryUtil.java
// ... existing code ...public static <OUT, OP extends StreamOperator<OUT>> OP createOperator(// ...MailboxExecutor mailboxExecutor = // Obtained via containingTask.getMailboxExecutorFactory()containingTask.getMailboxExecutorFactory().createExecutor(configuration.getChainIndex());// ...final ProcessingTimeService processingTimeService;if (operatorFactory instanceof ProcessingTimeServiceAware) {processingTimeService =((ProcessingTimeServiceAware) operatorFactory).createProcessingTimeService(mailboxExecutor);} else {processingTimeService = processingTimeServiceFactory.get();} // ... existing code ...
ProcessingTimeServiceImpl
?內部會使用這個?mailboxExecutor
?來?execute
?或?schedule
?定時任務。
控制流程與狀態:
StreamTask
?的?processInput
?方法(作為?MailboxDefaultAction
)可以通過?MailboxDefaultAction.Controller
?與?MailboxProcessor
?交互。例如,當輸入數據處理完畢或遇到反壓時,它可以調用?controller.suspendDefaultAction()
?來暫時掛起默認動作的執行,讓?MailboxProcessor
?優先處理郵箱中的其他?Mail
。- 代碼見?
StreamTask.processInput()
:// ... existing code ... protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {DataInputStatus status = inputProcessor.processInput();switch (status) {// ...case END_OF_INPUT:// Suspend the mailbox processor, it would be resumed in afterInvoke and finished// after all records processed by the downstream tasks. We also suspend the default// actions to avoid repeat executing the empty default operation (namely process// records).controller.suspendDefaultAction(); // 通過 Controller 控制 MailboxProcessormailboxProcessor.suspend();return;}// ... } // ... existing code ...
生命周期同步:
StreamTask
?在其生命周期的不同階段(如?cancelTask
,?afterInvoke
)會調用?MailboxProcessor
?的相應方法(如?prepareClose
,?close
,?allActionsCompleted
)來同步狀態和清理資源。- 例如,在任務正常結束或需要最終 Checkpoint 完成后,會調用?
mailboxProcessor.allActionsCompleted()
:StreamTask.java
// ... existing code ...FutureUtils.waitForAll(terminationConditions).thenRun(mailboxProcessor::allActionsCompleted);// Resumes the mailbox processor. The mailbox processor would be completed// after all records are processed by the downstream tasks.mailboxProcessor.runMailboxLoop(); // ... existing code ...
TaskMailboxImpl
?
雖然這個類的核心結構是“一個鎖(ReentrantLock
)加一個隊列(Deque<Mail>
)”,但它的實現中包含了一些針對 Flink Task 執行模型的特定優化和設計,使其不僅僅是一個簡單的線程安全隊列。
@ThreadSafe
public class TaskMailboxImpl implements TaskMailbox {/** Lock for all concurrent ops. */private final ReentrantLock lock = new ReentrantLock();/** Internal queue of mails. */@GuardedBy("lock")private final Deque<Mail> queue = new ArrayDeque<>();/** Condition that is triggered when the mailbox is no longer empty. */@GuardedBy("lock")private final Condition notEmpty = lock.newCondition();/** The state of the mailbox in the lifecycle of open, quiesced, and closed. */@GuardedBy("lock")private State state = OPEN;/** Reference to the thread that executes the mailbox mails. */@Nonnull private final Thread taskMailboxThread;/*** The current batch of mails. A new batch can be created with {@link #tryBuildBatch()} and* consumed with {@link #tryTakeFromBatch()}.*/private final Deque<Mail> batch = new ArrayDeque<>();/*** Performance optimization where hasNewMail == !queue.isEmpty(). Will not reflect the state of* {@link #batch}.*/private volatile boolean hasNewMail = false;/*** Performance optimization where there is new urgent mail. When there is no urgent mail in the* batch, it should be checked every time mail is taken, including taking mail from batch queue.*/private volatile boolean hasNewUrgentMail = false;public TaskMailboxImpl(@Nonnull final Thread taskMailboxThread) {this.taskMailboxThread = taskMailboxThread;}@VisibleForTestingpublic TaskMailboxImpl() {this(Thread.currentThread());}
核心結構:
lock: ReentrantLock
: 這是實現線程安全的核心。所有對內部隊列?queue
?和狀態?state
?的并發訪問都由此鎖保護。queue: Deque<Mail>
:- 實際存儲待處理?
Mail
?對象的雙端隊列。Mail
?對象封裝了需要執行的動作(通常是一個?Runnable
)及其優先級。 - 使用?
ArrayDeque
?作為底層實現。
- 實際存儲待處理?
notEmpty: Condition
:- 與?
lock
?關聯的條件變量。當郵箱從空變為非空時(即有新的?Mail
?被放入),會通過?notEmpty.signal()
?或?notEmpty.signalAll()
?來喚醒可能正在等待獲取?Mail
?的線程(主要是郵箱處理線程)。 - 在?
take()
?方法中,如果隊列為空,線程會調用?notEmpty.await()
?等待。
- 與?
state: State
?(enum:?OPEN
,?QUIESCED
,?CLOSED
):- 表示郵箱的生命周期狀態:
OPEN
: 郵箱正常工作,可以接收和發送郵件。QUIESCED
: 郵箱處于靜默狀態,不再接受新的郵件(put
?操作會失敗),但仍然可以取出已有的郵件。通常在任務準備關閉時進入此狀態。CLOSED
: 郵箱已關閉,不能進行任何操作。所有未處理的郵件會被清空。
- 狀態轉換由?
quiesce()
?和?close()
?方法控制,并且這些操作也受?lock
?保護。
- 表示郵箱的生命周期狀態:
taskMailboxThread: Thread
:- 一個非常重要的字段,它存儲了被指定為“郵箱線程”的線程引用。
- 很多操作(如?
take
,?tryTake
,?hasMail
,?createBatch
,?tryTakeFromBatch
,?quiesce
,?close
)都強制要求調用者必須是這個?taskMailboxThread
,通過?checkIsMailboxThread()
?進行檢查。這是因為 Flink 的 Task 執行模型是單線程的,MailboxProcessor
?會在其專用的線程中處理郵箱中的郵件和默認動作。
batch: Deque<Mail>
:- 這是一個性能優化的設計。
MailboxProcessor
?在其主循環中,會先調用?createBatch()
?將主隊列?queue
?中的所有郵件一次性轉移到這個?batch
?隊列中。然后,MailboxProcessor
?會優先從?batch
?中通過?tryTakeFromBatch()
?獲取郵件進行處理。 - 目的: 減少鎖的競爭。
createBatch()
?在持有鎖的情況下將一批郵件轉移出來,之后?MailboxProcessor
?處理?batch
?中的郵件時就不再需要頻繁獲取鎖去訪問主隊列?queue
。這對于高吞吐量的場景非常重要。 batch
?的操作也僅限于?taskMailboxThread
。
- 這是一個性能優化的設計。
hasNewMail: volatile boolean
:- 這是另一個性能優化。它大致反映了主隊列?
queue
?是否為空 (!queue.isEmpty()
)。 volatile
?關鍵字確保了不同線程對它的可見性。- 目的: 允許郵箱線程在不獲取鎖的情況下快速檢查是否有新郵件。例如,在?
hasMail()
?和?tryTake()
?方法中,會先檢查?batch
,然后檢查?hasNewMail
,只有當?hasNewMail
?為?true
?時,才嘗試獲取鎖并檢查主隊列?queue
。 - 當有新郵件通過?
put()
?或?putFirst()
(從非郵箱線程調用時)添加到?queue
?時,hasNewMail
?會被設置為?true
。當郵件從?queue
?中被取出或通過?createBatch()
?轉移到?batch
?時,hasNewMail
?會被更新。
- 這是另一個性能優化。它大致反映了主隊列?
特別需要注意的點:
單消費者(郵箱線程)設計:
- 盡管?
put()
?和?putFirst()
?方法允許從任何線程添加郵件(是線程安全的),但所有取郵件的操作(take
,?tryTake
,?createBatch
,?tryTakeFromBatch
)以及生命周期管理方法(quiesce
,?close
)都必須由?taskMailboxThread
?調用。這是 Flink Mailbox 模型的核心設計,確保了任務邏輯的單線程執行。
- 盡管?
批處理優化 (
batch
?隊列):- 理解?
batch
?隊列的作用對于分析性能至關重要。它不是一個獨立的郵箱,而是主隊列?queue
?的一個臨時緩存,用于減少鎖爭用。MailboxProcessor
?會周期性地將?queue
?中的內容“批發”到?batch
?中。
- 理解?
hasNewMail
?優化:?hasNewMail
?變量提供了一種輕量級的檢查機制,避免了郵箱線程在主隊列可能為空時仍頻繁獲取鎖。優先級處理 (
takeOrNull
?方法):takeOrNull(Deque<Mail> queue, int priority)
?方法實現了從隊列中根據優先級取出郵件的邏輯。它會遍歷隊列,找到第一個優先級大于或等于指定?priority
?的郵件并返回。這意味著高優先級的郵件(如控制命令、Checkpoint barrier)可以被優先處理。
putFirst()
?的特殊行為:putFirst(@Nonnull Mail mail)
?方法很有意思:- 如果調用者是?
taskMailboxThread
,郵件會直接被添加到?batch
?隊列的頭部。這是因為郵箱線程是當前批次郵件的消費者,將郵件直接放入批處理隊列的頭部可以使其被更快處理,而無需等待下一輪?createBatch
。 - 如果調用者不是?
taskMailboxThread
,郵件會被添加到主隊列?queue
?的頭部,并通過?notEmpty.signal()
?喚醒郵箱線程。
- 如果調用者是?
生命周期管理 (
state
,?quiesce()
,?close()
):- 郵箱的生命周期狀態轉換是嚴格控制的,并且與任務的生命周期緊密相關。
quiesce()
: 使郵箱不再接受新郵件,但允許處理完已有的郵件。close()
: 徹底關閉郵箱,清空所有郵件,并喚醒所有可能在等待的線程(通過?notEmpty.signalAll()
),通常是為了讓它們感知到關閉狀態并退出。
鎖的粒度和使用:
ReentrantLock
?用于保護對共享數據(queue
,?state
)的訪問。Condition
?(notEmpty
) 用于實現生產者-消費者模式中的等待和通知機制。lock.lockInterruptibly()
?在?take()
?方法中使用,允許等待的郵箱線程響應中斷。
runExclusively(Runnable runnable)
:- 提供了一種機制,允許以獨占方式在郵箱的鎖保護下執行一段代碼。這對于需要原子地執行多個郵箱操作(例如,檢查狀態然后根據狀態放入郵件)的場景非常有用,可以避免競態條件。
總而言之,TaskMailboxImpl
?雖然基于簡單的鎖和隊列,但通過引入批處理、hasNewMail
?標志、嚴格的線程模型以及精細的生命周期管理,為 Flink 的?MailboxProcessor
?提供了一個高效且功能完備的郵件調度機制。這些設計都是為了在保證單線程執行模型的前提下,最大化吞吐量并減少不必要的同步開銷。
Mail 類分析
Mail
?類是 Apache Flink 流處理運行時任務郵箱機制中的一個核心組件。它代表一個可執行的任務單元,綁定到特定的操作符鏈中,可以被下游郵箱處理器選擇執行。
主要屬性
mailOptions
: 郵件選項,用于配置郵件的行為,如是否可延遲執行。runnable
: 要執行的操作,是一個?ThrowingRunnable
?類型的實例,可以拋出異常。priority
: 郵件的優先級。優先級并不直接決定執行順序,而是用于避免上下游操作符之間的活鎖或死鎖問題。descriptionFormat
?和?descriptionArgs
: 用于調試和錯誤報告的郵件描述信息。actionExecutor
: 用于執行?runnable
?的執行器。
Mail
?類提供了三個構造函數,允許靈活地創建郵件對象:
- 最簡單的構造函數只需要?
runnable
、priority
?和描述信息。 - 可以指定?
MailboxExecutor.MailOptions
?來配置郵件選項。 - 可以指定?
StreamTaskActionExecutor
?來控制操作的執行方式。
核心方法
getMailOptions()
: 獲取郵件選項。getPriority()
: 獲取郵件的優先級。如果郵件是可延遲的,則返回最小優先級。tryCancel()
: 嘗試取消郵件的執行。toString()
: 返回郵件的描述信息。run()
: 執行郵件中的操作。
Mail
?類在 Flink 的流處理任務中扮演著重要角色。它允許將任務分解為小的、可執行的單元,并通過郵箱機制進行調度和執行。這種設計有助于提高任務的并發性和響應性,同時避免復雜的同步問題。
在實際使用中,可以通過創建?Mail
?對象來封裝需要執行的操作,并將其提交到郵箱中等待執行。通過設置不同的優先級和選項,可以控制操作的執行順序和行為。
MailboxExecutorImpl
MailboxExecutorImpl
?實現了?flink.api.common.operators.MailboxExecutor
?接口,它充當了向 Flink Task 的郵箱(TaskMailbox
)提交執行單元(Runnable
?或?Callable
)的一個入口或門面。它的核心目標是允許其他組件將代碼片段(封裝為?Mail
?對象)放入郵箱,這些代碼片段最終會由?MailboxProcessor
?在其專用的單線程中執行。
核心成員變量:
mailbox: TaskMailbox
: 這是實際存儲待執行郵件的郵箱實例。MailboxExecutorImpl
?將通過它來提交新的郵件。MailboxExecutorImpl.java
// ... existing code .../** The mailbox that manages the submitted runnable objects. */@Nonnull private final TaskMailbox mailbox; // ... existing code ...
priority: int
: 與此?MailboxExecutorImpl
?實例關聯的郵件的默認優先級。當通過這個執行器提交任務時,任務會帶上這個優先級。// ... existing code ...private final int priority; // ... existing code ...
actionExecutor: StreamTaskActionExecutor
: 這是一個執行器,用于實際運行封裝在?Mail
?對象中的命令。Mail
?對象在被?MailboxProcessor
?取出后,其?run()
?方法會使用這個?actionExecutor
?來執行具體的邏輯。// ... existing code ...private final StreamTaskActionExecutor actionExecutor; // ... existing code ...
mailboxProcessor: MailboxProcessor
?(可能為?null
): 指向驅動郵箱循環的?MailboxProcessor
。主要用于?isIdle()
?方法的判斷。// ... existing code ...private final MailboxProcessor mailboxProcessor; // ... existing code ...
構造函數:
- 提供了兩個構造函數,主要的區別在于是否傳入?
MailboxProcessor
。// ... existing code ...public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, StreamTaskActionExecutor actionExecutor) {this(mailbox, priority, actionExecutor, null);}public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox,int priority,StreamTaskActionExecutor actionExecutor,MailboxProcessor mailboxProcessor) {this.mailbox = mailbox;this.priority = priority;this.actionExecutor = Preconditions.checkNotNull(actionExecutor);this.mailboxProcessor = mailboxProcessor;} // ... existing code ...
主要方法分析:
execute
// ... existing code ...@Overridepublic void execute(MailOptions mailOptions,final ThrowingRunnable<? extends Exception> command,final String descriptionFormat,final Object... descriptionArgs) {try {mailbox.put(new Mail(mailOptions,command,priority,actionExecutor,descriptionFormat,descriptionArgs));} catch (MailboxClosedException mbex) {throw new RejectedExecutionException(mbex);}}
// ... existing code ...
- 這是向郵箱提交任務的核心方法。
- 它接收一個?
ThrowingRunnable
?作為要執行的命令,以及?MailOptions
(用于配置郵件行為,例如是否可延遲)、描述信息等。 - 內部會創建一個新的?
Mail
?對象,該對象封裝了傳入的?command
、此執行器實例的?priority
、actionExecutor
?以及描述信息。 - 然后調用?
mailbox.put(new Mail(...))
?將這個新創建的?Mail
?對象放入?TaskMailbox
?中。 - 如果郵箱已經關閉(
MailboxClosedException
),則會拋出?RejectedExecutionException
,這是 Executor 服務在無法接受新任務時的標準行為。
yield?
此方法設計為由郵箱線程自身調用。
- 它嘗試從?
mailbox
?中獲取一個至少具有 此執行器?priority
?的郵件 (mailbox.take(priority)
)。這是一個阻塞操作,如果當前沒有符合條件的郵件,它會等待。 - 一旦獲取到?
Mail
?對象,它會立即在當前線程(即郵箱線程)中執行?mail.run()
。 - 目的: 允許當前正在郵箱線程中執行的某個可能耗時較長的操作(例如用戶函數)主動暫停,讓郵箱中其他待處理的郵件(特別是具有相同或更高優先級的郵件,如 Checkpoint Barrier)有機會執行。這是一種協作式多任務處理機制,對于保證郵箱系統的響應性至關重要。
@Overridepublic void yield() throws InterruptedException {Mail mail = mailbox.take(priority);try {mail.run();} catch (Exception ex) {throw WrappingRuntimeException.wrapIfNecessary(ex);}}
tryYield()
:- 與?
yield()
?類似,但是一個非阻塞版本。 - 它調用?
mailbox.tryTake(priority)
?嘗試獲取郵件。 - 如果成功獲取到郵件,則執行它并返回?
true
。 - 如果沒有符合條件的郵件,則立即返回?
false
,不會阻塞。 - 需要注意的是,根據?
MailboxExecutor
?接口的約定和?MailOptions.deferrable()
?的設計,yield()
?和?tryYield()
?通常不會執行被標記為 "deferrable"(可延遲)的郵件。這是為了在需要快速讓出執行權(例如為了處理 Checkpoint)時,避免執行那些可以稍后處理的低優先級或非緊急任務。
- 與?
shouldInterrupt()
:- 此方法用于指示當前正在郵箱線程中執行的操作是否應該被中斷(例如,一個長時間運行的用戶函數)。
- 目前的實現是簡單地檢查?
mailbox.hasMail()
,即只要郵箱中還有任何待處理的郵件,就建議中斷。 - 代碼中的?
TODO: FLINK-35051
?注釋表明,這是一個待優化的點。理想情況下,只有當郵箱中有時間敏感的郵件(例如與 Checkpoint 相關的郵件)時,才應該建議中斷,以避免不必要的性能開銷。
isIdle()
:// ... existing code ...public boolean isIdle() {return !mailboxProcessor.isDefaultActionAvailable()&& !mailbox.hasMail()&& mailbox.getState().isAcceptingMails();} // ... existing code ...
- 檢查關聯的?
MailboxProcessor
?是否處于空閑狀態。 - 判斷條件為:
MailboxProcessor
?的默認操作(通常是?processInput
)當前不可用(即被掛起)。TaskMailbox
?中沒有待處理的郵件。TaskMailbox
?的狀態仍然是接受郵件的狀態(即不是?QUIESCED
?或?CLOSED
)。
- 這個方法需要?
mailboxProcessor
?成員不為?null
。
- 檢查關聯的?
總結與作用
MailboxExecutorImpl
?為 Flink 的異步操作和事件驅動模型提供了一個關鍵的接口。它使得系統中的不同部分(例如 Timer Service、Checkpoint Coordinator,甚至是算子自身)能夠安全地將需要在 Task 主執行線程(即郵箱線程)中執行的邏輯提交到郵箱隊列。
- 封裝提交邏輯: 它將創建?
Mail
?對象并將其放入?TaskMailbox
?的細節封裝起來,提供了一個更簡潔的?Executor
?風格的 API。 - 支持優先級: 允許為通過特定執行器實例提交的任務指定一個默認優先級。
- 協作式調度 (
yield
/tryYield
): 這是 Mailbox 模型單線程執行模式下實現并發感和響應性的核心機制。它允許長時間運行的任務主動讓出控制權,確保高優先級任務(如系統事件)能夠及時處理。 - 中斷提示 (
shouldInterrupt
): 為長時間運行的用戶代碼提供了一個檢查點,以便在需要時(例如為了執行 Checkpoint)能夠優雅地中斷。
通過?MailboxExecutorImpl
,Flink 能夠確保所有關鍵的 Task 級別操作(數據處理、狀態訪問、Checkpoint、Timer 回調等)都在同一個線程中有序執行,從而避免了復雜的并發控制問題,簡化了狀態管理和一致性保證。
MailboxProcessor
?細節分析
MailboxProcessor
?封裝了基于 Mailbox 的執行模型的完整邏輯。它的核心是一個事件循環?(runMailboxLoop
),該循環持續執行兩個主要任務:
- 處理郵箱中的郵件 (Mail): 檢查?
TaskMailbox
?中是否有待處理的郵件(例如 Checkpoint 觸發、Timer 事件、用戶通過?MailboxExecutor
?提交的自定義邏輯等),并按優先級順序執行它們。 - 執行默認動作 (MailboxDefaultAction): 如果郵箱中沒有郵件,或者郵件處理完畢后,它會執行一個“默認動作”。在?
StreamTask
?的上下文中,這個默認動作通常是?processInput()
,即處理來自上游的數據。
這種設計確保了 Task 內部所有操作(數據處理、Checkpoint、Timer 等)的單線程執行,從而極大地簡化了并發控制和狀態管理。
主要結構組件:
mailbox: TaskMailbox
: 這是實際存儲和管理?Mail
?對象的組件。MailboxProcessor
?從它那里獲取郵件。mailboxDefaultAction: MailboxDefaultAction
: 代表在郵箱空閑時重復執行的默認操作。它通過?MailboxDefaultAction.Controller
?與?MailboxProcessor
?交互,例如在沒有輸入數據時通知?MailboxProcessor
?暫停調用默認動作。actionExecutor: StreamTaskActionExecutor
: 用于實際執行?Mail
?中封裝的?Runnable
。Mail
?對象本身不直接執行邏輯,而是委托給這個執行器。控制標志 (Control Flags)?- 這些標志必須只能從郵箱線程訪問,以避免競態條件:
mailboxLoopRunning: boolean
: 控制主事件循環是否應該繼續運行。當設置為?false
?時,循環會在當前迭代完成后終止。suspended: boolean
: 控制郵箱處理器是否被臨時掛起。如果為?true
,runMailboxLoop
?會退出,但之后可以被重新調用以恢復。suspendedDefaultAction: DefaultActionSuspension
: 記錄當前默認動作是否被掛起。如果非?null
,表示默認動作已掛起,MailboxProcessor
?不會調用它。
// ... existing code .../*** Control flag to terminate the mailbox processor. Once it was terminated could not be* restarted again. Must only be accessed from mailbox thread.*/private boolean mailboxLoopRunning;/*** Control flag to temporary suspend the mailbox loop/processor. After suspending the mailbox* processor can be still later resumed. Must only be accessed from mailbox thread.*/private boolean suspended;/*** Remembers a currently active suspension of the default action. Serves as flag to indicate a* suspended default action (suspended if not-null) and to reuse the object as return value in* consecutive suspend attempts. Must only be accessed from mailbox thread.*/private DefaultActionSuspension suspendedDefaultAction; // ... existing code ...
mailboxMetricsControl: MailboxMetricsController
: 用于管理和暴露與郵箱相關的度量指標。
MailboxProcessor
?提供了多個構造函數,允許不同程度的定制。核心的構造函數接收?MailboxDefaultAction
、TaskMailbox
、StreamTaskActionExecutor
?和?MailboxMetricsController
。
一個常見的用法是傳入一個?MailboxDefaultAction
,然后?MailboxProcessor
?會使用默認的?TaskMailboxImpl
(與當前線程綁定)和?StreamTaskActionExecutor.IMMEDIATE
。
// ... existing code ...public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction,TaskMailbox mailbox,StreamTaskActionExecutor actionExecutor,MailboxMetricsController mailboxMetricsControl) {this.mailboxDefaultAction = Preconditions.checkNotNull(mailboxDefaultAction);this.actionExecutor = Preconditions.checkNotNull(actionExecutor);this.mailbox = Preconditions.checkNotNull(mailbox);this.mailboxLoopRunning = true;this.suspendedDefaultAction = null;this.mailboxMetricsControl = mailboxMetricsControl;}
// ... existing code ...
runMailboxLoop()
// ... existing code ...public void runMailboxLoop() throws Exception {suspended = !mailboxLoopRunning;final TaskMailbox localMailbox = mailbox;checkState(localMailbox.isMailboxThread(),"Method must be executed by declared mailbox thread!");assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";final MailboxController mailboxController = new MailboxController(this);while (isNextLoopPossible()) {// The blocking `processMail` call will not return until default action is available.processMail(localMailbox, false);if (isNextLoopPossible()) {mailboxDefaultAction.runDefaultAction(mailboxController); // lock is acquired inside default action as needed}}}private boolean isNextLoopPossible() {// 'Suspended' can be false only when 'mailboxLoopRunning' is true.return !suspended;}
// ... existing code ...
- 這是?
MailboxProcessor
?的心臟。它在一個?while (isNextLoopPossible())
?循環中運行。 - 前置檢查: 確保該方法由指定的郵箱線程執行,并且郵箱處于?
OPEN
?狀態。 - 創建?
MailboxController
:?MailboxController
?是?MailboxDefaultAction
?與?MailboxProcessor
?交互的橋梁。 - 循環體:
processMail(localMailbox, false)
: 調用此方法處理郵箱中的郵件。這是一個關鍵步驟,它會嘗試非阻塞地處理一批郵件。如果默認操作被掛起,它可能會阻塞地等待郵件或默認操作變為可用。false
?表示不是單步執行。if (isNextLoopPossible()) { mailboxDefaultAction.runDefaultAction(mailboxController); }
: 如果循環仍然可以繼續(例如,沒有被掛起或關閉),并且默認動作是可用的,則執行默認動作。
- 設計理念: 注釋中提到,
runMailboxLoop
?的設計目標是保持熱路徑(默認動作,郵箱中沒有郵件)盡可能快。因此,對控制標志(如?mailboxLoopRunning
,?suspendedDefaultAction
)的檢查通常與?mailbox.hasMail()
?為?true
?相關聯。這意味著,如果要在郵箱線程內部更改這些標志,必須確保郵箱中至少有一個郵件,以便更改能被及時感知。
processMail
// ... existing code ...private boolean processMail(TaskMailbox mailbox, boolean singleStep) throws Exception {// Doing this check is an optimization to only have a volatile read in the expected hot// path, locks are only// acquired after this point.boolean isBatchAvailable = mailbox.createBatch();// Take mails in a non-blockingly and execute them.boolean processed = isBatchAvailable && processMailsNonBlocking(singleStep);if (singleStep) {return processed;}// If the default action is currently not available, we can run a blocking mailbox execution// until the default action becomes available again.processed |= processMailsWhenDefaultActionUnavailable();return processed;}private boolean processMailsNonBlocking(boolean singleStep) throws Exception {long processedMails = 0;Optional<Mail> maybeMail;while (isNextLoopPossible() && (maybeMail = mailbox.tryTakeFromBatch()).isPresent()) {if (processedMails++ == 0) {maybePauseIdleTimer();}runMail(maybeMail.get());if (singleStep) {break;}}if (processedMails > 0) {maybeRestartIdleTimer();return true;} else {return false;}}
// ... existing code ...
其中?processMailsNonBlocking
?和?processMailsWhenDefaultActionUnavailable
?內部會調用?runMail(Mail mail)
?來實際執行郵件:
// ... existing code ...private void runMail(Mail mail) throws Exception {mailboxMetricsControl.getMailCounter().inc();mail.run();
// ... existing code ...
- 此方法負責處理郵箱中的郵件。
mailbox.createBatch()
: 首先嘗試從主隊列創建一批郵件到?TaskMailbox
?的內部批處理隊列。這是一個優化,減少鎖競爭。processMailsNonBlocking(singleStep)
: 非阻塞地處理批處理隊列中的郵件。如果?singleStep
?為?true
,則只處理一個郵件(用于測試或調試)。processMailsWhenDefaultActionUnavailable()
: 如果默認動作當前不可用(例如,由于反壓或沒有輸入),此方法會嘗試從郵箱中獲取并處理郵件。它可能會阻塞地等待新郵件的到來,直到默認動作再次可用或循環終止。- 返回?
true
?如果至少處理了一封郵件。
suspend()
// ... existing code .../** Suspend the running of the loop which was started by {@link #runMailboxLoop()}}. */public void suspend() {sendPoisonMail(() -> suspended = true);}/** Send mail in first priority for internal needs. */private void sendPoisonMail(RunnableWithException mail) {mailbox.runExclusively(() -> {// keep state check and poison mail enqueuing atomic, such that no intermediate// #close may cause a// MailboxStateException in #sendPriorityMail.if (mailbox.getState() == TaskMailbox.State.OPEN) {sendControlMail(mail, "poison mail");}});public void runExclusively(Runnable runnable) {lock.lock();try {runnable.run();} finally {lock.unlock();}}
// ... existing code ...
- 用于從外部(非郵箱線程)請求掛起郵箱循環。
- 它通過?
sendPoisonMail()
?向郵箱頭部插入一個高優先級的“毒丸”郵件。當這個郵件被處理時,它會將?suspended
?標志設置為?true
,從而導致?runMailboxLoop
?在下一次檢查?isNextLoopPossible()
?時退出。 - Poison Mail: 是一種特殊控制郵件,用于改變?
MailboxProcessor
?的內部狀態。
allActionsCompleted()
// ... existing code .../*** This method must be called to end the stream task when all actions for the tasks have been* performed.*/public void allActionsCompleted() {sendPoisonMail(() -> {mailboxLoopRunning = false;suspended = true;});}
// ... existing code ...
- 當 Task 的所有動作都已完成,需要終止郵箱循環時調用此方法。
- 與?
suspend()
?類似,它也通過?sendPoisonMail()
?發送一個毒丸郵件。該郵件會將?mailboxLoopRunning
?設置為?false
?并將?suspended
?設置為?true
,從而徹底停止事件循環。
sendPoisonMail
?和?sendControlMail(...)
:
// ... existing code .../** Send mail in first priority for internal needs. */private void sendPoisonMail(RunnableWithException mail) {mailbox.runExclusively(() -> {// keep state check and poison mail enqueuing atomic, such that no intermediate// #close may cause a// MailboxStateException in #sendPriorityMail.if (mailbox.getState() == TaskMailbox.State.OPEN) {sendControlMail(mail, "poison mail");}});}/*** Sends the given <code>mail</code> using {@link TaskMailbox#putFirst(Mail)} . Intended use is* to control this <code>MailboxProcessor</code>; no interaction with tasks should be performed;*/private void sendControlMail(RunnableWithException mail, String descriptionFormat, Object... descriptionArgs) {mailbox.putFirst(new Mail(mail,Integer.MAX_VALUE /*not used with putFirst*/,descriptionFormat,descriptionArgs));}
// ... existing code ...
sendPoisonMail
: 確保在郵箱?OPEN
?狀態下,通過?sendControlMail
?發送一個控制郵件。它使用?mailbox.runExclusively
?來原子地檢查狀態和入隊。sendControlMail
: 將一個具有最高優先級的?Mail
?對象(通過?mailbox.putFirst()
)放入郵箱。這些郵件用于內部控制,如掛起、終止、報告錯誤等。
生命周期方法 (prepareClose()
,?close()
):
// ... existing code .../** Lifecycle method to close the mailbox for action submission. */public void prepareClose() {mailbox.quiesce();}/*** Lifecycle method to close the mailbox for action submission/retrieval. This will cancel all* instances of {@link java.util.concurrent.RunnableFuture} that are still contained in the* mailbox.*/@Overridepublic void close() {List<Mail> droppedMails = mailbox.close();
// ... existing code ...
prepareClose()
: 調用?mailbox.quiesce()
。這會使郵箱進入靜默狀態,不再接受新的郵件,但允許處理已有的郵件。這是關閉過程的第一步。close()
: 調用?mailbox.close()
。這會徹底關閉郵箱,清空所有未處理的郵件,并嘗試取消仍在郵箱中的?RunnableFuture
?實例。
與?MailboxDefaultAction
?的交互 (通過?MailboxController
):
// ... existing code ...
protected static final class MailboxController implements MailboxDefaultAction.Controller {private final MailboxProcessor mailboxProcessor;protected MailboxController(MailboxProcessor mailboxProcessor) {this.mailboxProcessor = mailboxProcessor;}@Overridepublic void allActionsCompleted() {mailboxProcessor.allActionsCompleted();}@Overridepublic MailboxDefaultAction.Suspension suspendDefaultAction(PeriodTimer suspensionPeriodTimer) {return mailboxProcessor.suspendDefaultAction(suspensionPeriodTimer);}
// ... existing code ...
}// ... existing code ...
private final class DefaultActionSuspension implements MailboxDefaultAction.Suspension {@Nullable private final PeriodTimer suspensionTimer;public DefaultActionSuspension(@Nullable PeriodTimer suspensionTimer) {this.suspensionTimer = suspensionTimer;}@Overridepublic void resume() {if (mailbox.isMailboxThread()) {resumeInternal();} else {try {sendControlMail(this::resumeInternal, "resume default action");} catch (MailboxClosedException ex) {// Ignored}}}private void resumeInternal() {// This method must be called from the mailbox thread.if (mailboxProcessor.suspendedDefaultAction == this) {mailboxProcessor.suspendedDefaultAction = null;if (suspensionTimer != null) {suspensionTimer.markEnd();}}}}
// ... existing code ...
MailboxController
?是一個內部類,實現了?MailboxDefaultAction.Controller
?接口。MailboxDefaultAction
?通過這個?Controller
?來與?MailboxProcessor
?通信。suspendDefaultAction()
: 當默認動作(如?processInput
)發現當前沒有工作可做時(例如,沒有輸入數據或下游反壓),它會調用?controller.suspendDefaultAction()
。MailboxProcessor.suspendDefaultAction(@Nullable PeriodTimer suspensionTimer)
:- 此方法(只能由郵箱線程調用)將?
suspendedDefaultAction
?設置為一個新的?DefaultActionSuspension
?實例。 DefaultActionSuspension
?實現了?MailboxDefaultAction.Suspension
?接口,其?resume()
?方法用于恢復默認動作的執行。resume()
?可以從任何線程調用,如果不是郵箱線程,它會發送一個控制郵件來確保恢復邏輯在郵箱線程中執行。
- 此方法(只能由郵箱線程調用)將?
獲取?MailboxExecutor
:
// ... existing code ...public MailboxExecutor getMainMailboxExecutor() {return new MailboxExecutorImpl(mailbox, MIN_PRIORITY, actionExecutor);}/*** Returns an executor service facade to submit actions to the mailbox.** @param priority the priority of the {@link MailboxExecutor}.*/public MailboxExecutor getMailboxExecutor(int priority) {return new MailboxExecutorImpl(mailbox, priority, actionExecutor, this);}
// ... existing code ...
getMainMailboxExecutor()
: 返回一個具有最低優先級的?MailboxExecutor
。getMailboxExecutor(int priority)
: 返回一個指定優先級的?MailboxExecutor
。這些?MailboxExecutor
?實例允許其他組件向此?MailboxProcessor
?的郵箱提交任務。
總結
MailboxProcessor
?是 Flink Task 單線程執行模型的核心。它通過一個事件循環來協調處理高優先級的控制/事件郵件和低優先級的默認數據處理動作。這種機制確保了:
- 單線程執行: 所有關鍵邏輯都在同一個線程中執行,避免了復雜的并發同步。
- 響應性: 高優先級郵件(如 Checkpoint barriers)可以搶占默認動作,保證系統事件的及時處理。
- 可控性: 提供了掛起、恢復、終止事件循環的機制。
- 可擴展性: 通過?
MailboxExecutor
?允許外部組件向郵箱提交自定義任務。
processInput
processInput
方法是 StreamTask
執行其核心數據處理邏輯的地方。它是作為 MailboxProcessor
的默認動作 (MailboxDefaultAction) 來執行的。這意味著,當 MailboxProcessor
的郵箱中沒有更高優先級的“郵件”(如 Checkpoint 觸發、Timer 事件等)需要處理時,它就會循環調用這個 processInput
方法。
下面是對 processInput
方法的詳細分析:
方法職責與設計理念:
處理輸入事件: 其核心職責是從輸入源(由
inputProcessor
代表)獲取一個事件(通常是一條記錄或一組記錄),并將其傳遞給后續的算子鏈進行處理。非阻塞性: 注釋中強調“Implementations should (in general) be non-blocking”。這是非常關鍵的一點。因為
MailboxProcessor
是單線程執行其郵箱中的郵件和默認動作的,如果processInput
長時間阻塞,將會導致 Checkpoint barriers、Timer 等重要事件無法及時處理,影響任務的正確性和性能。與 MailboxProcessor 協作: 通過
MailboxDefaultAction.Controller controller
參數,processInput
可以與MailboxProcessor
進行交互,例如在沒有數據或遇到反壓時,通知MailboxProcessor
暫停調用默認動作。
處理輸入 (
inputProcessor.processInput()
):
- 方法首先調用
inputProcessor.processInput()
。InputProcessor
負責從上游讀取數據、反序列化,并將數據喂給當前 Task 的第一個 Operator。 processInput()
的返回值DataInputStatus
描述了本次輸入處理的結果。
根據 DataInputStatus
進行分支處理:
DataInputStatus status = inputProcessor.processInput();switch (status) {case MORE_AVAILABLE:if (taskIsAvailable()) {return;}break;case NOTHING_AVAILABLE:break;case END_OF_RECOVERY:throw new IllegalStateException("We should not receive this event here.");case STOPPED:endData(StopMode.NO_DRAIN);return;case END_OF_DATA:endData(StopMode.DRAIN);notifyEndOfData();return;case END_OF_INPUT:// Suspend the mailbox processor, it would be resumed in afterInvoke and finished// after all records processed by the downstream tasks. We also suspend the default// actions to avoid repeat executing the empty default operation (namely process// records).controller.suspendDefaultAction();mailboxProcessor.suspend();return;}
MORE_AVAILABLE
: 表示 inputProcessor
中還有更多數據可以立即處理。
if (taskIsAvailable()) { return; }
: 如果當前任務本身也是可用的(例如,下游沒有反壓),則直接返回。MailboxProcessor
會很快再次調用processInput
來處理更多數據。NOTHING_AVAILABLE
: 表示inputProcessor
當前沒有可用的數據。此時,方法不會立即返回,而是會繼續檢查是否存在反壓等情況,可能需要暫停默認動作的執行。END_OF_RECOVERY
: 這是一個不期望在此處出現的狀態,表示任務恢復邏輯可能存在問題,因此拋出IllegalStateException
。STOPPED
: 表示輸入流被強制停止(例如任務被取消,且不需要流干數據)。endData(StopMode.NO_DRAIN)
: 通知算子鏈以非排空模式結束處理。return;
: 結束當前processInput
調用。
END_OF_DATA
: 表示當前輸入流的所有數據都已到達(例如,有限流Source結束)。endData(StopMode.DRAIN)
: 通知算子鏈以排空模式結束處理(處理完所有已緩沖的數據)。notifyEndOfData()
: 通知 TaskManager 當前任務的數據已結束。return;
: 結束當前processInput
調用。
END_OF_INPUT
: 表示該 Task 的所有輸入都已經結束。這是一個更強的結束信號。controller.suspendDefaultAction()
: 通知MailboxProcessor
暫停調用processInput
。因為已經沒有新的輸入了,再繼續調用也沒有意義。mailboxProcessor.suspend()
: 暫停整個MailboxProcessor
的事件循環。任務此時會等待下游處理完所有數據,并完成最終的 Checkpoint 等操作。return;
: 結束當前processInput
調用。
處理反壓和等待邏輯 (當 NOTHING_AVAILABLE
或其他需要等待的情況): 如果 inputProcessor.processInput()
返回 NOTHING_AVAILABLE
,或者雖然有數據但任務本身不可用(例如下游反壓),代碼會進入等待邏輯:
// 如果前面沒有returnTaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup();PeriodTimer timer;CompletableFuture<?> resumeFuture;if (!recordWriter.isAvailable()) {timer = new GaugePeriodTimer(ioMetrics.getSoftBackPressuredTimePerSecond());resumeFuture = recordWriter.getAvailableFuture();} else if (!inputProcessor.isAvailable()) {timer = new GaugePeriodTimer(ioMetrics.getIdleTimeMsPerSecond());resumeFuture = inputProcessor.getAvailableFuture();} else if (changelogWriterAvailabilityProvider != null&& !changelogWriterAvailabilityProvider.isAvailable()) {// waiting for changelog availability is reported as busytimer = new GaugePeriodTimer(ioMetrics.getChangelogBusyTimeMsPerSecond());resumeFuture = changelogWriterAvailabilityProvider.getAvailableFuture();} else {// data availability has changed in the meantime; retry immediatelyreturn;}assertNoException(resumeFuture.thenRun(new ResumeWrapper(controller.suspendDefaultAction(timer), timer)));
TaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup();
: 獲取IO相關的度量指標組。PeriodTimer timer; CompletableFuture<?> resumeFuture;
: 聲明計時器和用于恢復的 Future。檢查輸出是否可用 (
!recordWriter.isAvailable()
):如果
recordWriter
(負責將處理結果寫到下游)不可用,說明下游存在反壓。timer = new GaugePeriodTimer(ioMetrics.getSoftBackPressuredTimePerSecond());
: 啟動一個計時器,用于度量由于下游反壓導致的等待時間。resumeFuture = recordWriter.getAvailableFuture();
: 獲取一個 Future,當recordWriter
再次可用時,該 Future 會完成。
檢查輸入處理器是否可用 (
!inputProcessor.isAvailable()
):如果
inputProcessor
本身不可用(例如,等待網絡緩沖區的到來)。timer = new GaugePeriodTimer(ioMetrics.getIdleTimeMsPerSecond());
: 啟動一個計時器,用于度量由于上游輸入不可用導致的空閑時間。resumeFuture = inputProcessor.getAvailableFuture();
: 獲取一個 Future,當inputProcessor
再次可用時,該 Future 會完成。
檢查 Changelog Writer 是否可用:
如果使用了 Changelog State Backend,并且其
changelogWriterAvailabilityProvider
表示不可用。timer = new GaugePeriodTimer(ioMetrics.getChangelogBusyTimeMsPerSecond());
: 啟動計時器,度量等待 Changelog Writer 的繁忙時間。resumeFuture = changelogWriterAvailabilityProvider.getAvailableFuture();
: 獲取 Future,等待其可用。
數據可用性已改變 (
else { return; }
):如果以上等待條件都不滿足,說明在
inputProcessor.processInput()
調用之后,數據的可用性可能已經發生了變化(例如,新的數據剛剛到達)。此時直接return
,讓MailboxProcessor
立即重試processInput
。
掛起默認動作并等待恢復:
controller.suspendDefaultAction(timer)
: 調用controller
的suspendDefaultAction
方法,并傳入之前啟動的timer
。這會通知MailboxProcessor
暫時停止調用processInput
。MailboxProcessor
會使用這個timer
來記錄掛起的時間(用于監控和度量)。該方法返回一個MailboxDefaultAction.Suspension
對象。resumeFuture.thenRun(new ResumeWrapper(controller.suspendDefaultAction(timer), timer))
: 當resumeFuture
完成時(即等待的條件解除,例如下游不再反壓或上游數據到達),會執行ResumeWrapper
中的邏輯。ResumeWrapper
會調用Suspension
對象的resume()
方法,這會通知MailboxProcessor
可以重新開始調用processInput
了。同時,timer
也會被停止。assertNoException(...)
: 確保thenRun
中的操作不會拋出未捕獲的異常。
Checkpoint
StreamTask
?通過其內部的?MailboxProcessor
?和相關的?MailboxExecutor
?來發送和處理與 Checkpoint 相關的郵件(即需要在 Task 主線程中執行的 Checkpoint 操作)。
以下是?StreamTask
?如何發送和處理與 Checkpoint 相關郵件的關鍵機制分析:
MailboxProcessor
?和?MailboxExecutor
:- 每個?
StreamTask
?都有一個?MailboxProcessor
?實例 (mailboxProcessor
),它負責驅動 Task 的事件循環。 StreamTask
?可以通過?mailboxProcessor.getMailboxExecutor(priority)
?獲取一個?MailboxExecutor
。這個?MailboxExecutor
?提供了?execute(...)
?方法,可以將一個?Runnable
(封裝了 Checkpoint 相關邏輯)作為?Mail
?提交到郵箱中。- 這些郵件會被?
MailboxProcessor
?在其主循環中按優先級取出并執行。
- 每個?
SubtaskCheckpointCoordinator
:StreamTask
?包含一個?SubtaskCheckpointCoordinator
?實例 (subtaskCheckpointCoordinator
)。這個協調器負責處理 Task 級別的 Checkpoint 邏輯,例如觸發操作符的快照、處理 Barrier 對齊、通知 Checkpoint 完成或中止等。- 很多 Checkpoint 相關的操作會首先由?
SubtaskCheckpointCoordinator
?發起或處理,然后它可能會通過?StreamTask
?的?MailboxExecutor
?將具體的執行步驟提交到郵箱。
actionExecutor
:StreamTask
?還有一個?StreamTaskActionExecutor
?實例 (actionExecutor
)。雖然?MailboxExecutor
?用于將任務?放入?郵箱,但當?Mail
?從郵箱中被取出后,其內部的?Runnable
?通常會通過這個?actionExecutor
?來實際執行。對于 Checkpoint 相關的操作,這確保了它們在正確的 Task 主線程上下文中運行。
發送 Checkpoint 相關郵件的典型場景和方法:
觸發 Checkpoint (
triggerCheckpointAsync
):- 當 JobManager 向 TaskManager 發送觸發 Checkpoint 的 RPC 時,
Task
(通常是?StreamTask
?的父類或其本身)會調用?triggerCheckpointAsync
?方法。 - 這個方法會將實際的 Checkpoint 執行邏輯封裝成一個?
Runnable
,并通過?mainMailboxExecutor
(一個具有默認優先級的?MailboxExecutor
)提交到郵箱。 - 這樣做是為了確保 Checkpoint 的所有階段(例如調用操作符的?
snapshotState
)都在 Task 的主線程中執行,從而避免與正常的數據處理流程發生并發沖突。
StreamTask.java
// ... existing code ... @Override public CompletableFuture<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {checkForcedFullSnapshotSupport(checkpointOptions);CompletableFuture<Boolean> result = new CompletableFuture<>();mainMailboxExecutor.execute(() -> {try {// Lock the mailbox to ensure that the checkpoint is not concurrent with other// actionssynchronized (mailboxProcessor) {result.complete(triggerUnfinishedChannelsCheckpoint(checkpointMetaData, checkpointOptions));}} catch (Exception ex) {// Report the failure both via the Future result but also to the mailboxresult.completeExceptionally(ex);throw ex;}},"checkpoint %s with %s",checkpointMetaData,checkpointOptions);return result; } // ... existing code ...
在上面的代碼片段中,
mainMailboxExecutor.execute(...)
?就是將 Checkpoint 觸發邏輯(triggerUnfinishedChannelsCheckpoint
)作為郵件發送到郵箱的關鍵步驟。- 當 JobManager 向 TaskManager 發送觸發 Checkpoint 的 RPC 時,
通知 Checkpoint 完成 (
notifyCheckpointCompleteAsync
):- 當 Task 完成一個 Checkpoint 并收到 JobManager 的確認后,會調用此方法。
- 同樣,通知操作符 Checkpoint 完成的邏輯也會被封裝并通過?
MailboxExecutor
?提交到郵箱。
StreamTask.java
// ... existing code ... @Override public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {return notifyCheckpointOperation(() -> notifyCheckpointComplete(checkpointId),String.format("checkpoint %d completed", checkpointId)); } // ... existing code ...
而?
notifyCheckpointOperation
?內部會使用?MailboxExecutor
:StreamTask.java
// ... existing code ... private Future<Void> notifyCheckpointOperation(RunnableWithException runnable, String description) {CompletableFuture<Void> result = new CompletableFuture<>();mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).execute(() -> {try {runnable.run();} catch (Exception ex) {result.completeExceptionally(ex);throw ex;}result.complete(null);},description);return result; } // ... existing code ...
這里使用了?
TaskMailbox.MAX_PRIORITY
,表明這是一個高優先級的操作。通知 Checkpoint 中止 (
notifyCheckpointAbortAsync
):- 當一個 Checkpoint 因為各種原因(超時、錯誤、被新的 Checkpoint 取代)需要中止時,會調用此方法。
- 中止邏輯,包括清理操作符可能產生的臨時狀態,也會通過郵件發送到郵箱執行。
StreamTask.java
// ... existing code ... @Override public Future<Void> notifyCheckpointAbortAsync(long checkpointId, long latestCompletedCheckpointId) {return notifyCheckpointOperation(() -> {if (latestCompletedCheckpointId > 0) {notifyCheckpointComplete(latestCompletedCheckpointId);}if (isCurrentSyncSavepoint(checkpointId)) {throw new FlinkRuntimeException("Stop-with-savepoint failed.");}subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, operatorChain, this::isRunning);},String.format("checkpoint %d aborted", checkpointId)); } // ... existing code ...
同樣,它也使用了?
notifyCheckpointOperation
?方法,將中止邏輯放入郵箱。處理 Barrier 對齊時的 Timer 回調:
- 當使用 Barrier 對齊策略時,如果一個 InputGate 等待某個 Channel 的 Barrier 超時,
SubtaskCheckpointCoordinator
?會注冊一個 Timer。當這個 Timer 觸發時,其回調邏輯(例如取消 Checkpoint 或強制觸發 Checkpoint)也會被封裝成郵件并通過?MailboxExecutor
?提交到郵箱執行。 - 在?
BarrierAlignmentUtil.createRegisterTimerCallback
?中可以看到相關的邏輯,它會返回一個?BiConsumer<Long, Long>
,這個 Consumer 內部會使用?mainMailboxExecutor
?來執行超時處理。
- 當使用 Barrier 對齊策略時,如果一個 InputGate 等待某個 Channel 的 Barrier 超時,
Source Task 的特定行為:
- 例如在?
SourceOperatorStreamTask
?中,notifyCheckpointAbortAsync
?和?notifyCheckpointSubsumedAsync
?方法會直接使用?mainMailboxExecutor
?來執行清理 Checkpoint 的邏輯。
SourceOperatorStreamTask.java
// ... existing code ... @Override public Future<Void> notifyCheckpointAbortAsync(long checkpointId, long latestCompletedCheckpointId) {mainMailboxExecutor.execute(() -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);return super.notifyCheckpointAbortAsync(checkpointId, latestCompletedCheckpointId); }@Override public Future<Void> notifyCheckpointSubsumedAsync(long checkpointId) {mainMailboxExecutor.execute(() -> cleanupCheckpoint(checkpointId), "Cleanup checkpoint %d", checkpointId);return super.notifyCheckpointSubsumedAsync(checkpointId); // ... existing code ...
- 例如在?
總結:
StreamTask
?依賴其?MailboxProcessor
?和通過它獲取的?MailboxExecutor
?來確保所有與 Checkpoint 相關的關鍵操作(觸發、通知完成/中止、Barrier 處理等)都在 Task 的主事件循環線程中串行執行。這避免了復雜的并發控制,保證了 Checkpoint 過程與正常數據處理流程的一致性和正確性。當需要執行一個 Checkpoint 相關操作時,通常會將其封裝為一個?Runnable
,然后通過?MailboxExecutor.execute()
?方法將其作為一封郵件提交到郵箱隊列中,等待?MailboxProcessor
?的調度執行。