18.TaskExecutor獲取ResourceManagerGateway

TaskExecutor獲取ResourceManagerGateway

  • TaskExecutorResourceManager 之間的交互機制較為復雜,核心可以拆分為三個階段:

    • 首次發現與注冊
    • 連接建立
    • 心跳維持

    本文聚焦連接建立階段,詳細分析底層 RPC 連接的實現原理。

回顧:startRegistration方法

在注冊過程中,TaskExecutor 會調用如下邏輯,通過 rpcService.connectResourceManager 建立遠程通信連接:

//其中,targetType 是 ResourceManagerGateway.class。重點關注 rpcService.connect 方法。
if (FencedRpcGateway.class.isAssignableFrom(targetType)) {rpcGatewayFuture = (CompletableFuture<G>)rpcService.connect(targetAddress,fencingToken,targetType.asSubclass(FencedRpcGateway.class));
}

PekkoRpcService

  • Flink 內部的 RPC 框架由 Pekko(Akka) 支撐,PekkoRpcService 就是基于 Pekko 實現的通信組件,負責不同組件之間的遠程通信。
@Override
public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(String address, F fencingToken, Class<C> clazz) {return connectInternal(address,clazz,(ActorRef actorRef) -> {Tuple2<String, String> addressHostname = extractAddressHostname(actorRef);return new FencedPekkoInvocationHandler<>(   // 關鍵:創建 InvocationHandleraddressHostname.f0,addressHostname.f1,actorRef,configuration.getTimeout(),configuration.getMaximumFramesize(),configuration.isForceRpcInvocationSerialization(),null,() -> fencingToken,  // 支持 fencingToken 防止舊節點通信captureAskCallstacks,flinkClassLoader);});
}

核心方法:connectInternal

connectInternal 方法的任務是:

  • 通過目標組件的 RPC 地址,獲取 ActorRef(類似 NIO 中的 selector-channel);
  • 與目標 Actor(如 ResourceManager)完成一次握手校驗;
  • 基于 InvocationHandler 生成遠程組件的代理對象(Gateway),用于后續透明 RPC 調用。
private <C extends RpcGateway> CompletableFuture<C> connectInternal(String address,Class<C> clazz,Function<ActorRef, InvocationHandler> invocationHandlerFactory) {final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =actorRefFuture.thenCompose((ActorRef actorRef) -> Patterns.ask(actorRef,new RemoteHandshakeMessage(clazz, getVersion()),   // 發送握手請求configuration.getTimeout().toMillis()).mapTo(ClassTag$.MODULE$.apply(HandshakeSuccessMessage.class)));final CompletableFuture<C> gatewayFuture =actorRefFuture.thenCombineAsync(handshakeFuture,(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);@SuppressWarnings("unchecked")C proxy = (C) Proxy.newProxyInstance(getClass().getClassLoader(),new Class<?>[] {clazz},invocationHandler);return proxy;   // 返回 ResourceManagerGateway 動態代理},actorSystem.dispatcher());return guardCompletionWithContextClassLoader(gatewayFuture, flinkClassLoader);
}

獲取 ActorRef:resolveActorAddress

private CompletableFuture<ActorRef> resolveActorAddress(String address) {final ActorSelection actorSel = actorSystem.actorSelection(address);return actorSel.resolveOne(configuration.getTimeout()).toCompletableFuture().exceptionally(error -> {throw new CompletionException(new RpcConnectionException(String.format("Could not connect to rpc endpoint under address %s.", address),error));});
}
  • 根據 address 定位到目標組件的 Actor(類似根據地址尋找遠程服務端點)。
  • 異步獲取目標組件的 ActorRef,這是后續所有遠程消息傳遞的核心通信對象。
  • 如果解析失敗,立即包裝為 RpcConnectionException 拋出,阻斷注冊鏈路。
  • 特別注意
    該方法返回的 CompletableFuture<ActorRef> 是由 Akka 內部線程異步完成的(即依賴 ActorSystem 自身的調度機制)。
    因此,無需顯式調用 executor 來管理異步邏輯,整個鏈式流程天然是異步的,并由 Akka 自身的事件機制驅動完成。
    這也是 AkkaPekko)模型的設計優勢:
    ? 組件間通信與任務調度完全解耦,基于 ActorRef 的消息傳遞自動異步非阻塞。

RemoteHandshakeMessage:初次握手階段

  • 作用:
    在建立正式通信前,TaskExecutor 必須先與 ResourceManager 進行協議層握手,確保:

    • 版本一致;
    • 所請求的網關類型(如 ResourceManagerGateway)是對方支持的。
  • 源碼

 private void handleHandshakeMessage(RemoteHandshakeMessage handshakeMessage) {if (!isCompatibleVersion(handshakeMessage.getVersion())) {sendErrorIfSender(new HandshakeException(String.format("Version mismatch between source (%s) and target (%s) rpc component. Please verify that all components have the same version.",handshakeMessage.getVersion(), getVersion())));} else if (!isGatewaySupported(handshakeMessage.getRpcGateway())) {sendErrorIfSender(new HandshakeException(String.format("The rpc endpoint does not support the gateway %s.",handshakeMessage.getRpcGateway().getSimpleName())));} else {//告訴taskExecutor,可以連接getSender().tell(new Status.Success(HandshakeSuccessMessage.INSTANCE), getSelf());}}
握手處理邏輯:PekkoRpcActor.handleHandshakeMessage()
  1. 版本校驗

    • 如果通信雙方的 Flink 版本不一致(可能是跨版本集群或配置錯誤),直接拒絕握手并返回錯誤信息。
  2. 接口類型校驗

    • 檢查請求方希望通信的 Gateway 接口(即 ResourceManagerGateway)是否被當前端點支持。
    • 不支持的網關說明連接請求本質錯誤,拒絕握手。
  3. 握手成功

    • 前兩步校驗都通過,表明可以安全建立通信。

    • 此時向對方返回:

      java復制編輯
      getSender().tell(new Status.Success(HandshakeSuccessMessage.INSTANCE), getSelf());
      

      即告訴發起方(如 TaskExecutor):可以正式建立連接。

actorRefFuture.thenCombineAsync:構建 ResourceManagerGateWay代理

核心目的:
  • 根據 ResourceManager 的 ActorRef 構建其 RPC 代理(即 ResourceManagerGateway 的動態代理對象)
源碼解析
final CompletableFuture<C> gatewayFuture =actorRefFuture.thenCombineAsync(handshakeFuture,(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {InvocationHandler invocationHandler =invocationHandlerFactory.apply(actorRef);ClassLoader classLoader = getClass().getClassLoader();@SuppressWarnings("unchecked")C proxy = (C)Proxy.newProxyInstance(classLoader,new Class<?>[] {clazz},invocationHandler);return proxy;},actorSystem.dispatcher());
整體過程
  1. 等待 actorRefFuture 和 handshakeFuture 都完成:
    • actorRefFuture:已經獲取了 ResourceManager 的 ActorRef。
    • handshakeFuture:握手成功,確認可以通信。
  2. 生成 InvocationHandler:
    • 實際是封裝了 actorRef 和通信參數的一個代理調用處理器。
    • 后續所有發往 ResourceManager 的方法調用,都會被轉化成消息,通過這個 handler 發送到 actorRef 對應的遠程組件。
  3. 構建代理對象:
    • 使用 JDK 動態代理(Proxy.newProxyInstance)創建了一個ResourceManagerGateway 的動態代理
    • 對用戶代碼來說,這個代理就是一個普通的 ResourceManagerGateway,只是內部通過 actorRef 做遠程消息發送。
  4. 返回代理對象(proxy):
    • proxy 就是一個“可直接遠程調用 ResourceManager”的接口對象。

總結

  • TaskExecutor 已獲取 ResourceManager 的代理網關(即 ResourceManagerGateway 代理對象);
  • 該代理對象封裝了與 ResourceManager 通信所需的 actorRef 和 RPC 協議細節;
  • TaskExecutor 接下來只需要通過該網關對象,正式發起注冊請求

這一階段的核心工作是:

  • 建立連接(即通過 rpcService.connect 拿到 ResourceManager 的 actorRef 并創建代理網關);
  • 完成握手(確保版本兼容和接口匹配);
  • 生成代理(通過動態代理對外提供 ResourceManagerGateway 接口)。

下一步就是:

  • TaskExecutor 通過該網關對象向 ResourceManager 發起注冊;
  • ResourceManager 受理注冊請求;
  • 建立心跳與 slot 匯報等穩定的會話機制。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/90286.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/90286.shtml
英文地址,請注明出處:http://en.pswp.cn/web/90286.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

kafka查看消息的具體內容 kafka-dump-log.sh

目錄kafka 消息查看1. 直接查看日志文件內容步驟&#xff1a;2. 使用 Kafka 工具查看日志主要參數說明常用命令&#xff1a;輸出說明&#xff1a;3. 注意事項kafka 消息日志文件詳解我們有時候遇到這樣的需求&#xff0c;需要查看下kafka消息的內容。 kafka 消息查看 查看 Ka…

Spring Cloud OpenFeign 常用注解_筆記

Spring Cloud OpenFeign 提供了一種聲明式、模板化的HTTP客戶端&#xff0c;可以通過簡單的接口描述遠程調用&#xff0c;而不必手動編寫低級的 HTTP 客戶端代碼。FeignClient用法參考&#xff1a;FeignClient用法-筆記-CSDN博客。這里梳理Spring Cloud OpenFeign 常用注解。 1…

移動端自動化Appium框架

文章目錄環境搭建JAVAAndroid SDKGenymotion模擬器環境搭建 JAVA 1、安裝JDK 從官網下載所需安裝包&#xff0c;默認安裝即可。 https://www.oracle.com/cn/java/technologies/downloads/ 2、配置環境變量 設置 - 編輯系統環境變量 - 環境變量。 系統變量下新建JAVA_HOME&a…

算法第26天|貪心算法:用最少數量的箭引爆氣球、無重疊區間、劃分字母區間

今日總結 用最少數量的箭引爆氣球 題目鏈接&#xff1a;452. 用最少數量的箭引爆氣球 - 力扣&#xff08;LeetCode&#xff09; 代碼隨想錄 整體思路&#xff1a; 1、統一度量 &#xff1a; 將所有區間按照左端點進行排序&#xff1a; 用到了二維的sort&#xff0c;在類中需…

最新版的electron通信規則

介紹: 以前electron require(electron/remote).fs 就能調用node中的各種api,最新版可能為了安全考慮,除了主main.js入口文件以外,其他的地方都不能調用node中的api,比如里面的各種函數,如fs,path等。這節課來教大家最新版本的electron如何進行通信。 結構: 了解通信之前…

Python爬蟲實戰:研究PyPLN庫相關技術

1. 引言 隨著全球化的發展,葡萄牙語作為世界第六大語言,其在互聯網上的文本數據量不斷增長。如何從海量的葡萄牙語文本中提取有價值的信息,成為自然語言處理領域的重要研究方向。 PyPLN (Python Natural Language Processing Toolkit) 是一個專門針對葡萄牙語設計的自然語言…

層次分析法代碼筆記

層次分析法 一、核心 在層次分析法中&#xff0c;通過 算術平均法、幾何平均法、特征值法 計算指標權重&#xff0c;再通過 一致性檢驗 確保判斷矩陣邏輯合理&#xff0c;為多準則決策提供量化依據。 二、代碼 &#xff08;一&#xff09;一致性檢驗&#xff08;判斷矩陣合理性…

[精選] 2025最新生成 SSH 密鑰和 SSL 證書的標準流程(Linux/macOS/Windows系統服務器通用方案)

[精選] 2025最新生成 SSH 密鑰和 SSL 證書的標準流程&#xff08;Linux/macOS/Windows系統服務器通用方案&#xff09; 在現代網絡中&#xff0c;SSH&#xff08;安全外殼協議&#xff09;和 SSL&#xff08;安全套接層協議&#xff09;是保證數據傳輸安全和身份驗證的重要技術…

開發框架安全ThinkPHPLaravelSpringBootStruts2SpringCloud復現

PHP-ThinkphpLaravelThinkPHP是一套開源的、基于PHP的輕量級Web應用開發框架綜合工具&#xff1a;武器庫-Thinkphp專檢&#xff08;3-6版本&#xff09;如何判斷是TP6框架開發的web程序&#xff0c;基于源碼、路徑、圖標、基于報錯可發現dex.php?xxx 在其6.0.13版本及以前/?c…

uniapp+vue3小程序點擊保存圖片、保存二維碼

介紹 步驟1:引入必要的API 在script部分,確保引入了uni的相關API,如uni.downloadFile和uni.saveImageToPhotosAlbum。 步驟2:下載圖片到本地 在toInvite函數中,使用uni.downloadFile將圖片下載到本地,并獲取本地路徑。 步驟3:處理權限和保存邏輯 在saveToAlbum函數…

Golang中GROM多表關聯跟原生SQL多表關聯區別

文章目錄前言一、GROM多表關聯二、原生Sql多表關聯前言 對比GROM多表關聯和原生Sql多表關聯 一、GROM多表關聯 適用于返回全部數據需要邏輯外鍵&#xff08;不會在數據庫創建任何約束&#xff09;適合三個表以下的關聯有幾張表就會查詢幾次 type Product struct {gorm.Model …

設計模式六:工廠模式(Factory Pattern)

概念定義一個創建對象的接口&#xff0c;但讓子類決定實例化哪個類。實現示例#include <iostream> #include <memory>// 產品基類 class Product { public:virtual void use() 0;virtual ~Product() default; };// 具體產品A class ConcreteProductA : public Pr…

應用層自定義協議【序列化+反序列化】

文章目錄再談 “協議”重新理解read、write、recv、send和tcp為什么支持全雙工Server.cc網絡版計算機實現Socket封裝&#xff08;模板方法類&#xff09;socket.hpp定制協議JsonJson安裝定義一個期望的報文格式Protocol.hppParser.hppCalculator.hpp完整的處理過程Client.cc三層…

dify創建OCR工作流

實現ocr識別文件內容&#xff0c;引用dify的一個插件&#xff0c;插件名稱&#xff1a;mineru 引用在線版本mineru 具體操作說明&#xff0c;參見視頻&#xff1a; 第六篇&#xff1a;DifyOCR&#xff0c;掃描件最優解_嗶哩嗶哩_bilibili 引用本地部署mineru 上面的這種使用…

備受關注的“Facebook Email Scraper”如何操作?

Facebook Email Scraper&#xff08;臉書郵箱提取工具&#xff09;是一類用于從Facebook平臺提取公開郵箱信息的工具&#xff0c;其核心功能是通過解析用戶主頁、群組、頁面等公開內容&#xff0c;識別并提取其中包含的郵箱地址&#xff0c;為用戶提供結構化的聯系方式數據。這…

【網絡原理】萬字長文解密UDP/TCP——手把手教你理解網絡通信

目錄 1.前言 2.正文 2.1UDP協議 2.1.1UDP協議端格式 2.1.2UDP的特點 2.1.3理解UDP的“不可靠” 2.1.4面向數據報 2.1.5基于UDP的應用層協議 2.2TCP協議 2.2.1TCP協議端格式 2.2.2TCP十個核心機制 2.2.2.1確認應答 2.2.2.2超時重傳 確認應答超時重傳 vs 三次握手 …

MATLAB軟件使用頻繁,企業如何做到“少買多用”?

在制造企業的工程計算、算法研發、系統建模等場景中&#xff0c;MATLAB 已成為不可或缺的核心工具。 無論是動力學建模、控制算法開發&#xff0c;還是信號處理和數據可視化&#xff0c;MATLAB 的高頻使用場景覆蓋了從研發部門到測試部門的多個崗位。然而&#xff0c;企業 IT 負…

數據結構自學Day13 -- 快速排序--“分而治之”

&#x1f536; 一、快速排序&#xff08;Quick Sort&#xff09;&#x1f4cc; 基本思想&#xff1a;分而治之&#xff1a;每次從數組中選一個“基準”&#xff08;pivot&#xff09;&#xff0c;把比它小的放左邊&#xff0c;大的放右邊。對左右子數組遞歸排序。&#x1f9e0;…

Linux 進程與服務管理~進程基礎、進程查看、進程控制、服務管理、開機啟動??

在 Linux 系統中,進程與服務管理是運維和開發的核心技能之一。進程是程序運行的實例,服務是長期運行的后臺進程(守護進程)。掌握進程與服務的管理方法,能有效排查系統問題、優化資源使用。以下從 ??進程基礎、進程查看、進程控制、服務管理、開機啟動?? 五大模塊詳細講…

論文筆記 | Beyond Pick-and-Place: Tackling Robotic Stacking of Diverse Shapes

論文地址&#xff1a;Beyond Pick-and-Place: Tackling Robotic Stacking of Diverse Shapes 概述&#xff1a;本文提出 RGB-Stacking 基準測試&#xff0c;研究如何僅憑 RGB 攝像頭視覺和本體感知&#xff0c;實現機器人對 復雜幾何物體的高效堆疊。通過結合仿真專家訓練、交互…