C++ Json-Rpc框架-3項目實現(2)

?一.消息分發Dispatcher實現

Dispatcher 就是“消息分發中樞”:根據消息類型 MType,把消息派發給對應的處理函數(Handler)執行。

初版:

#pragma once
#include "net.hpp"
#include "message.hpp"namespace wws
{class Dispatcher {public:using ptr=std::shared_ptr<Dispatcher>;void registerHandler(MType mtype,const MessageCallback &handler){std::unique_lock<std::mutex> lock(_mutex);_handlers.insert(std::make_pair(mtype,handler));}void onMessage(const BaseConnection::ptr&conn,BaseMessage::ptr &msg){//找到消息類型對應的業務處理函數,進行調用std::unique_lock<std::mutex> lock(_mutex);auto it=_handlers.find(msg->mtype());if(it!=_handlers.end()){it->second(conn,msg);return;}//沒有找到指定類型的處理回調 但我們客戶端和服務端都是我們自己設計的 因此不可能出現這種情況ELOG("收到未知類型的消息");conn->shutdown();}private:std::mutex _mutex;std::unordered_map<MType,MessageCallback> _handlers;};
}

使用方法 :服務端為例

#include "message.hpp"
#include "net.hpp"
#include "dispatcher.hpp"void onRpcRequest(const wws::BaseConnection::ptr conn,wws::BaseMessage::ptr& msg)
{std::cout<<"收到了Rpc請求:";std::string body =msg->serialize();std::cout<<body<<std::endl;auto rpc_rsp=wws::MessageFactory::create<wws::RpcResponse>();rpc_rsp->setId("111");rpc_rsp->setMType(wws::MType::RSP_RPC);rpc_rsp->setRcode(wws::RCode::RCODE_OK);rpc_rsp->setResult(33);conn->send(rpc_rsp);
}
void onTopicRequest(const wws::BaseConnection::ptr conn,wws::BaseMessage::ptr& msg)
{std::cout<<"收到了Topic請求:";std::string body =msg->serialize();std::cout<<body<<std::endl;auto rpc_rsp=wws::MessageFactory::create<wws::TopicResponse>();rpc_rsp->setId("111");rpc_rsp->setMType(wws::MType::RSP_RPC);rpc_rsp->setRcode(wws::RCode::RCODE_OK);conn->send(rpc_rsp);
}int main()
{//server建立收到rpc topic請求時對應的調用函數auto dispatcher=std::make_shared<wws::Dispatcher>();dispatcher->registerHandler(wws::MType::REQ_RPC,onRpcRequest);//注冊映射關系dispatcher->registerHandler(wws::MType::REQ_TOPIC,onTopicRequest);//注冊映射關系auto server=wws::ServerFactory::create(9090);auto message_cb=std::bind(&wws::Dispatcher::onMessage,dispatcher.get(),std::placeholders::_1,std::placeholders::_2);server->setMessageCallback(message_cb);server->start();return 0;
}

回調函數調用過程:

1.服務端定義了兩個函數onRpcRequest收到rpc請求的回調函數,onTopicRequest收到topic請求的回調函數(兩個函數的參數部分都是一樣的)。

2.創建Dispatcher類對象,調用registerHandler函數把請求類型(mtype)和對應的回調函數建立映射(因為上面回調函數的類型都是一樣的,所以可以用map進行同一管理)。

3.把Dispatcher::onMessage函數設置成消息回調函數。在Dispatcher::onMessage函數內部會根據不同的消息類型,找到對應的回調函數并進行調用。

但是設置的兩個回調函數中onRpcRequest,onTopicRequest。它們第二個參數類型都是基類BaseMessage,雖然傳入的對象是子類對象RpcResponse TopicResponse,但仍無法訪問它們對應子類的成員函數。

如果把第二個參數基類Base換成它們對應的子類,能訪問了,但這就會導致函數的類型不一樣了,就不能用map進行統一管理。

有沒有什么辦法即能用map進行統一管理,還能在回調函數中調用到子類的函數。

方法一:直接在回調函數中通過dynamic_cast?/ std::dynamic_pointer_cast將父類->子類

可以通過 dynamic_cast 將基類指針(或引用)轉換為子類類型,以便訪問子類特有的成員函數。前提是你使用的是多態(polymorphic)類,也就是基類至少要有一個虛函數

//Base基類 Derived子類
void test(Base* basePtr) {// 裸指針寫法Derived* derivedPtr = dynamic_cast<Derived*>(basePtr);if (derivedPtr) {derivedPtr->specialFunction(); // 成功轉換,調用子類函數} else {cout << "dynamic_cast failed!" << endl;}
}
//智能指針寫法
std::shared_ptr<Base> basePtr = std::make_shared<Derived>();  // 用智能指針創建對象
std::shared_ptr<Derived> derivedPtr = std::dynamic_pointer_cast<Derived>(basePtr);  // 類型安全轉換
項目裸指針寫法智能指針寫法
創建方式new Derived()std::make_shared<Derived>()
類型轉換dynamic_cast<Derived*>(Base*)std::dynamic_pointer_cast<Derived>(shared_ptr<Base>)
返回值類型Derived*std::shared_ptr<Derived>
生命周期管理手動 delete自動釋放,無內存泄漏風險
安全檢查? 運行時類型檢查,失敗返回 nullptr? 同樣運行時類型檢查,失敗返回空智能指針
是否影響引用計數? 不涉及引用計數? 新建了一個共享控制塊引用
缺點說明
? 調用方需要知道消息類型調用者必須手動 dynamic_cast 到對應子類,否則不能訪問子類內容
? 有一定運行時開銷dynamic_cast 需要在運行時檢查類型,會略有性能損失(但一般能接受)
? 容易出錯如果類型錯了,就返回 nullptr,還要額外判斷、處理錯誤
? 易破壞封裝上層代碼要知道并顯式轉換為子類,增加了耦合度和類型暴露

方法二:模板 + 繼承 + 多態

先看代碼實現:
?

#pragma once
#include "net.hpp"
#include "message.hpp"namespace wws
{//讓統一的父類指針指向不同的子類對象//通過調用父類虛函數,調用不同子類onMessage類型轉換(dynamic<>)完成后的函數調用class Callback{public:using ptr=std::shared_ptr<Callback>;virtual void onMessage(const BaseConnection::ptr &conn,BaseMessage::ptr &msg)=0;};template<typename T>class CallbackT:public Callback{public:using ptr=std::shared_ptr<CallbackT<T>>;//根據消息類型重新定義出函數類型using MessageCallback =std::function<void(const BaseConnection::ptr&conn,std::shared_ptr<T> &msg)>;CallbackT(const MessageCallback &handler):_handler(handler){}void onMessage(const BaseConnection::ptr &conn,BaseMessage::ptr &msg)override{auto type_msg=std::dynamic_pointer_cast<T>(msg);_handler(conn,type_msg);}private:MessageCallback  _handler;};class Dispatcher {public:using ptr=std::shared_ptr<Dispatcher>;template<typename T>                 //加typename表示這是一個類型void registerHandler(MType mtype,const typename CallbackT<T>::MessageCallback &handler)//傳的是類對象 T是消息類型(BaseMessage子類){std::unique_lock<std::mutex> lock(_mutex);auto cb = std::make_shared<CallbackT<T>>(handler);_handlers.insert(std::make_pair(mtype,cb));}void onMessage(const BaseConnection::ptr&conn,BaseMessage::ptr &msg){//找到消息類型對應的業務處理函數,進行調用std::unique_lock<std::mutex> lock(_mutex);auto it=_handlers.find(msg->mtype());if(it!=_handlers.end()){it->second->onMessage(conn,msg);//調用類中的回調函數return;}//沒有找到指定類型的處理回調 但我們客戶端和服務端都是我們自己設計的 因此不可能出現這種情況ELOG("收到未知類型的消息");conn->shutdown();}private:std::mutex _mutex;std::unordered_map<MType,Callback::ptr> _handlers;//second是一個基類指針};
}

使用方法:

不用傳BaseMessage基類,直接傳子類。但在設置時指明Typed消息類型。

因為回調函數hadler類型不同,我們就用模板類根據T(消息類型)生成不同的類,里面是保存的是不同類型的回調函數。但根據模板類生成的類,類型不一樣還是不能統一放入map中。

所以再定義一個父類 里面的回調函數設置為虛函數,這些不同的模板類作為子類繼承父類并實現各自的回調函數。map中的handler存的不再是回調函數,而是一個父類指針,通過父類指針調用子類的回調函數。

具體過程:

  • 使用模板類 CallbackT<T> 根據消息類型 T 生成不同的子類,它們內部包裝了各自類型的回調函數。(因為子類傳入的是一個函數類型,保存的一個函數類型,而T是一個消息類型。還需要MessageCallback將T消息類型和回調函數類型綁定在一起)

  • 這些模板類繼承自統一的基類 Callback,并重寫了其 onMessage() 虛函數。

  • 當我們在調用registerHandler注冊回調函數時,會創建一個CallbackT<T>的對象并獲取其指針,最后設置到map中。

  • Dispatcher 中,map<MType, Callback::ptr> 存儲的是基類指針,但實際指向的是不同子類 CallbackT<T> 的對象。

  • 當調用 onMessage() 時,基類指針會通過虛函數機制調用對應子類的實現,再通過 dynamic_pointer_cast 將消息轉換為正確的類型,最終調用具體的業務回調函數

方法一和方法二對比:

對比點? 當前封裝方式<br>(模板 + 多態)? 直接在每個 handler 里手動 dynamic_cast
💡 代碼復用性回調邏輯封裝在模板中,避免重復寫類型轉換代碼每個回調都要重復寫一次 dynamic_pointer_cast
? 類型安全編譯器自動根據模板類型 T 限定傳入的函數簽名由開發者自己保證類型正確,容易寫錯
🎯 接口統一Dispatcher 接收統一的 BaseMessage::ptr,自動調用類型對應的 handler接口統一,但每次回調前都得“手動猜”消息類型
🧼 代碼整潔性Handler 業務函數專注于處理業務,不摻雜類型轉換代碼handler 代碼里混入了類型轉換、錯誤判斷等雜項
🔄 可擴展性新增消息類型只需 registerHandler<T> 一行,無需修改 dispatcher 邏輯每新增一種類型,都要寫新回調 + 自己處理類型轉換
🔒 類型封裝類型轉換封裝在 CallbackT<T>::onMessage 內,調用者無感知顯式暴露類型細節,破壞封裝性
🧠 可維護性Dispatcher 管理邏輯集中、結構清晰回調函數多時,容易混亂、出錯
相比直接在 handler 里 dynamic_cast,現在的設計通過模板和多態封裝了類型轉換邏輯,使回調函數 更簡潔、更安全、更可維護,Dispatcher 也更具通用性和擴展性。

二.服務端-RpcRouter實現

組織和處理客戶端發來的 RPC 請求,并調用對應的業務邏輯進行響應

這個模塊主要由 4 個類構成:

類名作用
VType參數類型的枚舉,例如整數、字符串、對象等
ServiceDescribe描述一個服務方法的參數、返回值、回調函數等
ServiceManager管理多個服務(增刪查)
RpcRouter處理客戶端發來的 RPC 請求,協調調用服務
#include "../common/net.hpp"
#include "../common/message.hpp"namespace wws
{
namespace server
{//枚舉類 VType 定義參數與返回值的類型enum class VType{BOOL = 0,INTEGRAL,NUMERIC,STRING,ARRAY,OBJECT,};// 服務描述類class ServiceDescribe{public:using ptr=std::shared_ptr<ServiceDescribe>;using ServiceCallback=std::function<void(const Json::Value&,Json::Value&)>;using ParamDescribe=std::pair<std::string,VType>;//參數名稱 類型ServiceDescribe(std::string &&method_name,ServiceCallback &&callback,std::vector<ParamDescribe>&& params_desc,VType return_type):_method_name(std::move(method_name)),_callback(std::move(callback)),_params_desc(std::move(params_desc)),_return_type(return_type){}//返回名稱const std::string & method(){return _method_name;}//校驗傳入參數是否符合要求(1.字段完整 + 2.類型匹配)bool paramCheck(const Json::Value&params)//{"nums1",11}{for(auto&desc:_params_desc){//1.判斷是否有該字段if(params.isMember(desc.first)==false){ELOG("沒有 %s 參數字段",desc.first.c_str());return false;}//2.判斷該字段類型是否正確if(check(desc.second,params[desc.first])==false){ELOG("%s參數字段類型錯誤",desc.first.c_str());return false;}}return true;}bool call(const Json::Value& params,Json::Value&result){_callback(params,result);if(rtypeCheck(result)==false){ELOG("回調處理函數中響應信息類型錯誤");return false;}return true;}private:// 判斷return類型是否正確bool rtypeCheck(const Json::Value &val){return check(_return_type, val);}//判斷val對象的類型是否和vtype一致 Json::Value兼容任何JSON類型(int、string、array、object 等)bool check(VType vtype,const Json::Value &val){switch(vtype){case VType::BOOL :return val.isBool();case VType::INTEGRAL : return val.isIntegral();case VType::NUMERIC : return val.isNumeric();case VType::STRING : return val.isString();case VType::ARRAY : return val.isArray();case VType::OBJECT : return val.isObject();}return false;}private:std::string _method_name;//方法名稱ServiceCallback _callback;//實際的業務回調函數std::vector<ParamDescribe> _params_desc;//參數字段格式描述vector<參數名稱,對應的類型>VType _return_type;//結果類型描述 };//對比 直接在ServiceDescribe中set各參數 的優點:構造完后ServiceDescribe 的成員就不再修改,僅讀取 天然線程安全。//若多個線程同時調用 setXxx() 方法 會出現線程安全的問題 需要在每個set函數中加鎖class SDescribeFactory{public:void setMethodName(const std::string&name){_method_name=name;}void setReturnType(VType vtype){_return_type=vtype;}void setParamDesc(const std::string &pname,VType vtype){_params_desc.push_back(ServiceDescribe::ParamDescribe(pname,vtype));}void setCallback(const ServiceDescribe::ServiceCallback&cb){_callback=cb;}ServiceDescribe::ptr build(){return std::make_shared<ServiceDescribe>(_method_name,_callback,_params_desc,_return_type);}private:std::string _method_name;//方法名稱ServiceDescribe::ServiceCallback _callback;               // 實際的業務回調函數std::vector<ServiceDescribe::ParamDescribe> _params_desc; // 參數字段格式描述vector<參數名稱,對應的類型>VType _return_type;                                       // 結果類型描述};//服務管理類 增刪查class ServiceManager{public:using ptr=std::shared_ptr<ServiceManager>;void insert(const ServiceDescribe::ptr&desc)//增{std::unique_lock<std::mutex> lock(_mutex);_service.insert(std::make_pair(desc->method(),desc));}ServiceDescribe::ptr select(const std::string &method_name)//查{std::unique_lock<std::mutex> lock(_mutex);auto it=_service.find(method_name);if(it==_service.end()){return ServiceDescribe::ptr();}return it->second;}void remove(const std::string &method_name)//刪{_service.erase(method_name);}private:std::mutex _mutex;std::unordered_map<std::string,ServiceDescribe::ptr> _service;//函數名稱 對應服務};class RpcRouter{public:using ptr=std::shared_ptr<ServiceDescribe>;//對注冊到Dispatcher模塊針對rpc請求進行回調處理的業務函數void onRpcRequest(const BaseConnection::ptr&conn,RpcRequest::ptr &request){//1.根據用戶請求的方法描述 判斷當前服務端能否提供對應的服務auto service=_service_manager->select(request->method());if(service.get()==nullptr){ELOG("未找到%s服務",request->method().c_str());return response(conn,request,Json::Value(),RCode::RCODE_NOT_FOUND_SERVICE);}//2.進行參數校驗 確定能否提供服務if(service->paramCheck(request->params())==false){ELOG("%s服務參數校驗失敗",request->method().c_str());return response(conn,request,Json::Value(),RCode::RCODE_INVALID_PARAMS);}//3.調用業務回調接口進行處理Json::Value result;bool ret=service->call(request->params(),result);if(ret==false){ELOG("%s服務參調用出錯",request->method().c_str());return response(conn,request,Json::Value(),RCode::RCODE_INTERNAL_ERROR);}//4.向客戶端發送結果return response(conn,request,result,RCode::RCODE_OK);}//提供服務注冊接口void registerMethod(const ServiceDescribe::ptr &service ){_service_manager->insert(service);}private://響應對象void response(const BaseConnection::ptr&conn,RpcRequest::ptr&req,const Json::Value&res,RCode rcode){auto msg=MessageFactory::create<RpcResponse>();msg->setId(req->rid());msg->setMType(wws::MType::RSP_RPC);msg->setRcode(rcode);msg->setResult(res);conn->send(msg);}private:ServiceManager::ptr _service_manager;};
}
}

ServiceDescribe:服務描述類

每一個服務方法(如 AddTranslate)都有一個 ServiceDescribe 對象,它記錄:

  • 方法名 _method_name

  • 參數信息 _params_desc(字段名 + 類型)

  • 返回值類型 _return_type

  • 實際處理邏輯 _callback

?功能:

  • paramCheck():檢查客戶端傳來的參數是否完整且類型匹配。

  • call():調用業務函數,處理請求,并校驗響應類型,輸出型參數Json::Value&result獲取結果。

RpcRouter:RPC 請求核心調度器

處理流程(onRpcRequest):

1. 獲取客戶端請求的方法名:request->method()
2. 查找是否有對應的服務:_service_manager->select()
3. 參數檢查:service->paramCheck()
4. 調用回調函數處理:service->call()
5. 構建響應消息:RpcResponse
6. 通過 conn->send(msg) 返回結果

注冊流程(registerMethod()):

RpcRouter router;
router.registerMethod(service); // 將服務注冊到服務管理器

為什么要用 SDescribeFactory 工廠模式而不是在 ServiceDescribe 中直接使用 setXxx() 方法進行設置ServiceDescribe的各個參數?

通過DescribeFactory 工廠模式,造完后ServiceDescribe 的成員就不再修改,僅讀取 天然線程安全。

如果在ServiceDescribe 設置set(),若多個線程同時調用 setXxx() 方法 會出現線程安全的問題 需要在每個set函數中加鎖。

         客戶端發送 RpcRequest↓RpcRouter::onRpcRequest()//進行處理↓_service_manager->select(method)//查找方法↓ServiceDescribe::paramCheck(params)//校驗參數↓ServiceDescribe::call() → 執行業務邏輯//執行回調↓RpcRouter::response() → 發送響應

三.客戶端-RpcRouter實現

🔹 1. describe(請求描述體)

  • 封裝一個請求的基本信息:

    • request: 包含 RID(請求 ID)、MType(消息類型)、Body(請求體)

    • std::promise<response>:用于 future 異步

    • callback:用于 callback 異步

    • RType: 標識異步類型(ASYNC / CALLBACK)

🔹 2. Requestor::send(...)

  • send(connection, request, callback) → 異步 callback 模式

  • send(connection, request, std::future<response>) → future 模式

  • describe 存入 map<rid, describe> 中,等待響應回調

🔹 3. onResponse(connection, response)

  • 收到響應后,根據 RID 從 map<rid, describe> 查找對應的 describe

  • 按照 RType 分發:

    • 如果是 CALLBACK,調用 callback

    • 如果是 ASYNC,通過 promise.set_value(...) 實現 future 結果

🔹 4. Dispatcher<mt, handler>

  • MType 進行派發處理(主要針對訂閱/通知類型的消息,不含 RID)

#include "../common/net.hpp"
#include "../common/message.hpp"
#include <future>namespace wws
{
namespace client
{class Requestor{public:using ptr=std::shared_ptr<Requestor>;using RequestCallback=std::function<void(const BaseMessage::ptr&)>;using AsyncResponse=std::future<BaseMessage::ptr>;//請求描述的結構體struct RequestDescribe{using ptr=std::shared_ptr<RequestDescribe>;BaseMessage::ptr request;//請求消息指針RType rtype; //請求類型 異步/回調std::promise<BaseMessage::ptr> response;//用于 async 模式,設置結果set_value 給 future 返回值RequestCallback callback;//用于 callback 模式,響應到來時觸發用戶邏輯};//收到應答 根據rid找到對應的請求設置結果 或者 調用回調 void onReponse(const BaseConnection::ptr&conn,const BaseMessage::ptr&msg){std::string rid=msg->rid();RequestDescribe::ptr rdp=getDescribe(rid);//根據id進行查找if(rdp.get()==nullptr){ELOG("收到響應%s,但是未找到對應的請求描述!",rid.c_str());return;}//異步請求把應答msg作為結果設置到promise中,讓future就緒if(rdp->rtype==RType::REQ_ASYNC){rdp->response.set_value(msg);//promise.set_value(value); 手動設置值 讓std::future<BaseMessage::ptr>變為就緒。}//回調請求 有回調函數就進行調用else if(rdp->rtype==RType::REQ_CALLBACK){if(rdp->callback) rdp->callback(msg);}elseELOG("請求類型未知");//收到應答 刪除rid對應的請求描述delDescribe(rid);}//1.異步請求發送bool send(const BaseConnection::ptr&conn,const BaseMessage::ptr&req,AsyncResponse&async_rsp){//創建請求描述對象(newDescribe內部完成插入map)RequestDescribe::ptr rdp=newDescribe(req,RType::REQ_ASYNC);if(rdp.get()==nullptr){ELOG("構建請描述對象失敗");return false;}//get_future()關聯std::future<>async_rsp 和 std::promise<>response//promise.set_value(value) 被調用,就能async_rsp.get()獲取值async_rsp=rdp->response.get_future();return true;}//2.同步請求發送(發送完請求后,立刻調用get()阻塞等待set_value()設置后獲取結果)//可以在上層進行get()阻塞等待,也是同樣效果bool send(const BaseConnection::ptr&conn,const BaseMessage::ptr&req,BaseMessage::ptr&rsp){AsyncResponse rsp_future;bool ret=send(conn,req,rsp_future);if(ret==false) return false;rsp=rsp_future.get();//阻塞等待值就緒return true;}//3.回調請求發送bool send(const BaseConnection::ptr&conn,const BaseMessage::ptr&rep,RequestCallback&cb){//創建請求描述對象(newDescribe內部完成插入map)RequestDescribe::ptr rdp=newDescribe(rep,RType::REQ_CALLBACK,cb);if(rdp.get()==nullptr){ELOG("構建請描述對象失敗");return false;}conn->send(rep);return true;}private://1.新增RequestDescribe::ptr newDescribe(const BaseMessage::ptr&req,RType rtype,const RequestCallback&cb=RequestCallback()){std::unique_lock<std::mutex> lock(_mutex);//構建請求描述對象 并插入到mapRequestDescribe::ptr rd=std::make_shared<RequestDescribe>();rd->request=req;rd->rtype=rtype;if(rtype==RType::REQ_CALLBACK&&cb)rd->callback=cb;_request_desc.insert(std::make_pair(req->rid(),rd));//插入到mapreturn rd;}//2.查找RequestDescribe::ptr getDescribe(const std::string&rid){std::unique_lock<std::mutex> lock(_mutex);auto it=_request_desc.find(rid);if(it==_request_desc.end()){return RequestDescribe::ptr();}return it->second;}//3.刪除void delDescribe(const std::string&rid){std::unique_lock<std::mutex> lock(_mutex);_request_desc.erase(rid);}private:std::mutex _mutex;std::unordered_map<std::string,RequestDescribe::ptr> _request_desc;//id->請求消息};
}
}
用戶發起請求│▼
Requestor::send(...)│▼
創建 RequestDescribe (含回調 or promise)│▼
加入 map<rid, describe>│▼
發送消息給服務器?(服務器響應)Requestor::onResponse(...)│▼找回 describe -> 判斷 rtype├── CALLBACK → 執行 callback└── ASYNC    → set promise▼清除 map<rid, describe>

四.客戶端RpcCaller實現

1. 構造函數 RpcCaller(const Requestor::ptr&)

  • 初始化傳入一個 Requestor 實例;

  • Requestor 負責發送消息、注冊回調、接收服務端響應等;

  • RpcCaller 是調用層,Requestor 是通信層。

2. 同步調用接口

bool call(const BaseConnection::ptr& conn,const std::string& method,const Json::Value& params,Json::Value& result)

1.先構建RpcRequest請求

2.調用requestor->同步send(),同步阻塞發送請求,并拿到rsp_msg。

3.將rsp_msg的Basemessage類型轉換為 RpcResponse,取出正文結果 result();

3. 異步 Future 調用接口

bool call(const BaseConnection::ptr& conn,const std::string& method,const Json::Value& params,std::future<Json::Value>& result)
  • 創建 promise 對象,用于異步回填響應;

  • 通過 shared_ptr 管理 promise 生命周期,綁定到回調函數中;

  • 通過 _requestor->send(...) 注冊異步回調;

  • 回調觸發后由 Callback() 設置 promise.set_value()

  • 調用方使用返回的 future 進行 .get() 即可拿到結果。

4. 異步回調接口

bool call(const BaseConnection::ptr& conn,const std::string& method,const Json::Value& params,JsonResponseCallback& cb)
  • 直接注冊用戶定義的回調 cb 到請求;

  • 內部通過 Callback1() 做包裝處理:

    • 類型轉換為 RpcResponse

    • 錯誤處理

    • 最后調用用戶傳入的 cb(result) 傳回結果。

#include "requestor.hpp"namespace wws
{
namespace client
{class RpcCaller{public:using ptr=std::shared_ptr<RpcCaller>;using JsonAsyncResponse=std::future<Json::Value>;using JsonResponseCallback=std::function<void(const Json::Value&)>;//requestor中的處理是針對BaseMessage進行處理的(因為要對所有請求進行處理,不單單對Rpc請求處理)//用于在rpc caller中針對結果的處理是對RpcResponse里邊的result進行的RpcCaller(const Requestor::ptr &requestor):_requestor(requestor){}//1.同步調用  1.連接conn 2.方法名method 3.方法參數params 4.結果resultbool call(const BaseConnection::ptr&conn,const std::string&method,const Json::Value&params,Json::Value&result){//1.組織請求auto req_msg=MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);BaseMessage::ptr rsp_msg;//2.發送請求 因為send()是重載函數 參數的類型必須保持一致(req_msg子類RpcRequest->基類BaseMessage)bool ret=_requestor->send(conn,std::dynamic_pointer_cast<BaseMessage>(req_msg),rsp_msg);if(ret==false){ELOG("同步Rpc請求失敗");return false;}//3.等待響應 響應信息存放在rsp_msg此時是Base基類,需要轉成RpcResponse應答auto rpc_rsp_msg=std::dynamic_pointer_cast<RpcResponse>(rsp_msg);if(!rpc_rsp_msg){ELOG("Rpc響應 向下類型轉換失敗");return false;}if(rpc_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("Rpc同步請求出錯:%s",errReason(rpc_rsp_msg->rcode()));return false;}result=rpc_rsp_msg->result();//返回響應消息里面的正文return true;}//2.異步 Future 調用 向服務器發送異步回調請求 設置回調函數 //回調函數中傳入一個promise對象 在回調函數中對promise設置數據//異步請求返回的是BaseMessage對象,用戶想要的是message里面正文的結果Valuebool call(const BaseConnection::ptr&conn,const std::string&method,const Json::Value&params,std::future<Json::Value>&result){auto req_msg=MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);// //這個json_promise對象是一個局部變量,等出了call作用域就會消失// //與它關聯的future result在外部get()獲取結果時,會異常// std::promise<Json::Value> json_promise;// result=json_promise.get_future();//future和promise建立關聯 以后future.get()獲取結果auto json_promise=std::make_shared<std::promise<Json::Value>>();result=json_promise->get_future();//std::bind() 對json_promise傳值傳參,shared_ptr引用計數 +1 此時引用計數==2//退出call作用域 引用計數-- 再等callback被觸發完畢并釋放后引用計數--,才會析構//shared_ptr引用計數是否加1,只和bind對json_promise指針的捕獲方式有關,與函數的參數聲明是否引用json_promise指針無關Requestor::RequestCallback cb=std::bind(&RpcCaller::Callback,this,json_promise,std::placeholders::_1);bool ret=_requestor->send(conn,std::dynamic_pointer_cast<BaseMessage>(req_msg),cb);if(ret==false){ELOG("異步Rpc請求失敗");return false;}return true;}//3.異步回調bool call(const BaseConnection::ptr&conn,const std::string&method,const Json::Value&params,JsonResponseCallback&cb){auto req_msg=MessageFactory::create<RpcRequest>();req_msg->setId(UUID::uuid());req_msg->setMType(MType::REQ_RPC);req_msg->setMethod(method);req_msg->setParams(params);Requestor::RequestCallback req_cb = std::bind(&RpcCaller::Callback1,this, cb, std::placeholders::_1);bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), req_cb);if (ret == false){ELOG("異步Rpc請求失敗");return false;}return true;}private:void Callback1(const JsonResponseCallback&cb,const BaseMessage::ptr&msg){//先判斷結果對不對auto rpc_rsp_msg=std::dynamic_pointer_cast<RpcResponse>(msg);if(!rpc_rsp_msg){ELOG("Rpc響應 向下類型轉換失敗");return ;}if(rpc_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("Rpc回調請求出錯:%s",errReason(rpc_rsp_msg->rcode()));return ;}cb(rpc_rsp_msg->result());}//const BaseMessage::ptr&msg Request參數是拿到響應后傳遞的void Callback(std::shared_ptr<std::promise<Json::Value>>result,const BaseMessage::ptr&msg){auto rpc_rsp_msg=std::dynamic_pointer_cast<RpcResponse>(msg);if(!rpc_rsp_msg){ELOG("Rpc響應 向下類型轉換失敗");return ;}if(rpc_rsp_msg->rcode() != RCode::RCODE_OK){ELOG("Rpc異步請求出錯:%s",errReason(rpc_rsp_msg->rcode()));return ;}//promise.set_value()設置正文結果     result->set_value(rpc_rsp_msg->result());}private:Requestor::ptr _requestor;};
}
}

為什么不讓 Requestor::send() 直接處理 RpcRequest,而是讓它只處理 BaseMessage,由 RpcCaller 來封裝具體業務(如 RpcRequest/RpcResponse)?

1.因為要對所有請求進行處理,不單單對Rpc請求處理。還可以處理主題 服務等消息類型

2.解耦業務協議與通信通道,只做消息傳遞與回調處理。

模塊職責
Requestor發送消息、注冊和觸發回調,處理響應派發
RpcCaller構造 RPC 請求、解析 RPC 響應,組織業務邏輯

為什么 bind 捕獲 shared_ptr 能延長 promise 的生命周期?

shared_ptr+bind(值捕獲)+外部拷貝保存

本質就是:

只要某個 shared_ptr 指向的對象,引用計數 不為 0,那它就不會被銷毀

所以我們通過創建另一個生命周期更長的 shared_ptr(比如通過 bind() 值捕獲并在外部保存),來延長這段資源的生命周期。

(不建議std::ref()引用捕獲?如果外部shared_ptr_obj的被銷毀,cb 里的引用就變成懸垂指針

auto cb = std::bind(&func, std::ref(shared_ptr_obj));

如果std::promise<Json::Value> json_promise;直接創建一個promise對象,這是一個局部對象等出了call函數(離開作用域),就會析構。

這就會導致與它關聯的future result在外部get()獲取結果時,會異常。

auto json_promise=std::make_shared<std::promise<Json::Value>>();創建promise對象并用shared指針來管理它,雖然出了call函數 引用計數-- 為0還是會析構。

但我們在bind綁定的時候,對json_promise進行了傳值,拷貝了一份 shared_ptr 對象 進入 bind 內部的閉包,引用計數+1.

auto cb = std::bind(&RpcCaller::Callback,this,json_promise,   // 👈 這里復制了一份 shared_ptr,引用計數 +1std::placeholders::_1);

寫成void Callback(const std::shared_ptr<std::promise<Json::Value>>&result,const BaseMessage::ptr&msg) 接收引用(不增加引用計數)。對bind傳值引用計數+1無影響

如果是下面這種拷貝引用計數會再+1,但僅限Callback運行時,調用結束后立刻-1,只是暫時的。

所以說現在,在call函數中有被shared指針指向的promise對象引用計數為2,一個make_shared返回的,一個bind閉包值捕獲的。

bind閉包對象cb(auto cb=std::bind(...)),它本質不也是一個局部對象嗎?call函數結束不還是和另一個局部變量一樣會銷毀,promise的生命周期怎么會延長呢?

call函數中的cb是否銷毀已經無關,因為在函數的外部已經拷貝保存了一份了。

它通常會被傳出 call() 函數,交給 Requestor::send() 并被保存在異步響應回調表里!

send()會調用newDescibe,把閉包對象存入rd請求描述對象中,再插入到map進行統一管理。

(注意newDescribe雖然是const&獲取的cb 不增加引用計數,但這根本無關緊要,因為rd->callback=cb;會把它拷貝到了 RequestDescribe::callback 中,這是才生命周期延長的關鍵)

等到onResponse收到應答并處理完,就delDescribe()刪除對應請求描述信息,保存的回調函數cb也析構,里面保存的json_promise也會析構,所以說ptr+bind值捕獲+外部保存

保證了 json_promise 會存活到 callback 被調用。

總結:

1.Callback(param) 中的 shared_ptr 是按值傳遞,因此會導致引用計數 +1,

但這個 +1 的生命周期僅限于函數執行期間,Callback() 結束后 param 被銷毀,引用計數立即 -1。

2.而 bind(...) 捕獲 shared_ptr 時(通過值捕獲),會將其拷貝一份保存在閉包對象中(如 auto cb),

這會導致引用計數 +1,無論是否調用 Callback,引用計數都存在,直到閉包對象銷毀為止

那閉包對象什么時候銷毀呢?delDescribe()

3.需要注意的是,cb 本身是 call() 函數內的局部變量,但它并不會隨著 call() 結束而失效。

雖然 call() 返回后 cb 變量本身會被銷毀,但在此之前 cb 已經被作為參數傳入 _requestor->send(),

并被內部保存到了 _request_desc 表中,作為 RequestDescribe 的一部分長期持有。

4.也就是說:call() 中定義的 cb 雖然是局部變量,但它在作用域結束前被拷貝并傳出,生命周期已經被延長。

所以 cb 中捕獲的 json_promise(shared_ptr)依然活著,call 函數中的 cb 就算析構,也不影響閉包內部捕獲的 promise 的生命周期。

5.只有當服務端響應到達,回調被觸發后,執行 cb(msg),再通過 delDescribe(rid) 刪除請求描述對象rd保存的回調函數cb閉包進行析構,值捕獲的json_promise被釋放,引用計數==0 promise 被析構.

6.相比之下,如果只是創建了一個局部的 shared_ptr(比如 json_promise),沒有通過 bind、lambda、線程等傳出去,

它就會在函數結束時被立即析構,future 將失效,調用 .get() 會拋出異常(broken promise)。

? 因此,雖然 cb 是局部變量,但它在 call() 結束前被拷貝傳出,生命周期被框架托管,保證了回調所需的 promise 安全存活。

這是異步通信框架中延長資源生命周期的關鍵機制,確保異步流程完整閉環。

局部變量當作參數傳參,并不會延長它的生命周期; 它的生命周期仍然只屬于它原來的作用域; 如果你想延長生命周期,必須使用 shared_ptr,并在外部再持有一份拷貝!

舉個例子:

std::shared_ptr<int> create() {auto p = std::make_shared<int>(100);  // p 是局部變量return p;  // ? 返回值是“拷貝一份”,原來的 p 會被銷毀,但返回值還持有控制塊
}auto x = create(); // 返回值拷貝給 x,資源不會被銷毀
情況局部變量會不會被銷毀解釋
普通局部變量(無指針托管)? 會作用域結束立即銷毀
傳值作為函數參數? 會參數是拷貝,原變量不延長
返回 shared_ptr? 原變量銷毀,但返回值持有副本,資源不析構
shared_ptr 被 bind 捕獲? 不會(+1)延長生命周期直到閉包結束

服務端測試代碼

相較于之前實現的客戶端,我們不再是直接給dispatcher傳業務層函數,而是傳RpcRouter的處理對應請求的回調函數,由該函數再從注冊到RpcRouter中的具體實現函數中找用戶需要的函數,并進行回調返回響應結果


#include "../common/message.hpp"
#include "../common/net.hpp"#include "../common/dispatcher.hpp"
#include "../server/rpc_router.hpp"void Add(const Json::Value&req,Json::Value&rsp)
{int nums1=req["nums1"].asInt();int nums2=req["nums2"].asInt();rsp=nums1+nums2;
}
int main()
{auto router=std::make_shared<wws::server::RpcRouter>();std::unique_ptr<wws::server::SDescribeFactory> desc_factory(new wws::server::SDescribeFactory());desc_factory->setMethodName("Add");desc_factory->setParamsDesc("nums1",wws::server::VType::INTEGRAL);desc_factory->setParamsDesc("nums2",wws::server::VType::INTEGRAL);desc_factory->setReturnType(wws::server::VType::INTEGRAL);desc_factory->setCallback(Add);//1.注冊Add函數到RpcRouterrouter->registerMethod(desc_factory->build());//bind綁定RpcRouter收到消息的回調函數->cbauto cb=std::bind(&wws::server::RpcRouter::onRpcRequest,router.get(),std::placeholders::_1,std::placeholders::_2);auto dispatcher=std::make_shared<wws::Dispatcher>();//2.把RpcRouter回調函數onRpcRequest設置到dispatcher中dispatcher->registerHandler<wws::RpcRequest>(wws::MType::REQ_RPC,cb);auto server=wws::ServerFactory::create(9090);auto message_cb=std::bind(&wws::Dispatcher::onMessage,dispatcher.get(),std::placeholders::_1,std::placeholders::_2);//3.把Dispatcher的回調函數onMessage設置到server中server->setMessageCallback(message_cb);server->start();return 0;
}

從圖示上可以看到,整個鏈路依次是:

  1. server 收到消息

  2. 調用 messageCallback

  3. 觸發 onMessage 進入 dispatcher

  4. Dispatcher 發現消息類型是 REQ_RPC,調用 RpcRouter::onRpcRequest

  5. Router 找到 Add 方法并調用其回調函數

  6. Add 函數執行計算并返回結果

  • server->setMessageCallback(message_cb);

    • 這里的 message_cb 是通過 std::bind(&wws::Dispatcher::onMessage, dispatcher.get(), ...) 綁定的。

    • 當服務器收到任何消息時,就會調用 dispatcher->onMessage(...)

  • dispatcher->onMessage(...)

    • Dispatcher 根據消息的類型(這里為 REQ_RPC)找到先前注冊的回調處理函數

    • 這一步對應代碼 dispatcher->registerHandler<wws::RpcRequest>(wws::MType::REQ_RPC, cb);

    • 所以當消息類型匹配到 REQ_RPC 時,就執行 cb

  • cb -> std::bind(&wws::server::RpcRouter::onRpcRequest, router.get(), ...)

    • 這個 cb 本質上就是對 RpcRouter::onRpcRequest 的一次包裝。

    • 調用 cb 時,實際上就是執行 router->onRpcRequest(...)

  • RpcRouter::onRpcRequest(...)

    • 在路由器里,根據請求中的“方法名稱”(比如 "Add")找到對應的回調函數

    • 此處就是在之前 router->registerMethod(...) 時注冊的 Add 方法回調。

  • 調用 Add(const Json::Value& req, Json::Value& rsp)

    • 最終執行我們自定義的邏輯(如取 nums1nums2,相加后存入 rsp)。

客戶端測試代碼

#include "../common/dispatcher.hpp"
#include "../client/requestor.hpp"
#include "../client/rpc_caller.hpp"
#include <thread>
#include <chrono>void callback(const Json::Value &result) {DLOG("callback result: %d", result.asInt());
}int main()
{auto requestor=std::make_shared<wws::client::Requestor>();auto caller=std::make_shared<wws::client::RpcCaller>(requestor);auto dispatcher=std::make_shared<wws::Dispatcher>();auto rsp_cb=std::bind(&wws::client::Requestor::onResponse,requestor.get(),std::placeholders::_1,std::placeholders::_2);//wws::RpcResponse->wws::BaseMessage //rsp_cb綁定的函數參數為Requestor::onResponse(const BaseConnection::ptr&conn,const BaseMessage::ptr&msg)//而registerHandler注冊需要的函數類型 std::function<void(const BaseConnection::ptr&conn,std::shared_ptr<T> &msg)>; 第二個參數必須也為BaseMessage::ptr,所以T傳BaseMessagedispatcher->registerHandler<wws::BaseMessage>(wws::MType::RSP_RPC,rsp_cb);auto client=wws::ClientFactory::create("127.0.0.1",9090);auto message_cb=std::bind(&wws::Dispatcher::onMessage,dispatcher.get(),std::placeholders::_1,std::placeholders::_2);client->setMessageCallback(message_cb);client->connect();auto conn=client->connection();//1.同步調用Json::Value params,result;params["nums1"]=11;params["nums2"]=22;bool ret=caller->call(conn,"Add",params,result);if(ret!=false){DLOG("result: %d", result.asInt());}//2.異步Futurewws::client::RpcCaller::JsonAsyncResponse res_future;params["nums1"]=33;params["nums2"]=44;ret=caller->call(conn,"Add",params,res_future);if(ret!=false){result=res_future.get();DLOG("result: %d", result.asInt());} //3.回調params["nums1"]=55;params["nums2"]=66;ret = caller->call(conn,"Add",params, callback);DLOG("-------\n");std::this_thread::sleep_for(std::chrono::seconds(1));client->shutdown();return 0;
}
  1. RpcCaller 調用 call("Add", params, rsp)

    • 用戶在業務代碼里直接寫 caller->call(...),想要調用服務端的 “Add” 方法并等待結果。

  2. RpcCaller 內部執行 “AddDesc(...)”

    • call(...) 方法內部會先構造一個帶唯一請求ID的 RpcRequest,然后調用 Requestor 的相關方法(示意中稱作 AddDesc)來“登記”該請求:

      • 存儲該請求ID

      • 記錄調用類型(同步/異步/回調)

      • 如果是回調式,還會記錄用戶傳入的回調函數

  3. 發送請求到服務端

    • Requestor 記完請求描述后,就會通過網絡連接將 RpcRequest 發送給遠程服務端。

  4. 服務端處理并返回 RpcResponse

    • 當服務端收到 “Add” 請求后,進行實際的加法運算或其他業務邏輯,然后打包 RpcResponse 返回給客戶端。

  5. 客戶端接收響應 -> 分發到 Requestor::onResponse()

    • 客戶端網絡層讀到響應后,先通過 Dispatcher 分發,根據消息類型(如 RSP_RPC)找到之前綁定的回調,即 Requestor::onResponse(...)

    • Requestor::onResponse() 根據響應里的 “請求ID=111” 查到對應的“請求描述 (desc)”,確定是哪個請求、用什么方式處理(同步阻塞喚醒或執行用戶回調等),并把結果交給調用方。

  6. RpcCaller 最終返回結果給用戶

    • 對于異步調用 call(...),當響應到來時,onResponse() 會調用? ? ? ? ? ? ? ? ? ? ? ? rdp->response.set_value(msg),把響應 msg 設置到 promise 中。用戶future.get()獲取結果。

    • 若是回調式調用,則 Requestor::onResponse()直接執行用戶的回調函數,把結果帶給用戶。

Requestor 和 RpcCaller 的關系:

  • Requestor 負責管理請求–響應映射。它用一個請求描述(包含唯一 ID、請求類型和處理機制)來記錄每次發送的請求。當響應到來時,根據請求 ID 查找描述,

    • 如果是異步請求(REQ_ASYNC),則調用 promise.set_value(msg) 使關聯 future 就緒;

    • 如果是回調請求(REQ_CALLBACK),則直接調用用戶注冊的回調函數。

  • RpcCaller 則對外提供 RPC 調用接口。它負責構造 RPC 請求(設置方法名、參數、生成請求 ID),并調用 Requestor 的 send() 方法來登記請求并發送消息。用戶可以選擇同步(阻塞等待 future.get())、異步(返回 future)或回調式調用。

二者協同工作:RpcCaller 構造并發送請求,而 Requestor 負責匹配響應并將結果傳遞給上層。

RpcCaller用戶調用的接口,用戶傳入要調用的對象 參數 方式,根據方式(同步 異步 回調)的不同選擇不同的call進行調用,1.先根據參數構建請求消息2.調用Requester里面對應的send()。

Requester中send()會先構建請求描述對象(call傳入的請求消息 請求類型...) 并建立請求-id間的映射(等收到應答時根據id找到對應的請求描述對象),再完成發送。

等服務端返回應答,Dispatcher根據消息的類型,找到client的Requestor中onResponse處理應答的回調函數,它根據id找到對應的請求描述,再根據請求描述中的類型,進行set_value設置結果或者callback()調用回調。最后刪除該請求描述

set_value后,get()獲取到結果,阻塞結束返回上層再返回到call進行檢查并返回結果

RpcCaller::call() 的三種調用方式流程

同步調用 (call(conn, method, params, result))

客戶端調用call,把連接 函數名 參數 用于獲取結果的Value對象

進入同步調用call. 先根據傳入的參數組織請求(里面設置了請求類型REQ_RPC)

? ? ? ? 調用Requester中send()發送請求

?using AsyncResponse=std::future<BaseMessage::ptr>;先創建一個future對象用于后面get()阻塞獲取結果。再調用異步send(),因為異步和同步的區別只是在于用戶什么時候get()阻塞獲取結果,異步+立刻get()==同步。

再看看異步send()

先調用newDescribe,里面會創建請求描述對象(傳入回調函數會設置回調函數)和UUID建立映射關系用map管理起來。

conn->send()發送請求。

之后服務端進行處理,返回應答,server::messageCallback->Disoatcher::onMessage->根據請求類型找到對應的回調函數,RSP_RPC對應的就是requestor::onResponse()處理應答的回調函數。

?進入onResponse,先根據UUID找到對應的請求描述,根據請求描述的類型,看是異步(同步里面調用的異步),還是回調,進行相對的處理。

是同步,就set_value設置結果。


設置完結果,futrue就緒get()獲取結果

上層的result就獲取到了結果,進行輸出。

RpcCaller::call(conn, method, params, result)  // 同步調用└──> Requestor::send(conn, req, rsp)       // 調用同步版本 send()└──> send(conn, req, rsp_future)     // 調用異步 Future 版本 send()└──> 創建 RequestDescribe 并存儲└──> 阻塞等待 future.get()└──> Dispatcher::onMessage(conn, msg)└──> Requestor::onResponse(conn, msg)└──> rdp->response.set_value(msg) // future.set_value() 解除阻塞└──> 解析 RpcResponse 并返回 result  // get() 獲取結果并返回

經過的關鍵函數:

  • RpcCaller::call(conn, method, params, result)

  • Requestor::send(conn, req, rsp)

  • Requestor::send(conn, req, rsp_future) (異步版本)

  • Dispatcher::onMessage(conn, msg)

  • Requestor::onResponse(conn, msg)

  • rdp->response.set_value(msg)

  • future.get() 解除阻塞

異步 Future 調用 (call(conn, method, params, future))

不傳Json::Value result直接獲取結果,std::future<Json::Value> res_futre,讓用戶自己get()獲取結果。

result=json_promise->get_future();管理promise,set_value后用戶就可以get()獲取結果。

還綁定了Callback,傳入回調函數cb,創建請求描述對象時設置回調函數cb

此時設置的類型為REQ_CALLBACK,收到應答,找到對應請求描述,會調用設置的回調函數cb

cb bind綁定的是Callbcak函數 它會set_value設置結果,讓用戶的future就緒,get()獲取結果

RpcCaller::call(conn, method, params, future)  // 異步 Future 調用└──> 創建 RpcRequest 并設置參數└──> 創建 std::promise<Json::Value> 和 future 關聯└──> 綁定 Callback (關聯 promise 和 result)└──> Requestor::send(conn, req, cb)       // 傳入回調函數 cb,onResponse() 解析后觸發└──> 創建 RequestDescribe 并存儲└──> 發送請求,不阻塞└──> Dispatcher::onMessage(conn, msg)      // 收到服務端響應└──> Requestor::onResponse(conn, msg)└──> 通過 rid 查找 RequestDescribe└──> Callback 觸發 set_value(msg) // 解析結果并設置 promise└──> future.get() 解除阻塞,獲取結果       // 用戶上層調用 future.get() 阻塞獲取結果

經過的關鍵函數:

  • RpcCaller::call(conn, method, params, res_future)

  • Requestor::send(conn, req, cb)

  • Requestor::onResponse(conn, msg)

  • rdp->callback(msg)

  • Callback 解析 msgset_value()

  • res_future.get() 解除阻塞

異步回調調用 (call(conn, method, params, cb))

回調,不直接設置結果,而是調用用戶傳來的函數(callback)。

call異步回調,和異步futrue一樣設置回調函數Callback1.(但回調函數不同)

?也是調用的這個send()?接下來也是 調用請求描述對象中設置的回調函數即Callback1?而Callback1不和Callback一樣set_value設置結果,而是調用上層傳來的函數,它返回結果給用戶。

RpcCaller::call(conn, method, params, cb)     // 異步回調調用└──> 創建 RpcRequest 并設置參數└──> 綁定 Callback1└──> Requestor::send(conn, req, cb)       // 調用異步回調版本 send()└──> 創建 RequestDescribe 并存儲└──> 發送請求,不阻塞└──> Dispatcher::onMessage(conn, msg)      // 收到服務端返回消息└──> Requestor::onResponse(conn, msg)└──> 通過 rid 查找 RequestDescribe└──> Callback1 觸發用戶自定義的 cb(msg)└──> 用戶自定義的回調函數解析結果
  • RpcCaller::call(conn, method, params, cb)

  • Requestor::send(conn, req, cb)

  • Requestor::onResponse(conn, msg)

  • rdp->callback(msg) 觸發 Callback1

  • 用戶自定義的 callback(msg) 解析結果

五.注冊中心---服務端rpc_registry

服務端如何實現服務信息的管理:

服務端需要1.提供注冊服務2.發現的請求業務處理

1.需要將 哪個服務 能夠由 哪個主機提供 管理起來 hash<method,vector<provide>>

? ? ? ? 進行服務發現時,能返回誰能提供指定服務

2.需要將 哪個主機 發現過 哪個服務 管理起來

? ? ? ? 當進行服務通知的時候,能通知給對應發現者 <method,vector<discoverer>>

3.需要 哪個連接 對應 哪個服務提供者 管理起來 hash<conn,provider>

? ? ? ? 當一個連接斷開時 能知道哪個主機的哪些服務下線了,然后才能給發現者通知xxx的xxx服務下線了。

4.需要 哪個連接 對應 哪個服務發現者 管理起來 hash<conn.discoverer>

? ? ? ? 當一個連接斷開時 如果有服務上下線 就不需要給它進行通知了

  • 1?? ProviderManager(服務提供者管理)

  • 維護服務提供者信息,進行服務注冊、刪除和查詢。

  • 提供 addProvider()delProvider()getProvider()methodHosts() 等方法。


  • 2?? DiscovererManager(服務發現者管理)

  • 維護服務發現者信息,進行服務發現、刪除和通知。

  • 提供 addDisecoverer()delDisoverer()onlineNotify()offlineNotify() 等方法。


  • 3?? PDManager(核心管理器)

  • 處理服務請求、注冊、發現、上線/下線通知以及連接斷開后的清理邏輯。

  • 提供 onServiceRequest()onConnShutdown() 等核心邏輯。

  • 處理服務的響應,包括:

    • registryResponse():服務注冊應答

    • discoverResponse():服務發現應答

    • errorResponse():錯誤處理

#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <set> 
namespace wws
{
namespace server
{//服務提供者class ProviderManager{public:using ptr=std::shared_ptr<ProviderManager>;struct Provider{using ptr=std::shared_ptr<Provider>;std::mutex _mutex; BaseConnection::ptr conn;Address host; //主機信息ip+portstd::vector<std::string> methods; //所有提供的方法Provider(const BaseConnection::ptr&c,const Address&h):conn(c),host(h){}void appendMethod(const std::string&method){std::unique_lock<std::mutex> lock(_mutex);methods.emplace_back(method);}};//新的服務提供者進行服務注冊void addProvider(const BaseConnection::ptr&c,const Address &h,const std::string&method){Provider::ptr provider;{std::unique_lock<std::mutex> lock(_mutex);auto it=_conns.find(c);//找連接對應的提供者if(it!=_conns.end()){provider=it->second;}//找不到就創建 并新增連接->提供者else{provider=std::make_shared<Provider>(c,h);_conns.insert(std::make_pair(c,provider));}//method方法被哪些提供者提供 增加提供者auto &providers=_providers[method];providers.insert(provider);}//提供者內更新記錄 能提供的方法provider->appendMethod(method);}//服務提供者斷開連接時 獲取它的信息 用于服務下線通知Provider::ptr getProvider(const BaseConnection::ptr&c){std::unique_lock<std::mutex> _mutex;auto it=_conns.find(c);if(it==_conns.end())return Provider::ptr();return it->second;}//服務提供者斷開連接時 刪除它的關聯信息void delProvider(const BaseConnection::ptr&c)//連接->提供者->所有提供方法{std::unique_lock<std::mutex> _mutex;auto it=_conns.find(c);if(it==_conns.end())return;for(auto&method:it->second->methods)//找到該服務提供者的所有方法{auto&providers=_providers[method];//根據方法 從提供該方法的提供者中再進行刪除//1.providers容器是vectro版本// //手動查找并按迭代器刪除 providers.erase(it->second)// //但是 erase(it->second) 并不會起到按值刪除的作用,因為 erase() 按值刪除時,只有 std::vector 在 C++20 之后才引入 erase 和 erase_if// auto provider_it = std::find(providers.begin(), providers.end(), it->second);// if (provider_it != providers.end())// {//     providers.erase(provider_it);// }//set 直接支持 erase(it->second)providers.erase(it->second);}//刪除連接與服務提供者的關系_conns.erase(it);}//返回 method對應的提供者std::vector<Address> methodHosts(const std::string &method){std::unique_lock<std::mutex> lock(_mutex);auto it = _providers.find(method);if (it == _providers.end())return std::vector<Address>();std::vector<Address> result(it->second.begin(), it->second.end());return result;}private:std::mutex _mutex;std::unordered_map<std::string,std::set<Provider::ptr>> _providers;//方法->提供者主機std::unordered_map<BaseConnection::ptr,Provider::ptr> _conns;//連接->提供者}; //服務發現者class DiscovererManager{public:using ptr=std::shared_ptr<DiscovererManager>;struct Discoverer{using ptr=std::shared_ptr<Discoverer>;std::mutex _mutex;BaseConnection::ptr conn;//發現者關聯的客戶端std::vector<std::string> methods;//發現過的服務Discoverer(const BaseConnection::ptr&c):conn(c){}void appendMethod(const std::string&method){std::unique_lock<std::mutex> lock(_mutex);methods.push_back(method);}};//當客戶端進行服務發現的時候新增發現者 新增服務名稱?Discoverer::ptr addDisecoverer(const BaseConnection::ptr&c,const std::string &method){Discoverer::ptr discoverer;{std::unique_lock<std::mutex> lock(_mutex);//找連接對應的服務發現者auto it=_conns.find(c);if(it!=_conns.end()){discoverer=it->second;}else{discoverer=std::make_shared<Discoverer>(c);_conns.insert(std::make_pair(c,discoverer));}//method方法被哪些發現者發現了 增加發現者auto &discoverers=_discoverers[method];discoverers.insert(discoverer);}//在發現者中 增加已經發現的方法discoverer->appendMethod(method);return discoverer;}//發現者不需要被get() 發現者下線不需要通知 所以不需要進行get()獲取對象后進行下線通知//發現者客戶端斷開連接時 找到發現者信息 刪除關聯信息void delDisoverer(const BaseConnection::ptr&c){std::unique_lock<std::mutex> _mutex;auto it=_conns.find(c);if(it==_conns.end())return;//找到發現過的方法for(auto&method:it->second->methods){//從發現過method的所有發現者 中找要進行刪除的發現者auto&discoverers=_discoverers[method];discoverers.erase(it->second);}//刪除conn->discoverer_conns.erase(it);}//新的服務提供者上線 上線通知void onlineNotify(const std::string &method,const Address&host){notify(method,host,ServiceOptype::SERVICE_ONLINE);}//服務提供者斷開連接 下線通知void offlineNotify(const std::string &method,const Address&host){notify(method,host,ServiceOptype::SERVICE_OFFLINE);}private:void notify(const std::string &method,const Address&host,wws::ServiceOptype optype){std::unique_lock<std::mutex> _mutex;//先判斷該方法有沒有被人發現過 auto it=_discoverers.find(method);//沒有就不用進行任何處理if(it==_discoverers.end())return;//對發現過該方法的發現者一個個進行通知auto msg_req=MessageFactory::create<ServiceRequest>();msg_req->setHost(host);msg_req->setId(UUID::uuid());msg_req->setMType(wws::MType::REQ_SERVICE);// 服務請求(注冊、發現、上線、下線)msg_req->setMethod(method);msg_req->setOptype(optype);//服務操作類型 for(auto&discoverers:it->second){discoverers->conn->send(msg_req);}}private:std::mutex _mutex;std::unordered_map<std::string,std::set<Discoverer::ptr>> _discoverers;//該方法被哪些發現者發現了std::unordered_map<BaseConnection::ptr,Discoverer::ptr> _conns;//連接->發現者(連接斷開->對應發現者->刪除vector中的發現者)};class PDManager{public:using ptr=std::shared_ptr<PDManager>;//處理服務請求 并返回應答void onServiceRequest(const BaseConnection::ptr&conn,const ServiceRequest::ptr &msg) {//先判斷服務操作請求:服務注冊/服務發現auto optype=msg->optype();if(optype==ServiceOptype::SERVICE_REGISTRY)//服務注冊{//1.新增服務提供者 _providers->addProvider(conn,msg->host(),msg->method());//2.對該方法的發現者進行服務上線通知_discoverers->onlineNotify(msg->method(),msg->host());//3.返回應答return registryResponse(conn,msg);}else if(optype==ServiceOptype::SERVICE_DISCOVERY)//服務發現{//新增服務發現者 _discoverers->addDisecoverer(conn,msg->method());return discoverResponse(conn,msg);}else{ELOG("收到服務操作請求,但操作類型錯誤"); return errorResponse(conn,msg);}}//連接斷開void onConnShutdown(const BaseConnection::ptr&conn)//{//這個要斷開的連接1.提供者下線 2.發現者下線//1.獲取提供者信息 為空說明不是提供者auto provider=_providers->getProvider(conn);if(provider.get()!=nullptr){//提供者下線//1.提供者的每個方法都要下線 通知對應發現者for(auto&method:provider->methods){_discoverers->offlineNotify(method,provider->host);}//2.刪除對該提供者的管理_providers->delProvider(conn);}//2.到這 可能是發現者 就算不是會直接返回空//直接刪除對該發現者的管理_discoverers->delDisoverer(conn);}private://錯誤響應void errorResponse(const BaseConnection::ptr&conn,const ServiceRequest::ptr&msg){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(wws::MType::RSP_SERVICE);// 服務請求(注冊、發現、上線、下線)msg_rsp->setRcode(RCode::RCODE_INVALID_OPTYPE); //無效 msg_rsp->setOptype(ServiceOptype::SERVICE_UNKNOW);//服務操作類型 未知 conn->send(msg_rsp);}//注冊應答void registryResponse(const BaseConnection::ptr&conn,const ServiceRequest::ptr&msg){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(wws::MType::RSP_SERVICE);// 服務請求(注冊、發現、上線、下線)msg_rsp->setRcode(RCode::RCODE_OK);msg_rsp->setOptype(ServiceOptype::SERVICE_REGISTRY);//服務操作類型 注冊conn->send(msg_rsp);}//發現應答 method方法有哪些主機可以提供 void discoverResponse(const BaseConnection::ptr&conn,const ServiceRequest::ptr&msg){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid()); msg_rsp->setOptype(ServiceOptype::SERVICE_DISCOVERY);//服務操作類型 發現msg_rsp->setMType(wws::MType::RSP_SERVICE);// 服務請求(注冊、發現、上線、下線)std::vector<Address> hosts=_providers->methodHosts(msg->method());if(hosts.empty()){msg_rsp->setRcode(RCode::RCODE_NOT_FOUND_SERVICE);return conn->send(msg_rsp);}msg_rsp->setRcode(RCode::RCODE_OK);msg_rsp->setMethod(msg->method());msg_rsp->setHost(hosts);return conn->send(msg_rsp);}private:ProviderManager::ptr _providers;DiscovererManager::ptr _discoverers;};
}
}

📡 服務注冊流程

1?? 客戶端(Provider)通過 `registryMethod()` 發送 `SERVICE_REGISTRY` 請求
2?? `PDManager::onServiceRequest()` 處理注冊請求
3?? `ProviderManager::addProvider()` 注冊服務
4?? `DiscovererManager::onlineNotify()` 通知發現者服務上線
5?? `PDManager::registryResponse()` 發送注冊結果

🔍 服務發現流程

1?? 客戶端(Discoverer)通過 `addDisecoverer()` 發送 `SERVICE_DISCOVERY` 請求
2?? `PDManager::onServiceRequest()` 處理發現請求
3?? `DiscovererManager::addDisecoverer()` 記錄發現者
4?? `ProviderManager::methodHosts()` 獲取 `method` 對應的主機
5?? `PDManager::discoverResponse()` 發送發現結果

🔥 服務下線流程

1?? 連接斷開時觸發 `PDManager::onConnShutdown()`
2?? 通過 `ProviderManager::getProvider()` 獲取 `Provider`
3?? `DiscovererManager::offlineNotify()` 通知發現者服務下線
4?? `ProviderManager::delProvider()` 刪除 `Provider`

🕵??♂? 發現者下線流程

1?? 連接斷開時觸發 `PDManager::onConnShutdown()`
2?? 通過 `DiscovererManager::delDisoverer()` 刪除 `Discoverer`
3?? 清理 `_conns` 和 `_discoverers` 的映射

六.注冊中心---客戶端rpc_registry

客戶端的功能比較分離,注冊端和發現端根本就不在同一個主機上。

因此客戶端的注冊和發現功能是完全分開的。

1.作為服務提供者 需要一個能進行服務注冊的接口

? ? ? ? 連接注冊中心 進行服務注冊

2.作為服務發現者 需要一個能進行服務發現的接口,需要將獲取到的提供對應服務的主機信息管理起來 hash<method,vector<host>> 一次發現,多次使用,沒有的話再次進行發現。

? ? ? ? ? ? ? ? ? ? ? ? ? ? ?需要進行服務上線/下線通知請求的處理(需要向dispatcher提供一個請求處理的回調函數)

#pragma 
#include"requestor.hpp"namespace wws
{
namespace client
{// 服務提供者類:負責將服務注冊到服務注冊中心class Provider{public:using ptr=std::shared_ptr<Provider>;Provider(const Requestor::ptr&requestor):_requestor(requestor){}//進行服務注冊的接口bool registryMethod(const BaseConnection::ptr&conn,const std::string &method,const Address&host){// 1. 創建 ServiceRequest 請求消息auto msg_req=MessageFactory::create<ServiceRequest>();msg_req->setHost(host);msg_req->setId(UUID::uuid());msg_req->setMType(wws::MType::REQ_SERVICE);// 服務請求(注冊、發現、上線、下線)msg_req->setMethod(method);msg_req->setOptype(ServiceOptype::SERVICE_REGISTRY);//服務操作類型 注冊// 2. 發送請求并同步等待響應BaseMessage::ptr msg_rsp;bool ret=_requestor->send(conn,msg_req,msg_rsp);//同步請求if(ret==false){ELOG("%s服務注冊失敗",method.c_str());return false;}auto service_rsp=std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if(service_rsp.get()==nullptr){ELOG("響應類型向下轉換失敗");return false;}if(service_rsp->rcode()!=RCode::RCODE_OK){ELOG("服務注冊失敗 原因%s",errReason(service_rsp->rcode()));return false;}return true;}private:Requestor::ptr _requestor;//負責發送請求、接收響應};class MethodHost{public:using ptr=std::shared_ptr<MethodHost>;MethodHost(const std::vector<Address>&hosts):_hosts(hosts.begin(),hosts.end()),_idx(0){}void appendHost(const Address&host){//新的服務上線后進行調用std::unique_lock<std::mutex> lock(_mutex);_hosts.push_back(host);}void removeHost(const Address&host){//服務下線進行調用std::unique_lock<std::mutex> lock(_mutex);//vector刪除效率O(n)效率低,但更多的操作還是 隨機訪問[] 進行RR輪轉,所以vector是最合適的for(auto it=_hosts.begin();it!=_hosts.end();it++){if(*it!=host){_hosts.erase(it);break;}}}Address chooseHost(){std::unique_lock<std::mutex> lock(_mutex);size_t pos=_idx++ %_hosts.size();//1.pos=_idx%size 2._idx+=1return _hosts[pos];}bool empty(){std::unique_lock<std::mutex> lock(_mutex);return _hosts.empty();}private:std::mutex _mutex;size_t _idx;//當前 RR 輪轉索引//vector 提供了 O(1) 的索引訪問,可以快速實現RR輪轉機制。std::vector<Address> _hosts;};class Discoverer{public:Discoverer(const Requestor::ptr &requestor){}//服務發現的接口bool serviceDiscovery(const BaseConnection::ptr&conn,const std::string&method,Address&host){//當前有method方法對應的提供服務者 直接返回host地址{std::unique_lock<std::mutex> lock(_mutex);auto it=_method_hosts.find(method);if(it!=_method_hosts.end()){if (it->second->empty() == false){host = it->second->chooseHost();return true;}}}//當前沒有對應的服務者//1.構建服務發現請求auto msg_req=MessageFactory::create<ServiceRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_SERVICE);//消息類型msg_req->setMethod(method);msg_req->setOptype(ServiceOptype::SERVICE_DISCOVERY);BaseMessage::ptr msg_rsp;bool ret=_requestor->send(conn,msg_req,msg_rsp);if(ret==false){ELOG("服務發現失敗!");return false;}auto service_rsp=std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);if(!service_rsp.get()){ELOG("服務發現失敗! 響應類型轉換失敗");return false;}if(service_rsp->rcode()!=RCode::RCODE_OK){ELOG("服務發現失敗! 錯誤原因%s",errReason(service_rsp->rcode()).c_str());return false;}//2.看服務發現完后有沒有新提供者增加std::unique_lock<std::mutex> _mutex;auto method_host=std::make_shared<MethodHost>(service_rsp->hosts());if(method_host->empty()){ELOG("%s服務發現失敗!沒有能提供服務的主機",method.c_str());return false;}host=method_host->chooseHost();_method_hosts[method]=method_host;//更新method方法->提供者address(可能method方法已經在map中所以=賦值)return true;}//給Dispathcer模塊進行服務上線下線請求處理的回調函數void onServiceRequest(const BaseConnection::ptr&conn,const ServiceRequest::ptr&msg){//1.先判斷是上線/下線 都不是就不處理auto optype=msg->optype();std::string method=msg->method();std::unique_lock<std::mutex> lock(_mutex);//2.上線通知if(optype==ServiceOptype::SERVICE_ONLINE){auto it=_method_hosts.find(method);if(it==_method_hosts.end()){//該method方法不存在 創建MethodHost初始化并添加入map中auto method_host=std::make_shared<MethodHost>();method_host->appendHost(msg->host());_method_hosts[method]=method_host;}else{//存在直接加it->second->appendHost(msg->host());}}//3.下線通知else if(optype==ServiceOptype::SERVICE_OFFLINE){auto it=_method_hosts.find(method);if(it==_method_hosts.end()){//該method方法不存在 直接return//不需要把method方法從map中移除 前面已經判斷過map中method方法為空的情況return;}else{//存在直接刪除it->second->removeHost(msg->host());}}}private:std::mutex _mutex;std::unordered_map<std::string,MethodHost::ptr> _method_hosts;Requestor::ptr _requestor;}; 
}
}

1.客戶端中的provider服務提供者,有服務注冊接口registryMethod,構建服務注冊請求借助Requestor send()到服務端的的注冊接口。收到應答后判斷應用是否正確。

2.客戶端的discoverer服務發現者,有服務發現接口serviceDiscovery,先開當前有沒有method方法對應的提供者,有就返回一個提供者。沒有就構建服務發現請求,并檢查是否有誤,無錯誤后 再判斷是否有新增的提供者,有就返回提供者并更新method->提供者的映射,沒有就直接返回.

3.MethodHost類 管理一個method方法對應的所以提供者的主機信息,并提供RR輪轉功能,按順序返回method方法對應的提供者。

1?? Provider 類:服務提供者

  • 作用:

    • 負責將服務注冊到服務注冊中心。

  • 主要功能:

    • registryMethod()

      • 發送 ServiceRequest 注冊請求到服務中心。

      • 檢查 msg_rsp 響應是否成功注冊。

    • 內部成員:

      • _requestor

        • Requestor::ptr 類型,負責請求發送和接收響應。


2?? MethodHost 類:服務主機管理

  • 作用:

    • 維護多個 Address 主機地址,并提供負載均衡(RR 輪轉)功能。

  • 主要功能:

    • appendHost():新增一個服務主機地址。

    • removeHost():移除一個服務主機地址。

    • chooseHost()

      • 通過 RR 輪轉選擇一個主機地址。

    • empty():判斷是否為空。

  • 內部成員:

    • _mutex:互斥鎖保護 _hosts 訪問安全。

    • _idx:當前 RR 輪轉索引。

    • _hostsstd::vector<Address> 存儲主機地址列表。


3?? Discoverer 類:服務發現者

  • 作用:

    • 發現服務并維護 MethodHost 對象,進行服務上線/下線通知的管理。

  • 主要功能:

    • serviceDiscovery()

      • 發現服務并更新 _method_hosts 緩存。

      • 選擇主機地址 host 并返回。

    • onServiceRequest()

      • 處理服務上線/下線的請求。

      • 動態添加/刪除 MethodHost 及其地址信息。

  • 內部成員:

    • _mutex:互斥鎖保護 _method_hosts

    • _method_hosts

      • std::unordered_map<std::string, MethodHost::ptr>,映射 method -> host

    • _requestor

      • Requestor::ptr 發送發現請求。


4?? Requestor 類:請求器

  • 作用:

    • 負責向服務中心發送請求并獲取響應。

  • 核心功能:

    • send()

      • 向服務中心同步發送 msg_req 請求。

      • 接收 msg_rsp 響應并返回 bool 標志位。


🔥 類之間的關系

  1. Provider 通過 _requestor 發送 ServiceRequest 進行服務注冊。

  2. Discoverer 通過 _requestor 發送 ServiceRequest 進行服務發現。

  3. MethodHostDiscoverer 維護,并提供 RR 輪轉機制選擇主機。

  4. Discoverer 監聽 onServiceRequest() 來處理服務上線/下線。

RR輪詢(Round-Robin)

一個method方法可以對應多個提供者,用戶請求method方法,我們應該返回哪一個提供者,才能實現最大資源利用呢?

我們可以通過RR輪詢按照固定順序輪流將請求分配到不同的主機。

1.維護一個遞增索引 _idx。

2.每次請求時,選擇 _hosts[_idx % _hosts.size()] 作為當前主機。

3._idx 自增,當 _idx >= _hosts.size() 時自動重置。

RR輪詢需要隨機訪問[ ],所以管理提供者的容器最好選擇vector< >。

RR 輪詢機制的價值

? 1. 負載均衡: 均勻分配請求,防止主機過載,提高系統吞吐量。
? 2. 實現簡單: 只需維護一個 _idx 遞增索引,選擇主機只需 O(1) 時間復雜度。
? 3. 故障規避: 結合 removeHost() 機制,可以自動剔除故障主機,提升可靠性。
? 4. 提升系統吞吐量: 通過多個主機并行處理請求,提升系統的整體性能。
? 5. 維護成本低: 邏輯簡單,維護成本極低,不需要監控主機狀態。

為什么選擇 vector<Address> 作為主機管理容器


? 1. 訪問速度快

  • vector 提供 O(1) 的隨機訪問能力,可以通過 pos 索引直接獲取 provider

  • 輪詢核心邏輯依賴于 vector[pos] 進行主機選擇,比 mapO(log n) 查找速度更快,適合高頻訪問場景。


? 2. 內存布局緊湊

  • vector 采用 連續內存存儲,有利于 CPU 緩存命中,提升訪問效率。

  • 在輪詢過程中,只需訪問固定內存位置,避免了內存跳轉帶來的性能損耗。


? 3. 刪除主機速度適中

  • 刪除主機時雖然 removeHost() 的復雜度是 O(n),但主機上下線事件發生頻率遠低于請求頻率。

  • 即使主機上下線處理稍慢,但 chooseHost() 仍然保持 O(1) 的快速訪問。


?? 注意:

  • vector 刪除主機時會觸發內存移動,導致性能下降,因此不適合頻繁上下線的場景。

  • 但在主機變更較少的場景下,vector 的整體性能優于 listmap

七.對服務發現與注冊的封裝

一.客戶端 rpc_client

封裝客戶端:三大功能模塊

一、業務功能:

  1. 基礎 RPC 功能

  2. 服務注冊功能

  3. 服務發現功能

二、基礎底層模塊:

  • 網絡通信客戶端模塊(由 BaseClient 封裝)


🧱 類結構封裝解析

1. RegistryClient:服務注冊客戶端

  • 構造時連接注冊中心地址

  • 提供 registryMethod() 方法:業務提供者向注冊中心注冊服務

  • 成員模塊包含:

    • _provider:服務提供者

    • _requestor:發送請求組件

    • _dispatcher:調度器

    • _client:基礎通信客戶端

2. DiscoveryClient:服務發現客戶端

  • 構造時連接注冊中心地址

  • 提供 registryDiscovery() 方法:業務調用方向注冊中心發現服務

  • 成員模塊包含:

    • _discoverer:服務發現器

    • _requestor_dispatcher_client:同上

3. RpcClient:RPC 核心客戶端

  • 構造參數 enableDiscovery 決定是否開啟服務發現模式:

    • 若為 true:連接的是注冊中心

    • 若為 false:連接的是具體的服務提供者

  • 提供多種調用方式:

    • 同步調用(返回 result

    • 異步 future 調用(返回 std::future

    • 異步 callback 調用(傳入回調函數)

  • 內部組合:

    • _discovery_client:可選服務發現客戶端

    • _caller:RPC 調用管理器

    • _requestor_dispatcher_client:同樣是基礎通信組件

在構建rpc客戶端時,我們用長連接還是短連接?

1.當客戶端調用call()請求對應方法時,RpcClient 內部調用 DiscoveryClient 進行服務發現?向 注冊中心 Registry Server 發送服務發現請求

2.注冊中心再返回對應方法的提供者主機地址。

3.RpcClient 用RR輪詢從中選出來一個地址創建rpc client客戶端并連接對應方法的提供者主機

4.服務提供者 Provider 接收到請求處理完返回結果給客戶端

5.客戶端回調觸發,返回響應給用戶。

短鏈接:創建一個rpc client客戶端對象,連接服務提供者,進行rpc調用,調用結束后就銷毀關閉客戶端。

? 優點:

  • 實現簡單,按需連接、按需釋放

  • 沒有資源長期占用問題。

? 缺點:

  • 性能差:每次調用都要建立和銷毀 TCP 連接,連接成本高

  • 不利于高頻 RPC 場景;

  • 異步處理時管理麻煩:可能連接剛斷,回調結果還沒處理。

長連接:調用完后并不會銷毀關閉客戶端,而是將客戶端放入連接池。后續還需要訪問該主機的該方法,就會從連接池中找到原本的客戶端對象,進行rpc調用。若該主機的該服務下線,需要從池中刪除對應客戶端連接。

? 優點:

  • 高性能:避免頻繁連接/斷開,尤其是重復調用同一服務時;

  • 適合高并發、低延遲系統。

? 缺點:

  • 管理復雜,需要處理:

    • 服務下線、連接失效的自動剔除;

    • 異步/并發安全;

    • 池容量、連接空閑策略等。

在我們這個項目中我們選擇長連接。

主要還是短連接異步處理時管理麻煩。

短鏈接,客戶端進行完rpc調用就會關閉,后面服務提供者返回結果給客戶端,客戶端沒了收到收到應答的回調函數onResponse???也就不能把結果設置道promise中,上層futrue就不能就緒。

  • 回調不觸發,業務邏輯“卡住”
    尤其是你用 std::futurepromise 等異步等待對象,結果永遠收不到。

  • 觸發回調時訪問已被釋放的連接對象,導致崩潰
    比如回調中引用了 RpcClientconnection,但連接已經析構。

  • 異步響應結果丟失,日志無記錄,bug 難排查
    你會覺得“調用失敗了但程序沒報錯”,其實是 TCP 在你沒注意時被關掉了。

異步+短連接的問題在于連接的生命周期和異步結果不一致,導致回調無法安全執行。

解決方法:在rpc調用結束后不關閉客戶端,而是設置一個回調函數確保收到收到響應處理完再關閉客戶端。

#include "../common/dispatcher.hpp"
#include "requestor.hpp"
#include "rpc_caller.hpp"
#include "rpc_registry.hpp"namespace wws
{
namespace client
{//服務注冊客戶端class RegistryClient{public:using ptr=std::shared_ptr<RegistryClient>;//構造函數傳入注冊中心的地址信息 用于連接注冊中心RegistryClient(const std::string&ip,int port):_requestor(std::make_shared<Requestor>()),_provider(std::make_shared<client::Provider>(_requestor)),_dispatcher(std::make_shared<Dispatcher>()){//注冊中心返回響應消息時觸發的回調函數auto rsp_cb=std::bind(&client::Requestor::onResponse,_requestor.get(),std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE,rsp_cb);auto message_cb=std::bind(&Dispatcher::onMessage,_dispatcher.get(),std::placeholders::_1,std::placeholders::_2);_client=ClientFactory::create(ip,port);_client->setMessageCallback(message_cb);_client->connect();}//向外提供的服務注冊接口bool registryMethod(const std::string&method, Address&host){return _provider->registryMethod(_client->connection(), method, host);}private:Requestor::ptr _requestor;client::Provider::ptr _provider;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;};//服務發現客戶端class DiscoveryClient{public:using ptr=std::shared_ptr<DiscoveryClient>;//構造函數傳入注冊中心的地址信息 用于連接注冊中心DiscoveryClient(const std::string&ip,int port,const Discoverer::OfflineCallback &cb):_requestor(std::make_shared<Requestor>()),_discoverer(std::make_shared<client::Discoverer>(_requestor,cb)),_dispatcher(std::make_shared<Dispatcher>()){//注冊中心返回響應消息時觸發的回調函數auto rsp_cb=std::bind(&client::Requestor::onResponse,_requestor.get(),std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE,rsp_cb);//當注冊中心向客戶端進行上線/下線通知時觸發的回調函數auto req_cb=std::bind(&client::Discoverer::onServiceRequest,_discoverer.get(),std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE,req_cb);//消息回調函數 所有收到的消息,統一交由 Dispatcher::onMessage 分發給對應 handlerauto message_cb=std::bind(&Dispatcher::onMessage,_dispatcher.get(),std::placeholders::_1,std::placeholders::_2);_client=ClientFactory::create(ip,port);_client->setMessageCallback(message_cb);_client->connect();}//向外提供的服務發現接口bool registryDiscovery(const std::string&method, Address&host){return _discoverer->serviceDiscovery(_client->connection(), method, host);}private:Requestor::ptr _requestor;client::Discoverer::ptr _discoverer;Dispatcher::ptr _dispatcher;BaseClient::ptr _client;};//rpc客戶端class RpcClient{public:using ptr=std::shared_ptr<RpcClient>;// enableDiscovery--是否啟用服務發現功能 也決定了傳入地址信息是注冊中心地址 還是提供者的地址RpcClient(bool enableDiscovery, const std::string &ip, int port):_enableDiscovery(enableDiscovery),_requestor(std::make_shared<Requestor>()),_dispatcher(std::make_shared<Dispatcher>()),_caller(std::make_shared<wws::client::RpcCaller>(_requestor)){//注冊中心返回響應消息時觸發的回調函數auto rsp_cb=std::bind(&client::Requestor::onResponse,_requestor.get(),std::placeholders::_1,std::placeholders::_2);_dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE,rsp_cb);//1.如果啟用的服務發現 地址信息是注冊中心的地址 是服務發現客戶端需要連接的地址 則通過地址信息實例化discover_client//需要經過服務發現獲取提供者address再獲取對應的clientif(_enableDiscovery){//設置服務下線回調auto offline_cb=std::bind(&RpcClient::delClient,this,std::placeholders::_1);_discovery_client=std::make_shared<DiscoveryClient>(ip,port,offline_cb);}//2.如果沒有啟用服務發現 則地址信息是服務提供者的地址 則直接創建客戶端實例化好rpc_client//直接根據提供的ip+port創建對應的clientelse{auto message_cb=std::bind(&Dispatcher::onMessage,_dispatcher.get(),std::placeholders::_1,std::placeholders::_2);_rpc_client=ClientFactory::create(ip,port);_rpc_client->setMessageCallback(message_cb);_rpc_client->connect();}}//1.同步bool call( const std::string &method,const Json::Value &params, Json::Value &result){//獲取clientBaseClient::ptr client=getClient(method);if(client.get()==nullptr)return false;//通過客戶端連接 發送rpc請求return _caller->call(client->connection(),method,params,result);}//2.異步futurebool call( const std::string &method,const Json::Value &params, std::future<Json::Value> &result){BaseClient::ptr client = getClient(method);if (client.get() == nullptr)return false;return _caller->call(client->connection(), method, params, result); }//3.異步回調bool call(const std::string &method,const Json::Value &params, const RpcCaller::JsonResponseCallback &cb){BaseClient::ptr client = getClient(method);if (client.get() == nullptr)return false;return _caller->call(client->connection(), method, params, cb); }private://創建clientBaseClient::ptr newClient(const Address &host){auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);auto client = ClientFactory::create(host.first, host.second);client->setMessageCallback(message_cb);client->connect();//添加到連接池putClient(host,client);return client;}//根據address從連接池查找client 沒有返回空BaseClient::ptr getClient(const Address&host){std::unique_lock<std::mutex> lock(_mutex);auto it=_rpc_clients.find(host);if(it==_rpc_clients.end()){return BaseClient::ptr();}return it->second;}//根據method獲取client://1.method->服務發現->獲取目標Address->從連接池獲取/沒有直接創建//2.用戶傳入的ip+port->直接獲取已經創建的clientBaseClient::ptr getClient(const std::string method){//1.服務發現獲取的ip+port BaseClient::ptr client;if(_enableDiscovery){//1.通過服務發現 獲取服務提供者地址信息Address host;bool ret=_discovery_client->registryDiscovery(method,host);if(ret==false){ELOG("當前%s服務 沒找到服務提供者",method.c_str());return BaseClient::ptr();}//2.查看連接池中是否有對應的客戶端 有就直接用 沒有就創建client=getClient(host);if(client.get()==nullptr){client=newClient(host);}}//2.用戶提供的ip+port創建的clientelse{client=_rpc_client;}return client;}void putClient(const Address&host,BaseClient::ptr&client){std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.insert(std::make_pair(host,client));}void delClient(const Address&host,BaseClient::ptr&client){std::unique_lock<std::mutex> lock(_mutex);_rpc_clients.erase(host);}private:struct AddressHash{size_t operator()(const Address&host){std::string addr=host.first+std::to_string(host.second);return std::hash<std::string>{}(addr);}};bool _enableDiscovery;DiscoveryClient::ptr _discovery_client;//網絡服務發現 用戶傳方法名method client用方法名找提供者地址進行連接RpcCaller::ptr _caller;Requestor::ptr _requestor;Dispatcher::ptr _dispatcher;BaseClient::ptr _rpc_client;//未啟用服務發現// RpcClient rpc(false, "127.0.0.1", 8080);// rpc.call("Add", ...);  // 直接用 _rpc_client 調用std::mutex _mutex;//<"127.0.0.1",client1>std::unordered_map<Address,BaseClient::ptr,AddressHash>_rpc_clients;//服務發現的客戶端連接池// RpcClient rpc(true, registry_ip, port);// rpc.call("Add", ...);  //  自動發現、自動連接、自動發請求};
}
}

unordered_map<>自定義類型作key

我們用哈希表來管理客戶端連接池時:

我們知道在哈希表中是通過key值找到對應的val值的,但并不是直接用我們傳過去的數當key,需要進行哈希值計算。(1.int size_t bool直接強轉 2.char* string?

h = h * 131 + static_cast<unsigned char>(c); ?// 類似 BKDR hash)

而庫中實現了string、int、float 等基本類型的哈希值計算,但這個Address pair<string,int>是個自定義類型,需要我們自己重載哈希值計算其實就算把string+int融合成string,再套用庫中對string類型計算的哈希函數。

STL中 pair 的哈希組合方法

template <typename T1, typename T2>
struct hash<std::pair<T1, T2>> {size_t operator()(const std::pair<T1, T2>& p) const {size_t h1 = std::hash<T1>()(p.first);size_t h2 = std::hash<T2>()(p.second);return h1 ^ (h2 << 1); }
};
操作目的
std::hash<X>()獲取單個字段的 hash 值
<< 1 左移擾動哈希位,使兩個字段 hash 分布更開
^ 異或混合兩個 hash,避免簡單疊加導致沖突
效果更均勻、穩定、不易沖突的哈希組合值

既然選擇長連接+連接池的做法,那就要處理當服務下線時 在連接池中刪除對應的client

怎么做?設置回調函數,在服務下線時進行調用。

1.RpcClinet初始化DiscoveryClient時傳入回調函數

2.DiscoveryClient 傳給->?Discoverer

3.Discoverer再把回調函數cb設置到成員變量中

4.Client::onServiceRequest處理服務下線時,除了刪除該方法中下線的主機地址Address,還要刪除連接池中連接它的client

二.服務端rpc_server (包含注冊中心服務端 rpc服務端)

🌐 服務端實現業務功能:

  1. 提供 RPC 服務

  2. 服務注冊與發現機制中提供者的管理(服務注冊)和消費者的管理(服務發現)


📦 封裝的三類服務端組件:

  1. RPC 服務端

    • 負責接收并處理 RPC 請求。

  2. 注冊中心服務端

    • 負責管理服務提供者與消費者的注冊信息。

  3. 發布訂閱服務端(后續實現)

    • 用于實現基于事件的通信機制(如 pub-sub 模式),暫未實現。


🛠 實現細節說明:

1. 注冊中心服務端

  • 是一個純粹的服務端,用于管理提供者和消費者信息

  • 核心功能是處理服務注冊與發現的請求。

2. RPC 服務端

  • 實際由兩部分組成:

    • RPC 服務端:用于接收和響應 RPC 請求。

    • 服務注冊客戶端:啟動后自動連接注冊中心,并將自己能提供的服務注冊上去。

#pragma once
#include "../common/dispatcher.hpp"
#include "../client/rpc_client.hpp"
#include "rpc_router.hpp"
#include "rpc_registry.hpp"#include <set> 
namespace wws
{
namespace server
{//服務注冊服務端class RegistryServer{public:using ptr=std::shared_ptr<RegistryServer>;RegistryServer(int port):_pd_manager(std::make_shared<PDManager>()),_dispatcher(std::make_shared<Dispatcher>()){auto service_cb = std::bind(&PDManager::onServiceRequest, _pd_manager.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, service_cb);_server = wws::ServerFactory::create(port);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);auto close_cb=std::bind(&RegistryServer::onConnShutdown,this,std::placeholders::_1);_server->setCloseCallback(close_cb);}void start(){_server->start();}private:void onConnShutdown(const BaseConnection::ptr&conn){_pd_manager->onConnShutdown(conn);}private:PDManager::ptr _pd_manager;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};//rpc服務端class RpcServer{public:using ptr=std::shared_ptr<RpcServer>;RpcServer(const Address&access_addr,bool enableRegistry=false,const Address&registry_server_addr=Address()):_enableRegistry(enableRegistry),_access_addr(access_addr),_router(std::make_shared<wws::server::RpcRouter>()),_dispatcher(std::make_shared<wws::Dispatcher>()){//啟用服務注冊if(_enableRegistry){_reg_client=std::make_shared<client::RegistryClient>(registry_server_addr.first,registry_server_addr.second);}//成員server是一個rpcserver 用于提供rpc服務auto rpc_cb = std::bind(&RpcRouter::onRpcRequest, _router.get(),std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<wws::RpcRequest>(wws::MType::REQ_RPC, rpc_cb);//創建一個監聽指定端口(如 8080)的 RPC 服務器對象_server = wws::ServerFactory::create(access_addr.second);//默認監聽 0.0.0.0:port 我監聽所有可用 IPauto message_cb = std::bind(&wws::Dispatcher::onMessage, _dispatcher.get(),std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);}void registerMethod(const ServiceDescribe::ptr &service)//服務描述類{if(_enableRegistry){_reg_client->registryMethod(service->method(),_access_addr);//把 method->本主機地址 發給注冊中心 表示自己可以提供method方法}_router->registerMethod(service);//本地注冊 把服務描述service注冊到RpcRouter中的服務管理類 onRpcRequest接收到客戶端請求時進行路由分發調用}void start(){_server->start();}private:bool _enableRegistry;Address _access_addr;//自己的對外服務地址(客戶端要連接我就來這)client::RegistryClient::ptr _reg_client;//注冊客戶端,用于連接注冊中心并注冊本地服務RpcRouter::ptr _router;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};}
}

注冊中心 服務端 客戶端代碼測試

1.先啟動注冊中心服務端 并設置port. 處理服務注冊請求 服務發現請求 以及服務上下線通知

2.再啟動rpc服務端 ,rpc服務端可以通過_reg_client注冊中心客戶端進行服務注冊,但需要啟動服務注冊_enableRegistry=ture,提供注冊中心客戶端的Address。

還可以注冊本地服務方法,內部注冊到 RpcRouter 路由器中,用于請求到來時快速查找并調用對應的回調函數。

3.啟動rpc客戶端,rpc客戶端有兩種連接方式 啟動服務發現 根據方法名向注冊中心查詢服務地址,然后連接對應的服務端。不啟動 直接連用戶傳入的服務提供者的地址

用戶再調用call 通過客戶端連接 發送rpc請求

rpc_server.cpp

#include "../../server/rpc_server.hpp"
#include "../../common/detail.hpp"void Add(const Json::Value&req,Json::Value&rsp)
{int nums1=req["nums1"].asInt();int nums2=req["nums2"].asInt();rsp=nums1+nums2;
}
int main()
{//build生產一個 服務描述類 函數名 + 參數類型+結果類型 + 函數地址(進行回調)std::unique_ptr<wws::server::SDescribeFactory> desc_factory(new wws::server::SDescribeFactory());desc_factory->setMethodName("Add");desc_factory->setParamsDesc("nums1",wws::server::VType::INTEGRAL);desc_factory->setParamsDesc("nums2",wws::server::VType::INTEGRAL);desc_factory->setReturnType(wws::server::VType::INTEGRAL);desc_factory->setCallback(Add);//wws::Address("127.0.0.1",9090) 監聽9090 在9090端口提供服務 true 表示啟動服務注冊 "127.0.0.1",8080 注冊中心的地址信息,創建client用于連接注冊中心wws::server::RpcServer server(wws::Address("127.0.0.1",9090),true,wws::Address("127.0.0.1",8080));//server.registerMethod(desc_factory->build());server.start();return 0;
}

registry_sever.cpp

#include "../../server/rpc_server.hpp"
#include "../../common/detail.hpp"int main()
{//實例化服務端wws::server::RegistryServer reg_server(8080);reg_server.start();return 0;
}

rpc_client.cpp


#include "../../client/rpc_client.hpp"
#include "../../common/detail.hpp"
#include <thread>void callback(const Json::Value &result) {DLOG("callback result: %d", result.asInt());
}int main()
{wws::client::RpcClient client(true,"127.0.0.1",8080);//1.同步調用Json::Value params,result;params["nums1"]=11;params["nums2"]=22;bool ret=client.call("Add",params,result);//client內找對應Add方法對應提供者的連接if(ret!=false){DLOG("result: %d", result.asInt());}//2.異步Futurewws::client::RpcCaller::JsonAsyncResponse res_future;params["nums1"]=33;params["nums2"]=44;ret=client.call("Add",params,res_future);if(ret!=false){result=res_future.get();DLOG("result: %d", result.asInt());} //3.回調params["nums1"]=55;params["nums2"]=66;ret = client.call("Add",params, callback);DLOG("-------\n");std::this_thread::sleep_for(std::chrono::seconds(1));return 0;
}

八.發布訂閱服務端實現rpc_topic

1. Dispatcher 模塊(右上角)

作用:

  • JSON-RPC 框架中的請求分發核心

  • 判斷 RPC 消息類型 → 調用對應業務模塊(如 PubSubManager)

Dispatcher::registerMethod("topic", &PubSubManager::onTopicRequest)

2. onTopicRequest 回調函數

位置: PubSubManager 中的統一入口函數
作用:

  • 接收來自 Dispatcher 的請求

  • 根據操作類型調用對應處理函數:

類型功能函數調用
創建主題topicCreate()
刪除主題topicRemove()
訂閱主題topicSubscribe()
取消訂閱topicCancel()
發布消息topicPublish()

3. PubSubManager 核心模塊(圖中心)

職責:

  • 管理兩個核心 map

  • 操作對應的 TopicSubscriber 數據結構

std::unordered_map<std::string, Topic::ptr> _topics;
std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;

4.Subscriber 結構(圖左下)

struct Subscriber {BaseConnection::ptr conn;std::unordered_set<std::string> topics;
};

圖示結構:

  • 每個訂閱者關聯一個連接

  • 還記錄了自己訂閱的所有主題名

5. topic 結構(圖左上)

struct Topic {std::string topic_name;std::unordered_set<Subscriber::ptr> subscribers;
};

圖示結構:

  • 每個主題有一個名稱

  • 內部維護一組訂閱它的訂閱者指針

主要用途:

  • 當消息發布到該主題時: → 遍歷 set<subscriber> 并調用 conn->send(msg)

6. 兩張 map 映射關系(圖中左)

map<topic_name, topic>
map<Connection, Subscriber>

構成了典型的雙向映射系統:

  • topic_name -> topic -> subscribers

  • conn -> subscriber -> topics

#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <unordered_set>
namespace wws
{
namespace server
{class TopicManager{public:using ptr=std::shared_ptr<TopicManager>;TopicManager();// 給Dispathcer模塊進行服務上線下線請求處理的回調函數void onTopicRequest(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {TopicOptype topic_optype=msg->optype();bool ret=true;switch(topic_optype){//主題創建case TopicOptype::TOPIC_CREATE: topicCreate(conn,msg);break;//主題刪除case TopicOptype::TOPIC_REMOVE: topicRemove(conn,msg);break;//主題訂閱case TopicOptype::TOPIC_SUBSCRIBE: topicSubscribe(conn,msg);break;//取消主題訂閱case TopicOptype::TOPIC_CANCEL: topicCancel(conn,msg);break;//主題消息發布case TopicOptype::TOPIC_PUBLISH: topicPublish(conn,msg);break;//返回應答 無效操作類型default: return errorResponse(conn,msg,RCode::RCODE_INVALID_OPTYPE);}if(!ret) return errorResponse(conn,msg,RCode::RCODE_NOT_FOUND_TOPIC);return topicResponse(conn,msg);}//一個訂閱者在連接斷開時的處理 刪除其關聯的數據void onShutdown(const BaseConnection::ptr&conn){//消息發布者斷開連接 不處理; 消息訂閱者斷開連接需要刪除管理數據//1.判斷斷開連接的是否是訂閱者 不是直接返回Subscriber::ptr subscriber;//斷開連接的訂閱者對象std::vector<Topic::ptr> topics;//受影響的主題對象{auto it = _subscribers.find(conn);if (it == _subscribers.end())return;// 2.獲取受影響的主題對象subscriber=it->second;for(auto&topic_name:subscriber->topics){auto topic_it=_topics.find(topic_name);if(topic_it==_topics.end())continue;topics.push_back(topic_it->second);}//3.從訂閱者映射map中刪除 訂閱者_subscribers.erase(it);}//4.從對應主題對象中刪除訂閱者for(auto&topic:topics){topic->removeSubscriber(subscriber);}}private://錯誤響應void errorResponse(const BaseConnection::ptr&conn,const TopicRequest::ptr&msg,RCode rcode){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(wws::MType::RSP_TOPIC);//主題響應 msg_rsp->setRcode(rcode);conn->send(msg_rsp);}//注冊應答void topicResponse(const BaseConnection::ptr&conn,const TopicRequest::ptr&msg){auto msg_rsp=MessageFactory::create<ServiceResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(wws::MType::RSP_TOPIC);msg_rsp->setRcode(RCode::RCODE_OK);conn->send(msg_rsp);}//創建主題void topicCreate(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {std::unique_lock<std::mutex> lock(_mutex);//構建一個主題對象 添加映射關系的管理std::string topic_name=msg->topicKey();//主題名稱auto topic=std::make_shared<Topic>(topic_name);_topics.insert(std::make_pair(topic_name,topic));}//刪除主題void topicRemove(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg) {//1.查看當前主題 有哪些訂閱者 再從訂閱者中刪除主題信息//_topics[topic_name]->subscribers  subscribers->topics訂閱的所有主題名稱//2.刪除主題數據  _topics[topic_name]->Subscriber 主題名稱和主題對象的映射關系std::string topic_name=msg->topicKey();std::unordered_set<Subscriber::ptr> subscribers;{std::unique_lock<std::mutex> lock(_mutex);//刪除主題前 先找出訂閱該主題的訂閱者auto it=_topics.find(topic_name);if(it==_topics.end())return;subscribers=it->second->subscribers;_topics.erase(it);//刪除主題名稱->topic}       for(auto&subscriber:subscribers)subscriber->removeSTopic(topic_name);}// 主題訂閱bool topicSubscribe(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){// 1.先找出主題對象topic 訂閱者對象subscriber//  沒有主題對象就報錯 沒有訂閱者對象就構建Topic::ptr topic;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it == _topics.end())return false;topic = topic_it->second;auto sub_it = _subscribers.find(conn);if (sub_it != _subscribers.end()){subscriber = sub_it->second;}else{subscriber = std::make_shared<Subscriber>(conn);_subscribers.insert(std::make_pair(conn, subscriber));}// 2.在主題對象中 新增一個訂閱者對象管理的連接; 在訂閱者對象中新增一個訂閱的主題topic->appendSubscriber(subscriber);subscriber->appendTopic(msg->topicKey());return true;}}//取消主題訂閱void topicCancel(const BaseConnection::ptr&conn,const TopicRequest::ptr&msg){// 1.先找出主題對象topic 訂閱者對象subscriberTopic::ptr topic;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it != _topics.end())topic = topic_it->second;auto sub_it = _subscribers.find(conn);if (sub_it != _subscribers.end())subscriber = sub_it->second;// 2.在主題對象中 刪除當前訂閱者對象管理的連接; 在訂閱者對象中刪除對應訂閱的主題if(subscriber) subscriber->removeSTopic(msg->topicKey());if(subscriber && topic) topic->removeSubscriber(subscriber);}}// 主題發布bool topicPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){Topic::ptr topic;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it == _topics.end())return false;topic=topic_it->second;}topic->pushMessage(msg);return true;}private:// 每個客戶端連接會對應一個訂閱者對象,記錄它當前訂閱了哪些主題struct Subscriber{using ptr = std::shared_ptr<Subscriber>;std::mutex _mutex;BaseConnection::ptr conn;std::unordered_set<std::string> topics; // 訂閱者訂閱的主題名稱Subscriber(const BaseConnection::ptr &c): conn(c){}// 增加主題void appendTopic(const std::string &topic_name){std::unique_lock<std::mutex> lock(_mutex);topics.insert(topic_name);}// 刪除主題void removeSTopic(const std::string &topic_name){std::unique_lock<std::mutex> lock(_mutex);topics.erase(topic_name);}};// 每個主題包含一個主題名 + 當前所有的訂閱者struct Topic{using ptr = std::shared_ptr<Topic>;std::mutex _mutex;std::string topic_name;std::unordered_set<Subscriber::ptr> subscribers; // 當前主題訂閱者Topic(const std::string &name): topic_name(name){}// 增加訂閱者void appendSubscriber(const Subscriber::ptr &subscriber){std::unique_lock<std::mutex> lock(_mutex);subscribers.insert(subscriber);}// 刪除訂閱者void removeSubscriber(const Subscriber::ptr &subscriber){std::unique_lock<std::mutex> lock(_mutex);subscribers.erase(subscriber);}// 給該主題的所有訂閱者發消息void pushMessage(const BaseMessage::ptr &msg){std::unique_lock<std::mutex> lock(_mutex);for (auto &subscriber : subscribers){subscriber->conn->send(msg);}}};private:std::mutex _mutex;std::unordered_map<std::string, Topic::ptr> _topics;std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;};
}
}

九.發布訂閱客戶端實現rpc_topic

一、發布訂閱客戶端的角色劃分

  • 消息發布客戶端

    • 創建主題

    • 刪除主題

    • 發布消息(向某個主題發布)

  • 消息訂閱客戶端

    • 創建主題

    • 刪除主題

    • 訂閱某主題的消息

    • 取消訂閱某主題


二、整體模塊設計思路

  1. 對外的五個操作接口

    • 針對“主題”的操作,包括:

      • 創建

      • 刪除

      • 訂閱

      • 取消訂閱

      • 發布

  2. 對外的一個消息處理接口

    • 提供給 dispatcher 模塊,進行消息分發處理

    • 相當于 dispatcher 收到消息發布請求后,查找有哪些訂閱者,并調用對應的回調函數將消息推送過去

  3. 內部的數據管理

    • 管理“主題名稱”與“消息處理回調函數”的映射關系

#pragma once
#include"requestor.hpp"namespace wws
{
namespace client
{class TopicManager{public:using ptr=std::shared_ptr<TopicManager>;using SubCallback=std::function<void(const std::string &key,const std::string&msg)>;//主題創建bool create(const BaseConnection::ptr&conn,const std::string &key){return commonRequest(conn,key,TopicOptype::TOPIC_CREATE);}//刪除bool remove(const BaseConnection::ptr&conn,const std::string &key){return commonRequest(conn,key,TopicOptype::TOPIC_REMOVE);}//訂閱主題 SubCallback收到主題新消息的進行的回調bool subscribe(const BaseConnection::ptr&conn,const std::string &key,const SubCallback&cb){//當我們訂閱了主題 可能發布者會馬上發布該主題的內容 //這時候如果cb還沒有設置到map中就無法執行回調函數 所以先設置回調函數到map中addSubscribe(key,cb);bool ret=commonRequest(conn,key,TopicOptype::TOPIC_SUBSCRIBE);if(ret==false){//請求發送失敗 刪除map中對應的cbdelSubscribe(key);return false;}return true;}//取消訂閱bool cancel(const BaseConnection::ptr&conn,const std::string &key){delSubscribe(key);return commonRequest(conn,key,TopicOptype::TOPIC_CANCEL);}//發布消息(向某個主題發布)bool publish(const BaseConnection::ptr&conn,const std::string &key,const std::string &msg){return commonRequest(conn,key,TopicOptype::TOPIC_PUBLISH,msg);}// 當收到服務端推送的消息時調用,觸發對應訂閱者的回調處理邏輯 (設置給dispatcher收到對應主題消息 進行回調處理)void onPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){//1.先判斷該消息的操作類型是否為 發布消息auto type=msg->optype();if(type!=TopicOptype::TOPIC_PUBLISH){ELOG("收到了錯誤類型的主題操作");return;}//2.取出主題名稱 以及消息內容 后面調用cbstd::string topic_key=msg->topicKey();std::string topic_msg=msg->topicMsg();//3.調用cbauto callback=getSubscribe(topic_key);if(!callback){ELOG("收到了%s主題信息 但該主題無對應回調",topic_key.c_str());return;}callback(topic_key,topic_msg);}private:void addSubscribe(const std::string &key,const SubCallback&cb){std::unique_lock<std::mutex> lock(_mutex);_topic_callbacks.insert(std::make_pair(key,cb));}void delSubscribe(const std::string &key){std::unique_lock<std::mutex> lock(_mutex);_topic_callbacks.erase(key);}const SubCallback& getSubscribe(const std::string &key){std::unique_lock<std::mutex> lock(_mutex);auto it=_topic_callbacks.find(key);if(it==_topic_callbacks.end())return SubCallback();return it->second;}bool commonRequest(const BaseConnection::ptr&conn,const std::string &key,TopicOptype type,const std::string &msg=""){//1.構造請求對象 并填充數據auto msg_req=MessageFactory::create<TopicRequest>();msg_req->setId(UUID::uuid());msg_req->setMType(MType::REQ_TOPIC);msg_req->setTopicKey(key);msg_req->setOptype(type);if(type==TopicOptype::TOPIC_PUBLISH)msg_req->setTopicMsg(msg);//2.向服務端發送請求 等待響應BaseMessage::ptr msg_rsp;//發請求 + 等響應 + 反序列化響應 + 返回響應對象msg_rspbool ret=_requestor->send(conn,msg_req,msg_rsp);if(ret==false){ELOG("主題創建請求失敗");return false;}//3.判斷請求處理是否成功auto topic_rsp_msg=std::dynamic_pointer_cast<TopicResponse>(msg_rsp);if(!topic_rsp_msg){ELOG("主題響應 向下類型轉換失敗");return false;}if(topic_rsp_msg->rcode()!=RCode::RCODE_OK);{ELOG("主題創建請求出錯:%s",errReason(topic_rsp_msg->rcode()).c_str());return false;}return true;}private:std::mutex _mutex;//根據主題查找對應的回調函數執行std::unordered_map<std::string,SubCallback> _topic_callbacks;Requestor::ptr _requestor;};
}
}

十.topicServer topicClient封裝

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/75777.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/75777.shtml
英文地址,請注明出處:http://en.pswp.cn/web/75777.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

C++算法優化實戰:破解性能瓶頸,提升程序效率

C算法優化實戰&#xff1a;破解性能瓶頸&#xff0c;提升程序效率 在現代軟件開發中&#xff0c;算法優化是提升程序性能的關鍵手段之一。無論是在高頻交易系統、實時游戲引擎&#xff0c;還是大數據處理平臺&#xff0c;算法的高效性直接關系到整體系統的性能與響應速度。C作…

【PostgreSQL教程】PostgreSQL 特別篇之 語言接口連接PHP

博主介紹:?全網粉絲22W+,CSDN博客專家、Java領域優質創作者,掘金/華為云/阿里云/InfoQ等平臺優質作者、專注于Java技術領域? 技術范圍:SpringBoot、SpringCloud、Vue、SSM、HTML、Nodejs、Python、MySQL、PostgreSQL、大數據、物聯網、機器學習等設計與開發。 感興趣的可…

山東大學軟件學院創新項目實訓開發日志(12)之將對話記錄保存到數據庫中

在之前的功能開發中&#xff0c;已經成功將deepseekAPI接口接入到springbootvue項目中&#xff0c;所以下一步的操作是將對話和消息記錄保存到數據庫中 在之前的開發日志中提到數據庫建表&#xff0c;所以在此刻需要用到兩個表&#xff0c;conversation表和message表&#xff…

Spring-注解編程

注解基礎概念 1.什么是注解編程 指的是在類或者方法上加入特定的注解(XXX) 完成特定功能的開發 Component public classXXX{} 2.為什么要講注解編程 1.注解開發方便 代碼簡潔 開發速度大大提高 2.Spring開發潮流 Spring2.x引入注解 Spring3.x完善注解 Springboot普及 推廣注解…

Dify智能體平臺源碼二次開發筆記(5) - 多租戶的SAAS版實現(2)

目錄 前言 用戶的查詢 controller層 添加路由 service層 用戶的添加 controller層 添加路由 service層-添加用戶 service層-添加用戶和租戶關系 驗證結果 結果 前言 完成租戶添加功能后&#xff0c;下一步需要實現租戶下的用戶管理。基礎功能包括&#xff1a;查詢租…

基于若依的ruoyi-vue-plus的nbmade-boot在線表單的設計(一)架構方面的設計

希望大家一起能參與我的新開源項目nbmade-boot: 寧波智能制造低代碼實訓平臺 主要目標是類似設計jeecgboot那樣的online表單功能,因為online本身沒有開源這部分代碼,而我設計這個是完全開源的,所以希望大家支持支持,開源不容易。 1、數據庫方面設計考慮 是在原來gen_table和…

WebFlux應用中獲取x-www-form-urlencoded數據的六種方法

&#x1f9d1; 博主簡介&#xff1a;CSDN博客專家&#xff0c;歷代文學網&#xff08;PC端可以訪問&#xff1a;https://literature.sinhy.com/#/?__c1000&#xff0c;移動端可微信小程序搜索“歷代文學”&#xff09;總架構師&#xff0c;15年工作經驗&#xff0c;精通Java編…

[Python基礎速成]1-Python規范與核心語法

本系列旨在快速掌握Python&#xff0c;實現能夠快速閱讀和理解 Python 代碼&#xff0c;并在可查閱語法的情況下進行 AI 學習。 本篇先了解一下Python基礎知識。 本篇內容較菜鳥教程有所刪減、方便快速構建大綱&#xff0c;且加入了PEP 8規范說明等有助于理解和編寫代碼的說明。…

農民劇團的春天與改變之路

楊天義&#xff0c;男&#xff0c;1966年9月生&#xff0c;中共黨員&#xff0c;江西省吉安市吉水縣水南農民劇團團長。 楊天義從廢品收購起家&#xff0c;憑借自身的努力和奮斗&#xff0c;自籌資金100余萬元建設了水南鎮的第一座影劇院&#xff0c;組建了江西省吉安市吉水縣…

python asyncio 的基本使用

1、引言 asyncio 是 Python 標準庫中的一個庫&#xff0c;提供了對異步 I/O 、事件循環、協程和任務等異步編程模型的支持。 asyncio 文檔 2、進程、線程、協程 線程 線程是操作系統調度的基本單位&#xff0c;同一個進程中的多個線程共享相同的內存空間。線程之間的切換由操…

Leedcode刷題 | Day30_貪心算法04

一、學習任務 452. 用最少數量的箭引爆氣球代碼隨想錄435. 無重疊區間763. 劃分字母區間 二、具體題目 1.452用最少數量的箭引爆氣球452. 用最少數量的箭引爆氣球 - 力扣&#xff08;LeetCode&#xff09; 在二維空間中有許多球形的氣球。對于每個氣球&#xff0c;提供的輸…

Ant Design Vue 表格復雜數據合并單元格

Ant Design Vue 表格復雜數據合并單元格 官方合并效果 官方示例 表頭只支持列合并&#xff0c;使用 column 里的 colSpan 進行設置。 表格支持行/列合并&#xff0c;使用 render 里的單元格屬性 colSpan 或者 rowSpan 設值為 0 時&#xff0c;設置的表格不會渲染。 <temp…

C++ 標準庫中的 <algorithm> 頭文件算法總結

C 常用 <algorithm> 算法概覽 C 標準庫中的 <algorithm> 頭文件提供了大量有用的算法&#xff0c;主要用于操作容器&#xff08;如 vector, list, array 等&#xff09;。這些算法通常通過迭代器來操作容器元素。 1. 非修改序列操作 std::all_of, std::any_of, s…

程序化廣告行業(84/89):4A廣告代理公司與行業資質解讀

程序化廣告行業&#xff08;84/89&#xff09;&#xff1a;4A廣告代理公司與行業資質解讀 大家好&#xff01;在探索程序化廣告行業的道路上&#xff0c;每一次知識的分享都是我們共同進步的階梯。一直以來&#xff0c;我都希望能和大家攜手前行&#xff0c;深入了解這個充滿機…

deepin使用autokey添加微信快捷鍵一鍵顯隱ctrl+alt+w

打開deepin商店&#xff0c;搜索快捷鍵&#xff0c;找到autokey 快捷鍵管理&#xff0c;點擊安裝 點擊右鍵新建文件夾 點擊右鍵新建腳本 打開腳本并添加以下內容 import subprocess import time# ------------------ 配置項 ------------------ WM_CLASS "wechat…

文件內容課堂總結

Spark SQL是Spark用于結構化數據處理的模塊&#xff0c;前身是Shark。Shark基于Hive開發&#xff0c;雖提升了SQL-on-Hadoop效率&#xff0c;但對Hive依賴過多。2014年6月1日Shark項目停止開發&#xff0c;團隊將資源投入Spark SQL項目。Spark SQL具有諸多優點&#xff0c;如擺…

Zotero PDF Translate 翻譯插件使用OpenAI API配置教程

PDF Translate&#xff1a;提升 Zotero 內置 PDF 閱讀器的翻譯功能 “PDF Translate” 是一款為 Zotero 設計的插件&#xff0c;旨在方便用戶在 Zotero 內置的 PDF 閱讀器中進行劃詞或段落翻譯&#xff0c;輔助閱讀外文文獻。 一、 安裝插件 下載插件&#xff1a; 訪問 PDF T…

火山引擎旗下的產品

用戶問的是火山引擎旗下的產品&#xff0c;我需要詳細列出各個類別下的產品。首先&#xff0c;我得確認火山引擎有哪些主要業務領域&#xff0c;比如云計算、大數據、人工智能這些。然后&#xff0c;每個領域下具體有哪些產品呢&#xff1f;比如云計算方面可能有云服務器、容器…

C/C++程序中實現Python綁定多種技術路線

在C/C程序中實現Python綁定有多種技術路線&#xff0c;選擇合適的方法取決于項目需求、性能要求和開發效率。以下是常見的幾種方案&#xff0c;按易用性排序&#xff1a; 1. PyBind11&#xff08;推薦首選&#xff09; 特點&#xff1a;現代C庫&#xff0c;語法簡潔&#xff0…

【位運算】消失的兩個數字

文章目錄 面試題 17.19. 消失的兩個數字解題思路 面試題 17.19. 消失的兩個數字 面試題 17.19. 消失的兩個數字 ? 給定一個數組&#xff0c;包含從 1 到 N 所有的整數&#xff0c;但其中缺了兩個數字。你能在 O(N) 時間內只用 O(1) 的空間找到它們嗎&#xff1f; ? 以任意…