摘要
通過閱讀flink源碼,了解flink是如何基于Pekko實現遠程RPC調用的
Pekko實現遠程調用
Flink 的 RPC 框架底層是構建在 Pekko 的 actor 模型之上的,了解Pekko如何使用,對后續源碼的閱讀有幫助。
Apache Pekko(原為 Akka 的一個分支)是一個強大的工具包,用于構建并發、分布式和可擴展的系統。它基于經典的 Actor 模型,提供了一種事件驅動、非阻塞的編程范式,使開發者能夠更輕松地構建容錯性強、模塊化清晰的分布式應用。
引入依賴
確保你使用的是 Apache Pekko 的 Maven 依賴:
<dependencies><dependency><groupId>org.apache.pekko</groupId><artifactId>pekko-actor_2.13</artifactId><version>1.0.2</version></dependency><dependency><groupId>org.apache.pekko</groupId><artifactId>pekko-remote_2.13</artifactId><version>1.0.2</version></dependency>
</dependencies>
定義消息類(RPC 通信協議)
public class HelloRequest implements java.io.Serializable { public final String message; public HelloRequest(String message) { this.message = message; }
}
public class HelloResponse implements java.io.Serializable { public final String reply; public HelloResponse(String reply) { this.reply = reply; } @Override public String toString() { return reply; }
}
HelloRequest
和 HelloResponse
是在使用 Pekko
遠程通信 時的消息協議類,也就是你定義的“請求消息”和“響應消息”。它們是通過網絡在客戶端與服務端之間傳輸的,所以必須滿足可序列化(Serializable)的要求。
服務端代碼(遠程服務)
HelloActor.java
public class HelloActor extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder() .match(HelloRequest.class, req -> { System.out.println("服務端收到消息: " + req.message); // 回復客戶端 getSender().tell(new HelloResponse("你好,客戶端,我收到了:" + req.message), getSelf()); }) .build(); } public static Props props() { return Props.create(HelloActor.class); }
}
在 Pekko 中,HelloActor
相當于傳統 RPC 框架中的服務實現類,但其處理邏輯是基于 消息驅動模型 而非方法調用。Pekko 的核心設計理念是:Actor 只對接收到的消息做出反應,并保持自身狀態獨立和可并發執行。
以下是關鍵點說明:
createReceive()
方法 定義了該 Actor 支持的消息類型和對應的處理邏輯。使用receiveBuilder().match(...).build()
來設置“消息類型 → 響應處理”的映射。getSender().tell(...)
表示將處理結果異步返回給消息發送者,它等價于傳統 RPC 中的“返回值”機制,只不過是通過消息的方式返回。Props.create(...)
返回一個Props
實例,描述了如何構造該 Actor。這類似于構造函數的封裝工廠。Props
是 Actor 的構造“配方”,用于ActorSystem.actorOf(...)
創建真正的 Actor 實例。
ServerApp.java
public class ServerApp { public static void main(String[] args) { // 使用硬編碼配置啟動遠程 ActorSystem Config config = ConfigFactory.parseString(""" pekko.actor.provider = remote pekko.remote.artery.canonical.hostname = "127.0.0.1" pekko.remote.artery.canonical.port = 25520 pekko.actor.allow-java-serialization = on pekko.actor.serialize-messages = on """); ActorSystem system = ActorSystem.create("ServerSystem", config); // 啟動 HelloActor,名字是 helloActor,供客戶端遠程訪問 ActorRef actorRef = system.actorOf(HelloActor.props(), "helloActor"); System.out.println("服務端已啟動,等待遠程調用..."); }
}
代碼說明
- 用 Java 代碼動態構造 Pekko 配置(替代 application.conf 文件)
pekko.actor.serialize-messages = on
強制所有 Actor 之間發送的消息都走序列化流程(即使是本地通信)ActorSystem.create(...)
創建了一個名為ServerSystem
的遠程 Actor 系統。- 指定 IP 和端口為
127.0.0.1:25520
,就像傳統 RPC 服務綁定地址。 - 啟動一個名為
helloActor
的 actor,客戶端稍后通過這個名字進行訪問。
客戶端代碼
ClientApp.java
public class ClientApp { public static void main(String[] args) throws Exception { // 使用硬編碼配置啟動客戶端 ActorSystem,端口 0 表示隨機 Config config = ConfigFactory.parseString(""" pekko.actor.provider = remote pekko.remote.artery.canonical.hostname = "127.0.0.1" pekko.remote.artery.canonical.port = 0 pekko.actor.allow-java-serialization = on pekko.actor.serialize-messages = on """); ActorSystem system = ActorSystem.create("ClientSystem", config); // 遠程 actor 路徑,相當于 RPC 服務地址 + 接口名 String remotePath = "pekko://ServerSystem@127.0.0.1:25520/user/helloActor"; // 選擇遠程 actor,相當于創建客戶端 stub ActorSelection selection = system.actorSelection(remotePath); // 使用 ask 模式發送消息,并接收響應(模擬同步 RPC 調用) CompletionStage<Object> future = Patterns.ask(selection, new HelloRequest("這是來自客戶端的問候"), Duration.ofSeconds(10)); // 等待響應結果(阻塞) future.thenApply(response -> { if (response instanceof HelloResponse helloResponse) { return "客戶端收到回復: " + helloResponse.reply; } else { return "收到未知回復: " + response; } }) .exceptionally(ex -> "調用失敗: " + ex.getMessage()) .thenAccept(System.out::println).toCompletableFuture().join(); system.terminate(); }
}
代碼說明:
ActorSelection
是一種actor
地址定位方式,它類似于 DNS 查詢,可以根據路徑去“找”一個遠程actor
Patterns.ask(...)
就像傳統 RPC 的同步調用,它封裝了發送、等待響應的過程。Duration.ofSeconds(3)
指定超時時間。.get()
阻塞等待結果,實際底層是異步實現。
在Pekko
(或Akka
)中,如果你不需要請求-響應(ask
),而只是發送消息給Actor
(fire-and-forget
),你可以直接使用ActorRef.tell(...)
方法。
// 從 ActorSystem 中選擇一個路徑為 "/user/helloActor" 的 Actor(可能還沒拿到真實引用)
// 注意:這個路徑必須匹配一個已存在的 Actor,否則會 resolve 失敗
ActorSelection selection = actorSystem.actorSelection("/user/helloActor");// 異步解析 selection,嘗試獲取對應 Actor 的真正引用 ActorRef(帶超時)
CompletionStage<ActorRef> futureRef = selection.resolveOne(Duration.ofSeconds(3));// 當成功獲取 ActorRef 后,使用 tell 發送一條消息,不需要返回(fire-and-forget)
futureRef.thenAccept(ref -> ref.tell("你好", ActorRef.noSender()));
flink的RPC框架如何使用
Flink
基于Pekko
實現了自己RPC
框架。當需要組件間需要使用RPC
服務時,只需要定義接口、編寫服務端接口邏輯即可。Flink
的Rpc
框架自己會完成接收遠程請求、調度線程、安全并發、處理生命周期等工作,讓你像寫本地對象一樣寫分布式服務。
本來想直接使用flink
的rpc
模塊創建一個簡單的demo
項目來說明的,但是由于Flink
使用了自定義的類加載器(如 SubmoduleClassLoader
)來隔離不同模塊(尤其是用戶代碼、插件、RPC 的動態 proxy 等)導致類不可見的問題
org.flink.MyServiceGateway referenced from a method is not visible from class loader: org.apache.flink.core.classloading.SubmoduleClassLoader
所以找了flink其中一個rpc服務來進行說明
Dispatcher組件
Dispatcher
是 集群調度的中樞組件,它的作用相當于一個集群控制器,負責接收作業、分配作業、啟動作業執行組件、以及監控作業生命周期。雖然Dispatcher只是在JobManager內使用,類似
偽分布式一樣,但其創建與使用流程和真正的遠程RPC組件是一樣的。
DIspatcher
在集群啟動的時候,通過DispatcherFactory
創建,StandaloneSession
模式下,工廠實現類為SessionDispatcherFactory
下面以Dispatcher
組件為例進行說明如何基于flink
的rpc
框架實現一個rpc服務。
rpc框架使用流程
使用流程大致如下:
- 定義
RpcGateway
接口作為rpc
協議 - 繼承
RpcEndpoint
或者FencedRpcEndpoint
并實現RpcGateWay接口 - 使用
RpcService
注冊服務(啟動服務端) - 使用
RpcService
連接服務端(獲取client)
步驟1.定義 RpcGateway 接口
Dispatcher的RPC接口類是DispatcherGateway
, FencedRpcGateway
是RpcGateway
的子接口。 rpc
方法的返回值必須是 CompletableFuture<T>
類型,這是 Flink RPC
框架的設計要求
步驟2. 實現服務端
StandaloneSession模式下,Dispatcher的實現類是StandaloneDispatcher,該類是Dispacher的子類。Dispatcher
類繼承FencedRpcEndpoint
類并實現DispatcherGateway
接口
RpcEndpoint是
Flink自研
RPC`框架中用于實現遠程服務端邏輯的抽象類,它幫你處理 RPC 生命周期、消息分發、線程安全調度等問題,其子類只需專注于“我要提供什么服務”即可。
步驟3. 啟動服務
通過工廠創建了Dispatcher對象后,調用其start()
方法啟動服務
步驟4. 遠程調用
提交job的時候,會調用dispatcher
的submitJob
啟動并調度該作業。
該gateway
是一個DispatcherGateway
對象,通過下面的代碼獲得到的,相當于Client。
通過該對象調用接口方法即可發起遠程調用。由于Dispatcher
的客戶端代碼從創建到使用的代碼分的太散了,不方便說明,下面通過一個簡單的示例來描述Client
的創建流程。
CompletableFuture<MyServiceGateway> gatewayFuture = rpcService.connect("akka://flink@127.0.0.1:6123/user/myService",MyServiceGateway.class
);MyServiceGateway gateway = gatewayFuture.get();gateway.sayHello("Flink").thenAccept(System.out::println);
MyServiceGateway.class
就是定義的RpcGateway
接口, gateway
是一個遠程代理對象了,調用它就等于遠程 RPC 調用!
Client是如何發送消息的
已知flink底層是利用Pekko來實現rpc調用的,再次回顧flink rpc示例代碼中可以想到
CompletableFuture<MyServiceGateway> gatewayFuture = rpcService.connect("akka://flink@127.0.0.1:6123/user/myService",MyServiceGateway.class
);
MyServiceGateway gateway = gatewayFuture.get();
gateway.sayHello("Flink").thenAccept(System.out::println);
該gateway對象發起遠程調用,本質上應該是使用了類似下面的代碼來發送消息的
CompletionStage<Object> future = Patterns.ask(selection, "Flink", Duration.ofSeconds(3));
這個gateway
對象是由rpcService.connect
返回的. rpcService
是一個RpcService
接口對象,其實現就4個,排除掉測試用的就剩一個 PekkoRpcService
了。
connect
方法的源碼
繼續看connect
方法的源碼,首先會先調用resolveActorAddress
解析入參的rpc
地址"akka://flink@127.0.0.1:6123/user/myService"
得到一個ActorRef
對象
private CompletableFuture<ActorRef> resolveActorAddress(String address) { final ActorSelection actorSel = actorSystem.actorSelection(address); return actorSel.resolveOne(configuration.getTimeout()) .toCompletableFuture() .exceptionally( error -> { throw new CompletionException( new RpcConnectionException( String.format( "Could not connect to rpc endpoint under address %s.", address), error)); });
}
獲取到ActorRef
后,使用 Java 的 動態代理機制創建一個實現了MyServiceGateway
接口的代理對象 proxy
既然是動態代理,那就得看Handler方法里面的邏輯了,創建Handler的invocationHandlerFactory
代碼如下:
查看對應的invoke方法會看到實際發消息的是invokeRpc
所以最終actor.tell是在這里被調用的
轉換成rpc參數的邏輯如下,只是將被調用方法所需的參數與信息封裝成MethodInvocation
對象
Server是接收處理消息的
前面的代碼已經知道了client通過Pekko的actor發送了消息,現在要看Server這邊是怎么處理的了(找到Actor
處理RpcInvocation
消息)。
服務端需要繼承RpcEndpoint
類,并在構建的時候傳遞rpcService
對象`
final RpcService rpcService = ...; // 通常通過 PekkoRpcServiceUtils 創建
final String endpointId = "myService";MyServiceEndpoint endpoint = new MyServiceEndpoint(rpcService, endpointId);
endpoint.start(); // 啟動 RPC 服務端
查看RpcEndpoint的
構造函數,可以看到利用rpcService
對象啟動了一個rpcServer
繼續往下看前需要了解Actor是如何處理消息的:
?ActorSystem?? 是Pekko
應用的中央控制樞紐,作為單例容器管理所有Actor
的層級結構和生命周期。當發送消息給遠程Actor
時,ActorSystem
會自動將消息序列化并通過網絡傳輸到目標節點,在遠程節點反序列化后放入目標Actor
的Mailbox
隊列,最終由目標節點的ActorSystem
調度線程執行消息處理,整個過程對開發者完全透明,如同本地調用一般。
可以粗略的認為:一個Actor等同于一個Server端(輕量級),Actor內有一個隊列,當有新的消息從客戶端發送過來就放到該隊列中。然后有一個線程不斷從隊列中取消息,然后調用該 Actor 的 createReceive()
所定義的行為處理消息。
了解的Actor是如何接收信息后,繼續看PekkoRpcService
的startServer
方法,其中調用下面的方法,通知另一個Actor
來創建本RpcEndpoin
對應的Actor
那么就要找出負責創建Actor的這個supervisor(Actor)在哪里,才能繼續往下看了。
很容易就可以看到PekkoRpcService
對象它的構造函數中調用下面的函數找到對應的Actor的具體類型
查看SupervisroActor
類的createReceive()
就可以看到真正創建actor
的邏輯了
@Override
public Receive createReceive() { return receiveBuilder() .match(StartRpcActor.class, this::createStartRpcActorMessage) .matchAny(this::handleUnknownMessage) .build();
}
在flink rpc
框架中,所有RpcEndpoint
對應的Actor
的類型都是PekkoRpcActor
, 只是名字不一樣而已。在PekkoRpcActor
的CreateReceive()
可以看到與Client
發送過來的RPC
消息相對應的處理邏輯。
通過反射調用方法,此處的rpcEndpoint
就是我們繼承了RcpEndpoint的對象
到此,我們就知道了服務端的業務方法是如何被調用了。
RpcService的作用
在前面介紹 Flink 中 Client 與 Server 如何工作的過程中,我們可以看到其底層是通過 Pekko
實現遠程通信的。但在調用流程中,業務代碼中并沒有直接與 ActorSystem
或 ActorRef
等 Pekko 原生類打交道。這是因為 Flink 通過一層抽象 RpcService
,優雅地屏蔽了底層通信實現的細節。
// 1. 創建 RpcService(基于 Pekko 實現)
RpcService rpcService = ...;// 2. 實例化 Dispatcher(繼承自 RpcEndpoint)
StandaloneDispatcher dispatcher = new StandaloneDispatcher(rpcService, ...);// 3. 注冊服務端
DispatcherGateway dispatcherGateway = rpcService.startServer(dispatcher);// 4. 客戶端連接(可在其他進程中執行)
rpcService.connect("pekko://flink@host:6123/user/dispatcher", DispatcherGateway.class);
如果沒有 RpcService
這一層抽象,Flink 的組件(如 Dispatcher、JobMaster)之間想要通信,就必須直接操作 Pekko 的底層 API,比如:
- 使用
ActorSystem
創建ActorRef
; - 使用
tell()
或ask()
發送消息; - 管理消息序列化和遠程地址;
- 處理超時、線程調度等復雜細節。
這會導致:
- Actor 概念侵入業務邏輯,開發就需要學習Actor相關的知識;
- 接口強耦合通信實現,未來若切換通信框架非常困難;
- 本地調用與遠程調用流程不統一,維護復雜。