[分布式網絡通訊框架]----RpcProvider實現

在上一節userservice.cc的主函數中,我們初始化以后實例化了一個RpcProvider對象provider。接著調用了它的NotifyService(new UserService)方法,將UserService服務對象及其提供的方法進行預備發布。發布完服務對象后再調用Run()就將預備發布的服務對象及方法注冊到ZooKeeper上并開啟了對遠端調用的網絡監聽。接下來我們看看RpcProvider的具體實現。

rpcprovider

該類是Rpc框架提供的專門發布RPC服務方法的網絡對象類。

重要成員變量

muduo::net::EventLoop m_eventLoop;struct ServiceInfo
{google::protobuf::Service *m_service; std::unordered_map<std::string,const google::protobuf::MethodDescriptor*> m_methodMap;
};std::unordered_map<std::string,ServiceInfo> m_serviceMap;
  • EventLoop大家都陌生
  • ServiceInfo類,組織了一個service服務類型信息,里面包含了服務對象m_service,以及服務對象方法m_methodMap,在user.proto中注冊的rpc遠端調用方法Login和Register都是用google::protobuf::MethodDescriptor類來描述的。
  • m_serviceMap 存儲注冊成功的服務對象和其服務方法的所有信,一臺服務器上可能會提供多個Service服務對象,m_serviceMap存儲了多個Service_Info結構體。

重要成員函數

void NotifyService();

這里是框架提供給外部使用的,可以發布rpc方法的函數接口,它的參數是google::protobuf::Service *service決定了也可以接受任意的service。

為什么要使用google::protobuf::Service *service呢?
userservice.cc中我們知道UserService是繼承自UserServiceRpc,而UserServiceRpc又是繼承自google::protobuf::Service類。
在這里插入圖片描述
這就是C++的多態設計,rpcprovider作為Rpc通信框架的一部分,是服務于業務層的,我們不能讓其只服務與某一個業務,即void NotifyService(UserService *service);,對于不同的業務我們再去定義其他的類。所以protobuf就提供了google::protobuf::Service基類來描述服務對象。傳遞對象的時候傳遞基類指針指向派生類實例,使Rpc框架中定義的類方法解耦于業務層,這樣就可以接受任意類型的service。

void RpcProvider::NotifyService(google::protobuf::Service *service)
{ServiceInfo service_info;// 獲取了服務對象的描述信息const google::protobuf::ServiceDescriptor *pserviceDesc = service->GetDescriptor();// 獲取服務的名字std::string service_name = pserviceDesc->name();// 獲取服務對象service對象的方法的數量int methodCnt = pserviceDesc->method_count();// std::cout << "service_name:" << service_name << std::endl;LOG_INFO("service_name:%s",service_name.c_str());for (int i = 0; i < methodCnt; i++){// 獲取了服務對象指定下標的服務方法的描述(抽象描述)const google::protobuf::MethodDescriptor *pmethodDesc = pserviceDesc->method(i);std::string method_name = pmethodDesc->name();service_info.m_methodMap.insert({method_name, pmethodDesc});//std::cout << "method_name:" << method_name << std::endl;LOG_INFO("method_name:%s",method_name.c_str());}service_info.m_service = service;m_serviceMap.insert({service_name, service_info});
}
  • 定義了一個ServiceInfo對象service_info,用來保存service服務類型信息。
  • ServiceDescriptor對象pserviceDesc通過底層的GetDescriptor()函數來獲取給定的消息對象的描述符,通過pserviceDesc,調用底層的方法我們可以獲得服務的名字以及對應的方法數量。
  • 通過循環,得到方法對應的名字和方法的描述放入結構體service_info的m_methodMap中。
  • 最后將service_name, service_info一起放入m_serviceMap中。這樣我們就獲得了服務對應的方法以及方法對應的描述。
void Run();

負責啟動rpc服務節點,開始提供rpc遠程網絡調用服務

