Flink on YARN 模式啟動流程及核心組件協作詳解
整個過程分為三個主要階段:
??JobManager 啟動??(作業提交與 AM 初始化)
??TaskManager 資源分配與啟動??
??任務部署與執行??
第一階段:作業提交與 JobManager (AM) 啟動
??目標??:在 YARN 集群上啟動 Flink 的"大腦"——JobManager
組件協作流程
1. 客戶端 (Client) -> YARN ResourceManager (RM)
??生成 JobGraph??
開發環境通過 Flink 命令行/代碼提交作業時,客戶端分析代碼生成
JobGraph
數據結構。該靜態邏輯視圖描述算子(Source/Map/Sink等)及其數據流向。??申請 ApplicationMaster (AM)??
客戶端連接 YARN RM 請求啟動 Flink AM。Per-Job 模式下該 AM 是 ??JobManager + Flink ResourceManager?? 的組合體。客戶端需上傳:
JobGraph
Flink 框架 Jar 包
用戶代碼 Jar 包
(資源存儲于 HDFS 等共享存儲供 AM 訪問)
2. YARN RM -> YARN NodeManager (NM)
??分配容器 (Container)??
RM 選擇計算節點(NodeManager)并分配資源單位 Container(獨立 JVM 進程環境)
3. YARN NM -> Flink JobManager (AM)
??啟動 AM??
NodeManager 通過
launch_container.sh
啟動 ApplicationMaster,入口點為YarnJobClusterEntrypoint
,關鍵操作包括:??環境初始化??
打印 Flink 版本/JVM/OS 信息
加載
flink-conf.yaml
配置
??核心服務啟動??
RpcService
:組件間 RPC 通信HAService
:JobManager 高可用BlobServer
:傳輸作業 Jar 等大二進制對象HeartbeatServices
:心跳檢測
??Flink 資源管理器啟動??
YarnResourceManager
:與外部 YARN RM 通信(申請/釋放 Container)SlotManager
:管理 TaskManager 提供的 Slot 資源狀態
??作業執行核心啟動??
Dispatcher
:接收 JobGraph 并啟動對應 JobManagerJobManager
:將 JobGraph 轉換為包含并行度/任務鏈/資源分配細節的ExecutionGraph
第二階段:TaskManager 資源分配與啟動
??目標??:根據作業需求啟動計算"工人" TaskManager
組件協作流程
1. JobManager -> Flink ResourceManager
??申請 Slot??
JobManager 分析 ExecutionGraph 后向 Flink ResourceManager 申請所需計算資源(Slot)
2. Flink ResourceManager -> YARN RM
??資源檢查與申請??
SlotManager
先檢查現有空閑 Slot不足時由
YarnResourceManager
向 YARN RM 申請新 Container
3. YARN RM -> YARN NM -> Flink TaskManager
??容器分配與進程啟動??
YARN RM 在其他 NodeManager 分配新 Container
Flink ResourceManager 指示啟動
TaskManager
(入口類YarnTaskExecutorRunner
)
??TaskManager 初始化??
啟動核心組件
TaskExecutor
(實際任務執行單元)每個 TaskExecutor 擁有與機器 CPU 核數匹配的 Slot(獨立任務線程池)
4. TaskManager -> Flink ResourceManager & JobManager
??資源注冊流程??
RPC 向 Flink ResourceManager 注冊自身
匯報可用 Slot 數量及狀態(SlotManager 掌握新資源)
與 JobManager 建立心跳連接(健康監控)
第三階段:任務部署與執行
??目標??:將計算任務分發至 TaskManager 并啟動持續計算
組件協作流程
1. Flink ResourceManager -> JobManager
??Slot 分配??
SlotManager 將可用 Slot 分配給等待的 JobManager 請求,并通知目標位置(某 TaskManager 的指定 Slot)
2. JobManager -> TaskManager
??任務提交??
JobManager 通過 RPC 調用 TaskManager 的
submitTask
接口,傳輸包含:任務執行邏輯
配置參數
依賴信息
3. TaskManager 內部執行
??任務線程生命周期??
啟動專用線程執行任務
調用核心方法
AbstractInvokable.invoke()
,按順序執行:??初始化 (initialize)??
創建狀態后端
恢復 Checkpoint 狀態
調用
RichFunction.open()
(如建立數據庫連接)
??運行 (run)??
Source Task:循環讀取 Kafka/Pulsar 等數據并下發下游
Stream Task(中間算子):處理上游數據并轉發下游
Sink Task:將數據寫入外部系統(數據庫/文件系統)
??關閉 (close)??
清理資源連接
調用
RichFunction.close()
至此數據開始在 TaskManager 間流動,Flink 作業進入持續計算狀態
TM向JM連接的源碼分析
這個過程由 ResourceManager (RM) 作為中間協調者。是 TaskManager (TM) 在收到 ResourceManager 的指令后,主動向 JobManager (JM) 發起連接和注冊。?而不是 JM 主動去發現并連接 TM。
下面我們結合源碼和 Flink 的工作機制來詳細分解這個流程:
連接建立的詳細步驟
TM 啟動并向 RM 注冊?當一個 TM 節點啟動時,它的首要任務是找到并向 ResourceManager (RM) 注冊自己。這相當于 TM 在向集群的資源管理者報告:“我上線了,我有這些資源(Slots)可供使用”。
在?
TaskExecutor.java
?的?onStart()
?方法中,我們可以看到這個過程的起點:// ... existing code ... @Override public void onStart() throws Exception {try {startTaskExecutorServices();} catch (Throwable t) { // ... existing code ...}startRegistrationTimeout(); }private void startTaskExecutorServices() throws Exception {try {// start by connecting to the ResourceManagerresourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());// tell the task slot table who's responsible for the task slot actionstaskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());// start the job leader servicejobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl()); // ... existing code ...
這里的?
resourceManagerLeaderRetriever.start(...)
?就是 TM 尋找 RM 的過程。一旦找到 RM,TM 就會將自己的位置信息、資源ID和可用的 Slot 信息注冊到 RM。JM 向 RM 請求資源?當用戶提交一個 Job 到 JM 后,JM 會為這個 Job 的所有 Task 向 RM 請求計算資源(Slots)。
RM 分配資源并通知 TM?RM 收到 JM 的請求后,會從它管理的、已經注冊的 TM 中尋找符合要求的空閑 Slot。找到后,RM 會將這個 Slot 分配給 JM。同時,RM 會向持有該 Slot 的 TM 發送一個指令(例如?
offerSlots
?RPC 調用),告訴它:“你的某個 Slot 現在被分配給了 JobID 為 X 的 Job,這個 Job 的 JobManager 在 Y 地址”。TM 主動連接并注冊到 JM?這是最關鍵的一步。TM 在收到 RM 的指令后,才得知它需要為哪個 JM 工作以及 JM 的地址。這時,TM 內部的?
JobLeaderService
?會為這個特定的?jobId
?啟動一個到目標 JM 的連接和注冊過程。DefaultJobLeaderService.java
?中的代碼片段展示了 TM 是如何準備注冊信息的:// ... existing code ... protected RetryingRegistration<JobMasterId,JobMasterGateway,JMTMRegistrationSuccess,JMTMRegistrationRejection>generateRegistration() {return new DefaultJobLeaderService.JobManagerRetryingRegistration(LOG,rpcService,"JobManager",JobMasterGateway.class,getTargetAddress(),getTargetLeaderId(),retryingRegistrationConfiguration,jobId,TaskManagerRegistrationInformation.create(ownerAddress, ownLocation, taskManagerSession)); } // ... existing code ...
TM 會構建一個?
TaskManagerRegistrationInformation
?對象,然后通過 RPC 調用 JM 的?registerTaskManager
?方法。當注冊成功后,TM 端的?
JobLeaderListenerImpl
?會被回調,執行后續的連接建立工作。// ... existing code ... private final class JobLeaderListenerImpl implements JobLeaderListener {@Overridepublic void jobManagerGainedLeadership(final JobID jobId,final JobMasterGateway jobManagerGateway,final JMTMRegistrationSuccess registrationMessage) {runAsync(() ->jobTable.getJob(jobId).ifPresent(job ->establishJobManagerConnection(job,jobManagerGateway,registrationMessage)));} // ... existing code ...
jobManagerGainedLeadership
?這個名字意味著 TM 已經成功地與 JM 領導者建立了聯系。JMTMRegistrationSuccess
?這個參數就是注冊成功的憑證。之后?establishJobManagerConnection
?方法會建立心跳監控等。JM 向 TM 提交任務?一旦 TM 在 JM 處注冊成功,它們之間的連接就完全建立好了。此時,JM 就可以通過這個連接向 TM 發送?
submitTask
?請求,將具體的計算任務部署到 TM 的 Slot 中去執行了。// ... existing code ... @Override public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Duration timeout) {final JobID jobId = tdd.getJobId();// todo: consider adding task infotry (MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) {final ExecutionAttemptID executionAttemptID = tdd.getExecutionAttemptId();final JobTable.Connection jobManagerConnection =jobTable.getConnection(jobId).orElseThrow(() -> {final String message ="Could not submit task because there is no JobManager "+ "associated for the job "+ jobId+ '.';log.debug(message); // ... existing code ...
submitTask
?方法首先會檢查與對應?jobId
?的 JM 連接是否存在 (jobTable.getConnection(jobId)
), 這也反向證明了連接必須預先建立好。
總結
- 啟動順序:TM 和 JM 都可以獨立啟動。
- 連接發起方:TM 是主動方。
- 協調者:ResourceManager?在整個過程中扮演了“介紹人”的角色。它告訴 TM 應該去連接哪個 JM。
所以,整個流程可以概括為:TM 向 RM "報到",JM 向 RM "要人",RM "指派" TM 去為 JM 工作,最后 TM 主動去向 JM "報到"(注冊)。
Flink 與 YARN 集成
即使使用了 YARN,Flink 自己的 ResourceManager (RM) 依然是必需的,但它的角色和工作方式會發生根本性的變化。
為什么使用 YARN 還需要 Flink 的 ResourceManager?
這是因為 Flink 的 RM 和 YARN 的 RM 處在不同的管理層級,職責也不同:
YARN ResourceManager:這是整個 Hadoop 集群的全局資源管理器。它管理集群中所有節點(NodeManager)的 CPU、內存等資源。它的任務是為各種應用(如 Flink、Spark、MapReduce)分配資源單元,這個單元叫做“容器”(Container)。YARN RM 并不知道 Flink 的“Slot”是什么,它只負責分配和調度容器。
Flink ResourceManager:這是 Flink 集群專屬的資源管理器。當以 Flink on YARN 模式運行時,它會作為?ApplicationMaster (AM)?在 YARN 分配的第一個容器中啟動。它的核心職責是:
- 作為橋梁:代表 Flink 應用,向 YARN RM?申請/釋放容器。
- 管理 Flink 資源:當從 YARN RM 獲取到容器后,它會在這些容器中啟動 Flink 的 TaskManager (TM) 進程。
- 維護 Slot:管理所有 TM 上的 Slot 狀態,并將可用的 Slot 信息提供給 JobManager (JM),用于執行任務。
所以,你可以這樣理解它們的關系:Flink RM 向 YARN RM “要地”(申請容器),然后在這些“地”上“蓋房子”(啟動 TM),并管理這些“房子”里的“房間”(Slots)。
ResourceManager.java
?文件正是 Flink RM 的核心抽象類定義,這證明了 Flink RM 是 Flink 運行時的一個獨立且關鍵的組件。
// ... existing code ...
public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>extends FencedRpcEndpoint<ResourceManagerId>implements DelegationTokenManager.Listener, ResourceManagerGateway {
// ... existing code ...
引入 YARN 到底發生了什么變化?
引入 YARN 最大的變化是 Flink 實現了資源管理的動態化和自動化。
- Standalone 模式(沒有 YARN):你需要手動啟動固定數量的 TaskManager。資源是靜態的,有多少 TM 和 Slot 在集群啟動時就確定了,不會自動增減。
- YARN 模式:
- 動態資源分配:Flink RM 可以根據作業的實際需求(例如,作業并行度增加了),動態地向 YARN RM 申請更多的容器來啟動新的 TM。當資源空閑時,也可以將容器釋放回 YARN,供其他應用使用。這大大提高了集群的資源利用率。
- 進程管理外包:TM 的生命周期由 Flink RM 和 YARN NodeManager 管理。如果一個 TM 掛了,Flink RM 會向 YARN RM 重新申請一個容器來替換它。如果 Flink RM (ApplicationMaster) 掛了,YARN RM 會負責重啟它,從而實現了一定程度的自動容錯。
在?flink-yarn
?模塊中,YarnResourceManagerFactory
?就是專門用于在 YARN 環境下創建 Flink RM 實例的工廠類,它創建的 RM 具備了與 YARN 交互的能力。
// ... existing code ...
public class YarnResourceManagerFactory extends ActiveResourceManagerFactory<YarnWorkerNode> {private static final YarnResourceManagerFactory INSTANCE = new YarnResourceManagerFactory();private YarnResourceManagerFactory() {}// ... existing code ...
Flink 是如何與 YARN 集成的?
Flink?沒有?復制 YARN 的源碼。它是通過標準的?YARN 客戶端 API?來與 YARN 集群進行通信的,這是一種標準的、解耦的集成方式。
整個集成過程大致如下:
- 你在客戶端執行?
flink run -t yarn-session ...
?或?flink run-application ...
?命令。 - Flink 的 YARN 客戶端會將 Flink 的 Jar 包(包括 ApplicationMaster 的代碼)和作業 Jar 包上傳到 HDFS。
- YARN 客戶端向 YARN RM 提交一個應用創建請求。
- YARN RM 收到請求后,會在某個 NodeManager 上分配一個容器,并在這個容器里啟動 Flink 的 ApplicationMaster(AM)進程(里面就運行著 Flink RM)。
- 啟動后的 Flink AM (Flink RM) 會使用 YARN 提供的?
AMRMClientAsync
?客戶端與 YARN RM 建立心跳和通信,根據需要申請更多的容器來運行 TaskManager。 - 當 YARN RM 批準請求并分配了新的容器后,Flink AM 會使用?
NMClientAsync
?客戶端與對應的 NodeManager 通信,在新的容器中啟動 TaskManager 進程。
YarnResourceManagerClientFactory.java
?這個文件就明確展示了 Flink 是如何創建 YARN RM 客戶端的,其中?AMRMClientAsync
?就是 YARN 提供的標準客戶端接口。
// ... existing code ...
public interface YarnResourceManagerClientFactory {/*** Create YARN ResourceManager client with the given callback handler.** @param yarnHeartbeatIntervalMillis heartbeat interval between the client and the YARN* ResourceManager.* @param callbackHandler which handles the events from YARN ResourceManager.* @return an {@link AMRMClientAsync} instance.*/AMRMClientAsync<AMRMClient.ContainerRequest> createResourceManagerClient(int yarnHeartbeatIntervalMillis,AMRMClientAsync.AbstractCallbackHandler callbackHandler);
}
這種方式使得 Flink 可以很好地兼容不同版本的 Hadoop YARN,而不需要關心 YARN 的內部實現細節。
Standalone和Yarn
TM 進程的來源以及?RM 的角色:
- Standalone 模式(原始模式):TM 是由用戶手動啟動的。RM 在這里扮演一個被動的“登記中心”角色,它只是等待 TM 來注冊并上報資源。
- YARN 模式:TM 是由 Flink RM?通過 YARN 動態申請并創建的。RM 在這里扮演一個主動的“資源調度者”角色,它負責向 YARN 申請資源并拉起 TM。
下面我們深入源碼來看這兩種模式的具體流程。
Standalone 模式:被動等待與注冊
在這種模式下,Flink RM 完全不關心 TM 進程是怎么來的。
TM 的產生:?由運維人員或啟動腳本在集群的各個節點上,手動執行?bin/taskmanager.sh start
?來啟動一個或多個 TaskManager 進程。
RM 如何知道 Slot:
TM 主動注冊:每個手動啟動的 TM,在初始化后會根據?
flink-conf.yaml
?中的?jobmanager.rpc.address
?(在 Flink 1.x 中,現在是?rest.address
) 找到 RM 的地址,然后調用 RM 的?registerTaskExecutor
?RPC 方法來注冊自己。上報 SlotReport:在注冊請求中,TM 會附帶一個?
SlotReport
?對象。這個對象詳細描述了該 TM 擁有多少個 Slot,以及每個 Slot 的資源規格(ResourceProfile)。RM 登記資源:RM 的?
registerTaskExecutor
?方法接收到請求后,會做幾件事:- 驗證這個 TM 是否合法。
- 將這個 TM 的連接信息(
TaskExecutorGateway
)和其 Worker 信息記錄在?taskExecutors
?這個 Map 中。 - 最關鍵的,它會將?
SlotReport
?交給內部的?SlotManager
?組件。 SlotManager
?會解析?SlotReport
,將這些新的 Slot 加入到自己的可用資源池中,等待 JobManager 來申請。
StandaloneResourceManager.java
?是此模式下的具體實現。你會發現它的?initialize
?方法非常簡單,因為它不需要做任何主動申請資源的操作,只是啟動內部服務,然后“坐等”TM 來連接。
// ... existing code ...@Overrideprotected void initialize() throws ResourceManagerException {// 啟動一個啟動周期,在這個周期內,它會等待TMs注冊。// 它不會主動做任何事情來創建TM。startStartupPeriod();}@Overrideprotected void terminate() {// noop}@Overrideprotected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nullable String diagnostics) {}
// ... existing code ...
YARN 模式:主動申請與創建
在 YARN 模式下,情況完全反轉,RM 變得非常“主動”。這里的 RM 是?ActiveResourceManager
?的一個實例。
TM 的產生:
JM 申請 Slot:一個 Job 提交后,JobManager 會根據作業的并行度向 Flink RM 申請所需數量的 Slot。
SlotManager 發現資源不足:Flink RM 內部的?
SlotManager
?收到請求,發現當前已注冊的 TM 上的空閑 Slot 不足以滿足需求。RM 主動申請資源:
SlotManager
?會通過?ResourceEventListener
?回調通知?ActiveResourceManager
:“我需要更多資源!”。ActiveResourceManager
?不會等待,而是會立即行動。它通過一個叫做?ResourceManagerDriver
?的組件(在 YARN 模式下,具體實現是?YarnResourceManagerDriver
)來和外部資源系統(YARN)交互。它會調用?resourceManagerDriver.requestResource(...)
?方法。YARN 分配容器:
YarnResourceManagerDriver
?會使用 YARN 的客戶端 API 向 YARN RM 發送一個容器申請。YARN RM 會在某個 NodeManager 上分配一個容器。RM 在容器中啟動 TM:當?
YarnResourceManagerDriver
?收到 YARN RM 分配容器的通知后,它會連接到對應的 NodeManager,并命令它在剛剛分配的容器中執行一個啟動命令,這個命令會拉起一個?TaskExecutor
?進程。
ActiveResourceManager.java
?是實現這一邏輯的核心。它的?initialize
?方法會啟動?ResourceManagerDriver
,為與外部系統通信做好準備。
// ... existing code ...@Overrideprotected void initialize() throws ResourceManagerException {try {// 初始化 ResourceManagerDriver,它封裝了與YARN/K8s等外部系統交互的邏輯resourceManagerDriver.initialize(this,new GatewayMainThreadExecutor(),ioExecutor,blocklistHandler::getAllBlockedNodeIds);} catch (Exception e) {throw new ResourceManagerException("Cannot initialize resource provider.", e);}}
// ... existing code ...
當需要新 worker 時,ActiveResourceManager
?會調用?startNewWorker
,最終會調用到?resourceManagerDriver.requestResource
。
RM 如何知道 Slot:?這一步和 Standalone 模式完全一樣。 一旦 TM 進程被 YARN 的 NodeManager 在容器中啟動,這個 TM 就會執行和 Standalone 模式下完全相同的初始化邏輯:找到 Flink RM (此時是 ApplicationMaster),然后調用?registerTaskExecutor
?RPC 方法,并上報自己的?SlotReport
。
總結
特性 | Standalone 模式 | YARN 模式 |
---|---|---|
RM 角色 | 被動 (Passive) | 主動 (Active) |
TM 來源 | 用戶手動啟動 | Flink RM 通過 YARN 申請并創建 |
資源彈性 | 靜態,資源固定 | 動態,按需伸縮 |
核心實現 | StandaloneResourceManager | ActiveResourceManager + YarnResourceManagerDriver |
所以,引入 YARN 的本質變化是將資源管理的職責從“用戶手動操作”轉移到了“程序自動協調”。Flink RM 從一個簡單的“登記員”升級為了一個聰明的“項目經理”,能夠主動地向 YARN 這個“資源供應商”申請和釋放資源,實現了整個集群的彈性伸縮。