20. TaskExecutor與ResourceManager心跳
- 現在,需要回過頭看
ResourceManager
是如何產生心跳管理服務的。 - cluster.initializeServices 方法的 heartbeatServices = createHeartbeatServices(configuration);產生一個 HeartbeatServicesImpl
- jobmanager的 resourceManager啟動的時候,會啟動 startHeartbeatServices();
startHeartbeatServices
方法
- 該方法產生2個對象。1個taskManagerHeartbeatManager和1個jobManagerHeartbeatManager。這次主要看前面1個。
private void startHeartbeatServices() {taskManagerHeartbeatManager =heartbeatServices.createHeartbeatManagerSender(resourceId,new TaskManagerHeartbeatListener(),getMainThreadExecutor(),log);jobManagerHeartbeatManager =heartbeatServices.createHeartbeatManagerSender(resourceId,new JobManagerHeartbeatListener(),getMainThreadExecutor(),log);}
- taskManagerHeartbeatManager:負責監控所有已注冊的 TaskExecutor,是 ResourceManager 與 TaskExecutor 間心跳通信的核心組件。
- jobManagerHeartbeatManager:與 JobManager 保持心跳(此處暫不關注)。
HeartbeatManagerSenderImpl
HeartbeatManagerSenderImpl
是 ResourceManager 端的心跳發送管理器,具備以下特點:- 繼承 HeartbeatManagerImpl,是具體的心跳機制實現;
- 實現 Runnable 接口,自身是一個周期性執行的任務。
- 核心點:
- 周期性遍歷
getHeartbeatTargets()
(即所有注冊的 TaskExecutor); - 逐個向它們發送心跳請求(
requestHeartbeat
方法)。
- 周期性遍歷
HeartbeatManagerSenderImpl(long heartbeatPeriod,long heartbeatTimeout,int failedRpcRequestsUntilUnreachable,ResourceID ownResourceID,HeartbeatListener<I, O> heartbeatListener,ScheduledExecutor mainThreadExecutor,Logger log,HeartbeatMonitor.Factory<O> heartbeatMonitorFactory) {super(heartbeatTimeout,failedRpcRequestsUntilUnreachable,ownResourceID,heartbeatListener,mainThreadExecutor,log,heartbeatMonitorFactory);this.heartbeatPeriod = heartbeatPeriod;//這里表明周期調度mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);}@Overridepublic void run() {if (!stopped) {log.debug("Trigger heartbeat request.");for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {//這里就是循環將前面taskExecutor注冊心跳取出來,進行心跳requestHeartbeat(heartbeatMonitor);}//這里單線程周期調度getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);}}
requestHeartbeat 方法
//heartbeatMonitor封裝了一個taskexecutor網關。說白了就是heartbeatTarget就是調用 taskexecutor方法進行交互。
private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();heartbeatTarget//TaskExecutorHeartbeatSender 就是前面講的rpc調用.requestHeartbeat(getOwnResourceID(), payload).whenCompleteAsync(handleHeartbeatRpc(heartbeatMonitor.getHeartbeatTargetId()),getMainThreadExecutor());}
-
heartbeatTarget:
-
實際上是封裝的
TaskExecutorHeartbeatSender
; -
通過 RPC 接口向對應 TaskExecutor 發出心跳請求;
-
本質上是對 TaskExecutor 方法的遠程調用。
-
-
TaskExecutorHeartbeatSender 實質上是一個封裝了 TaskExecutor RPC 網關的對象,負責通過 RPC 調用向 TaskExecutor 發送心跳請求。推薦配合 Debug 調試理解整個心跳交互過程,有助于深入掌握 ResourceManager 與 TaskExecutor 之間的通信機制。
小結
-
ResourceManager 在啟動階段就為所有 TaskExecutor 準備好了心跳監控;
-
依靠單線程周期調度器,實現對所有 TaskExecutor 的心跳請求發送;
-
心跳本質是對 TaskExecutor RPC 方法的遠程調用;
-
如果某個 TaskExecutor 心跳超時或失敗,會觸發資源回收與故障恢復機制。