void RpcProvider::Run()
{std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());muduo::net::InetAddress address(ip, port);// 創建TcpServer對象muduo::net::TcpServer server(&m_eventLoop, address, "RpcProvider");// 綁定連接回調和消息讀寫回調的方法 分離了網絡代碼和業務代碼server.setConnectionCallback(std::bind(&RpcProvider::OnConnection,this, std::placeholders::_1));server.setMessageCallback(std::bind(&RpcProvider::OnMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 設置muduo庫的線程數量server.setThreadNum(4);ZkClient zkCli;// 連接zk服務zkCli.Start();for(auto &sp:m_serviceMap){//    /service_name   /UserServiceRpcstd::string service_path="/"+sp.first;zkCli.Create(service_path.c_str(),nullptr,0);for(auto &mp:sp.second.m_methodMap){std::string method_path = service_path+"/"+mp.first;char method_path_data[128]={0};sprintf(method_path_data,"%s:%d",ip.c_str(),port);zkCli.Create(method_path.c_str(),method_path_data,strlen(method_path_data),ZOO_EPHEMERAL);}}std::cout << "RpcProvider statrt service at ip: " << ip<< " port: " << port << std::endl;// 啟動網絡服務server.start();m_eventLoop.loop();
}
  • 調用MprpcApplication的方法獲取了響應的ip和port,接下來就是我們在muduo庫中剖析的網絡通訊過程,得到ip和port組裝了address,創建tcpserver對象,注冊連接回調和消息讀寫回調的方法,分離網絡代碼和業務代碼,設置muduo庫的線程數量。
  • 把當前rpc節點上要發布的服務全部注冊到zk上面,讓rpc client可以在zk上發現服務,關于zk之后會分析到。
  • 啟動網絡服務
void OnConnection();

連接回調

void RpcProvider::OnConnection(const muduo::net::TcpConnectionPtr &conn)
{if (!conn->connected()){// rpc client的連接斷開了conn->shutdown();}
}
void OnMessage( );

已建立連接用戶的讀寫事件回調,如果遠程有一個rpc服務的調用請求,那么OnMessage方法就會響應

void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buffer, muduo::Timestamp)
{std::string revc_buf = buffer->retrieveAllAsString();uint32_t header_size = 0;revc_buf.copy((char *)&header_size, 4, 0);std::string rpc_header_str = revc_buf.substr(4, header_size);mprpc::RpcHeader rpcHeader;std::string service_name;std::string method_name;uint32_t args_size;if (rpcHeader.ParseFromString(rpc_header_str)){// 數據頭反序列化成功service_name = rpcHeader.service_name();method_name = rpcHeader.method_name();args_size = rpcHeader.args_size();}else{// 數據頭反序列化失敗std::cout << "rpc_header_str:" << rpc_header_str<< " parse error!" << std::endl;return;}// 獲取rpc方法參數的字符流數據std::string args_str = revc_buf.substr(4 + header_size, args_size);// 獲取service對象和method對象auto it = m_serviceMap.find(service_name);if (it == m_serviceMap.end()){// 沒有對應的服務對象std::cout << service_name << " is not exist!" << std::endl;return;}auto mit = it->second.m_methodMap.find(method_name);if (mit == it->second.m_methodMap.end()){// 沒有對應的服務對象std::cout << service_name << ": "<< method_name << " is not exist!" << std::endl;return;}// 獲取service對象 new UserServicegoogle::protobuf::Service *service = it->second.m_service;// 獲取method對象 Loginconst google::protobuf::MethodDescriptor *method = mit->second;// 生成rpc方法調用的請求request和響應response參數google::protobuf::Message *request = service->GetRequestPrototype(method).New();if (!request->ParseFromString(args_str)){std::cout << " request parse error, content: " << args_str << std::endl;return;}google::protobuf::Message *response = service->GetResponsePrototype(method).New();google::protobuf::Closure *done=google::protobuf::NewCallback<RpcProvider,const muduo::net::TcpConnectionPtr&,google::protobuf::Message*>(this, &RpcProvider::SendRpcResponse, conn, response);service->CallMethod(method, nullptr, request, response, done);
}
  • 網絡上接受的遠程rpc調用請求的字符流 ,并從中讀取前4個字節的內容,這里我們按照header_size(4個字節)+hear_str+args_str進行存放,前四個字節是服務的名字和方法的名字一起的長度,通過這四個字節,我們可以分辨出來名字和參數。
  • 通過從字符流中讀取前4個字節的內容,得到header_size,并根據其讀取數據頭的原始字符流,反序列化數據。
  • 在定義RpcHeader時我們按照以下結構進行定義,這樣通過反序列化,我們就得到了相應的方法以及參數長度
syntax="proto3";
package mprpc;message RpcHeader
{bytes service_name=1;   //類名bytes method_name=2;    //方法名uint32 args_size=3;     //參數長度(參數序列化后的長度)
}
  • 通過service_name以及method_name在之前定義的m_serviceMap中,找到相應的service對象(UserService)和method對象(Login);
  • 生成rpc方法調用的請求request和響應response參數;
  • CallMethod函數中最后一個參數為google::protobuf::Closure *done,這里我們綁定一個Closure的回調函數SendRpcResponse,通過網絡把rpc方法執行的結果發送會rpc的調用方。Closure類其實相當于一個閉包。這個閉包捕獲了一個成員對象的成員函數例如login函數,以及這個成員函數需要的參數。然后閉包類提供了一個方法Run(),當執行這個閉包對象的Run()函數時,他就會執行捕獲到的成員對象的成員函數,也就是相當于執行void RpcProvider::SendRpcResponse(conn, response);,這個函數可以將reponse消息體發送給Tcp連接的另一端,即caller。
  • 也就是在userservice.cc中的Login()函數中,最后調用done->Run(),實際上就是調用了RpcProvider::SendRpcResponse(conn, response);將response消息體作為Login處理結果發送回caller。
    在這里插入圖片描述
void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr &conn, google::protobuf::Message *response)
{std::string response_str;//response 進行序列化if(response->SerializeToString(&response_str)){//序列化成功后,通過網絡把rpc方法執行的結果發送會rpc的調用方conn->send(response_str);}else{std::cout<<"Serialize response_str error!"<<std::endl;}//模擬http的短鏈接服務,由rpcprovider主動斷開連接conn->shutdown();
}
  • 在框架上根據遠端rpc請求,調用當前rpc節點上發布的方法,也就是service->CallMethod(method, nullptr, request, response, done);

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/36831.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/36831.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/36831.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【Docker】可視化平臺Portainer

