【Flink集群RPC通訊機制(三)】AkkaRpcActor設計與實現:接收RPC消息以及處理邏輯

文章目錄

    • 1. 創建Receiver
    • 2. 進行消息處理

RPC請求發送后接收方的處理邏輯

在RpcEndpoint中創建的RemoteRpcInvocation消息,最終會通過Akka系統傳遞到被調用方。例如TaskExecutor向ResourceManager發送SlotReport請求的時候,會在TaskExecutor中將ResourceManagerGateway的方法名稱和參數打包成RemoteRpcInvocation對象。然后經過網絡發送到ResourceManager中的AkkaRpcActor,處理請求。

接下來深入了解AkkaRpcActor的設計與實現,了解在AkkaRpcActor中如何接收RemoteRpcInvocation消息并執行后續的操作。

?

1. 創建Receiver

如代碼所示,首先在AkkaRpcActor中創建Receive對象,用于處理Akka系統接收的其他Actor發送過來的消息。

Receiver相關能力

在AkkaRpcActor中主要創建了RemoteHandshakeMessage、ControlMessages等消息對應的處理器,

  • 其中RemoteHandshakeMessage主要用于進行正式RPC通信之前的網絡連接檢測,保障RPC通信正常。
  • ControlMessages用于控制Akka系統,例如啟動和停止Akka Actor等控制消息。這里我們重點關注第三種類型的消息,即在集群運行時中RPC組件通信使用的Message類型,此時會調用handleMessage()方法對這類消息進行處理。
public Receive createReceive() {return ReceiveBuilder.create().match(RemoteHandshakeMessage.class, this::handleHandshakeMessage).match(ControlMessages.class, this::handleControlMessage).matchAny(this::handleMessage).build();
}

?

2. 進行消息處理

在AkkaRpcActor.handleMessage()方法中,最終會調用handleRpcMessage()方法繼續對RPC消息進行處理。

如下代碼:


//根據RPC消息類型,進行不同方式處理
protected void handleRpcMessage(Object message) {if (message instanceof RunAsync) {//將代碼塊提交到本地線程池中執行handleRunAsync((RunAsync) message);} else if (message instanceof CallAsync) {handleCallAsync((CallAsync) message);} else if (message instanceof RpcInvocation) {handleRpcInvocation((RpcInvocation) message);} else {// 省略部分代碼sendErrorIfSender(new AkkaUnknownMessageException("Received unknown message " + message +  " of type " +  message.getClass().getSimpleName() + '.'));}
}

接著看AkkaRpcActor.handleRpcInvocation()方法邏輯:

  1. 判斷當前RpcEndpoint是否實現了指定rpcMethod。

例如JobMaster調用ResourceManagerGateway.requestSlot()方法,會在lookupRpcMethod()方法中判斷當前ResourceManager實現的Endpoint是否提供了該方法的實現。

  1. 當rpcMethod不為空時,rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs())
  2. 調用sendAsyncResponse()、sendSyncResponse()方法通過Akka系統將RpcMethod返回值返回給調用方。
