1、JobGraph (JobManager)
JobGraph 生成時,通過 ChainingStrategy
連接算子,最終在 Task 中生成 ChainedDriver
鏈表。
StreamingJobGraphGeneratorcreateJobGraph() 構建jobGrapch 包含 JobVertex setChaining() 構建算子鏈isChainable() 是否可以合并算子
-- 算子鏈沒有禁用
-- 下游算子非head
-- 并行度一致
-- 相同slot
-- partitioner ForwardcreateChain 遞歸構建JobVertexJobVertex jobVertex = jobVertexBuildContext.getJobVertex(startNodeId);if (jobVertex == null) {jobVertex =createJobVertex(chainInfo, serializationExecutor, jobVertexBuildContext);}
2、ExecutionGraph(JobManager)
ExecutionVertex
:代表一個并行子任務(即一個算子鏈的實例)
- 作用:將
JobVertex
拆解為并行子任務,每個子任務對應一個ExecutionVertex
。 - 核心對象:
ExecutionJobVertex
:與JobVertex
一一對應,管理并行實例。ExecutionVertex
:代表一個并行子任務(即一個算子鏈的實例)
3、物理執行計劃(TaskManager)
? 作用:將 ExecutionVertex
調度到 TaskManager 的 Slot 中運行。
OperatorChain
管理算子鏈的結構,負責算子的初始化、狀態管理和數據傳遞。
<pre>
first\main (multi-input) -> ... -> tail/
second
</pre>@Nullable protected final StreamOperatorWrapper<OUT, OP> mainOperatorWrapper;
@Nullable protected final StreamOperatorWrapper<?, ?> firstOperatorWrapper;
@Nullable protected final StreamOperatorWrapper<?, ?> tailOperatorWrapper;
StreamOperatorWrapper
包裝單個算子,維護算子間的鏈式關系,處理數據在算子間的流轉。
private StreamOperatorWrapper<?, ?> previous;private StreamOperatorWrapper<?, ?> next;
*StreamIterationHead/StreamIterationTail
* 迭代場景下的特殊任務,分別處理迭代的頭部(反饋輸入)和尾部(反饋輸出),通過阻塞隊列實現迭代數據循環
4、鏈化策略類型
ALWAYS
:默認策略,盡可能與上下游算子鏈化(如map
、filter
)。HEAD
:僅與下游鏈化,不與上游鏈化(如Source
算子)。NEVER
:獨立成 Task,不與任何算子鏈化
5、算子鏈執行流程與優化技術
- 鏈化過程
- 編譯階段:將滿足條件的算子合并為
OperatorChain
,生成一個Task
而非多個獨立 Task。 - 運行時:
ChainedDriver
在單線程內按拓撲順序執行鏈內算子,數據通過CopyingChainingOutput
(默認深拷貝)或BroadcastingOutput
(對象重用)傳遞。
- 編譯階段:將滿足條件的算子合并為
- 性能權衡
- 優點:減少線程數、網絡 IO,提升吞吐并降低延遲。
- 缺點:長鏈路可能阻塞,需通過
startNewChain()
或disableChaining()
手動拆分