文章目錄 Portainer的特點Portainer的安裝步驟注意事項 Docker的可視化工具Portainer是一個輕量級的容器管理平臺&#xff0c;它為用戶提供了一個直觀的圖形界面來管理Docker環境。以下是關于Portainer的詳細介紹和安裝步驟&#xff1a; Portainer的特點 輕量級&#xff1a;P…

企業級Windows設備日志采集工具

永久免費: 前往Gitee最新版本 更新內容 進一步提升工程師部署采集客戶端效率. 打開根Url,自動跳轉到部署頁面.(原工程師需輸入很長的Url);新增復制同類客戶端同步任務功能.優化客戶端分組操作;文件同步到服務器后,可配置文件名增加時間戳; 介紹 定時全量或增量采集工控機,…

項目分層--簡單圖書管理系統

分層情況 實體類Book代碼 //實體類 public class Book {private int id;private String name;private int bsum;public Book() {}public Book(int id, String name, int bsum) {this.id id;this.name name;this.bsum bsum;}public int getId() {return id;}public void set…

9.2JavaEE——JDBCTemplate的常用方法(三)query()方法

JdbcTemplate類中常用的查詢方法 方法說明List query(String sql, RowMapper rowMapper)執行String類型參數提供的SQL語句,并通過參數rowMapper返回一個List類型的結果。List query(String sql, PreparedStatementSetter pss, RowMapper rowMapper)根據String類型參數提供的S…

9. Revit API UI: UIView、UIDocument、框選聚焦

9. Revit API UI: UIView、UIDocument、框選聚焦 UI命名空間下的API&#xff0c;到這里差不多就押送講完了&#xff0c;同Application那篇所講的幾個類與接口&#xff0c;都是帶UI的對應了一個不帶UI的&#xff0c;如UIApplication和Application&#xff0c;作用呢&#xff0c…

Jenkins 下使用 Node 和 Npm(借助 nvm-wrapper 插件)構建前端程序

一、前言 搭建完Jenkins后&#xff0c;如何使用node進行構建前端呢&#xff0c;多個項目會使用的node的多個版本。如何動態指定node的版本進行構建呢。 方案一&#xff1a; 安裝多個node版本&#xff0c;然后進行指定。這樣比較麻煩。 方案二&#xff1a; 使用Jenkins的nv…

Spring相關面試題(三)

29 如何在所有的BeanDefinition注冊完成后&#xff0c;進行擴展 Bean工廠的后置處理器&#xff0c;在所有的Bean注冊完成后&#xff0c;就被執行。 public class A implements BeanFactoryPostProcessor {private String name "a class";private B b; ?public St…

ARM芯片架構(RTOS)

前言&#xff1a;筆記韋東山老師的rtos教程&#xff0c;連接放在最后 #ARM介紹 arm芯片屬于精簡指令集risc&#xff0c;所用的指令比較簡單&#xff0c;ARM架構是一種精簡指令集&#xff08;RISC&#xff09;架構&#xff0c;廣泛應用于移動設備、嵌入式系統、物聯網等領域。AR…

Linux:簡單說說分號“;”、單豎線“|”、雙豎線“||”、單“”、雙“”作為多個命令分隔符的用法

以下符號都可以用來連接多個命令&#xff0c;在一行中作為多個命令的分隔符。區別如下&#xff1a; 1、分號“;” 按照先后順序執行命令。每個命令都會在前一個命令執行完畢后立即執行。 如果其中一個命令出現錯誤&#xff0c;后續命令仍然會繼續執行。 2、單豎線“|” 作為管…

