要真正駕馭 Flink 并構建出高效、穩定、可擴展的流處理應用,僅僅停留在 API 的表面使用是遠遠不夠的。深入理解其內部的運行機制,洞悉數據從代碼到分布式執行的完整生命周期,以及明晰各個核心組件之間錯綜復雜而又協同工作的關系,對于我們進行性能調優、故障排查以及設計更優的應用程序架構至關重要。
本文將帶領大家一起揭開 Flink 的神秘面紗,我們將首先詳細梳理一個 Flink 作業從客戶端提交到在 TaskManager 上具體執行的完整啟動流程,理解?StreamGraph
、JobGraph
?到?ExecutionGraph
?的演變。
緊接著,我們將深入剖析 Flink 中那些我們既熟悉又可能感到困惑的核心概念,如?DataStream
?如何通過?Transformation
?承載用戶的?UDF
,最終又是如何在?StreamOperator
?和?StreamTask
?中煥發生機,以及它們之間是如何相互關聯、協同工作的。希望通過這次探索,能幫助構建起對 Flink 內部原理更為清晰和系統的認識。
Flink 任務啟動流程
-
客戶端(Client)準備與提交作業:
- 用戶通過 Flink 客戶端(例如,執行?
flink run
?命令的 CLI,或者通過 REST API 提交的程序,或者在 IDE 中直接運行)提交一個 Flink 應用程序(例如,用 DataStream API 編寫的程序)。 - 在客戶端,用戶的程序(例如?
StreamExecutionEnvironment.execute()
)首先會將用戶定義的?DataStream
?操作轉換為一個?StreamGraph
。StreamGraph
?是作業的最初的、面向流的邏輯表示,它包含了所有的算子、UDF、并行度設置、數據流向等信息。 - 客戶端隨后將這個?
StreamGraph
?提交給?JobManager(具體來說是 JobManager 中的?Dispatcher?組件)。
- 用戶通過 Flink 客戶端(例如,執行?
-
JobManager 接收與處理作業:【stream Graph轉化Job Graph 版本有變動,但是通信鏈路是一致的】
- Dispatcher: JobManager 中的 Dispatcher 接收到?
StreamGraph
?后,會為這個作業啟動一個新的?JobMaster。Dispatcher 負責作業的提交、JobMaster 的生命周期管理,并提供 REST 接口。 - JobMaster: 每個作業都有其專屬的 JobMaster。JobMaster 負責該作業的整個生命周期管理。
StreamGraph
?->?JobGraph
: JobMaster 首先將接收到的?StreamGraph
?轉換為?JobGraph
。JobGraph
?是一個更通用的、并行的作業表示,它將?StreamGraph
?中的算子鏈(Operator Chains)優化考慮在內,并確定了?JobVertex
(邏輯上的并行算子)。每個?JobVertex
?對應?StreamGraph
?中的一個或多個(鏈式)算子。- 如?
docs/content.zh/docs/internals/job_scheduling.md
?中提到:“JobManager 會接收到一個 JobGraph,用來描述由多個算子頂點 (JobVertex) 組成的數據流圖”。
- 如?
JobGraph
?->?ExecutionGraph
: JobMaster 接著將?JobGraph
?轉換為?ExecutionGraph
。ExecutionGraph
?是作業的物理執行計劃,是?JobGraph
?的并行化版本。- 它將每個?
JobVertex
?根據其并行度展開為多個并行的?ExecutionVertex
。 - 每個?
ExecutionVertex
?代表了一個邏輯算子(或算子鏈)的一個并行實例。 ExecutionGraph
?中的每個?ExecutionVertex
?會有一個或多個?Execution
?對象來跟蹤其執行嘗試(例如,初次執行、故障恢復后的重試)。ExecutionGraph
?是 JobMaster 調度和監控作業執行的核心數據結構。
- 它將每個?
- 調度器 (Scheduler): JobMaster 內部的調度器負責將?
ExecutionGraph
?中的任務(具體來說是?Execution
?對象代表的執行嘗試)部署到可用的 TaskManager Slot 上。- 調度器會向?ResourceManager?(如果使用了如 YARN, Kubernetes 等資源管理器,或者是 Flink 自身的 Standalone ResourceManager) 請求所需的 Task Slot。
- 一旦 Slot 分配成功,調度器就會將任務部署到相應的 TaskManager。
- Dispatcher: JobManager 中的 Dispatcher 接收到?
-
TaskManager 執行任務:
- TaskManager 接收到 JobManager (JobMaster) 分配的任務部署指令后,會在其管理的某個空閑的?Task Slot?中為該任務啟動執行。
- Slot 與線程: 一個 Task Slot 代表了 TaskManager 提供的一份固定的計算資源(通常與 CPU核心數相關)。一個 Task Slot 會運行一個或多個(如果啟用了 Slot Sharing Group 且任務屬于同一共享組)任務,每個任務(Task)都在其自己的獨立線程中執行。?這個線程本身不屬于槽而屬于task,Slot 是資源的劃分,線程是執行的載體。
StreamTask
: 在 TaskManager 內部,每個被部署的流處理任務的實際體現就是一個?StreamTask
(或其特定子類,如?SourceTask
,?OneInputStreamTask
,?TwoInputStreamTask
?等)的實例。- 如?
docs/content/docs/internals/task_lifecycle.md
?所述: "The?StreamTask
?is the base for all different task sub-types in Flink's streaming engine." StreamTask
?負責初始化和運行其內部的?StreamOperator
?鏈(OperatorChain
),處理輸入數據,執行用戶定義的函數 (UDF),并產生輸出數據。StreamTask
?的生命周期包括創建、部署、運行、取消、完成或失敗等狀態。- 如?
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
?文件所示,StreamTask
?包含了大量的邏輯來管理算子鏈 (operatorChain
)、配置 (configuration
)、狀態后端 (stateBackend
)、檢查點協調 (subtaskCheckpointCoordinator
) 等。
- 如?
關鍵補充點總結:
StreamGraph
: 在客戶端生成,是最初的邏輯圖。- Dispatcher 與 JobMaster: JobManager 內部組件,Dispatcher 負責接收作業并為每個作業啟動一個 JobMaster。JobMaster 負責單個作業的完整生命周期。
JobGraph
: 由?StreamGraph
?轉換而來,是并行的邏輯圖。ExecutionGraph
: 由??JobGraph
?轉換而來,是物理執行圖,包含?ExecutionVertex
?和?Execution
。- ResourceManager: 負責 Task Slot 的分配。
- Task Slot 與線程: Slot 是資源單位,Task 在 Slot 內的獨立線程中運行。
OperatorChain
:?StreamTask
?內部可以運行一個算子鏈,這是 Flink 的一項重要優化。
Flink 核心概念關系:從 API 到執行
-
DataStream
?API 與?UDF
?(User-Defined Function - 用戶定義函數):- 起點: 用戶通過?
DataStream
?API (例如?dataStream.map(myMapFunction).filter(myFilterFunction)
) 來聲明式地構建數據處理流程。 - 業務邏輯:?
UDF
?(例如?MyMapFunction
,?MyFilterFunction
,?KeyedProcessFunction
) 是用戶編寫的包含具體業務處理邏輯的 Java/Scala 函數或類。它們是數據轉換的核心。
- 起點: 用戶通過?
-
Transformation
?(轉換):- 邏輯藍圖: 每當在?
DataStream
?上調用一個操作(如?map
,?filter
,?keyBy
),就會創建一個或多個?Transformation
?對象。 Transformation
?樹是 Flink 作業的邏輯表示或藍圖。它詳細描述了數據如何從一個操作流向下一個操作,包括操作類型、應用的?UDF
、輸入/輸出數據類型、并行度設置等。它本身不執行計算。
- 邏輯藍圖: 每當在?
-
邏輯?
Operator
?(算子) 與邏輯?Subtask
?(子任務):- 邏輯處理單元: 在?
Transformation
?層面,我們可以認為每個轉換操作對應一個邏輯?Operator
?(例如 Map Operator, Filter Operator)。 - 并行實例 (邏輯): 如果一個邏輯?
Operator
?的并行度(parallelism)被設置為?N
,那么在邏輯規劃中,這個?Operator
?就擁有?N
?個并行的邏輯實例,這些邏輯實例通常被稱為?Subtask
。每個邏輯?Subtask
?代表了該?Operator
?的一個獨立、并行的處理單元。
- 邏輯處理單元: 在?
-
StreamOperator
?(運行時算子/流算子):- 物理執行體:?
StreamOperator
?是 Flink 運行時的核心組件,是?Transformation
?中定義的邏輯算子在物理執行時的具體實現和承載體。例如,StreamMap
、KeyedProcessOperator
?都是?StreamOperator
?的具體實現。 - 封裝 UDF:?
StreamOperator
?負責封裝用戶提供的?UDF
。它管理?UDF
?的生命周期(如調用?open()
,?close()
?方法)并調用?UDF
?的核心處理方法(如?map()
,?filter()
,?processElement()
)來處理數據。 AbstractUdfStreamOperator
?是許多包含?UDF
?的?StreamOperator
?的通用基類,它簡化了?UDF
?的管理。
- 物理執行體:?
-
StreamTask
?(流任務/物理任務):- 執行單元:?
StreamTask
?是 Flink 在 TaskManager 上進行物理執行的基本單元。它是一個實現了?java.lang.Runnable
?的對象,在 TaskManager 的一個 Slot 中的一個獨立線程內運行。 - 核心職責:?
StreamTask
?負責:- 管理其內部一個或多個?
StreamOperator
?的完整生命周期(初始化、打開、運行數據處理循環、響應 Checkpoint、關閉、清理)。 - 處理數據的輸入(從網絡或上游 Task)和輸出(到網絡或下游 Task)。
- 協調 Checkpoint 過程。
- 管理其內部一個或多個?
- 執行入口:?
StreamTask
?的?invoke()
?方法是其執行邏輯的入口點,它啟動了數據處理的主循環。
- 執行單元:?
-
算子鏈 (Operator Chaining) 的影響:
- 優化: Flink 會盡可能地將滿足條件的多個邏輯?
Operator
?(及其對應的邏輯?Subtask
) 鏈接(chain)在一起。例如,連續的?map -> filter -> map
?操作,如果并行度相同且數據傳輸直接(非重分區),它們通常會被鏈接。 - 結果:
- 被鏈接起來的一系列?
StreamOperator
?實例會運行在同一個?StreamTask
?內部,由一個?OperatorChain
?對象管理。 - 這意味著一個?
StreamTask
?可能只包含一個單獨的?StreamOperator
?(如果沒有發生鏈接),或者包含一串鏈式連接的?StreamOperator
。 - 這樣做能顯著減少線程切換、數據序列化/反序列化以及網絡傳輸的開銷,提升性能。
- 被鏈接起來的一系列?
- 優化: Flink 會盡可能地將滿足條件的多個邏輯?
總結關系流程:
- 用戶使用?
DataStream
?API?編寫代碼,并提供?UDF
?來定義業務邏輯。 - 這些 API 調用會構建一個?
Transformation
?樹,這是作業的邏輯藍圖。 - Flink 編譯器將?
Transformation
?樹轉換為物理執行圖 (JobGraph -> ExecutionGraph):- 每個邏輯?
Operator
?(在?Transformation
?中定義) 根據其并行度被實例化為多個邏輯?Subtask
。 - 滿足條件的邏輯?
Subtask
?(來自不同的邏輯?Operator
) 會被優化策略鏈接(chain)起來。
- 每個邏輯?
- 在運行時 (TaskManager):
- 每個(可能經過鏈接的)
Subtask
?序列,作為一個整體,被調度為一個?StreamTask
?實例,并在一個獨立的線程中執行。 StreamTask
?內部運行著一個或多個(如果發生鏈接,則形成?OperatorChain
)StreamOperator
?實例。- 每個?
StreamOperator
?實例則封裝并調用用戶編寫的?UDF
?來對流經它的數據執行具體的業務邏輯處理。
- 每個(可能經過鏈接的)
結合源碼說明
我們來看一些關鍵的源碼片段來理解這個流程:
-
StreamTask 的構造與初始化: 當 TaskManager 接收到部署任務的指令后,會創建?
StreamTask
?實例。// ... existing code ... public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>implements TaskInvokable,CheckpointableTask,CoordinatedTask,AsyncExceptionHandler,ContainingTaskDetails {// ... existing code ...protected StreamTask(Environment environment,@Nullable TimerService timerService,Thread.UncaughtExceptionHandler uncaughtExceptionHandler,StreamTaskActionExecutor actionExecutor,TaskMailbox taskMailbox)throws Exception {this.environment = Preconditions.checkNotNull(environment);this.configuration = new StreamConfig(environment.getTaskConfiguration());this.recordWriter = createRecordWriterDelegate(configuration, environment);this.resourceCloser = new AutoCloseableRegistry();this.mailboxProcessor =new MailboxProcessor(this::processInput,taskMailbox,actionExecutor,resourceCloser,this::shouldBeTerminated,this::handleAsyncExceptionDuringNormalExecution); // ... existing code ...this.asyncOperationsThreadPool =MdcUtils.scopeToJob(getEnvironment().getJobID(),new ThreadPoolExecutor(0,configuration.getMaxConcurrentCheckpoints() + 1,60L,TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler))); // ... existing code ...this.stateBackend = createStateBackend();this.checkpointStorage = createCheckpointStorage(stateBackend); // ... existing code ...this.subtaskCheckpointCoordinator =new SubtaskCheckpointCoordinatorImpl(checkpointStorageAccess,getName(),actionExecutor,getAsyncOperationsThreadPool(),environment,this,this::prepareInputSnapshot,configuration.getMaxConcurrentCheckpoints(),channelStateWriter,configuration.getConfiguration().get(CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH),BarrierAlignmentUtil.createRegisterTimerCallback(mainMailboxExecutor, systemTimerService),environment.getTaskStateManager().getFileMergingSnapshotManager()); // ... existing code ... }
在構造函數中,
StreamTask
?會初始化運行環境、配置、狀態后端、Checkpoint 存儲和協調器等。 -
StreamTask 的執行入口?
invoke()
: 這是 TaskManager 啟動 Task 后調用的核心方法。// ... existing code ... @Override public final void invoke() throws Exception {SubTaskInitializationMetricsBuilder initializationMetrics =SubTaskInitializationMetricsBuilder.create(getEnvironment().getMetricGroup());final long initializationStarted = SystemClock.getInstance().absoluteTimeMillis();initializationMetrics.addTimestampMetric(INITIALIZATION_START_TIMESTAMP, initializationStarted);// 初始化 OperatorChain,這里會創建 StreamOperator 實例// RegularOperatorChain 或 FinishedOperatorChaintry {operatorChain =getEnvironment().getTaskStateManager().isTaskDeployedAsFinished()? new FinishedOperatorChain<>(this, recordWriter): new RegularOperatorChain<>(this, recordWriter);mainOperator = operatorChain.getMainOperator();getEnvironment().getTaskStateManager().getRestoreCheckpointId().ifPresent(restoreId -> latestReportCheckpointId = restoreId);// task specific initialization,調用子類實現的 init() 方法init();configuration.clearInitialConfigs();// save the work of reloading state, etc, if the task is already canceledensureNotCanceled();// -------- Invoke --------LOG.debug("Invoking {}", getName());// we need to make sure that any triggers scheduled in open() cannot be// executed before all operators are opened// 恢復狀態并打開算子CompletableFuture<Void> allGatesRecoveredFuture =actionExecutor.call(() -> restoreStateAndGates(initializationMetrics));// Run mailbox until all gates will be recovered.// 啟動郵箱處理循環,這是任務處理數據的主要邏輯mailboxProcessor.runMailboxLoop();// ... existing code ...// make sure this is executed in any case!LOG.debug("Finished task {}", getName());} finally {// ... cleanup ...actionExecutor.runThrowing(() -> {// only set the StreamTask to not running after all operators have been// finished! // ... existing code ...disableInterruptOnCancel();// ... existing code ...// clean up everything we initializedisRunning = false;// ... existing code ...try {resourceCloser.close();} catch (Throwable t) {Exception e = t instanceof Exception ? (Exception) t : new Exception(t);throw firstOrSuppressed(e, cancelException);}} }
在?
invoke()
?方法中:- 創建?
OperatorChain
?(RegularOperatorChain
?或?FinishedOperatorChain
),它包含了這個?StreamTask
?要執行的一個或多個?StreamOperator
。 - 調用?
init()
?方法進行特定于任務類型的初始化(例如,SourceOperatorStreamTask
?會在這里啟動 SourceReader)。 - 調用?
restoreStateAndGates()
,其中會調用?operatorChain.initializeStateAndOpenOperators()
。
- 創建?
-
Operator 的初始化和開:?
OperatorChain
?的?initializeStateAndOpenOperators
?方法會遍歷鏈上的所有算子,調用它們的?initializeState()
?和?open()
?方法。// ... existing code ...operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer(initializationMetrics));initializeStateEndTs = SystemClock.getInstance().absoluteTimeMillis(); // ... existing code ...
而?
createStreamTaskStateInitializer
?會創建一個?StreamTaskStateInitializerImpl
?實例,用于初始化算子的狀態。// ... existing code ... public StreamTaskStateInitializer createStreamTaskStateInitializer(SubTaskInitializationMetricsBuilder initializationMetrics) {InternalTimeServiceManager.Provider timerServiceProvider =configuration.getTimerServiceProvider(getUserCodeClassLoader());return new StreamTaskStateInitializerImpl(getEnvironment(),stateBackend, // ... existing code ...
-
UDF 的生命周期調用: 以?
AbstractUdfStreamOperator
?為例,它的?initializeState()
?和?open()
?方法會進一步調用 UDF 的相應方法。// ... existing code ... @Override public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);StreamingFunctionUtils.restoreFunctionState(context, userFunction); }@Override public void open() throws Exception {super.open();FunctionUtils.openFunction(userFunction, DefaultOpenContext.INSTANCE); }@Override public void finish() throws Exception {super.finish();if (userFunction instanceof SinkFunction) {((SinkFunction<?>) userFunction).finish();} }@Override public void close() throws Exception {super.close();FunctionUtils.closeFunction(userFunction); } // ... existing code ...
這里?
userFunction
?就是用戶定義的 UDF。FunctionUtils.openFunction
?會調用 UDF 的?open()
?方法,并傳入?RuntimeContext
。 -
數據處理循環:?
StreamTask
?的?mailboxProcessor.runMailboxLoop()
?啟動后,會不斷調用?processInput()
?方法(如果郵箱中有待處理的郵件或默認操作可以執行)。// ... existing code ... protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {DataInputStatus status = inputProcessor.processInput();switch (status) {case MORE_AVAILABLE:if (taskIsAvailable()) {return;}break;case NOTHING_AVAILABLE:break;case END_OF_RECOVERY:throw new IllegalStateException("We should not receive this event here."); // ... existing code ...
inputProcessor.processInput()
?會從輸入源讀取數據,并通過?OperatorChain
?將數據傳遞給第一個?StreamOperator
,然后數據會在算子鏈中依次處理,每個算子會調用其內部 UDF 的處理邏輯(如?map()
,?filter()
,?processElement()
)。
文檔中的相關描述
-
Task Lifecycle:?
/flink/docs/content/docs/internals/task_lifecycle.md
?描述了?StreamTask
?和?Operator
?的生命周期。The?
StreamTask
?is the base for all different task sub-types in Flink's streaming engine. ... OPERATOR::setup -> UDF::setRuntimeContext OPERATOR::initializeState OPERATOR::open -> UDF::open OPERATOR::processElement -> UDF::run -
Flink Architecture - Tasks and Operator Chains:?
flink/docs/content/docs/concepts/flink-architecture.md
For distributed execution, Flink?chains?operator subtasks together into?tasks. Each task is executed by one thread. Chaining operators together into tasks is a useful optimization...
通過以上分析,應該對 Flink 任務的啟動流程以及?Subtask
、StreamTask
、Operator
?和?UDF
?之間的關系有了更清晰的理解。
StreamTask
?是核心的執行單元,它承載了?Operator
,而?Operator
?又驅動著?UDF
?的執行。整個過程由 JobManager 調度,并在 TaskManager 上實際運行。