文章目錄
- 1. 創建Receiver
- 2. 進行消息處理
RPC請求發送后接收方的處理邏輯
在RpcEndpoint中創建的RemoteRpcInvocation消息,最終會通過Akka系統傳遞到被調用方。例如TaskExecutor向ResourceManager發送
SlotReport
請求的時候,會在TaskExecutor中將ResourceManagerGateway的方法名稱和參數打包成RemoteRpcInvocation對象
。然后經過網絡發送到ResourceManager中的AkkaRpcActor,處理請求。
接下來深入了解AkkaRpcActor的設計與實現,了解在AkkaRpcActor中如何接收RemoteRpcInvocation消息并執行后續的操作。
?
1. 創建Receiver
如代碼所示,首先在AkkaRpcActor中創建Receive對象,用于處理Akka系統接收的其他Actor發送過來的消息。
Receiver相關能力
在AkkaRpcActor中主要創建了RemoteHandshakeMessage、ControlMessages等消息對應的處理器,
- 其中RemoteHandshakeMessage主要用于進行正式RPC通信之前的網絡連接檢測,保障RPC通信正常。
- ControlMessages用于控制Akka系統,例如啟動和停止Akka Actor等控制消息。這里我們重點關注第三種類型的消息,即在集群運行時中RPC組件通信使用的Message類型,此時會調用
handleMessage()
方法對這類消息進行處理。
public Receive createReceive() {return ReceiveBuilder.create().match(RemoteHandshakeMessage.class, this::handleHandshakeMessage).match(ControlMessages.class, this::handleControlMessage).matchAny(this::handleMessage).build();
}
?
2. 進行消息處理
在AkkaRpcActor.handleMessage()方法中,最終會調用handleRpcMessage()方法繼續對RPC消息進行處理。
如下代碼:
//根據RPC消息類型,進行不同方式處理
protected void handleRpcMessage(Object message) {if (message instanceof RunAsync) {//將代碼塊提交到本地線程池中執行handleRunAsync((RunAsync) message);} else if (message instanceof CallAsync) {handleCallAsync((CallAsync) message);} else if (message instanceof RpcInvocation) {handleRpcInvocation((RpcInvocation) message);} else {// 省略部分代碼sendErrorIfSender(new AkkaUnknownMessageException("Received unknown message " + message + " of type " + message.getClass().getSimpleName() + '.'));}
}
接著看AkkaRpcActor.handleRpcInvocation()方法邏輯:
- 判斷當前RpcEndpoint是否實現了指定rpcMethod。
例如JobMaster調用ResourceManagerGateway.requestSlot()方法,會在lookupRpcMethod()方法中判斷當前ResourceManager實現的Endpoint是否提供了該方法的實現。
- 當rpcMethod不為空時,rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs())
- 調用sendAsyncResponse()、sendSyncResponse()方法通過Akka系統將RpcMethod返回值返回給調用方。
private void handleRpcInvocation(RpcInvocation rpcInvocation) {Method rpcMethod = null;try {String methodName = rpcInvocation.getMethodName();Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();rpcMethod = lookupRpcMethod(methodName, parameterTypes);} catch (ClassNotFoundException e) {// 省略部分代碼}if (rpcMethod != null) {try {rpcMethod.setAccessible(true);if (rpcMethod.getReturnType().equals(Void.TYPE)) {// 沒有返回值的情況rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());}else {// 有返回值的情況final Object result;try {result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());}catch (InvocationTargetException e) {getSender().tell(new Status.Failure(e.getTargetException()), getSelf());return;}final String methodName = rpcMethod.getName();if (result instanceof CompletableFuture) {final CompletableFuture<?> responseFuture = (CompletableFuture<?>) result;sendAsyncResponse(responseFuture, methodName);} else {sendSyncResponse(result, methodName);}}} catch (Throwable e) {log.error("Error while executing remote procedure call {}.", rpcMethod, e);// 通知錯誤信息getSender().tell(new Status.Failure(e), getSelf());}}
}
接下來從更加宏觀的角度了解各組件之間如何基于已經實現的RPC框架進行通信,進一步加深對Flink中RPC框架的了解。
?
?
參考:《Flink設計與實現:核心原理與源碼解析》–張利兵