2. jenkins發布java項目

jenkins發布java項目 一、環境描述二、部署tomcat業務服務器三、部署git服務器&#xff0c;上傳測試代碼1、部署git服務器2、上傳測試代碼 四、jenkins對接組件1、安裝必要的插件2、對接git客戶端3、對接maven工具4、配置maven需要的jdk5、配置gitlab服務器的連接6、在jenkins上…

1161. 最大層內元素和

1161. 最大層內元素和 題目鏈接&#xff1a;1161. 最大層內元素和 代碼如下&#xff1a; /*** Definition for a binary tree node.* struct TreeNode {* int val;* TreeNode *left;* TreeNode *right;* TreeNode() : val(0), left(nullptr), right(nullptr)…

AI智能體文章索引

1&#xff0c;探索AI世界系列&#xff1a;俗說AI智能體 2&#xff0c;是真的嗎&#xff0c;不會代碼也能開發一款AI應用&#xff1f;GPTs讓人夢想成真 3&#xff0c;來了&#xff0c;你的第一個AI智能體 4&#xff0c;制作一個智能體&#xff1a;抖音熱點話題文案制作助手 5&am…

【Django】網上蛋糕項目商城-首頁

概念 本文在上一文章搭建完數據庫&#xff0c;以及創建好項目之后&#xff0c;以及前端靜態文件后&#xff0c;對項目的首頁功能開發。 后端代碼編寫 這里我們使用pymysql模塊對數據庫進行操作&#xff0c;獲取數據。因此需要在dos窗口使用以下指令下載該庫文件 pip instal…

springboot3搭建WebSocket服務

springboot3搭建WebSocket服務 文章目錄 springboot3搭建WebSocket服務前言一、創建SpringBoot工程二、pom.xml中引入依賴1.引入庫2. application.yml配置 三、主啟動類四、WebSocket配置類五、編寫WebSocket服務類六、編寫測試頁面總結 前言 本文詳細介紹了如何在SpringBoot項…

新型發電系統——光伏行業推動能源轉型

一、發展背景 “十四五”期間&#xff0c;隨著“雙碳”目標提出及逐步落實&#xff0c;本就呈現出較好發展勢頭的分布式光伏發展有望大幅提速。就“十四五”光伏發展規劃&#xff0c;國家發改委能源研究所可再生能源發展中心副主任陶冶表示&#xff0c;“雙碳”目標意味著國家…

Java面試題:比較Maven和Gradle的構建生命周期和依賴管理

Maven和Gradle是兩個流行的構建工具&#xff0c;各自有不同的構建生命周期和依賴管理機制。以下是它們的比較&#xff1a; 構建生命周期 Maven Maven有一個固定的生命周期&#xff0c;由一系列階段&#xff08;phases&#xff09;組成&#xff0c;每個階段代表一個構建步驟。…

ubuntu更改ssh默認端口22

編輯 /etc/ssh/sshd_config&#xff0c;把Port前的#去掉&#xff0c;端口號改成8022&#xff0c;重啟ssh。 // 1. 修改配置 sudo vi /etc/ssh/sshd_config// 2. 重啟 ssh sudo systemctl restart ssh//【不想確認可跳過此步驟】 3. 查看 ssh 監聽端口 sudo lsof -i:22 // 無…

MySQL改密

這里寫目錄標題 更改登錄密碼&#xff1a;有權限賬號能登錄mysql中&#xff1a;有權限賬號不能登錄mysql中&#xff1a;mysql5.6版本命令mysql5.7版本命令修改密碼8.0版本改完后&#xff1a; mysql登錄不上了本機安裝了5.6后&#xff0c;又安裝了mysql8.0 更改登錄密碼&#xf…

QT QVariant 類和 C++ 的 union有什么區別

QVariant 類和 C 的 union&#xff08;共用體&#xff09;在概念、用途和實現上有所不同。以下是對它們的區別和使用的簡要概述&#xff1a; QVariantQT 如何儲存多種數據類型&#xff08;QVariant &#xff09;-CSDN博客 概念&#xff1a;QVariant 是 Qt 框架中的一個類&…

易查分小程序丨查詢開始和截止時間如何設置?

老師在發布查詢時&#xff0c;希望讓學生家長在指定的時間段才能查詢&#xff0c;應該如何實現&#xff1f; 通過查詢時段功能&#xff0c;老師可以自主設置查詢開始和截止時間&#xff0c;下面就來教給大家如何使用吧&#xff01; 設置查詢時段演示效果 &#x1f4cc;使用教程…