private void handleRpcInvocation(RpcInvocation rpcInvocation) {Method rpcMethod = null;try {String methodName = rpcInvocation.getMethodName();Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();rpcMethod = lookupRpcMethod(methodName, parameterTypes);} catch (ClassNotFoundException e) {// 省略部分代碼}if (rpcMethod != null) {try {rpcMethod.setAccessible(true);if (rpcMethod.getReturnType().equals(Void.TYPE)) {// 沒有返回值的情況rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());}else {// 有返回值的情況final Object result;try {result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());}catch (InvocationTargetException e) {getSender().tell(new Status.Failure(e.getTargetException()), getSelf());return;}final String methodName = rpcMethod.getName();if (result instanceof CompletableFuture) {final CompletableFuture<?> responseFuture = (CompletableFuture<?>) result;sendAsyncResponse(responseFuture, methodName);} else {sendSyncResponse(result, methodName);}}} catch (Throwable e) {log.error("Error while executing remote procedure call {}.", rpcMethod, e);// 通知錯誤信息getSender().tell(new Status.Failure(e), getSelf());}}
}

接下來從更加宏觀的角度了解各組件之間如何基于已經實現的RPC框架進行通信,進一步加深對Flink中RPC框架的了解。

?
?
參考:《Flink設計與實現:核心原理與源碼解析》–張利兵

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/696975.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/696975.shtml
英文地址,請注明出處:http://en.pswp.cn/news/696975.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

petalinux_zynq7 驅動DAC以及ADC模塊之二:petalinux

petalinux_zynq7 C語言驅動DAC以及ADC模塊之一&#xff1a;建立IPhttps://blog.csdn.net/qq_27158179/article/details/136234296在上一篇&#xff0c;建立了ADC和DAC兩個IP。這里繼續。本文在 petalinux默認配置的基礎上&#xff0c;添加了python和qt。再編譯出sdk可以給x86主…

汽車智能座艙中 顯示屏市場戰略趨勢分析 中篇

今天主要講講主流車廠顯示屏的趨勢。 主流車廠的中控&液晶儀表屏的尺寸及趨勢匯總 奔馳 奔馳A級 10.2510.25 奔馳C級 12.310.25 奔馳GLA 10.2510.25 奔馳E級 12.312.3 奔馳S級 12.312.8 1、奔馳的儀表幾乎都為液晶儀表&#xff0c;幾乎所有車型都有HUD的選配&#xff…

大功率應用中的厚膜電阻散熱器的設計?

在許多大功率應用中&#xff0c;例如電機和電源&#xff0c;電源電阻器位于主電源線中。它們的目的是防止損壞或提供一定程度的控制。 在這些應用中&#xff0c;電阻器承受恒定的、相對較高的電流。當電流流過電阻器時&#xff0c;它會產生熱量。這種熱能必須消散到環境中&…

1、WEB攻防-通用漏洞SQL注入MYSQL跨庫ACCESS偏移

用途&#xff1a;個人學習筆記&#xff0c;歡迎指正&#xff01; 前言&#xff1a; 為了網站和數據庫的安全性&#xff0c;MYSQL 內置有 ROOT 最高用戶&#xff0c;劃分等級&#xff0c;每個用戶對應管理一個數據庫&#xff0c;這樣保證無不關聯&#xff0c;從而不會影響到其他…

Autosar-WdgM配置詳解-3

1.11生成代碼解析 1.11.1MasterSWC代碼解析 在MasterSWC的RE_TestRun這個runnable里會調用兩個檢測點函數,我們可以在兩個檢測點函數之間,加入我們所需要監控的代碼。 ?Rte_Call_RPort_StartCheckPoint_CheckpointReached(); ?Rte_Call_RPort_EndCheckPoint_CheckpointReac…

C#串口 Modbus通訊工具類

一、安裝Modbus包 二、創建modbushelper類 1、打開串口 public bool IfCOMOpend; //用于實例內的COM口的狀態 public SerialPort OpenedCOM;//用于手動輸入的COM轉成SERIAL PORT /// <summary> /// 打開串口 /// </summary> /// <param name="COMname&quo…

unity小工具-非實時的值變化監聽器

項目里有代碼專門監聽網絡環境的變化&#xff0c;特別是在下載中&#xff0c;如果遇到斷網或者切換為移動網絡&#xff0c;可能需要觸發提醒等等。這種需求可能是通用的&#xff0c;于是便寫了一個通用的監聽代碼。是 using System; using System.Collections; using System.C…

c++服務器開源項目Tinywebserver運行

c服務器開源項目Tinywebserver運行 一、Tinywebserver介紹二、環境搭建三、構建數據庫四、編譯Tinywebserver五、查看效果 Tinywebserver是github上一個十分優秀的開源項目&#xff0c;幫助初學者學習如何搭建一個服務器。 本文講述如何在使用mysql跟該項目進行連接并將項目運行…

python 層次分析(AHP)

文章目錄 一、算法原理二、案例分析2.1 構建指標層判斷矩陣2.2 求各指標權重2.2.1 算術平均法&#xff08;和積法&#xff09;2.2.2 幾何平均法&#xff08;方根法&#xff09; 2.3 一致性檢驗2.3.1 求解最大特征根值2.3.2 求解CI、RI、CR值2.3.3 一致性判斷 2.4 分別求解方案層…

利用Ubuntu22.04啟動U盤對電腦磁盤進行格式化

概要&#xff1a; 本篇演示利用Ubuntu22.04啟動U盤的Try Ubuntu模式對電腦磁盤進行格式化 一、說明 1、電腦 筆者的電腦品牌是acer(宏碁/宏基) 開機按F2進入BIOS 開機按F12進入Boot Manager 2、Ubuntu22.04啟動U盤 制作方法參考筆者的文章&#xff1a; Ubuntu制作Ubun…

【OpenAI官方課程】第五課:ChatGPT文本轉換Transforming

歡迎來到ChatGPT 開發人員提示工程課程&#xff08;ChatGPT Prompt Engineering for Developers&#xff09;&#xff01;本課程將教您如何通過OpenAI API有效地利用大型語言模型&#xff08;LLM&#xff09;來創建強大的應用程序。 本課程由OpenAI 的Isa Fulford和 DeepLearn…

緩存篇—緩存雪崩

什么是緩存雪崩 通常我們為了保證緩存中的數據與數據庫中的數據一致性&#xff0c;會給 Redis 里的數據設置過期時間&#xff0c;當緩存數據過期后&#xff0c;用戶訪問的數據如果不在緩存里&#xff0c;業務系統需要重新生成緩存&#xff0c;因此就會訪問數據庫&#xff0c;并…

QEMU源碼全解析 —— virtio(22)

接前一篇文章&#xff1a;QEMU源碼全解析 —— virtio&#xff08;21&#xff09; 前幾回講解了virtio驅動的加載。本回開始講解virtio驅動的初始化。 在講解virtio驅動的初始化之前&#xff0c;先要介紹virtio配置的函數集合變量virtio_pci_config_ops。實際上前文書也有提到…

c# HttpCookie操作,建立cookie工具類

HttpCookie 是一個在.NET Framework中用于管理和操作HTTP Cookie的類。它提供了一種方便的方式來創建、設置、讀取和刪除Cookie。 Cookie是一種在客戶端和服務器之間傳遞數據的機制&#xff0c;用于跟蹤用戶的會話狀態和存儲用戶相關的信息。它通常由服務器發送給客戶端&#…

萬字干貨-京東零售數據資產能力升級與實踐

開篇 京東自營和商家自運營模式&#xff0c;以及伴隨的多種運營視角、多種組合計算、多種銷售屬性等數據維度&#xff0c;相較于行業同等量級&#xff0c;數據處理的難度與復雜度都顯著增加。如何從海量的數據模型與數據指標中提升檢索數據的效率&#xff0c;降低數據存算的成…

parallels配置centos虛擬環境

parallels Desktop M1/M2芯片Parallels Desktop 19虛擬機安裝使用教程&#xff08;超詳細&#xff09;-CSDN博客 下鏡像記得找和mac芯片匹配的 安裝就選第一個centos7不要選第二個 安裝有問題就選回退重啟 parallel desktop 18/19安裝centos7.2009教程_parallels desktop 19…

echarts多y軸樣式重疊問題

1、主要屬性設置 yAxis: [{//y軸1nameTextStyle: {align: "right",padding: 0}},{//y軸2nameTextStyle: {align: "left",padding: 0}},{//y軸3axisLabel: {margin: 50},nameTextStyle: {align: "left",padding: [0, 0, 0, 50]},axisPointer: {l…

Python Web開發記錄 Day2:CSS

名人說&#xff1a;莫道桑榆晚&#xff0c;為霞尚滿天。——劉禹錫&#xff08;劉夢得&#xff0c;詩豪&#xff09; 創作者&#xff1a;Code_流蘇(CSDN)&#xff08;一個喜歡古詩詞和編程的Coder&#x1f60a;&#xff09; 目錄 二、CSS1、CSS-初始入門①快速了解②CSS應用方式…

【C語言】sizeof()函數

前言 sizeof函數用于獲取數據類型或變量在內存中所占的字節數。 sizeof函數返回的是編譯時確定的值&#xff0c;不會計算動態分配的內存大小。 sizeof函數可以用于多種類型的數據&#xff0c;包括數組、指針、結構體、枚舉等。 1.數組 int arr[5];printf("%zu ", siz…

文件上傳與下載

文件上傳與下載 1. 文件上傳 為了能上傳文件&#xff0c;必須將表單的 method 設置為 POST&#xff0c;并將 enctype 設置為 multipart/form-data 。 有兩種實現文件上傳的方式&#xff1a; 底層使用 Apache Commons FileUpload 包 底層使用 Servlet 3.1 內置的文件上傳功能…