在上一節userservice.cc
的主函數中,我們初始化以后實例化了一個RpcProvider
對象provider
。接著調用了它的NotifyService(new UserService)
方法,將UserService服務對象及其提供的方法進行預備發布。發布完服務對象后再調用Run()就將預備發布的服務對象及方法注冊到ZooKeeper上并開啟了對遠端調用的網絡監聽。接下來我們看看RpcProvider
的具體實現。
rpcprovider
該類是Rpc框架提供的專門發布RPC服務方法的網絡對象類。
重要成員變量
muduo::net::EventLoop m_eventLoop;struct ServiceInfo
{google::protobuf::Service *m_service; std::unordered_map<std::string,const google::protobuf::MethodDescriptor*> m_methodMap;
};std::unordered_map<std::string,ServiceInfo> m_serviceMap;
- EventLoop大家都陌生
- ServiceInfo類,組織了一個service服務類型信息,里面包含了服務對象m_service,以及服務對象方法m_methodMap,在user.proto中注冊的rpc遠端調用方法Login和Register都是用google::protobuf::MethodDescriptor類來描述的。
- m_serviceMap 存儲注冊成功的服務對象和其服務方法的所有信,一臺服務器上可能會提供多個Service服務對象,m_serviceMap存儲了多個Service_Info結構體。
重要成員函數
void NotifyService();
這里是框架提供給外部使用的,可以發布rpc方法的函數接口,它的參數是google::protobuf::Service *service
決定了也可以接受任意的service。
為什么要使用google::protobuf::Service *service
呢?
在userservice.cc
中我們知道UserService是繼承自UserServiceRpc,而UserServiceRpc又是繼承自google::protobuf::Service類。
這就是C++的多態設計,rpcprovider作為Rpc通信框架的一部分,是服務于業務層的,我們不能讓其只服務與某一個業務,即void NotifyService(UserService *service);
,對于不同的業務我們再去定義其他的類。所以protobuf就提供了google::protobuf::Service
基類來描述服務對象。傳遞對象的時候傳遞基類指針指向派生類實例,使Rpc框架中定義的類方法解耦于業務層,這樣就可以接受任意類型的service。
void RpcProvider::NotifyService(google::protobuf::Service *service)
{ServiceInfo service_info;// 獲取了服務對象的描述信息const google::protobuf::ServiceDescriptor *pserviceDesc = service->GetDescriptor();// 獲取服務的名字std::string service_name = pserviceDesc->name();// 獲取服務對象service對象的方法的數量int methodCnt = pserviceDesc->method_count();// std::cout << "service_name:" << service_name << std::endl;LOG_INFO("service_name:%s",service_name.c_str());for (int i = 0; i < methodCnt; i++){// 獲取了服務對象指定下標的服務方法的描述(抽象描述)const google::protobuf::MethodDescriptor *pmethodDesc = pserviceDesc->method(i);std::string method_name = pmethodDesc->name();service_info.m_methodMap.insert({method_name, pmethodDesc});//std::cout << "method_name:" << method_name << std::endl;LOG_INFO("method_name:%s",method_name.c_str());}service_info.m_service = service;m_serviceMap.insert({service_name, service_info});
}
- 定義了一個ServiceInfo對象
service_info
,用來保存service服務類型信息。 - ServiceDescriptor對象pserviceDesc通過底層的GetDescriptor()函數來獲取給定的消息對象的描述符,通過pserviceDesc,調用底層的方法我們可以獲得服務的名字以及對應的方法數量。
- 通過循環,得到方法對應的名字和方法的描述放入結構體service_info的m_methodMap中。
- 最后將service_name, service_info一起放入m_serviceMap中。這樣我們就獲得了服務對應的方法以及方法對應的描述。
void Run();
負責啟動rpc服務節點,開始提供rpc遠程網絡調用服務
void RpcProvider::Run()
{std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());muduo::net::InetAddress address(ip, port);// 創建TcpServer對象muduo::net::TcpServer server(&m_eventLoop, address, "RpcProvider");// 綁定連接回調和消息讀寫回調的方法 分離了網絡代碼和業務代碼server.setConnectionCallback(std::bind(&RpcProvider::OnConnection,this, std::placeholders::_1));server.setMessageCallback(std::bind(&RpcProvider::OnMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 設置muduo庫的線程數量server.setThreadNum(4);ZkClient zkCli;// 連接zk服務zkCli.Start();for(auto &sp:m_serviceMap){// /service_name /UserServiceRpcstd::string service_path="/"+sp.first;zkCli.Create(service_path.c_str(),nullptr,0);for(auto &mp:sp.second.m_methodMap){std::string method_path = service_path+"/"+mp.first;char method_path_data[128]={0};sprintf(method_path_data,"%s:%d",ip.c_str(),port);zkCli.Create(method_path.c_str(),method_path_data,strlen(method_path_data),ZOO_EPHEMERAL);}}std::cout << "RpcProvider statrt service at ip: " << ip<< " port: " << port << std::endl;// 啟動網絡服務server.start();m_eventLoop.loop();
}
- 調用MprpcApplication的方法獲取了響應的ip和port,接下來就是我們在muduo庫中剖析的網絡通訊過程,得到ip和port組裝了address,創建tcpserver對象,注冊連接回調和消息讀寫回調的方法,分離網絡代碼和業務代碼,設置muduo庫的線程數量。
- 把當前rpc節點上要發布的服務全部注冊到zk上面,讓rpc client可以在zk上發現服務,關于zk之后會分析到。
- 啟動網絡服務
void OnConnection();
連接回調
void RpcProvider::OnConnection(const muduo::net::TcpConnectionPtr &conn)
{if (!conn->connected()){// rpc client的連接斷開了conn->shutdown();}
}
void OnMessage( );
已建立連接用戶的讀寫事件回調,如果遠程有一個rpc服務的調用請求,那么OnMessage方法就會響應
void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buffer, muduo::Timestamp)
{std::string revc_buf = buffer->retrieveAllAsString();uint32_t header_size = 0;revc_buf.copy((char *)&header_size, 4, 0);std::string rpc_header_str = revc_buf.substr(4, header_size);mprpc::RpcHeader rpcHeader;std::string service_name;std::string method_name;uint32_t args_size;if (rpcHeader.ParseFromString(rpc_header_str)){// 數據頭反序列化成功service_name = rpcHeader.service_name();method_name = rpcHeader.method_name();args_size = rpcHeader.args_size();}else{// 數據頭反序列化失敗std::cout << "rpc_header_str:" << rpc_header_str<< " parse error!" << std::endl;return;}// 獲取rpc方法參數的字符流數據std::string args_str = revc_buf.substr(4 + header_size, args_size);// 獲取service對象和method對象auto it = m_serviceMap.find(service_name);if (it == m_serviceMap.end()){// 沒有對應的服務對象std::cout << service_name << " is not exist!" << std::endl;return;}auto mit = it->second.m_methodMap.find(method_name);if (mit == it->second.m_methodMap.end()){// 沒有對應的服務對象std::cout << service_name << ": "<< method_name << " is not exist!" << std::endl;return;}// 獲取service對象 new UserServicegoogle::protobuf::Service *service = it->second.m_service;// 獲取method對象 Loginconst google::protobuf::MethodDescriptor *method = mit->second;// 生成rpc方法調用的請求request和響應response參數google::protobuf::Message *request = service->GetRequestPrototype(method).New();if (!request->ParseFromString(args_str)){std::cout << " request parse error, content: " << args_str << std::endl;return;}google::protobuf::Message *response = service->GetResponsePrototype(method).New();google::protobuf::Closure *done=google::protobuf::NewCallback<RpcProvider,const muduo::net::TcpConnectionPtr&,google::protobuf::Message*>(this, &RpcProvider::SendRpcResponse, conn, response);service->CallMethod(method, nullptr, request, response, done);
}
- 網絡上接受的遠程rpc調用請求的字符流 ,并從中讀取前4個字節的內容,這里我們按照header_size(4個字節)+hear_str+args_str進行存放,前四個字節是服務的名字和方法的名字一起的長度,通過這四個字節,我們可以分辨出來名字和參數。
- 通過從字符流中讀取前4個字節的內容,得到header_size,并根據其讀取數據頭的原始字符流,反序列化數據。
- 在定義RpcHeader時我們按照以下結構進行定義,這樣通過反序列化,我們就得到了相應的方法以及參數長度
syntax="proto3";
package mprpc;message RpcHeader
{bytes service_name=1; //類名bytes method_name=2; //方法名uint32 args_size=3; //參數長度(參數序列化后的長度)
}
- 通過service_name以及method_name在之前定義的m_serviceMap中,找到相應的service對象(UserService)和method對象(Login);
- 生成rpc方法調用的請求request和響應response參數;
- CallMethod函數中最后一個參數為
google::protobuf::Closure *done
,這里我們綁定一個Closure的回調函數SendRpcResponse
,通過網絡把rpc方法執行的結果發送會rpc的調用方。Closure類其實相當于一個閉包。這個閉包捕獲了一個成員對象的成員函數例如login函數,以及這個成員函數需要的參數。然后閉包類提供了一個方法Run()
,當執行這個閉包對象的Run()函數時,他就會執行捕獲到的成員對象的成員函數,也就是相當于執行void RpcProvider::SendRpcResponse(conn, response);,
這個函數可以將reponse消息體發送給Tcp連接的另一端,即caller。 - 也就是在
userservice.cc
中的Login()
函數中,最后調用done->Run()
,實際上就是調用了RpcProvider::SendRpcResponse(conn, response)
;將response消息體作為Login處理結果發送回caller。
void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr &conn, google::protobuf::Message *response)
{std::string response_str;//response 進行序列化if(response->SerializeToString(&response_str)){//序列化成功后,通過網絡把rpc方法執行的結果發送會rpc的調用方conn->send(response_str);}else{std::cout<<"Serialize response_str error!"<<std::endl;}//模擬http的短鏈接服務,由rpcprovider主動斷開連接conn->shutdown();
}
- 在框架上根據遠端rpc請求,調用當前rpc節點上發布的方法,也就是
service->CallMethod(method, nullptr, request, response, done);
;