TaskExecutor獲取ResourceManagerGateway
-
TaskExecutor
與ResourceManager
之間的交互機制較為復雜,核心可以拆分為三個階段:- 首次發現與注冊
- 連接建立
- 心跳維持
本文聚焦連接建立階段,詳細分析底層
RPC
連接的實現原理。
回顧:startRegistration
方法
在注冊過程中,TaskExecutor
會調用如下邏輯,通過 rpcService.connect
與 ResourceManager
建立遠程通信連接:
//其中,targetType 是 ResourceManagerGateway.class。重點關注 rpcService.connect 方法。
if (FencedRpcGateway.class.isAssignableFrom(targetType)) {rpcGatewayFuture = (CompletableFuture<G>)rpcService.connect(targetAddress,fencingToken,targetType.asSubclass(FencedRpcGateway.class));
}
PekkoRpcService
- Flink 內部的 RPC 框架由 Pekko(Akka) 支撐,
PekkoRpcService
就是基于 Pekko 實現的通信組件,負責不同組件之間的遠程通信。
@Override
public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(String address, F fencingToken, Class<C> clazz) {return connectInternal(address,clazz,(ActorRef actorRef) -> {Tuple2<String, String> addressHostname = extractAddressHostname(actorRef);return new FencedPekkoInvocationHandler<>( // 關鍵:創建 InvocationHandleraddressHostname.f0,addressHostname.f1,actorRef,configuration.getTimeout(),configuration.getMaximumFramesize(),configuration.isForceRpcInvocationSerialization(),null,() -> fencingToken, // 支持 fencingToken 防止舊節點通信captureAskCallstacks,flinkClassLoader);});
}
核心方法:connectInternal
connectInternal
方法的任務是:
- 通過目標組件的 RPC 地址,獲取 ActorRef(類似 NIO 中的 selector-channel);
- 與目標 Actor(如 ResourceManager)完成一次握手校驗;
- 基于
InvocationHandler
生成遠程組件的代理對象(Gateway),用于后續透明 RPC 調用。
private <C extends RpcGateway> CompletableFuture<C> connectInternal(String address,Class<C> clazz,Function<ActorRef, InvocationHandler> invocationHandlerFactory) {final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =actorRefFuture.thenCompose((ActorRef actorRef) -> Patterns.ask(actorRef,new RemoteHandshakeMessage(clazz, getVersion()), // 發送握手請求configuration.getTimeout().toMillis()).mapTo(ClassTag$.MODULE$.apply(HandshakeSuccessMessage.class)));final CompletableFuture<C> gatewayFuture =actorRefFuture.thenCombineAsync(handshakeFuture,(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);@SuppressWarnings("unchecked")C proxy = (C) Proxy.newProxyInstance(getClass().getClassLoader(),new Class<?>[] {clazz},invocationHandler);return proxy; // 返回 ResourceManagerGateway 動態代理},actorSystem.dispatcher());return guardCompletionWithContextClassLoader(gatewayFuture, flinkClassLoader);
}
獲取 ActorRef:resolveActorAddress
private CompletableFuture<ActorRef> resolveActorAddress(String address) {final ActorSelection actorSel = actorSystem.actorSelection(address);return actorSel.resolveOne(configuration.getTimeout()).toCompletableFuture().exceptionally(error -> {throw new CompletionException(new RpcConnectionException(String.format("Could not connect to rpc endpoint under address %s.", address),error));});
}
- 根據
address
定位到目標組件的 Actor(類似根據地址尋找遠程服務端點)。 - 異步獲取目標組件的
ActorRef
,這是后續所有遠程消息傳遞的核心通信對象。 - 如果解析失敗,立即包裝為
RpcConnectionException
拋出,阻斷注冊鏈路。 - 特別注意:
該方法返回的CompletableFuture<ActorRef>
是由Akka
內部線程異步完成的(即依賴ActorSystem
自身的調度機制)。
因此,無需顯式調用executor
來管理異步邏輯,整個鏈式流程天然是異步的,并由Akka
自身的事件機制驅動完成。
這也是Akka
(Pekko
)模型的設計優勢:
? 組件間通信與任務調度完全解耦,基于 ActorRef 的消息傳遞自動異步非阻塞。
RemoteHandshakeMessage:初次握手階段
-
作用:
在建立正式通信前,TaskExecutor
必須先與ResourceManager
進行協議層握手,確保:- 版本一致;
- 所請求的網關類型(如 ResourceManagerGateway)是對方支持的。
-
源碼
private void handleHandshakeMessage(RemoteHandshakeMessage handshakeMessage) {if (!isCompatibleVersion(handshakeMessage.getVersion())) {sendErrorIfSender(new HandshakeException(String.format("Version mismatch between source (%s) and target (%s) rpc component. Please verify that all components have the same version.",handshakeMessage.getVersion(), getVersion())));} else if (!isGatewaySupported(handshakeMessage.getRpcGateway())) {sendErrorIfSender(new HandshakeException(String.format("The rpc endpoint does not support the gateway %s.",handshakeMessage.getRpcGateway().getSimpleName())));} else {//告訴taskExecutor,可以連接getSender().tell(new Status.Success(HandshakeSuccessMessage.INSTANCE), getSelf());}}
握手處理邏輯:PekkoRpcActor.handleHandshakeMessage()
-
版本校驗
- 如果通信雙方的 Flink 版本不一致(可能是跨版本集群或配置錯誤),直接拒絕握手并返回錯誤信息。
-
接口類型校驗
- 檢查請求方希望通信的 Gateway 接口(即 ResourceManagerGateway)是否被當前端點支持。
- 不支持的網關說明連接請求本質錯誤,拒絕握手。
-
握手成功
-
前兩步校驗都通過,表明可以安全建立通信。
-
此時向對方返回:
java復制編輯 getSender().tell(new Status.Success(HandshakeSuccessMessage.INSTANCE), getSelf());
即告訴發起方(如 TaskExecutor):可以正式建立連接。
-
actorRefFuture.thenCombineAsync
:構建 ResourceManagerGateWay代理
核心目的:
- 根據 ResourceManager 的 ActorRef 構建其 RPC 代理(即 ResourceManagerGateway 的動態代理對象)
源碼解析
final CompletableFuture<C> gatewayFuture =actorRefFuture.thenCombineAsync(handshakeFuture,(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {InvocationHandler invocationHandler =invocationHandlerFactory.apply(actorRef);ClassLoader classLoader = getClass().getClassLoader();@SuppressWarnings("unchecked")C proxy = (C)Proxy.newProxyInstance(classLoader,new Class<?>[] {clazz},invocationHandler);return proxy;},actorSystem.dispatcher());
整體過程
- 等待 actorRefFuture 和 handshakeFuture 都完成:
- actorRefFuture:已經獲取了 ResourceManager 的 ActorRef。
- handshakeFuture:握手成功,確認可以通信。
- 生成 InvocationHandler:
- 實際是封裝了 actorRef 和通信參數的一個代理調用處理器。
- 后續所有發往 ResourceManager 的方法調用,都會被轉化成消息,通過這個 handler 發送到 actorRef 對應的遠程組件。
- 構建代理對象:
- 使用 JDK 動態代理(
Proxy.newProxyInstance
)創建了一個ResourceManagerGateway 的動態代理。 - 對用戶代碼來說,這個代理就是一個普通的 ResourceManagerGateway,只是內部通過 actorRef 做遠程消息發送。
- 使用 JDK 動態代理(
- 返回代理對象(proxy):
- proxy 就是一個“可直接遠程調用 ResourceManager”的接口對象。
總結
- TaskExecutor 已獲取 ResourceManager 的代理網關(即
ResourceManagerGateway
代理對象); - 該代理對象封裝了與 ResourceManager 通信所需的 actorRef 和 RPC 協議細節;
- TaskExecutor 接下來只需要通過該網關對象,正式發起注冊請求。
這一階段的核心工作是:
- 建立連接(即通過
rpcService.connect
拿到 ResourceManager 的 actorRef 并創建代理網關); - 完成握手(確保版本兼容和接口匹配);
- 生成代理(通過動態代理對外提供 ResourceManagerGateway 接口)。
下一步就是:
- TaskExecutor 通過該網關對象向 ResourceManager 發起注冊;
- ResourceManager 受理注冊請求;
- 建立心跳與 slot 匯報等穩定的會話機制。