?一.消息分發Dispatcher實現
Dispatcher 就是“消息分發中樞”:根據消息類型 MType,把消息派發給對應的處理函數(Handler)執行。
初版:
#pragma once
#include "net.hpp"
#include "message.hpp"namespace wws
{class Dispatcher {public:using ptr=std::shared_ptr<Dispatcher>;void registerHandler(MType mtype,const MessageCallback &handler){std::unique_lock<std::mutex> lock(_mutex);_handlers.insert(std::make_pair(mtype,handler));}void onMessage(const BaseConnection::ptr&conn,BaseMessage::ptr &msg){//找到消息類型對應的業務處理函數,進行調用std::unique_lock<std::mutex> lock(_mutex);auto it=_handlers.find(msg->mtype());if(it!=_handlers.end()){it->second(conn,msg);return;}//沒有找到指定類型的處理回調 但我們客戶端和服務端都是我們自己設計的 因此不可能出現這種情況ELOG("收到未知類型的消息");conn->shutdown();}private:std::mutex _mutex;std::unordered_map<MType,MessageCallback> _handlers;};
}
使用方法 :服務端為例
#include "message.hpp"
#include "net.hpp"
#include "dispatcher.hpp"void onRpcRequest(const wws::BaseConnection::ptr conn,wws::BaseMessage::ptr& msg)
{std::cout<<"收到了Rpc請求:";std::string body =msg->serialize();std::cout<<body<<std::endl;auto rpc_rsp=wws::MessageFactory::create<wws::RpcResponse>();rpc_rsp->setId("111");rpc_rsp->setMType(wws::MType::RSP_RPC);rpc_rsp->setRcode(wws::RCode::RCODE_OK);rpc_rsp->setResult(33);conn->send(rpc_rsp);
}
void onTopicRequest(const wws::BaseConnection::ptr conn,wws::BaseMessage::ptr& msg)
{std::cout<<"收到了Topic請求:";std::string body =msg->serialize();std::cout<<body<<std::endl;auto rpc_rsp=wws::MessageFactory::create<wws::TopicResponse>();rpc_rsp->setId("111");rpc_rsp->setMType(wws::MType::RSP_RPC);rpc_rsp->setRcode(wws::RCode::RCODE_OK);conn->send(rpc_rsp);
}int main()
{//server建立收到rpc topic請求時對應的調用函數auto dispatcher=std::make_shared<wws::Dispatcher>();dispatcher->registerHandler(wws::MType::REQ_RPC,onRpcRequest);//注冊映射關系dispatcher->registerHandler(wws::MType::REQ_TOPIC,onTopicRequest);//注冊映射關系auto server=wws::ServerFactory::create(9090);auto message_cb=std::bind(&wws::Dispatcher::onMessage,dispatcher.get(),std::placeholders::_1,std::placeholders::_2);server->setMessageCallback(message_cb);server->start();return 0;
}
回調函數調用過程:
1.服務端定義了兩個函數onRpcRequest收到rpc請求的回調函數,onTopicRequest收到topic請求的回調函數(兩個函數的參數部分都是一樣的)。
2.創建Dispatcher類對象,調用registerHandler函數把請求類型(mtype)和對應的回調函數建立映射(因為上面回調函數的類型都是一樣的,所以可以用map進行同一管理)。
3.把Dispatcher::onMessage函數設置成消息回調函數。在Dispatcher::onMessage函數內部會根據不同的消息類型,找到對應的回調函數并進行調用。
但是設置的兩個回調函數中onRpcRequest,onTopicRequest。它們第二個參數類型都是基類BaseMessage,雖然傳入的對象是子類對象RpcResponse TopicResponse,但仍無法訪問它們對應子類的成員函數。
如果把第二個參數基類Base換成它們對應的子類,能訪問了,但這就會導致函數的類型不一樣了,就不能用map進行統一管理。
有沒有什么辦法即能用map進行統一管理,還能在回調函數中調用到子類的函數。
方法一:直接在回調函數中通過
dynamic_cast
?/ std::dynamic_pointer_cast將父類->子類可以通過
dynamic_cast
將基類指針(或引用)轉換為子類類型,以便訪問子類特有的成員函數。前提是你使用的是多態(polymorphic)類,也就是基類至少要有一個虛函數//Base基類 Derived子類 void test(Base* basePtr) {// 裸指針寫法Derived* derivedPtr = dynamic_cast<Derived*>(basePtr);if (derivedPtr) {derivedPtr->specialFunction(); // 成功轉換,調用子類函數} else {cout << "dynamic_cast failed!" << endl;} } //智能指針寫法 std::shared_ptr<Base> basePtr = std::make_shared<Derived>(); // 用智能指針創建對象 std::shared_ptr<Derived> derivedPtr = std::dynamic_pointer_cast<Derived>(basePtr); // 類型安全轉換
項目 裸指針寫法 智能指針寫法 創建方式 new Derived()
std::make_shared<Derived>()
類型轉換 dynamic_cast<Derived*>(Base*)
std::dynamic_pointer_cast<Derived>(shared_ptr<Base>)
返回值類型 Derived*
std::shared_ptr<Derived>
生命周期管理 手動 delete 自動釋放,無內存泄漏風險 安全檢查 ? 運行時類型檢查,失敗返回 nullptr
? 同樣運行時類型檢查,失敗返回空智能指針 是否影響引用計數 ? 不涉及引用計數 ? 新建了一個共享控制塊引用
缺點 說明 ? 調用方需要知道消息類型 調用者必須手動 dynamic_cast
到對應子類,否則不能訪問子類內容? 有一定運行時開銷 dynamic_cast
需要在運行時檢查類型,會略有性能損失(但一般能接受)? 容易出錯 如果類型錯了,就返回 nullptr
,還要額外判斷、處理錯誤? 易破壞封裝 上層代碼要知道并顯式轉換為子類,增加了耦合度和類型暴露
方法二:模板 + 繼承 + 多態
先看代碼實現:
?#pragma once #include "net.hpp" #include "message.hpp"namespace wws {//讓統一的父類指針指向不同的子類對象//通過調用父類虛函數,調用不同子類onMessage類型轉換(dynamic<>)完成后的函數調用class Callback{public:using ptr=std::shared_ptr<Callback>;virtual void onMessage(const BaseConnection::ptr &conn,BaseMessage::ptr &msg)=0;};template<typename T>class CallbackT:public Callback{public:using ptr=std::shared_ptr<CallbackT<T>>;//根據消息類型重新定義出函數類型using MessageCallback =std::function<void(const BaseConnection::ptr&conn,std::shared_ptr<T> &msg)>;CallbackT(const MessageCallback &handler):_handler(handler){}void onMessage(const BaseConnection::ptr &conn,BaseMessage::ptr &msg)override{auto type_msg=std::dynamic_pointer_cast<T>(msg);_handler(conn,type_msg);}private:MessageCallback _handler;};class Dispatcher {public:using ptr=std::shared_ptr<Dispatcher>;template<typename T> //加typename表示這是一個類型void registerHandler(MType mtype,const typename CallbackT<T>::MessageCallback &handler)//傳的是類對象 T是消息類型(BaseMessage子類){std::unique_lock<std::mutex> lock(_mutex);auto cb = std::make_shared<CallbackT<T>>(handler);_handlers.insert(std::make_pair(mtype,cb));}void onMessage(const BaseConnection::ptr&conn,BaseMessage::ptr &msg){//找到消息類型對應的業務處理函數,進行調用std::unique_lock<std::mutex> lock(_mutex);auto it=_handlers.find(msg->mtype());if(it!=_handlers.end()){it->second->onMessage(conn,msg);//調用類中的回調函數return;}//沒有找到指定類型的處理回調 但我們客戶端和服務端都是我們自己設計的 因此不可能出現這種情況ELOG("收到未知類型的消息");conn->shutdown();}private:std::mutex _mutex;std::unordered_map<MType,Callback::ptr> _handlers;//second是一個基類指針}; }
使用方法:
不用傳BaseMessage基類,直接傳子類。但在設置時指明Typed消息類型。
因為回調函數hadler類型不同,我們就用模板類根據T(消息類型)生成不同的類,里面是保存的是不同類型的回調函數。但根據模板類生成的類,類型不一樣還是不能統一放入map中。
所以再定義一個父類 里面的回調函數設置為虛函數,這些不同的模板類作為子類繼承父類并實現各自的回調函數。map中的handler存的不再是回調函數,而是一個父類指針,通過父類指針調用子類的回調函數。
具體過程:
使用模板類
CallbackT<T>
根據消息類型T
生成不同的子類,它們內部包裝了各自類型的回調函數。(因為子類傳入的是一個函數類型,保存的一個函數類型,而T是一個消息類型。還需要MessageCallback將T消息類型和回調函數類型綁定在一起)這些模板類繼承自統一的基類
Callback
,并重寫了其onMessage()
虛函數。當我們在調用registerHandler注冊回調函數時,會創建一個
CallbackT<T>
的對象并獲取其指針,最后設置到map中。在
Dispatcher
中,map<MType, Callback::ptr>
存儲的是基類指針,但實際指向的是不同子類CallbackT<T>
的對象。當調用
onMessage()
時,基類指針會通過虛函數機制調用對應子類的實現,再通過dynamic_pointer_cast
將消息轉換為正確的類型,最終調用具體的業務回調函數。
方法一和方法二對比:
相比直接在 handler 里 dynamic_cast,現在的設計通過模板和多態封裝了類型轉換邏輯,使回調函數 更簡潔、更安全、更可維護,Dispatcher 也更具通用性和擴展性。
對比點 ? 當前封裝方式<br>(模板 + 多態) ? 直接在每個 handler 里手動 dynamic_cast 💡 代碼復用性 回調邏輯封裝在模板中,避免重復寫類型轉換代碼 每個回調都要重復寫一次 dynamic_pointer_cast ? 類型安全 編譯器自動根據模板類型 T
限定傳入的函數簽名由開發者自己保證類型正確,容易寫錯 🎯 接口統一 Dispatcher 接收統一的 BaseMessage::ptr
,自動調用類型對應的 handler接口統一,但每次回調前都得“手動猜”消息類型 🧼 代碼整潔性 Handler 業務函數專注于處理業務,不摻雜類型轉換代碼 handler 代碼里混入了類型轉換、錯誤判斷等雜項 🔄 可擴展性 新增消息類型只需 registerHandler<T>
一行,無需修改 dispatcher 邏輯每新增一種類型,都要寫新回調 + 自己處理類型轉換 🔒 類型封裝 類型轉換封裝在 CallbackT<T>::onMessage
內,調用者無感知顯式暴露類型細節,破壞封裝性 🧠 可維護性 Dispatcher 管理邏輯集中、結構清晰 回調函數多時,容易混亂、出錯
二.服務端-RpcRouter實現
組織和處理客戶端發來的 RPC 請求,并調用對應的業務邏輯進行響應。
這個模塊主要由 4 個類構成:
類名 作用 VType
參數類型的枚舉,例如整數、字符串、對象等 ServiceDescribe
描述一個服務方法的參數、返回值、回調函數等 ServiceManager
管理多個服務(增刪查) RpcRouter
處理客戶端發來的 RPC 請求,協調調用服務
#include "../common/net.hpp"
#include "../common/message.hpp"namespace wws
{
namespace server
{//枚舉類 VType 定義參數與返回值的類型enum class VType{BOOL = 0,INTEGRAL,NUMERIC,STRING,ARRAY,OBJECT,};// 服務描述類class ServiceDescribe{public:using ptr=std::shared_ptr<ServiceDescribe>;using ServiceCallback=std::function<void(const Json::Value&,Json::Value&)>;using ParamDescribe=std::pair<std::string,VType>;//參數名稱 類型ServiceDescribe(std::string &&method_name,ServiceCallback &&callback,std::vector<ParamDescribe>&& params_desc,VType return_type):_method_name(std::move(method_name)),_callback(std::move(callback)),_params_desc(std::move(params_desc)),_return_type(return_type){}//返回名稱const std::string & method(){return _method_name;}//校驗傳入參數是否符合要求(1.字段完整 + 2.類型匹配)bool paramCheck(const Json::Value¶ms)//{"nums1",11}{for(auto&desc:_params_desc){//1.判斷是否有該字段if(params.isMember(desc.first)==false){ELOG("沒有 %s 參數字段",desc.first.c_str());return false;}//2.判斷該字段類型是否正確if(check(desc.second,params[desc.first])==false){ELOG("%s參數字段類型錯誤",desc.first.c_str());return false;}}return true;}bool call(const Json::Value& params,Json::Value&result){_callback(params,result);if(rtypeCheck(result)==false){ELOG("回調處理函數中響應信息類型錯誤");return false;}return true;}private:// 判斷return類型是否正確bool rtypeCheck(const Json::Value &val){return check(_return_type, val);}//判斷val對象的類型是否和vtype一致 Json::Value兼容任何JSON類型(int、string、array、object 等)bool check(VType vtype,const Json::Value &val){switch(vtype){case VType::BOOL :return val.isBool();case VType::INTEGRAL : return val.isIntegral();case VType::NUMERIC : return val.isNumeric();case VType::STRING : return val.isString();case VType::ARRAY : return val.isArray();case VType::OBJECT : return val.isObject();}return false;}private:std::string _method_name;//方法名稱ServiceCallback _callback;//實際的業務回調函數std::vector<ParamDescribe> _params_desc;//參數字段格式描述vector<參數名稱,對應的類型>VType _return_type;//結果類型描述 };//對比 直接在ServiceDescribe中set各參數 的優點:構造完后ServiceDescribe 的成員就不再修改,僅讀取 天然線程安全。//若多個線程同時調用 setXxx() 方法 會出現線程安全的問題 需要在每個set函數中加鎖class SDescribeFactory{public:void setMethodName(const std::string&name){_method_name=name;}void setReturnType(VType vtype){_return_type=vtype;}void setParamDesc(const std::string &pname,VType vtype){_params_desc.push_back(ServiceDescribe::ParamDescribe(pname,vtype));}void setCallback(const ServiceDescribe::ServiceCallback&cb){_callback=cb;}ServiceDescribe::ptr build(){return std::make_shared<ServiceDescribe>(_method_name,_callback,_params_desc,_return_type);}private:std::string _method_name;//方法名稱ServiceDescribe::ServiceCallback _callback; // 實際的業務回調函數std::vector<ServiceDescribe::ParamDescribe> _params_desc; // 參數字段格式描述vector<參數名稱,對應的類型>VType _return_type; // 結果類型描述};//服務管理類 增刪查class ServiceManager{public:using ptr=std::shared_ptr<ServiceManager>;void insert(const ServiceDescribe::ptr&desc)//增{std::unique_lock<std::mutex> lock(_mutex);_service.insert(std::make_pair(desc->method(),desc));}ServiceDescribe::ptr select(const std::string &method_name)//查{std::unique_lock<std::mutex> lock(_mutex);auto it=_service.find(method_name);if(it==_service.end()){return ServiceDescribe::ptr();}return it->second;}void remove(const std::string &method_name)//刪{_service.erase(method_name);}private:std::mutex _mutex;std::unordered_map<std::string,ServiceDescribe::ptr> _service;//函數名稱 對應服務};class RpcRouter{public:using ptr=std::shared_ptr<ServiceDescribe>;//對注冊到Dispatcher模塊針對rpc請求進行回調處理的業務函數void onRpcRequest(const BaseConnection::ptr&conn,RpcRequest::ptr &request){//1.根據用戶請求的方法描述 判斷當前服務端能否提供對應的服務auto service=_service_manager->select(request->method());if(service.get()==nullptr){ELOG("未找到%s服務",request->method().c_str());return response(conn,request,Json::Value(),RCode::RCODE_NOT_FOUND_SERVICE);}//2.進行參數校驗 確定能否提供服務if(service->paramCheck(request->params())==false){ELOG("%s服務參數校驗失敗",request->method().c_str());return response(conn,request,Json::Value(),RCode::RCODE_INVALID_PARAMS);}//3.調用業務回調接口進行處理Json::Value result;bool ret=service->call(request->params(),result);if(ret==false){ELOG("%s服務參調用出錯",request->method().c_str());return response(conn,request,Json::Value(),RCode::RCODE_INTERNAL_ERROR);}//4.向客戶端發送結果return response(conn,request,result,RCode::RCODE_OK);}//提供服務注冊接口void registerMethod(const ServiceDescribe::ptr &service ){_service_manager->insert(service);}private://響應對象void response(const BaseConnection::ptr&conn,RpcRequest::ptr&req,const Json::Value&res,RCode rcode){auto msg=MessageFactory::create<RpcResponse>();msg->setId(req->rid());msg->setMType(wws::MType::RSP_RPC);msg->setRcode(rcode);msg->setResult(res);conn->send(msg);}private:ServiceManager::ptr _service_manager;};
}
}
ServiceDescribe
:服務描述類每一個服務方法(如
Add
、Translate
)都有一個ServiceDescribe
對象,它記錄:
方法名
_method_name
參數信息
_params_desc
(字段名 + 類型)返回值類型
_return_type
實際處理邏輯
_callback
?功能:
paramCheck()
:檢查客戶端傳來的參數是否完整且類型匹配。
call()
:調用業務函數,處理請求,并校驗響應類型,輸出型參數Json::Value&result獲取結果。
RpcRouter
:RPC 請求核心調度器處理流程(onRpcRequest):
1. 獲取客戶端請求的方法名:request->method() 2. 查找是否有對應的服務:_service_manager->select() 3. 參數檢查:service->paramCheck() 4. 調用回調函數處理:service->call() 5. 構建響應消息:RpcResponse 6. 通過 conn->send(msg) 返回結果
注冊流程(registerMethod()):
RpcRouter router; router.registerMethod(service); // 將服務注冊到服務管理器
為什么要用 SDescribeFactory 工廠模式而不是在 ServiceDescribe 中直接使用 setXxx() 方法進行設置ServiceDescribe的各個參數?
通過DescribeFactory 工廠模式,造完后ServiceDescribe 的成員就不再修改,僅讀取 天然線程安全。
如果在ServiceDescribe 設置set(),若多個線程同時調用 setXxx() 方法 會出現線程安全的問題 需要在每個set函數中加鎖。
客戶端發送 RpcRequest↓RpcRouter::onRpcRequest()//進行處理↓_service_manager->select(method)//查找方法↓ServiceDescribe::paramCheck(params)//校驗參數↓ServiceDescribe::call() → 執行業務邏輯//執行回調↓RpcRouter::response() → 發送響應
三.客戶端-RpcRouter實現
🔹 1.
describe
(請求描述體)
封裝一個請求的基本信息:
request
: 包含 RID(請求 ID)、MType(消息類型)、Body(請求體)
std::promise<response>
:用于 future 異步
callback
:用于 callback 異步
RType
: 標識異步類型(ASYNC / CALLBACK)🔹 2.
Requestor::send(...)
send(connection, request, callback)
→ 異步 callback 模式
send(connection, request, std::future<response>)
→ future 模式將
describe
存入map<rid, describe>
中,等待響應回調🔹 3.
onResponse(connection, response)
收到響應后,根據 RID 從
map<rid, describe>
查找對應的describe
按照
RType
分發:
如果是
CALLBACK
,調用 callback如果是
ASYNC
,通過promise.set_value(...)
實現 future 結果🔹 4.
Dispatcher<mt, handler>
對
MType
進行派發處理(主要針對訂閱/通知類型的消息,不含 RID)
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <future>namespace wws
{
namespace client
{class Requestor{public:using ptr=std::shared_ptr<Requestor>;using RequestCallback=std::function<void(const BaseMessage::ptr&)>;using AsyncResponse=std::future<BaseMessage::ptr>;//請求描述的結構體struct RequestDescribe{using ptr=std::shared_ptr<RequestDescribe>;BaseMessage::ptr request;//請求消息指針RType rtype; //請求類型 異步/回調std::promise<BaseMessage::ptr> response;//用于 async 模式,設置結果set_value 給 future 返回值RequestCallback callback;//用于 callback 模式,響應到來時觸發用戶邏輯};//收到應答 根據rid找到對應的請求設置結果 或者 調用回調 void onReponse(const BaseConnection::ptr&conn,const BaseMessage::ptr&msg){std::string rid=msg->rid();RequestDescribe::ptr rdp=getDescribe(rid);//根據id進行查找if(rdp.get()==nullptr){ELOG("收到響應%s,但是未找到對應的請求描述!",rid.c_str());return;}//異步請求把應答msg作為結果設置到promise中,讓future就緒if(rdp->rtype==RType::REQ_ASYNC){rdp->response.set_value(msg);//promise.set_value(value); 手動設置值 讓std::future<BaseMessage::ptr>變為就緒。}//回調請求 有回調函數就進行調用else if(rdp->rtype==RType::REQ_CALLBACK){if(rdp->callback) rdp->callback(msg);}elseELOG("請求類型未知");//收到應答 刪除rid對應的請求描述delDescribe(rid);}//1.異步請求發送bool send(const BaseConnection::ptr&conn,const BaseMessage::ptr&req,AsyncResponse&async_rsp){//創建請求描述對象(newDescribe內部完成插入map)RequestDescribe::ptr rdp=newDescribe(req,RType::REQ_ASYNC);if(rdp.get()==nullptr){ELOG("構建請描述對象失敗");return false;}//get_future()關聯std::future<>async_rsp 和 std::promise<>response//promise.set_value(value) 被調用,就能async_rsp.get()獲取值async_rsp=rdp->response.get_future();return true;}//2.同步請求發送(發送完請求后,立刻調用get()阻塞等待set_value()設置后獲取結果)//可以在上層進行get()阻塞等待,也是同樣效果bool send(const BaseConnection::ptr&conn,const BaseMessage::ptr&req,BaseMessage::ptr&rsp){AsyncResponse rsp_future;bool ret=send(conn,req,rsp_future);if(ret==false) return false;rsp=rsp_future.get();//阻塞等待值就緒return true;}//3.回調請求發送bool send(const BaseConnection::ptr&conn,const BaseMessage::ptr&rep,RequestCallback&cb){//創建請求描述對象(newDescribe內部完成插入map)RequestDescribe::ptr rdp=newDescribe(rep,RType::REQ_CALLBACK,cb);if(rdp.get()==nullptr){ELOG("構建請描述對象失敗");return false;}conn->send(rep);return true;}private://1.新增RequestDescribe::ptr newDescribe(const BaseMessage::ptr&req,RType rtype,const RequestCallback&cb=RequestCallback()){std::unique_lock<std::mutex> lock(_mutex);//構建請求描述對象 并插入到mapRequestDescribe::ptr rd=std::make_shared<RequestDescribe>();rd->request=req;rd->rtype=rtype;if(rtype==RType::REQ_CALLBACK&&cb)rd->callback=cb;_request_desc.insert(std::make_pair(req->rid(),rd));//插入到mapreturn rd;}//2.查找RequestDescribe::ptr getDescribe(const std::string&rid){std::unique_lock<std::mutex> lock(_mutex);auto it=_request_desc.find(rid);if(it==_request_desc.end()){return RequestDescribe::ptr();}return it->second;}//3.刪除void delDescribe(const std::string&rid){std::unique_lock<std::mutex> lock(_mutex);_request_desc.erase(rid);}private:std::mutex _mutex;std::unordered_map<std::string,RequestDescribe::ptr> _request_desc;//id->請求消息};
}
}
用戶發起請求│▼
Requestor::send(...)│▼
創建 RequestDescribe (含回調 or promise)│▼
加入 map<rid, describe>│▼
發送消息給服務器?(服務器響應)Requestor::onResponse(...)│▼找回 describe -> 判斷 rtype├── CALLBACK → 執行 callback└── ASYNC → set promise▼清除 map<rid, describe>
四.客戶端RpcCaller實現
1. 構造函數
RpcCaller(const Requestor::ptr&)
初始化傳入一個
Requestor
實例;
Requestor
負責發送消息、注冊回調、接收服務端響應等;
RpcCaller
是調用層,Requestor
是通信層。2. 同步調用接口
bool call(const BaseConnection::ptr& conn,const std::string& method,const Json::Value& params,Json::Value& result)
1.先構建RpcRequest請求
2.調用requestor->同步send(),同步阻塞發送請求,并拿到rsp_msg。
3.將rsp_msg的Basemessage類型轉換為 RpcResponse,取出正文結果 result();
3. 異步 Future 調用接口
bool call(const BaseConnection::ptr& conn,const std::string& method,const Json::Value& params,std::future<Json::Value>& result)
創建
promise
對象,用于異步回填響應;通過
shared_ptr
管理promise
生命周期,綁定到回調函數中;通過
_requestor->send(...)
注冊異步回調;回調觸發后由
Callback()
設置promise.set_value()
;調用方使用返回的
future
進行.get()
即可拿到結果。4. 異步回調接口
bool call(const BaseConnection::ptr& conn,const std::string& method,const Json::Value& params,JsonResponseCallback& cb)
直接注冊用戶定義的回調
cb
到請求;內部通過
Callback1()
做包裝處理:
類型轉換為
RpcResponse
錯誤處理
最后調用用戶傳入的
cb(result)
傳回結果。
#include "requestor.hpp"namespace wws
{
namespace client
{class RpcCaller{public:using ptr=std::shared_ptr<RpcCaller>;using JsonAsyncResponse=std::future<Json::Value>;using JsonResponseCallback=std::function<void(const Json::Value&)>;//requestor中的處理是針對BaseMessage進行處理的(因為要對所有請求進行處理,不單單對Rpc請求處理)//用于在rpc caller中針對結果的處理是對RpcResponse里邊的result進行的RpcCaller(const Requestor::ptr &requestor):_requestor(requestor){}//1.同步調用 1.連接conn 2.方法名method 3.方法參數params 4.結果resultbool call(const BaseConnection::ptr&conn,const std::string&method,const Json::Value¶ms,Json::Value&result){//1.組織請求auto req_msg=MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);BaseMessage::ptr rsp_msg;//2.發送請求 因為send()是重載函數 參數的類型必須保持一致(req_msg子類RpcRequest->基類BaseMessage)bool ret=_requestor->send(conn,std::dynamic_pointer_cast<BaseMessage>(req_msg),rsp_msg);if(ret==false){ELOG("同步Rpc請求失敗");return false;}//3.等待響應 響應信息存放在rsp_msg此時是Base基類,需要轉成RpcResponse應答auto rpc_rsp_msg=std::dynamic_pointer_cast<RpcResponse>(rsp_msg);if(!rpc_rsp_msg){ELOG("Rpc響應 向下類型轉換失敗");return false;}if(rpc_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("Rpc同步請求出錯:%s",errReason(rpc_rsp_msg->rcode()));return false;}result=rpc_rsp_msg->result();//返回響應消息里面的正文return true;}//2.異步 Future 調用 向服務器發送異步回調請求 設置回調函數 //回調函數中傳入一個promise對象 在回調函數中對promise設置數據//異步請求返回的是BaseMessage對象,用戶想要的是message里面正文的結果Valuebool call(const BaseConnection::ptr&conn,const std::string&method,const Json::Value¶ms,std::future<Json::Value>&result){auto req_msg=MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);// //這個json_promise對象是一個局部變量,等出了call作用域就會消失// //與它關聯的future result在外部get()獲取結果時,會異常// std::promise<Json::Value> json_promise;// result=json_promise.get_future();//future和promise建立關聯 以后future.get()獲取結果auto json_promise=std::make_shared<std::promise<Json::Value>>();result=json_promise->get_future();//std::bind() 對json_promise傳值傳參,shared_ptr引用計數 +1 此時引用計數==2//退出call作用域 引用計數-- 再等callback被觸發完畢并釋放后引用計數--,才會析構//shared_ptr引用計數是否加1,只和bind對json_promise指針的捕獲方式有關,與函數的參數聲明是否引用json_promise指針無關Requestor::RequestCallback cb=std::bind(&RpcCaller::Callback,this,json_promise,std::placeholders::_1);bool ret=_requestor->send(conn,std::dynamic_pointer_cast<BaseMessage>(req_msg),cb);if(ret==false){ELOG("異步Rpc請求失敗");return false;}return true;}//3.異步回調bool call(const BaseConnection::ptr&conn,const std::string&method,const Json::Value¶ms,JsonResponseCallback&cb){auto req_msg=MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);Requestor::RequestCallback req_cb = std::bind(&RpcCaller::Callback1,this, cb, std::placeholders::_1);bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), req_cb);if (ret == false){ELOG("異步Rpc請求失敗");return false;}return true;}private:void Callback1(const JsonResponseCallback&cb,const BaseMessage::ptr&msg){//先判斷結果對不對auto rpc_rsp_msg=std::dynamic_pointer_cast<RpcResponse>(msg);if(!rpc_rsp_msg){ELOG("Rpc響應 向下類型轉換失敗");return ;}if(rpc_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("Rpc回調請求出錯:%s",errReason(rpc_rsp_msg->rcode()));return ;}cb(rpc_rsp_msg->result());}//const BaseMessage::ptr&msg Request參數是拿到響應后傳遞的void Callback(std::shared_ptr<std::promise<Json::Value>>result,const BaseMessage::ptr&msg){auto rpc_rsp_msg=std::dynamic_pointer_cast<RpcResponse>(msg);if(!rpc_rsp_msg){ELOG("Rpc響應 向下類型轉換失敗");return ;}if(rpc_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("Rpc異步請求出錯:%s",errReason(rpc_rsp_msg->rcode()));return ;}//promise.set_value()設置正文結果 result->set_value(rpc_rsp_msg->result());}private:Requestor::ptr _requestor;};
}
}
為什么不讓 Requestor::send() 直接處理 RpcRequest,而是讓它只處理 BaseMessage,由 RpcCaller 來封裝具體業務(如 RpcRequest/RpcResponse)?
1.因為要對所有請求進行處理,不單單對Rpc請求處理。還可以處理主題 服務等消息類型
2.解耦業務協議與通信通道,只做消息傳遞與回調處理。
模塊 職責 Requestor
發送消息、注冊和觸發回調,處理響應派發 RpcCaller
構造 RPC 請求、解析 RPC 響應,組織業務邏輯
為什么 bind 捕獲 shared_ptr 能延長 promise 的生命周期?
shared_ptr+bind(值捕獲)+外部拷貝保存
本質就是:
只要某個 shared_ptr 指向的對象,引用計數 不為 0,那它就不會被銷毀;
所以我們通過創建另一個生命周期更長的 shared_ptr(比如通過 bind() 值捕獲并在外部保存),來延長這段資源的生命周期。
(不建議std::ref()引用捕獲?如果外部shared_ptr_obj的被銷毀,cb 里的引用就變成懸垂指針)
auto cb = std::bind(&func, std::ref(shared_ptr_obj));
如果std::promise<Json::Value> json_promise;直接創建一個promise對象,這是一個局部對象等出了call函數(離開作用域),就會析構。
這就會導致與它關聯的future result在外部get()獲取結果時,會異常。
auto json_promise=std::make_shared<std::promise<Json::Value>>();創建promise對象并用shared指針來管理它,雖然出了call函數 引用計數-- 為0還是會析構。
但我們在bind綁定的時候,對json_promise進行了傳值,拷貝了一份
shared_ptr
對象 進入bind
內部的閉包,引用計數+1.auto cb = std::bind(&RpcCaller::Callback,this,json_promise, // 👈 這里復制了一份 shared_ptr,引用計數 +1std::placeholders::_1);
寫成void Callback(const std::shared_ptr<std::promise<Json::Value>>&result,const BaseMessage::ptr&msg) 接收引用(不增加引用計數)。對bind傳值引用計數+1無影響
如果是下面這種拷貝引用計數會再+1,但僅限Callback運行時,調用結束后立刻-1,只是暫時的。
所以說現在,在call函數中有被shared指針指向的promise對象引用計數為2,一個make_shared返回的,一個bind閉包值捕獲的。
bind閉包對象cb(auto cb=std::bind(...)),它本質不也是一個局部對象嗎?call函數結束不還是和另一個局部變量一樣會銷毀,promise的生命周期怎么會延長呢?
call函數中的cb是否銷毀已經無關,因為在函數的外部已經拷貝保存了一份了。
它通常會被傳出
call()
函數,交給Requestor::send()
并被保存在異步響應回調表里!send()會調用newDescibe,把閉包對象存入rd請求描述對象中,再插入到map進行統一管理。
(注意newDescribe雖然是const&獲取的cb 不增加引用計數,但這根本無關緊要,因為rd->callback=cb;會把它拷貝到了 RequestDescribe::callback 中,這是才生命周期延長的關鍵)
等到onResponse收到應答并處理完,就delDescribe()刪除對應請求描述信息,保存的回調函數cb也析構,里面保存的json_promise也會析構,所以說ptr+bind值捕獲+外部保存
保證了 json_promise 會存活到 callback 被調用。
總結:
1.Callback(param) 中的 shared_ptr 是按值傳遞,因此會導致引用計數 +1,
但這個 +1 的生命周期僅限于函數執行期間,Callback() 結束后 param 被銷毀,引用計數立即 -1。
2.而 bind(...) 捕獲 shared_ptr 時(通過值捕獲),會將其拷貝一份保存在閉包對象中(如 auto cb),
這會導致引用計數 +1,無論是否調用 Callback,引用計數都存在,直到閉包對象銷毀為止。
那閉包對象什么時候銷毀呢?delDescribe()
3.需要注意的是,cb 本身是 call() 函數內的局部變量,但它并不會隨著 call() 結束而失效。
雖然 call() 返回后 cb 變量本身會被銷毀,但在此之前 cb 已經被作為參數傳入 _requestor->send(),
并被內部保存到了 _request_desc 表中,作為 RequestDescribe 的一部分長期持有。
4.也就是說:call() 中定義的 cb 雖然是局部變量,但它在作用域結束前被拷貝并傳出,生命周期已經被延長。
所以 cb 中捕獲的 json_promise(shared_ptr)依然活著,call 函數中的 cb 就算析構,也不影響閉包內部捕獲的 promise 的生命周期。
5.只有當服務端響應到達,回調被觸發后,執行 cb(msg),再通過 delDescribe(rid) 刪除請求描述對象,rd保存的回調函數cb閉包進行析構,值捕獲的json_promise被釋放,引用計數==0 promise 被析構.
6.相比之下,如果只是創建了一個局部的 shared_ptr(比如 json_promise),沒有通過 bind、lambda、線程等傳出去,
它就會在函數結束時被立即析構,future 將失效,調用 .get() 會拋出異常(broken promise)。
? 因此,雖然 cb 是局部變量,但它在 call() 結束前被拷貝傳出,生命周期被框架托管,保證了回調所需的 promise 安全存活。
這是異步通信框架中延長資源生命周期的關鍵機制,確保異步流程完整閉環。
局部變量當作參數傳參,并不會延長它的生命周期; 它的生命周期仍然只屬于它原來的作用域; 如果你想延長生命周期,必須使用 shared_ptr,并在外部再持有一份拷貝!
舉個例子:
std::shared_ptr<int> create() {auto p = std::make_shared<int>(100); // p 是局部變量return p; // ? 返回值是“拷貝一份”,原來的 p 會被銷毀,但返回值還持有控制塊 }auto x = create(); // 返回值拷貝給 x,資源不會被銷毀
情況 局部變量會不會被銷毀 解釋 普通局部變量(無指針托管) ? 會 作用域結束立即銷毀 傳值作為函數參數 ? 會 參數是拷貝,原變量不延長 返回 shared_ptr ? 原變量銷毀,但返回值持有副本,資源不析構 shared_ptr 被 bind 捕獲 ? 不會(+1) 延長生命周期直到閉包結束
服務端測試代碼
相較于之前實現的客戶端,我們不再是直接給dispatcher傳業務層函數,而是傳RpcRouter的處理對應請求的回調函數,由該函數再從注冊到RpcRouter中的具體實現函數中找用戶需要的函數,并進行回調返回響應結果。
#include "../common/message.hpp"
#include "../common/net.hpp"#include "../common/dispatcher.hpp"
#include "../server/rpc_router.hpp"void Add(const Json::Value&req,Json::Value&rsp)
{int nums1=req["nums1"].asInt();int nums2=req["nums2"].asInt();rsp=nums1+nums2;
}
int main()
{auto router=std::make_shared<wws::server::RpcRouter>();std::unique_ptr<wws::server::SDescribeFactory> desc_factory(new wws::server::SDescribeFactory());desc_factory->setMethodName("Add");desc_factory->setParamsDesc("nums1",wws::server::VType::INTEGRAL);desc_factory->setParamsDesc("nums2",wws::server::VType::INTEGRAL);desc_factory->setReturnType(wws::server::VType::INTEGRAL);desc_factory->setCallback(Add);//1.注冊Add函數到RpcRouterrouter->registerMethod(desc_factory->build());//bind綁定RpcRouter收到消息的回調函數->cbauto cb=std::bind(&wws::server::RpcRouter::onRpcRequest,router.get(),std::placeholders::_1,std::placeholders::_2);auto dispatcher=std::make_shared<wws::Dispatcher>();//2.把RpcRouter回調函數onRpcRequest設置到dispatcher中dispatcher->registerHandler<wws::RpcRequest>(wws::MType::REQ_RPC,cb);auto server=wws::ServerFactory::create(9090);auto message_cb=std::bind(&wws::Dispatcher::onMessage,dispatcher.get(),std::placeholders::_1,std::placeholders::_2);//3.把Dispatcher的回調函數onMessage設置到server中server->setMessageCallback(message_cb);server->start();return 0;
}
從圖示上可以看到,整個鏈路依次是:
server 收到消息
調用 messageCallback
觸發 onMessage 進入 dispatcher
Dispatcher 發現消息類型是
REQ_RPC
,調用 RpcRouter::onRpcRequestRouter 找到 Add 方法并調用其回調函數
Add 函數執行計算并返回結果
server->setMessageCallback(message_cb);
這里的
message_cb
是通過std::bind(&wws::Dispatcher::onMessage, dispatcher.get(), ...)
綁定的。當服務器收到任何消息時,就會調用
dispatcher->onMessage(...)
。
dispatcher->onMessage(...)
Dispatcher
根據消息的類型(這里為REQ_RPC
)找到先前注冊的回調處理函數。這一步對應代碼
dispatcher->registerHandler<wws::RpcRequest>(wws::MType::REQ_RPC, cb);
。所以當消息類型匹配到
REQ_RPC
時,就執行cb
。
cb
->std::bind(&wws::server::RpcRouter::onRpcRequest, router.get(), ...)
這個
cb
本質上就是對RpcRouter::onRpcRequest
的一次包裝。當調用
cb
時,實際上就是執行router->onRpcRequest(...)
。
RpcRouter::onRpcRequest(...)
在路由器里,根據請求中的“方法名稱”(比如
"Add"
)找到對應的回調函數。此處就是在之前
router->registerMethod(...)
時注冊的Add
方法回調。調用
Add(const Json::Value& req, Json::Value& rsp)
最終執行我們自定義的邏輯(如取
nums1
、nums2
,相加后存入rsp
)。
客戶端測試代碼
#include "../common/dispatcher.hpp"
#include "../client/requestor.hpp"
#include "../client/rpc_caller.hpp"
#include <thread>
#include <chrono>void callback(const Json::Value &result) {DLOG("callback result: %d", result.asInt());
}int main()
{auto requestor=std::make_shared<wws::client::Requestor>();auto caller=std::make_shared<wws::client::RpcCaller>(requestor);auto dispatcher=std::make_shared<wws::Dispatcher>();auto rsp_cb=std::bind(&wws::client::Requestor::onResponse,requestor.get(),std::placeholders::_1,std::placeholders::_2);//wws::RpcResponse->wws::BaseMessage //rsp_cb綁定的函數參數為Requestor::onResponse(const BaseConnection::ptr&conn,const BaseMessage::ptr&msg)//而registerHandler注冊需要的函數類型 std::function<void(const BaseConnection::ptr&conn,std::shared_ptr<T> &msg)>; 第二個參數必須也為BaseMessage::ptr,所以T傳BaseMessagedispatcher->registerHandler<wws::BaseMessage>(wws::MType::RSP_RPC,rsp_cb);auto client=wws::ClientFactory::create("127.0.0.1",9090);auto message_cb=std::bind(&wws::Dispatcher::onMessage,dispatcher.get(),std::placeholders::_1,std::placeholders::_2);client->setMessageCallback(message_cb);client->connect();auto conn=client->connection();//1.同步調用Json::Value params,result;params["nums1"]=11;params["nums2"]=22;bool ret=caller->call(conn,"Add",params,result);if(ret!=false){DLOG("result: %d", result.asInt());}//2.異步Futurewws::client::RpcCaller::JsonAsyncResponse res_future;params["nums1"]=33;params["nums2"]=44;ret=caller->call(conn,"Add",params,res_future);if(ret!=false){result=res_future.get();DLOG("result: %d", result.asInt());} //3.回調params["nums1"]=55;params["nums2"]=66;ret = caller->call(conn,"Add",params, callback);DLOG("-------\n");std::this_thread::sleep_for(std::chrono::seconds(1));client->shutdown();return 0;
}
RpcCaller 調用
call("Add", params, rsp)
用戶在業務代碼里直接寫
caller->call(...)
,想要調用服務端的 “Add” 方法并等待結果。RpcCaller 內部執行 “AddDesc(...)”
call(...)
方法內部會先構造一個帶唯一請求ID的RpcRequest
,然后調用 Requestor 的相關方法(示意中稱作AddDesc
)來“登記”該請求:
存儲該請求ID
記錄調用類型(同步/異步/回調)
如果是回調式,還會記錄用戶傳入的回調函數
發送請求到服務端
Requestor 記完請求描述后,就會通過網絡連接將
RpcRequest
發送給遠程服務端。服務端處理并返回
RpcResponse
當服務端收到 “Add” 請求后,進行實際的加法運算或其他業務邏輯,然后打包
RpcResponse
返回給客戶端。客戶端接收響應 -> 分發到 Requestor::onResponse()
客戶端網絡層讀到響應后,先通過 Dispatcher 分發,根據消息類型(如
RSP_RPC
)找到之前綁定的回調,即Requestor::onResponse(...)
。
Requestor::onResponse()
根據響應里的 “請求ID=111” 查到對應的“請求描述 (desc)”,確定是哪個請求、用什么方式處理(同步阻塞喚醒或執行用戶回調等),并把結果交給調用方。RpcCaller 最終返回結果給用戶
對于異步調用
call(...)
,當響應到來時,onResponse() 會調用? ? ? ? ? ? ? ? ? ? ? ? rdp->response.set_value(msg),把響應 msg 設置到 promise 中。用戶future.get()獲取結果。若是回調式調用,則
Requestor::onResponse()
會直接執行用戶的回調函數,把結果帶給用戶。
Requestor 和 RpcCaller 的關系:
Requestor 負責管理請求–響應映射。它用一個請求描述(包含唯一 ID、請求類型和處理機制)來記錄每次發送的請求。當響應到來時,根據請求 ID 查找描述,
如果是異步請求(REQ_ASYNC),則調用 promise.set_value(msg) 使關聯 future 就緒;
如果是回調請求(REQ_CALLBACK),則直接調用用戶注冊的回調函數。
RpcCaller 則對外提供 RPC 調用接口。它負責構造 RPC 請求(設置方法名、參數、生成請求 ID),并調用 Requestor 的 send() 方法來登記請求并發送消息。用戶可以選擇同步(阻塞等待 future.get())、異步(返回 future)或回調式調用。
二者協同工作:RpcCaller 構造并發送請求,而 Requestor 負責匹配響應并將結果傳遞給上層。
RpcCaller用戶調用的接口,用戶傳入要調用的對象 參數 方式,根據方式(同步 異步 回調)的不同選擇不同的call進行調用,1.先根據參數構建請求消息2.調用Requester里面對應的send()。
Requester中send()會先構建請求描述對象(call傳入的請求消息 請求類型...) 并建立請求-id間的映射(等收到應答時根據id找到對應的請求描述對象),再完成發送。
等服務端返回應答,Dispatcher根據消息的類型,找到client的Requestor中onResponse處理應答的回調函數,它根據id找到對應的請求描述,再根據請求描述中的類型,進行set_value設置結果或者callback()調用回調。最后刪除該請求描述
set_value后,get()獲取到結果,阻塞結束返回上層
再返回到call進行檢查并返回結果
RpcCaller::call()
的三種調用方式流程
同步調用 (
call(conn, method, params, result)
)客戶端調用call,把連接 函數名 參數 用于獲取結果的Value對象
進入同步調用call. 先根據傳入的參數組織請求(里面設置了請求類型REQ_RPC)
? ? ? ? 調用Requester中send()發送請求
?using AsyncResponse=std::future<BaseMessage::ptr>;先創建一個future對象用于后面get()阻塞獲取結果。再調用異步send(),因為異步和同步的區別只是在于用戶什么時候get()阻塞獲取結果,異步+立刻get()==同步。
再看看異步send()
先調用newDescribe,里面會創建請求描述對象(傳入回調函數會設置回調函數)和UUID建立映射關系用map管理起來。
conn->send()發送請求。
之后服務端進行處理,返回應答,server::messageCallback->Disoatcher::onMessage->根據請求類型找到對應的回調函數,RSP_RPC對應的就是requestor::onResponse()處理應答的回調函數。
?進入onResponse,先根據UUID找到對應的請求描述,根據請求描述的類型,看是異步(同步里面調用的異步),還是回調,進行相對的處理。
是同步,就set_value設置結果。
設置完結果,futrue就緒get()獲取結果
上層的result就獲取到了結果,進行輸出。
RpcCaller::call(conn, method, params, result) // 同步調用└──> Requestor::send(conn, req, rsp) // 調用同步版本 send()└──> send(conn, req, rsp_future) // 調用異步 Future 版本 send()└──> 創建 RequestDescribe 并存儲└──> 阻塞等待 future.get()└──> Dispatcher::onMessage(conn, msg)└──> Requestor::onResponse(conn, msg)└──> rdp->response.set_value(msg) // future.set_value() 解除阻塞└──> 解析 RpcResponse 并返回 result // get() 獲取結果并返回
經過的關鍵函數:
RpcCaller::call(conn, method, params, result)
Requestor::send(conn, req, rsp)
Requestor::send(conn, req, rsp_future)
(異步版本)
Dispatcher::onMessage(conn, msg)
Requestor::onResponse(conn, msg)
rdp->response.set_value(msg)
future.get()
解除阻塞
異步 Future 調用 (
call(conn, method, params, future)
)不傳Json::Value result直接獲取結果,std::future<Json::Value> res_futre,讓用戶自己get()獲取結果。
result=json_promise->get_future();管理promise,set_value后用戶就可以get()獲取結果。
還綁定了Callback,傳入回調函數cb,創建請求描述對象時設置回調函數cb
此時設置的類型為REQ_CALLBACK,收到應答,找到對應請求描述,會調用設置的回調函數cb
cb bind綁定的是Callbcak函數 它會set_value設置結果,讓用戶的future就緒,get()獲取結果
RpcCaller::call(conn, method, params, future) // 異步 Future 調用└──> 創建 RpcRequest 并設置參數└──> 創建 std::promise<Json::Value> 和 future 關聯└──> 綁定 Callback (關聯 promise 和 result)└──> Requestor::send(conn, req, cb) // 傳入回調函數 cb,onResponse() 解析后觸發└──> 創建 RequestDescribe 并存儲└──> 發送請求,不阻塞└──> Dispatcher::onMessage(conn, msg) // 收到服務端響應└──> Requestor::onResponse(conn, msg)└──> 通過 rid 查找 RequestDescribe└──> Callback 觸發 set_value(msg) // 解析結果并設置 promise└──> future.get() 解除阻塞,獲取結果 // 用戶上層調用 future.get() 阻塞獲取結果
經過的關鍵函數:
RpcCaller::call(conn, method, params, res_future)
Requestor::send(conn, req, cb)
Requestor::onResponse(conn, msg)
rdp->callback(msg)
Callback
解析msg
并set_value()
res_future.get()
解除阻塞
異步回調調用 (
call(conn, method, params, cb)
)回調,不直接設置結果,而是調用用戶傳來的函數(callback)。
call異步回調,和異步futrue一樣設置回調函數Callback1.(但回調函數不同)
?也是調用的這個send()
?接下來也是 調用請求描述對象中設置的回調函數即Callback1
?而Callback1不和Callback一樣set_value設置結果,而是調用上層傳來的函數,它返回結果給用戶。
RpcCaller::call(conn, method, params, cb) // 異步回調調用└──> 創建 RpcRequest 并設置參數└──> 綁定 Callback1└──> Requestor::send(conn, req, cb) // 調用異步回調版本 send()└──> 創建 RequestDescribe 并存儲└──> 發送請求,不阻塞└──> Dispatcher::onMessage(conn, msg) // 收到服務端返回消息└──> Requestor::onResponse(conn, msg)└──> 通過 rid 查找 RequestDescribe└──> Callback1 觸發用戶自定義的 cb(msg)└──> 用戶自定義的回調函數解析結果
RpcCaller::call(conn, method, params, cb)
Requestor::send(conn, req, cb)
Requestor::onResponse(conn, msg)
rdp->callback(msg)
觸發Callback1
用戶自定義的
callback(msg)
解析結果
五.注冊中心---服務端rpc_registry
服務端如何實現服務信息的管理:
服務端需要1.提供注冊服務2.發現的請求業務處理
1.需要將 哪個服務 能夠由 哪個主機提供 管理起來 hash<method,vector<provide>>
? ? ? ? 進行服務發現時,能返回誰能提供指定服務
2.需要將 哪個主機 發現過 哪個服務 管理起來
? ? ? ? 當進行服務通知的時候,能通知給對應發現者 <method,vector<discoverer>>
3.需要 哪個連接 對應 哪個服務提供者 管理起來 hash<conn,provider>
? ? ? ? 當一個連接斷開時 能知道哪個主機的哪些服務下線了,然后才能給發現者通知xxx的xxx服務下線了。
4.需要 哪個連接 對應 哪個服務發現者 管理起來 hash<conn.discoverer>
? ? ? ? 當一個連接斷開時 如果有服務上下線 就不需要給它進行通知了
1??
ProviderManager
(服務提供者管理)維護服務提供者信息,進行服務注冊、刪除和查詢。
提供
addProvider()
、delProvider()
、getProvider()
和methodHosts()
等方法。
2??
DiscovererManager
(服務發現者管理)維護服務發現者信息,進行服務發現、刪除和通知。
提供
addDisecoverer()
、delDisoverer()
、onlineNotify()
和offlineNotify()
等方法。
3??
PDManager
(核心管理器)處理服務請求、注冊、發現、上線/下線通知以及連接斷開后的清理邏輯。
提供
onServiceRequest()
、onConnShutdown()
等核心邏輯。處理服務的響應,包括:
registryResponse()
:服務注冊應答
discoverResponse()
:服務發現應答
errorResponse()
:錯誤處理
#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <set>
namespace wws
{
namespace server
{//服務提供者class ProviderManager{public:using ptr=std::shared_ptr<ProviderManager>;struct Provider{using ptr=std::shared_ptr<Provider>;std::mutex _mutex; BaseConnection::ptr conn;Address host; //主機信息ip+portstd::vector<std::string> methods; //所有提供的方法Provider(const BaseConnection::ptr&c,const Address&h):conn(c),host(h){}void appendMethod(const std::string&method){std::unique_lock<std::mutex> lock(_mutex);methods.emplace_back(method);}};//新的服務提供者進行服務注冊void addProvider(const BaseConnection::ptr&c,const Address &h,const std::string&method){Provider::ptr provider;{std::unique_lock<std::mutex> lock(_mutex);auto it=_conns.find(c);//找連接對應的提供者if(it!=_conns.end()){provider=it->second;}//找不到就創建 并新增連接->提供者else{provider=std::make_shared<Provider>(c,h);_conns.insert(std::make_pair(c,provider));}//method方法被哪些提供者提供 增加提供者auto &providers=_providers[method];providers.insert(provider);}//提供者內更新記錄 能提供的方法provider->appendMethod(method);}//服務提供者斷開連接時 獲取它的信息 用于服務下線通知Provider::ptr getProvider(const BaseConnection::ptr&c){std::unique_lock<std::mutex> _mutex;auto it=_conns.find(c);if(it==_conns.end())return Provider::ptr();return it->second;}//服務提供者斷開連接時 刪除它的關聯信息void delProvider(const BaseConnection::ptr&c)//連接->提供者->所有提供方法{std::unique_lock<std::mutex> _mutex;auto it=_conns.find(c);if(it==_conns.end())return;for(auto&method:it->second->methods)//找到該服務提供者的所有方法{auto&providers=_providers[method];//根據方法 從提供該方法的提供者中再進行刪除//1.providers容器是vectro版本// //手動查找并按迭代器刪除 providers.erase(it->second)// //但是 erase(it->second) 并不會起到按值刪除的作用,因為 erase() 按值刪除時,只有 std::vector 在 C++20 之后才引入 erase 和 erase_if// auto provider_it = std::find(providers.begin(), providers.end(), it->second);// if (provider_it != providers.end())// {// providers.erase(provider_it);// }//set 直接支持 erase(it->second)providers.erase(it->second);}//刪除連接與服務提供者的關系_conns.erase(it);}//返回 method對應的提供者std::vector<Address> methodHosts(const std::string &method){std::unique_lock<std::mutex> lock(_mutex);auto it = _providers.find(method);if (it == _providers.end())return std::vector<Address>();std::vector<Address> result(it->second.begin(), it->second.end());return result;}private:std::mutex _mutex;std::unordered_map<std::string,std::set<Provider::ptr>> _providers;//方法->提供者主機std::unordered_map<BaseConnection::ptr,Provider::ptr> _conns;//連接->提供者}; //服務發現者class DiscovererManager{public:using ptr=std::shared_ptr<DiscovererManager>;struct Discoverer{using ptr=std::shared_ptr<Discoverer>;std::mutex _mutex;BaseConnection::ptr conn;//發現者關聯的客戶端std::vector<std::string> methods;//發現過的服務Discoverer(const BaseConnection::ptr&c):conn(c){}void appendMethod(const std::string&method){std::unique_lock<std::mutex> lock(_mutex);methods.push_back(method);}};//當客戶端進行服務發現的時候新增發現者 新增服務名稱?Discoverer::ptr addDisecoverer(const BaseConnection::ptr&c,const std::string &method){Discoverer::ptr discoverer;{std::unique_lock<std::mutex> lock(_mutex);//找連接對應的服務發現者auto it=_conns.find(c);if(it!=_conns.end()){discoverer=it->second;}else{discoverer=std::make_shared<Discoverer>(c);_conns.insert(std::make_pair(c,discoverer));}//method方法被哪些發現者發現了 增加發現者auto &discoverers=_discoverers[method];discoverers.insert(discoverer);}//在發現者中 增加已經發現的方法discoverer->appendMethod(method);return discoverer;}//發現者不需要被get() 發現者下線不需要通知 所以不需要進行get()獲取對象后進行下線通知//發現者客戶端斷開連接時 找到發現者信息 刪除關聯信息void delDisoverer(const BaseConnection::ptr&c){std::unique_lock<std::mutex> _mutex;auto it=_conns.find(c);if(it==_conns.end())return;//找到發現過的方法for(auto&method:it->second->methods){//從發現過method的所有發現者 中找要進行刪除的發現者auto&discoverers=_discoverers[method];discoverers.erase(it->second);}//刪除conn->discoverer_conns.erase(it);}//新的服務提供者上線 上線通知void onlineNotify(const std::string &method,const Address&host){notify(method,host,ServiceOptype::SERVICE_ONLINE);}//服務提供者斷開連接 下線通知void offlineNotify(const std::string &method,const Address&host){notify(method,host,ServiceOptype::SERVICE_OFFLINE);}private:void notify(const std::string &method,const Address&host,wws::ServiceOptype optype){std::unique_lock<std::mutex> _mutex;//先判斷該方法有沒有被人發現過 auto it=_discoverers.find(method);//沒有就不用進行任何處理if(it==_discoverers.end())return;//對發現過該方法的發現者一個個進行通知auto msg_req=MessageFactory::create<ServiceRequest>();msg_req->setHost(host);msg_req->setId(UUID::uuid());msg_req->setMType(wws::MType::REQ_SERVICE);// 服務請求(注冊、發現、上線、下線)msg_req->setMethod(method);msg_req->setOptype(optype);//服務操作類型 for(auto&discoverers:it->second){discoverers->conn->send(msg_req);}}private:std::mutex _mutex;std::unordered_map<std::string,std::set<Discoverer::ptr>> _discoverers;//該方法被哪些發現者發現了std::unordered_map<BaseConnection::ptr,Discoverer::ptr> _conns;//連接->發現者(連接斷開->對應發現者->刪除vector中的發現者)};class PDManager{public:using ptr=std::shared_ptr<PDManager>;//處理服務請求 并返回應答void onServiceRequest(const BaseConnection::ptr&conn,const ServiceRequest::ptr &msg) {//先判斷服務操作請求:服務注冊/服務發現auto optype=msg->optype();if(optype==ServiceOptype::SERVICE_REGISTRY)//服務注冊{//1.新增服務提供者 _providers->addProvider(conn,msg->host(),msg->method());//2.對該方法的發現者進行服務上線通知_discoverers->onlineNotify(msg->method(),msg->host());//3.返回應答return registryResponse(conn,msg);}else if(optype==ServiceOptype::SERVICE_DISCOVERY)//服務發現{//新增服務發現者 _discoverers->addDisecoverer(conn,msg->method());return discoverResponse(conn,msg);}else{ELOG("收到服務操作請求,但操作類型錯誤"); return errorResponse(conn,msg);}}//連接斷開void onConnShutdown(const BaseConnection::ptr&conn)//{//這個要斷開的連接1.提供者下線 2.發現者下線//1.獲取提供者信息 為空說明不是提供者auto provider=_providers->getProvider(conn);if(provider.get()!=nullptr){//提供者下線//1.提供者的每個方法都要下線 通知對應發現者for(auto&method:provider->methods){_discoverers->offlineNotify(method,provider->host);}//2.刪除對該提供者的管理_providers->delProvider(conn);}//2.到這 可能是發現者 就算不是會直接返回空//直接刪除對該發現者的管理_discoverers->delDisoverer(conn);}private://錯誤響應void errorResponse(const BaseConnection::ptr&conn,const ServiceRequest::ptr&msg){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(wws::MType::RSP_SERVICE);// 服務請求(注冊、發現、上線、下線)msg_rsp->setRcode(RCode::RCODE_INVALID_OPTYPE); //無效 msg_rsp->setOptype(ServiceOptype::SERVICE_UNKNOW);//服務操作類型 未知 conn->send(msg_rsp);}//注冊應答void registryResponse(const BaseConnection::ptr&conn,const ServiceRequest::ptr&msg){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(wws::MType::RSP_SERVICE);// 服務請求(注冊、發現、上線、下線)msg_rsp->setRcode(RCode::RCODE_OK);msg_rsp->setOptype(ServiceOptype::SERVICE_REGISTRY);//服務操作類型 注冊conn->send(msg_rsp);}//發現應答 method方法有哪些主機可以提供 void discoverResponse(const BaseConnection::ptr&conn,const ServiceRequest::ptr&msg){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid()); msg_rsp->setOptype(ServiceOptype::SERVICE_DISCOVERY);//服務操作類型 發現msg_rsp->setMType(wws::MType::RSP_SERVICE);// 服務請求(注冊、發現、上線、下線)std::vector<Address> hosts=_providers->methodHosts(msg->method());if(hosts.empty()){msg_rsp->setRcode(RCode::RCODE_NOT_FOUND_SERVICE);return conn->send(msg_rsp);}msg_rsp->setRcode(RCode::RCODE_OK);msg_rsp->setMethod(msg->method());msg_rsp->setHost(hosts);return conn->send(msg_rsp);}private:ProviderManager::ptr _providers;DiscovererManager::ptr _discoverers;};
}
}
📡 服務注冊流程
1?? 客戶端(Provider)通過 `registryMethod()` 發送 `SERVICE_REGISTRY` 請求 2?? `PDManager::onServiceRequest()` 處理注冊請求 3?? `ProviderManager::addProvider()` 注冊服務 4?? `DiscovererManager::onlineNotify()` 通知發現者服務上線 5?? `PDManager::registryResponse()` 發送注冊結果
🔍 服務發現流程
1?? 客戶端(Discoverer)通過 `addDisecoverer()` 發送 `SERVICE_DISCOVERY` 請求 2?? `PDManager::onServiceRequest()` 處理發現請求 3?? `DiscovererManager::addDisecoverer()` 記錄發現者 4?? `ProviderManager::methodHosts()` 獲取 `method` 對應的主機 5?? `PDManager::discoverResponse()` 發送發現結果
🔥 服務下線流程
1?? 連接斷開時觸發 `PDManager::onConnShutdown()` 2?? 通過 `ProviderManager::getProvider()` 獲取 `Provider` 3?? `DiscovererManager::offlineNotify()` 通知發現者服務下線 4?? `ProviderManager::delProvider()` 刪除 `Provider`
🕵??♂? 發現者下線流程
1?? 連接斷開時觸發 `PDManager::onConnShutdown()` 2?? 通過 `DiscovererManager::delDisoverer()` 刪除 `Discoverer` 3?? 清理 `_conns` 和 `_discoverers` 的映射
六.注冊中心---客戶端rpc_registry
客戶端的功能比較分離,注冊端和發現端根本就不在同一個主機上。
因此客戶端的注冊和發現功能是完全分開的。
1.作為服務提供者 需要一個能進行服務注冊的接口
? ? ? ? 連接注冊中心 進行服務注冊
2.作為服務發現者 需要一個能進行服務發現的接口,需要將獲取到的提供對應服務的主機信息管理起來 hash<method,vector<host>> 一次發現,多次使用,沒有的話再次進行發現。
? ? ? ? ? ? ? ? ? ? ? ? ? ? ?需要進行服務上線/下線通知請求的處理(需要向dispatcher提供一個請求處理的回調函數)
#pragma
#include"requestor.hpp"namespace wws
{
namespace client
{// 服務提供者類:負責將服務注冊到服務注冊中心class Provider{public:using ptr=std::shared_ptr<Provider>;Provider(const Requestor::ptr&requestor):_requestor(requestor){}//進行服務注冊的接口bool registryMethod(const BaseConnection::ptr&conn,const std::string &method,const Address&host){// 1. 創建 ServiceRequest 請求消息auto msg_req=MessageFactory::create<ServiceRequest>();msg_req->setHost(host);msg_req->setId(UUID::uuid());msg_req->setMType(wws::MType::REQ_SERVICE);// 服務請求(注冊、發現、上線、下線)msg_req->setMethod(method);msg_req->setOptype(ServiceOptype::SERVICE_REGISTRY);//服務操作類型 注冊// 2. 發送請求并同步等待響應BaseMessage::ptr msg_rsp;bool ret=_requestor->send(conn,msg_req,msg_rsp);//同步請求if(ret==false){ELOG("%s服務注冊失敗",method.c_str());return false;}auto service_rsp=std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if(service_rsp.get()==nullptr){ELOG("響應類型向下轉換失敗");return false;}if(service_rsp->rcode()!=RCode::RCODE_OK){ELOG("服務注冊失敗 原因%s",errReason(service_rsp->rcode()));return false;}return true;}private:Requestor::ptr _requestor;//負責發送請求、接收響應};class MethodHost{public:using ptr=std::shared_ptr<MethodHost>;MethodHost(const std::vector<Address>&hosts):_hosts(hosts.begin(),hosts.end()),_idx(0){}void appendHost(const Address&host){//新的服務上線后進行調用std::unique_lock<std::mutex> lock(_mutex);_hosts.push_back(host);}void removeHost(const Address&host){//服務下線進行調用std::unique_lock<std::mutex> lock(_mutex);//vector刪除效率O(n)效率低,但更多的操作還是 隨機訪問[] 進行RR輪轉,所以vector是最合適的for(auto it=_hosts.begin();it!=_hosts.end();it++){if(*it!=host){_hosts.erase(it);break;}}}Address chooseHost(){std::unique_lock<std::mutex> lock(_mutex);size_t pos=_idx++ %_hosts.size();//1.pos=_idx%size 2._idx+=1return _hosts[pos];}bool empty(){std::unique_lock<std::mutex> lock(_mutex);return _hosts.empty();}private:std::mutex _mutex;size_t _idx;//當前 RR 輪轉索引//vector 提供了 O(1) 的索引訪問,可以快速實現RR輪轉機制。std::vector<Address> _hosts;};class Discoverer{public:Discoverer(const Requestor::ptr &requestor){}//服務發現的接口bool serviceDiscovery(const BaseConnection::ptr&conn,const std::string&method,Address&host){//當前有method方法對應的提供服務者 直接返回host地址{std::unique_lock<std::mutex> lock(_mutex);auto it=_method_hosts.find(method);if(it!=_method_hosts.end()){if (it->second->empty() == false){host = it->second->chooseHost();return true;}}}//當前沒有對應的服務者//1.構建服務發現請求auto msg_req=MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);//消息類型msg_req->setMethod(method);msg_req->setOptype(ServiceOptype::SERVICE_DISCOVERY);BaseMessage::ptr msg_rsp;bool ret=_requestor->send(conn,msg_req,msg_rsp);if(ret==false){ELOG("服務發現失敗!");return false;}auto service_rsp=std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if(!service_rsp.get()){ELOG("服務發現失敗! 響應類型轉換失敗");return false;}if(service_rsp->rcode()!=RCode::RCODE_OK){ELOG("服務發現失敗! 錯誤原因%s",errReason(service_rsp->rcode()).c_str());return false;}//2.看服務發現完后有沒有新提供者增加std::unique_lock<std::mutex> _mutex;auto method_host=std::make_shared<MethodHost>(service_rsp->hosts());if(method_host->empty()){ELOG("%s服務發現失敗!沒有能提供服務的主機",method.c_str());return false;}host=method_host->chooseHost();_method_hosts[method]=method_host;//更新method方法->提供者address(可能method方法已經在map中所以=賦值)return true;}//給Dispathcer模塊進行服務上線下線請求處理的回調函數void onServiceRequest(const BaseConnection::ptr&conn,const ServiceRequest::ptr&msg){//1.先判斷是上線/下線 都不是就不處理auto optype=msg->optype();std::string method=msg->method();std::unique_lock<std::mutex> lock(_mutex);//2.上線通知if(optype==ServiceOptype::SERVICE_ONLINE){auto it=_method_hosts.find(method);if(it==_method_hosts.end()){//該method方法不存在 創建MethodHost初始化并添加入map中auto method_host=std::make_shared<MethodHost>();method_host->appendHost(msg->host());_method_hosts[method]=method_host;}else{//存在直接加it->second->appendHost(msg->host());}}//3.下線通知else if(optype==ServiceOptype::SERVICE_OFFLINE){auto it=_method_hosts.find(method);if(it==_method_hosts.end()){//該method方法不存在 直接return//不需要把method方法從map中移除 前面已經判斷過map中method方法為空的情況return;}else{//存在直接刪除it->second->removeHost(msg->host());}}}private:std::mutex _mutex;std::unordered_map<std::string,MethodHost::ptr> _method_hosts;Requestor::ptr _requestor;};
}
}
1.客戶端中的provider服務提供者,有服務注冊接口registryMethod,構建服務注冊請求借助Requestor send()到服務端的的注冊接口。收到應答后判斷應用是否正確。
2.客戶端的discoverer服務發現者,有服務發現接口serviceDiscovery,先開當前有沒有method方法對應的提供者,有就返回一個提供者。沒有就構建服務發現請求,并檢查是否有誤,無錯誤后 再判斷是否有新增的提供者,有就返回提供者并更新method->提供者的映射,沒有就直接返回.
3.MethodHost類 管理一個method方法對應的所以提供者的主機信息,并提供RR輪轉功能,按順序返回method方法對應的提供者。
1??
Provider
類:服務提供者
作用:
負責將服務注冊到服務注冊中心。
主要功能:
registryMethod()
:
發送
ServiceRequest
注冊請求到服務中心。檢查
msg_rsp
響應是否成功注冊。內部成員:
_requestor
:
Requestor::ptr
類型,負責請求發送和接收響應。
2??
MethodHost
類:服務主機管理
作用:
維護多個
Address
主機地址,并提供負載均衡(RR 輪轉)功能。主要功能:
appendHost()
:新增一個服務主機地址。
removeHost()
:移除一個服務主機地址。
chooseHost()
:
通過 RR 輪轉選擇一個主機地址。
empty()
:判斷是否為空。內部成員:
_mutex
:互斥鎖保護_hosts
訪問安全。
_idx
:當前 RR 輪轉索引。
_hosts
:std::vector<Address>
存儲主機地址列表。
3??
Discoverer
類:服務發現者
作用:
發現服務并維護
MethodHost
對象,進行服務上線/下線通知的管理。主要功能:
serviceDiscovery()
:
發現服務并更新
_method_hosts
緩存。選擇主機地址
host
并返回。
onServiceRequest()
:
處理服務上線/下線的請求。
動態添加/刪除
MethodHost
及其地址信息。內部成員:
_mutex
:互斥鎖保護_method_hosts
。
_method_hosts
:
std::unordered_map<std::string, MethodHost::ptr>
,映射method -> host
。
_requestor
:
Requestor::ptr
發送發現請求。
4??
Requestor
類:請求器
作用:
負責向服務中心發送請求并獲取響應。
核心功能:
send()
:
向服務中心同步發送
msg_req
請求。接收
msg_rsp
響應并返回bool
標志位。
🔥 類之間的關系
Provider
通過_requestor
發送ServiceRequest
進行服務注冊。
Discoverer
通過_requestor
發送ServiceRequest
進行服務發現。
MethodHost
由Discoverer
維護,并提供 RR 輪轉機制選擇主機。
Discoverer
監聽onServiceRequest()
來處理服務上線/下線。
RR輪詢(Round-Robin)
一個method方法可以對應多個提供者,用戶請求method方法,我們應該返回哪一個提供者,才能實現最大資源利用呢?
我們可以通過RR輪詢按照固定順序輪流將請求分配到不同的主機。
1.維護一個遞增索引 _idx。
2.每次請求時,選擇 _hosts[_idx % _hosts.size()] 作為當前主機。
3._idx 自增,當 _idx >= _hosts.size() 時自動重置。
RR輪詢需要隨機訪問[ ],所以管理提供者的容器最好選擇vector< >。
RR 輪詢機制的價值
? 1. 負載均衡: 均勻分配請求,防止主機過載,提高系統吞吐量。
? 2. 實現簡單: 只需維護一個_idx
遞增索引,選擇主機只需O(1)
時間復雜度。
? 3. 故障規避: 結合removeHost()
機制,可以自動剔除故障主機,提升可靠性。
? 4. 提升系統吞吐量: 通過多個主機并行處理請求,提升系統的整體性能。
? 5. 維護成本低: 邏輯簡單,維護成本極低,不需要監控主機狀態。
為什么選擇
vector<Address>
作為主機管理容器
? 1. 訪問速度快
vector
提供 O(1) 的隨機訪問能力,可以通過pos
索引直接獲取provider
。輪詢核心邏輯依賴于
vector[pos]
進行主機選擇,比map
的 O(log n) 查找速度更快,適合高頻訪問場景。
? 2. 內存布局緊湊
vector
采用 連續內存存儲,有利于 CPU 緩存命中,提升訪問效率。在輪詢過程中,只需訪問固定內存位置,避免了內存跳轉帶來的性能損耗。
? 3. 刪除主機速度適中
刪除主機時雖然
removeHost()
的復雜度是 O(n),但主機上下線事件發生頻率遠低于請求頻率。即使主機上下線處理稍慢,但
chooseHost()
仍然保持 O(1) 的快速訪問。
?? 注意:
vector
刪除主機時會觸發內存移動,導致性能下降,因此不適合頻繁上下線的場景。但在主機變更較少的場景下,
vector
的整體性能優于list
或map
。
七.對服務發現與注冊的封裝
一.客戶端 rpc_client
封裝客戶端:三大功能模塊
一、業務功能:
基礎 RPC 功能
服務注冊功能
服務發現功能
二、基礎底層模塊:
網絡通信客戶端模塊(由
BaseClient
封裝)
🧱 類結構封裝解析
1.
RegistryClient
:服務注冊客戶端
構造時連接注冊中心地址
提供
registryMethod()
方法:業務提供者向注冊中心注冊服務成員模塊包含:
_provider
:服務提供者
_requestor
:發送請求組件
_dispatcher
:調度器
_client
:基礎通信客戶端2.
DiscoveryClient
:服務發現客戶端
構造時連接注冊中心地址
提供
registryDiscovery()
方法:業務調用方向注冊中心發現服務成員模塊包含:
_discoverer
:服務發現器
_requestor
、_dispatcher
、_client
:同上3.
RpcClient
:RPC 核心客戶端
構造參數
enableDiscovery
決定是否開啟服務發現模式:
若為
true
:連接的是注冊中心若為
false
:連接的是具體的服務提供者提供多種調用方式:
同步調用(返回
result
)異步 future 調用(返回
std::future
)異步 callback 調用(傳入回調函數)
內部組合:
_discovery_client
:可選服務發現客戶端
_caller
:RPC 調用管理器
_requestor
、_dispatcher
、_client
:同樣是基礎通信組件
在構建rpc客戶端時,我們用長連接還是短連接?
1.當客戶端調用call()請求對應方法時,RpcClient 內部調用 DiscoveryClient 進行服務發現?向 注冊中心 Registry Server 發送服務發現請求
2.注冊中心再返回對應方法的提供者主機地址。
3.RpcClient 用RR輪詢從中選出來一個地址創建rpc client客戶端并連接對應方法的提供者主機
4.服務提供者 Provider 接收到請求處理完返回結果給客戶端
5.客戶端回調觸發,返回響應給用戶。
短鏈接:創建一個rpc client客戶端對象,連接服務提供者,進行rpc調用,調用結束后就銷毀關閉客戶端。
? 優點:
實現簡單,按需連接、按需釋放;
沒有資源長期占用問題。
? 缺點:
性能差:每次調用都要建立和銷毀 TCP 連接,連接成本高;
不利于高頻 RPC 場景;
異步處理時管理麻煩:可能連接剛斷,回調結果還沒處理。
長連接:調用完后并不會銷毀關閉客戶端,而是將客戶端放入連接池。后續還需要訪問該主機的該方法,就會從連接池中找到原本的客戶端對象,進行rpc調用。若該主機的該服務下線,需要從池中刪除對應客戶端連接。
? 優點:
高性能:避免頻繁連接/斷開,尤其是重復調用同一服務時;
適合高并發、低延遲系統。
? 缺點:
管理復雜,需要處理:
服務下線、連接失效的自動剔除;
異步/并發安全;
池容量、連接空閑策略等。
在我們這個項目中我們選擇長連接。
主要還是短連接異步處理時管理麻煩。
短鏈接,客戶端進行完rpc調用就會關閉,后面服務提供者返回結果給客戶端,客戶端沒了收到收到應答的回調函數onResponse???也就不能把結果設置道promise中,上層futrue就不能就緒。
回調不觸發,業務邏輯“卡住”
尤其是你用std::future
或promise
等異步等待對象,結果永遠收不到。觸發回調時訪問已被釋放的連接對象,導致崩潰
比如回調中引用了RpcClient
或connection
,但連接已經析構。異步響應結果丟失,日志無記錄,bug 難排查
你會覺得“調用失敗了但程序沒報錯”,其實是 TCP 在你沒注意時被關掉了。異步+短連接的問題在于連接的生命周期和異步結果不一致,導致回調無法安全執行。
解決方法:在rpc調用結束后不關閉客戶端,而是設置一個回調函數,確保收到收到響應處理完再關閉客戶端。
#include "../common/dispatcher.hpp"
#include "requestor.hpp"
#include "rpc_caller.hpp"
#include "rpc_registry.hpp"namespace wws
{
namespace client
{//服務注冊客戶端class RegistryClient{public:using ptr=std::shared_ptr<RegistryClient>;//構造函數傳入注冊中心的地址信息 用于連接注冊中心RegistryClient(const std::string&ip,int port):_requestor(std::make_shared<Requestor>()),_provider(std::make_shared<client::Provider>(_requestor)),_dispatcher(std::make_shared<Dispatcher>()){//注冊中心返回響應消息時觸發的回調函數auto rsp_cb=std::bind(&client::Requestor::onResponse,_requestor.get(),std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE,rsp_cb);auto message_cb=std::bind(&Dispatcher::onMessage,_dispatcher.get(),std::placeholders::_1,std::placeholders::_2);_client=ClientFactory::create(ip,port);_client->setMessageCallback(message_cb);_client->connect();}//向外提供的服務注冊接口bool registryMethod(const std::string&method, Address&host){return _provider->registryMethod(_client->connection(), method, host);}private:Requestor::ptr _requestor;client::Provider::ptr _provider;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;};//服務發現客戶端class DiscoveryClient{public:using ptr=std::shared_ptr<DiscoveryClient>;//構造函數傳入注冊中心的地址信息 用于連接注冊中心DiscoveryClient(const std::string&ip,int port,const Discoverer::OfflineCallback &cb):_requestor(std::make_shared<Requestor>()),_discoverer(std::make_shared<client::Discoverer>(_requestor,cb)),_dispatcher(std::make_shared<Dispatcher>()){//注冊中心返回響應消息時觸發的回調函數auto rsp_cb=std::bind(&client::Requestor::onResponse,_requestor.get(),std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE,rsp_cb);//當注冊中心向客戶端進行上線/下線通知時觸發的回調函數auto req_cb=std::bind(&client::Discoverer::onServiceRequest,_discoverer.get(),std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE,req_cb);//消息回調函數 所有收到的消息,統一交由 Dispatcher::onMessage 分發給對應 handlerauto message_cb=std::bind(&Dispatcher::onMessage,_dispatcher.get(),std::placeholders::_1,std::placeholders::_2);_client=ClientFactory::create(ip,port);_client->setMessageCallback(message_cb);_client->connect();}//向外提供的服務發現接口bool registryDiscovery(const std::string&method, Address&host){return _discoverer->serviceDiscovery(_client->connection(), method, host);}private:Requestor::ptr _requestor;client::Discoverer::ptr _discoverer;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;};//rpc客戶端class RpcClient{public:using ptr=std::shared_ptr<RpcClient>;// enableDiscovery--是否啟用服務發現功能 也決定了傳入地址信息是注冊中心地址 還是提供者的地址RpcClient(bool enableDiscovery, const std::string &ip, int port):_enableDiscovery(enableDiscovery),_requestor(std::make_shared<Requestor>()),_dispatcher(std::make_shared<Dispatcher>()),_caller(std::make_shared<wws::client::RpcCaller>(_requestor)){//注冊中心返回響應消息時觸發的回調函數auto rsp_cb=std::bind(&client::Requestor::onResponse,_requestor.get(),std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE,rsp_cb);//1.如果啟用的服務發現 地址信息是注冊中心的地址 是服務發現客戶端需要連接的地址 則通過地址信息實例化discover_client//需要經過服務發現獲取提供者address再獲取對應的clientif(_enableDiscovery){//設置服務下線回調auto offline_cb=std::bind(&RpcClient::delClient,this,std::placeholders::_1);_discovery_client=std::make_shared<DiscoveryClient>(ip,port,offline_cb);}//2.如果沒有啟用服務發現 則地址信息是服務提供者的地址 則直接創建客戶端實例化好rpc_client//直接根據提供的ip+port創建對應的clientelse{auto message_cb=std::bind(&Dispatcher::onMessage,_dispatcher.get(),std::placeholders::_1,std::placeholders::_2);_rpc_client=ClientFactory::create(ip,port);_rpc_client->setMessageCallback(message_cb);_rpc_client->connect();}}//1.同步bool call( const std::string &method,const Json::Value ¶ms, Json::Value &result){//獲取clientBaseClient::ptr client=getClient(method);if(client.get()==nullptr)return false;//通過客戶端連接 發送rpc請求return _caller->call(client->connection(),method,params,result);}//2.異步futurebool call( const std::string &method,const Json::Value ¶ms, std::future<Json::Value> &result){BaseClient::ptr client = getClient(method);if (client.get() == nullptr)return false;return _caller->call(client->connection(), method, params, result); }//3.異步回調bool call(const std::string &method,const Json::Value ¶ms, const RpcCaller::JsonResponseCallback &cb){BaseClient::ptr client = getClient(method);if (client.get() == nullptr)return false;return _caller->call(client->connection(), method, params, cb); }private://創建clientBaseClient::ptr newClient(const Address &host){auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);auto client = ClientFactory::create(host.first, host.second);client->setMessageCallback(message_cb);client->connect();//添加到連接池putClient(host,client);return client;}//根據address從連接池查找client 沒有返回空BaseClient::ptr getClient(const Address&host){std::unique_lock<std::mutex> lock(_mutex);auto it=_rpc_clients.find(host);if(it==_rpc_clients.end()){return BaseClient::ptr();}return it->second;}//根據method獲取client://1.method->服務發現->獲取目標Address->從連接池獲取/沒有直接創建//2.用戶傳入的ip+port->直接獲取已經創建的clientBaseClient::ptr getClient(const std::string method){//1.服務發現獲取的ip+port BaseClient::ptr client;if(_enableDiscovery){//1.通過服務發現 獲取服務提供者地址信息Address host;bool ret=_discovery_client->registryDiscovery(method,host);if(ret==false){ELOG("當前%s服務 沒找到服務提供者",method.c_str());return BaseClient::ptr();}//2.查看連接池中是否有對應的客戶端 有就直接用 沒有就創建client=getClient(host);if(client.get()==nullptr){client=newClient(host);}}//2.用戶提供的ip+port創建的clientelse{client=_rpc_client;}return client;}void putClient(const Address&host,BaseClient::ptr&client){std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.insert(std::make_pair(host,client));}void delClient(const Address&host,BaseClient::ptr&client){std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.erase(host);}private:struct AddressHash{size_t operator()(const Address&host){std::string addr=host.first+std::to_string(host.second);return std::hash<std::string>{}(addr);}};bool _enableDiscovery;DiscoveryClient::ptr _discovery_client;//網絡服務發現 用戶傳方法名method client用方法名找提供者地址進行連接RpcCaller::ptr _caller;Requestor::ptr _requestor;Dispatcher::ptr _dispatcher;BaseClient::ptr _rpc_client;//未啟用服務發現// RpcClient rpc(false, "127.0.0.1", 8080);// rpc.call("Add", ...); // 直接用 _rpc_client 調用std::mutex _mutex;//<"127.0.0.1",client1>std::unordered_map<Address,BaseClient::ptr,AddressHash>_rpc_clients;//服務發現的客戶端連接池// RpcClient rpc(true, registry_ip, port);// rpc.call("Add", ...); // 自動發現、自動連接、自動發請求};
}
}
unordered_map<>自定義類型作key
我們用哈希表來管理客戶端連接池時:
我們知道在哈希表中是通過key值找到對應的val值的,但并不是直接用我們傳過去的數當key,需要進行哈希值計算。(1.int size_t bool直接強轉 2.char* string?
h = h * 131 + static_cast<unsigned char>(c); ?// 類似 BKDR hash)
而庫中實現了string、int、float 等基本類型的哈希值計算,但這個Address pair<string,int>是個自定義類型,需要我們自己重載哈希值計算。其實就算把string+int融合成string,再套用庫中對string類型計算的哈希函數。
STL中 pair 的哈希組合方法
template <typename T1, typename T2> struct hash<std::pair<T1, T2>> {size_t operator()(const std::pair<T1, T2>& p) const {size_t h1 = std::hash<T1>()(p.first);size_t h2 = std::hash<T2>()(p.second);return h1 ^ (h2 << 1); } };
操作 目的 std::hash<X>()
獲取單個字段的 hash 值 << 1
左移擾動哈希位,使兩個字段 hash 分布更開 ^
異或混合兩個 hash,避免簡單疊加導致沖突 效果 更均勻、穩定、不易沖突的哈希組合值
既然選擇長連接+連接池的做法,那就要處理當服務下線時 在連接池中刪除對應的client
怎么做?設置回調函數,在服務下線時進行調用。
1.RpcClinet初始化DiscoveryClient時傳入回調函數
2.DiscoveryClient 傳給->?Discoverer
3.Discoverer再把回調函數cb設置到成員變量中
4.Client::onServiceRequest處理服務下線時,除了刪除該方法中下線的主機地址Address,還要刪除連接池中連接它的client
二.服務端rpc_server (包含注冊中心服務端 rpc服務端)
🌐 服務端實現業務功能:
提供 RPC 服務
服務注冊與發現機制中提供者的管理(服務注冊)和消費者的管理(服務發現)
📦 封裝的三類服務端組件:
RPC 服務端
負責接收并處理 RPC 請求。
注冊中心服務端
負責管理服務提供者與消費者的注冊信息。
發布訂閱服務端(后續實現)
用于實現基于事件的通信機制(如 pub-sub 模式),暫未實現。
🛠 實現細節說明:
1. 注冊中心服務端
是一個純粹的服務端,用于管理提供者和消費者信息。
核心功能是處理服務注冊與發現的請求。
2. RPC 服務端
實際由兩部分組成:
RPC 服務端:用于接收和響應 RPC 請求。
服務注冊客戶端:啟動后自動連接注冊中心,并將自己能提供的服務注冊上去。
#pragma once
#include "../common/dispatcher.hpp"
#include "../client/rpc_client.hpp"
#include "rpc_router.hpp"
#include "rpc_registry.hpp"#include <set>
namespace wws
{
namespace server
{//服務注冊服務端class RegistryServer{public:using ptr=std::shared_ptr<RegistryServer>;RegistryServer(int port):_pd_manager(std::make_shared<PDManager>()),_dispatcher(std::make_shared<Dispatcher>()){auto service_cb = std::bind(&PDManager::onServiceRequest, _pd_manager.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, service_cb);_server = wws::ServerFactory::create(port);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);auto close_cb=std::bind(&RegistryServer::onConnShutdown,this,std::placeholders::_1);_server->setCloseCallback(close_cb);}void start(){_server->start();}private:void onConnShutdown(const BaseConnection::ptr&conn){_pd_manager->onConnShutdown(conn);}private:PDManager::ptr _pd_manager;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};//rpc服務端class RpcServer{public:using ptr=std::shared_ptr<RpcServer>;RpcServer(const Address&access_addr,bool enableRegistry=false,const Address®istry_server_addr=Address()):_enableRegistry(enableRegistry),_access_addr(access_addr),_router(std::make_shared<wws::server::RpcRouter>()),_dispatcher(std::make_shared<wws::Dispatcher>()){//啟用服務注冊if(_enableRegistry){_reg_client=std::make_shared<client::RegistryClient>(registry_server_addr.first,registry_server_addr.second);}//成員server是一個rpcserver 用于提供rpc服務auto rpc_cb = std::bind(&RpcRouter::onRpcRequest, _router.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<wws::RpcRequest>(wws::MType::REQ_RPC, rpc_cb);//創建一個監聽指定端口(如 8080)的 RPC 服務器對象_server = wws::ServerFactory::create(access_addr.second);//默認監聽 0.0.0.0:port 我監聽所有可用 IPauto message_cb = std::bind(&wws::Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);}void registerMethod(const ServiceDescribe::ptr &service)//服務描述類{if(_enableRegistry){_reg_client->registryMethod(service->method(),_access_addr);//把 method->本主機地址 發給注冊中心 表示自己可以提供method方法}_router->registerMethod(service);//本地注冊 把服務描述service注冊到RpcRouter中的服務管理類 onRpcRequest接收到客戶端請求時進行路由分發調用}void start(){_server->start();}private:bool _enableRegistry;Address _access_addr;//自己的對外服務地址(客戶端要連接我就來這)client::RegistryClient::ptr _reg_client;//注冊客戶端,用于連接注冊中心并注冊本地服務RpcRouter::ptr _router;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};}
}
注冊中心 服務端 客戶端代碼測試
1.先啟動注冊中心服務端 并設置port. 處理服務注冊請求 服務發現請求 以及服務上下線通知
2.再啟動rpc服務端 ,rpc服務端可以通過_reg_client注冊中心客戶端進行服務注冊,但需要啟動服務注冊_enableRegistry=ture,提供注冊中心客戶端的Address。
還可以注冊本地服務方法,內部注冊到
RpcRouter
路由器中,用于請求到來時快速查找并調用對應的回調函數。3.啟動rpc客戶端,rpc客戶端有兩種連接方式 啟動服務發現 根據方法名向注冊中心查詢服務地址,然后連接對應的服務端。不啟動 直接連用戶傳入的服務提供者的地址
用戶再調用call 通過客戶端連接 發送rpc請求
rpc_server.cpp
#include "../../server/rpc_server.hpp"
#include "../../common/detail.hpp"void Add(const Json::Value&req,Json::Value&rsp)
{int nums1=req["nums1"].asInt();int nums2=req["nums2"].asInt();rsp=nums1+nums2;
}
int main()
{//build生產一個 服務描述類 函數名 + 參數類型+結果類型 + 函數地址(進行回調)std::unique_ptr<wws::server::SDescribeFactory> desc_factory(new wws::server::SDescribeFactory());desc_factory->setMethodName("Add");desc_factory->setParamsDesc("nums1",wws::server::VType::INTEGRAL);desc_factory->setParamsDesc("nums2",wws::server::VType::INTEGRAL);desc_factory->setReturnType(wws::server::VType::INTEGRAL);desc_factory->setCallback(Add);//wws::Address("127.0.0.1",9090) 監聽9090 在9090端口提供服務 true 表示啟動服務注冊 "127.0.0.1",8080 注冊中心的地址信息,創建client用于連接注冊中心wws::server::RpcServer server(wws::Address("127.0.0.1",9090),true,wws::Address("127.0.0.1",8080));//server.registerMethod(desc_factory->build());server.start();return 0;
}
registry_sever.cpp
#include "../../server/rpc_server.hpp"
#include "../../common/detail.hpp"int main()
{//實例化服務端wws::server::RegistryServer reg_server(8080);reg_server.start();return 0;
}
rpc_client.cpp
#include "../../client/rpc_client.hpp"
#include "../../common/detail.hpp"
#include <thread>void callback(const Json::Value &result) {DLOG("callback result: %d", result.asInt());
}int main()
{wws::client::RpcClient client(true,"127.0.0.1",8080);//1.同步調用Json::Value params,result;params["nums1"]=11;params["nums2"]=22;bool ret=client.call("Add",params,result);//client內找對應Add方法對應提供者的連接if(ret!=false){DLOG("result: %d", result.asInt());}//2.異步Futurewws::client::RpcCaller::JsonAsyncResponse res_future;params["nums1"]=33;params["nums2"]=44;ret=client.call("Add",params,res_future);if(ret!=false){result=res_future.get();DLOG("result: %d", result.asInt());} //3.回調params["nums1"]=55;params["nums2"]=66;ret = client.call("Add",params, callback);DLOG("-------\n");std::this_thread::sleep_for(std::chrono::seconds(1));return 0;
}
八.發布訂閱服務端實現rpc_topic
1. Dispatcher 模塊(右上角)
作用:
JSON-RPC 框架中的請求分發核心
判斷 RPC 消息類型 → 調用對應業務模塊(如 PubSubManager)
Dispatcher::registerMethod("topic", &PubSubManager::onTopicRequest)
2. onTopicRequest 回調函數
位置:
PubSubManager
中的統一入口函數
作用:
接收來自 Dispatcher 的請求
根據操作類型調用對應處理函數:
類型 功能函數調用 創建主題 topicCreate()
刪除主題 topicRemove()
訂閱主題 topicSubscribe()
取消訂閱 topicCancel()
發布消息 topicPublish()
3. PubSubManager 核心模塊(圖中心)
職責:
管理兩個核心 map
操作對應的
Topic
和Subscriber
數據結構std::unordered_map<std::string, Topic::ptr> _topics; std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;
4.Subscriber 結構(圖左下)
struct Subscriber {BaseConnection::ptr conn;std::unordered_set<std::string> topics; };
圖示結構:
每個訂閱者關聯一個連接
還記錄了自己訂閱的所有主題名
5. topic 結構(圖左上)
struct Topic {std::string topic_name;std::unordered_set<Subscriber::ptr> subscribers; };
圖示結構:
每個主題有一個名稱
內部維護一組訂閱它的訂閱者指針
主要用途:
當消息發布到該主題時: → 遍歷
set<subscriber>
并調用conn->send(msg)
6. 兩張 map 映射關系(圖中左)
map<topic_name, topic> map<Connection, Subscriber>
構成了典型的雙向映射系統:
topic_name -> topic -> subscribers
conn -> subscriber -> topics
#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <unordered_set>
namespace wws
{
namespace server
{class TopicManager{public:using ptr=std::shared_ptr<TopicManager>;TopicManager();// 給Dispathcer模塊進行服務上線下線請求處理的回調函數void onTopicRequest(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {TopicOptype topic_optype=msg->optype();bool ret=true;switch(topic_optype){//主題創建case TopicOptype::TOPIC_CREATE: topicCreate(conn,msg);break;//主題刪除case TopicOptype::TOPIC_REMOVE: topicRemove(conn,msg);break;//主題訂閱case TopicOptype::TOPIC_SUBSCRIBE: topicSubscribe(conn,msg);break;//取消主題訂閱case TopicOptype::TOPIC_CANCEL: topicCancel(conn,msg);break;//主題消息發布case TopicOptype::TOPIC_PUBLISH: topicPublish(conn,msg);break;//返回應答 無效操作類型default: return errorResponse(conn,msg,RCode::RCODE_INVALID_OPTYPE);}if(!ret) return errorResponse(conn,msg,RCode::RCODE_NOT_FOUND_TOPIC);return topicResponse(conn,msg);}//一個訂閱者在連接斷開時的處理 刪除其關聯的數據void onShutdown(const BaseConnection::ptr&conn){//消息發布者斷開連接 不處理; 消息訂閱者斷開連接需要刪除管理數據//1.判斷斷開連接的是否是訂閱者 不是直接返回Subscriber::ptr subscriber;//斷開連接的訂閱者對象std::vector<Topic::ptr> topics;//受影響的主題對象{auto it = _subscribers.find(conn);if (it == _subscribers.end())return;// 2.獲取受影響的主題對象subscriber=it->second;for(auto&topic_name:subscriber->topics){auto topic_it=_topics.find(topic_name);if(topic_it==_topics.end())continue;topics.push_back(topic_it->second);}//3.從訂閱者映射map中刪除 訂閱者_subscribers.erase(it);}//4.從對應主題對象中刪除訂閱者for(auto&topic:topics){topic->removeSubscriber(subscriber);}}private://錯誤響應void errorResponse(const BaseConnection::ptr&conn,const TopicRequest::ptr&msg,RCode rcode){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(wws::MType::RSP_TOPIC);//主題響應 msg_rsp->setRcode(rcode);conn->send(msg_rsp);}//注冊應答void topicResponse(const BaseConnection::ptr&conn,const TopicRequest::ptr&msg){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(wws::MType::RSP_TOPIC);msg_rsp->setRcode(RCode::RCODE_OK);conn->send(msg_rsp);}//創建主題void topicCreate(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {std::unique_lock<std::mutex> lock(_mutex);//構建一個主題對象 添加映射關系的管理std::string topic_name=msg->topicKey();//主題名稱auto topic=std::make_shared<Topic>(topic_name);_topics.insert(std::make_pair(topic_name,topic));}//刪除主題void topicRemove(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {//1.查看當前主題 有哪些訂閱者 再從訂閱者中刪除主題信息//_topics[topic_name]->subscribers subscribers->topics訂閱的所有主題名稱//2.刪除主題數據 _topics[topic_name]->Subscriber 主題名稱和主題對象的映射關系std::string topic_name=msg->topicKey();std::unordered_set<Subscriber::ptr> subscribers;{std::unique_lock<std::mutex> lock(_mutex);//刪除主題前 先找出訂閱該主題的訂閱者auto it=_topics.find(topic_name);if(it==_topics.end())return;subscribers=it->second->subscribers;_topics.erase(it);//刪除主題名稱->topic} for(auto&subscriber:subscribers)subscriber->removeSTopic(topic_name);}// 主題訂閱bool topicSubscribe(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){// 1.先找出主題對象topic 訂閱者對象subscriber// 沒有主題對象就報錯 沒有訂閱者對象就構建Topic::ptr topic;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it == _topics.end())return false;topic = topic_it->second;auto sub_it = _subscribers.find(conn);if (sub_it != _subscribers.end()){subscriber = sub_it->second;}else{subscriber = std::make_shared<Subscriber>(conn);_subscribers.insert(std::make_pair(conn, subscriber));}// 2.在主題對象中 新增一個訂閱者對象管理的連接; 在訂閱者對象中新增一個訂閱的主題topic->appendSubscriber(subscriber);subscriber->appendTopic(msg->topicKey());return true;}}//取消主題訂閱void topicCancel(const BaseConnection::ptr&conn,const TopicRequest::ptr&msg){// 1.先找出主題對象topic 訂閱者對象subscriberTopic::ptr topic;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it != _topics.end())topic = topic_it->second;auto sub_it = _subscribers.find(conn);if (sub_it != _subscribers.end())subscriber = sub_it->second;// 2.在主題對象中 刪除當前訂閱者對象管理的連接; 在訂閱者對象中刪除對應訂閱的主題if(subscriber) subscriber->removeSTopic(msg->topicKey());if(subscriber && topic) topic->removeSubscriber(subscriber);}}// 主題發布bool topicPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){Topic::ptr topic;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it == _topics.end())return false;topic=topic_it->second;}topic->pushMessage(msg);return true;}private:// 每個客戶端連接會對應一個訂閱者對象,記錄它當前訂閱了哪些主題struct Subscriber{using ptr = std::shared_ptr<Subscriber>;std::mutex _mutex;BaseConnection::ptr conn;std::unordered_set<std::string> topics; // 訂閱者訂閱的主題名稱Subscriber(const BaseConnection::ptr &c): conn(c){}// 增加主題void appendTopic(const std::string &topic_name){std::unique_lock<std::mutex> lock(_mutex);topics.insert(topic_name);}// 刪除主題void removeSTopic(const std::string &topic_name){std::unique_lock<std::mutex> lock(_mutex);topics.erase(topic_name);}};// 每個主題包含一個主題名 + 當前所有的訂閱者struct Topic{using ptr = std::shared_ptr<Topic>;std::mutex _mutex;std::string topic_name;std::unordered_set<Subscriber::ptr> subscribers; // 當前主題訂閱者Topic(const std::string &name): topic_name(name){}// 增加訂閱者void appendSubscriber(const Subscriber::ptr &subscriber){std::unique_lock<std::mutex> lock(_mutex);subscribers.insert(subscriber);}// 刪除訂閱者void removeSubscriber(const Subscriber::ptr &subscriber){std::unique_lock<std::mutex> lock(_mutex);subscribers.erase(subscriber);}// 給該主題的所有訂閱者發消息void pushMessage(const BaseMessage::ptr &msg){std::unique_lock<std::mutex> lock(_mutex);for (auto &subscriber : subscribers){subscriber->conn->send(msg);}}};private:std::mutex _mutex;std::unordered_map<std::string, Topic::ptr> _topics;std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;};
}
}
九.發布訂閱客戶端實現rpc_topic
一、發布訂閱客戶端的角色劃分
消息發布客戶端
創建主題
刪除主題
發布消息(向某個主題發布)
消息訂閱客戶端
創建主題
刪除主題
訂閱某主題的消息
取消訂閱某主題
二、整體模塊設計思路
對外的五個操作接口
針對“主題”的操作,包括:
創建
刪除
訂閱
取消訂閱
發布
對外的一個消息處理接口
提供給 dispatcher 模塊,進行消息分發處理
相當于 dispatcher 收到消息發布請求后,查找有哪些訂閱者,并調用對應的回調函數將消息推送過去
內部的數據管理
管理“主題名稱”與“消息處理回調函數”的映射關系
#pragma once
#include"requestor.hpp"namespace wws
{
namespace client
{class TopicManager{public:using ptr=std::shared_ptr<TopicManager>;using SubCallback=std::function<void(const std::string &key,const std::string&msg)>;//主題創建bool create(const BaseConnection::ptr&conn,const std::string &key){return commonRequest(conn,key,TopicOptype::TOPIC_CREATE);}//刪除bool remove(const BaseConnection::ptr&conn,const std::string &key){return commonRequest(conn,key,TopicOptype::TOPIC_REMOVE);}//訂閱主題 SubCallback收到主題新消息的進行的回調bool subscribe(const BaseConnection::ptr&conn,const std::string &key,const SubCallback&cb){//當我們訂閱了主題 可能發布者會馬上發布該主題的內容 //這時候如果cb還沒有設置到map中就無法執行回調函數 所以先設置回調函數到map中addSubscribe(key,cb);bool ret=commonRequest(conn,key,TopicOptype::TOPIC_SUBSCRIBE);if(ret==false){//請求發送失敗 刪除map中對應的cbdelSubscribe(key);return false;}return true;}//取消訂閱bool cancel(const BaseConnection::ptr&conn,const std::string &key){delSubscribe(key);return commonRequest(conn,key,TopicOptype::TOPIC_CANCEL);}//發布消息(向某個主題發布)bool publish(const BaseConnection::ptr&conn,const std::string &key,const std::string &msg){return commonRequest(conn,key,TopicOptype::TOPIC_PUBLISH,msg);}// 當收到服務端推送的消息時調用,觸發對應訂閱者的回調處理邏輯 (設置給dispatcher收到對應主題消息 進行回調處理)void onPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){//1.先判斷該消息的操作類型是否為 發布消息auto type=msg->optype();if(type!=TopicOptype::TOPIC_PUBLISH){ELOG("收到了錯誤類型的主題操作");return;}//2.取出主題名稱 以及消息內容 后面調用cbstd::string topic_key=msg->topicKey();std::string topic_msg=msg->topicMsg();//3.調用cbauto callback=getSubscribe(topic_key);if(!callback){ELOG("收到了%s主題信息 但該主題無對應回調",topic_key.c_str());return;}callback(topic_key,topic_msg);}private:void addSubscribe(const std::string &key,const SubCallback&cb){std::unique_lock<std::mutex> lock(_mutex);_topic_callbacks.insert(std::make_pair(key,cb));}void delSubscribe(const std::string &key){std::unique_lock<std::mutex> lock(_mutex);_topic_callbacks.erase(key);}const SubCallback& getSubscribe(const std::string &key){std::unique_lock<std::mutex> lock(_mutex);auto it=_topic_callbacks.find(key);if(it==_topic_callbacks.end())return SubCallback();return it->second;}bool commonRequest(const BaseConnection::ptr&conn,const std::string &key,TopicOptype type,const std::string &msg=""){//1.構造請求對象 并填充數據auto msg_req=MessageFactory::create<TopicRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_TOPIC);msg_req->setTopicKey(key);msg_req->setOptype(type);if(type==TopicOptype::TOPIC_PUBLISH)msg_req->setTopicMsg(msg);//2.向服務端發送請求 等待響應BaseMessage::ptr msg_rsp;//發請求 + 等響應 + 反序列化響應 + 返回響應對象msg_rspbool ret=_requestor->send(conn,msg_req,msg_rsp);if(ret==false){ELOG("主題創建請求失敗");return false;}//3.判斷請求處理是否成功auto topic_rsp_msg=std::dynamic_pointer_cast<TopicResponse>(msg_rsp);if(!topic_rsp_msg){ELOG("主題響應 向下類型轉換失敗");return false;}if(topic_rsp_msg->rcode()!=RCode::RCODE_OK);{ELOG("主題創建請求出錯:%s",errReason(topic_rsp_msg->rcode()).c_str());return false;}return true;}private:std::mutex _mutex;//根據主題查找對應的回調函數執行std::unordered_map<std::string,SubCallback> _topic_callbacks;Requestor::ptr _requestor;};
}
}
十.topicServer topicClient封裝