【從零實現Json-Rpc框架】- 項目實現 - 服務端主題實現及整體封裝

📢博客主頁:https://blog.csdn.net/2301_779549673
📢博客倉庫:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢歡迎點贊 👍 收藏 ?留言 📝 如有錯誤敬請指正!
📢本文由 JohnKi 原創,首發于 CSDN🙉
📢未來很長,值得我們全力奔赴更美好的生活?

在這里插入圖片描述

在這里插入圖片描述

文章目錄

  • 📢前言
  • 🏳??🌈一、服務端主題管理模塊?
    • 1.1 核心功能
    • 1.2 核心設計思路
    • 1.3 主題 結構構造
    • 1.4 訂閱者 結構構造
    • 1.5 主題 和 訂閱者 統籌管理
      • (1) onTopicRequest
      • (2) onShutdown
      • (3) 細節實現方法
    • 1.8 框架代碼
    • 1.7 整體代碼
  • 🏳??🌈二、服務端整體實現
    • 2.1 邏輯框架
    • 2.1.1 注冊中心(RegistryServer)?
    • 2.1.2 RPC 服務核心(RpcServer)?
      • 2.1.3 主題服務(TopicServer)?
    • 2.2 邏輯代碼
    • 2.3 整體代碼
  • 👥總結


📢前言

截至現在,在項目實現上,我們已經封裝好了 零碎接口,對 各種消息 及 常用結構體 進行了封裝

也實現了 dispatcher 路由轉發的功能。

服務端 方面,完成了

  • 業務函數回調總結 - rpc_route.hpp
  • 服務的提供、發現、注冊 - rpc_registry.hpp

客戶端 方面,完成了

  • 消息請求及其回調 - requestor.hpp
  • 消息請求發送 - rpc_caller.hpp

服務端 未完成部分

  • 服務端主題管理模塊 - rpc_topic.hpp
  • 服務端功能整合 - rpc_server.hpp

客戶端 未完成部分

  • 客戶的提供、發現、注冊 - rpc_registry.hpp
  • 客戶端主題管理模塊 - rpc_topic.hpp
  • 客戶端功能整合 - rpc_client.hpp

這一篇文章,筆者就為服務端的 主題實現整體封裝 畫上句號


🏳??🌈一、服務端主題管理模塊?

1.1 核心功能

這段代碼實現了一個 ?服務端主題管理模塊?(TopicManager),支持 ?發布-訂閱模式 的核心功能,包括:

  • ?主題的創建與刪除
  • 客戶端的訂閱與取消訂閱
  • 消息的發布與推送
  • 連接斷開時的自動清理

1.2 核心設計思路

?線程安全:通過 std::mutex 保護共享數據(_topics 和 _subscribers)。
?數據映射

  • _topics:維護主題名稱到 Topic 對象的映射。
  • _subscribers:維護客戶端連接到 Subscriber 對象的映射。

?操作統一入口:通過 onTopicRequest 分發不同類型的主題操作請求。

1.3 主題 結構構造

作用:表示一個主題,管理其所有訂閱者。
?成員

  • _subscribers:存儲所有訂閱者的集合(Subscriber::ptr)。

?方法

  • appendSubscriber / removeSubscriber:增刪訂閱者。
  • pushMessage:向所有訂閱者發送消息。
// 主題名稱 和 其訂閱者連接 的映射
struct Topic {using ptr = std::shared_ptr<Topic>;std::mutex _mutex;std::string _topic_name;std::unordered_set<Subscriber::ptr> _subscribers; // 當前主題的訂閱者的連接Topic(const std::string& topic_name) : _topic_name(topic_name) {}// 增加訂閱者void appendSubscriber(const Subscriber::ptr& subscriber);// 刪除訂閱者void removeSubscriber(const Subscriber::ptr& subscriber);// 推送消息void pushMessage(const BaseMessage::ptr& msg);
};

1.4 訂閱者 結構構造

作用:表示一個訂閱者,記錄其訂閱的主題。
?成員

  • _conn:訂閱者的網絡連接對象。
  • _topics:訂閱者當前訂閱的所有主題名稱集合。

?方法

  • appendTopic / removeTopic:增刪訂閱的主題。
// 定義一個訂閱者對象,記錄其訂閱的 所有主題名稱
struct Subscriber {using ptr = std::shared_ptr<Subscriber>;std::mutex _mutex;BaseConnection::ptr _conn;std::unordered_set<std::string> _topics; // 該訂閱者所訂閱的主題名稱Subscriber(const BaseConnection::ptr& conn) : _conn(conn) {};// 增加訂閱的主題void appendTopic(const std::string& topic);// 刪除訂閱的主題void removeTopic(const std::string& topic);
};

1.5 主題 和 訂閱者 統籌管理

我們需要建立兩個映射關系

  • 主題名 -> 主題結構
  • 訂閱者連接 -> 訂閱者結構
std::mutex _mutex;
std::unordered_map<std::string, Topic::ptr> _topics;
std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;

核心接口解析

(1) onTopicRequest

?功能:處理客戶端發送的主題操作請求(總入口)。
?操作類型

  • TOPIC_CREATE:調用 topicCreate 創建主題。
  • TOPIC_REMOVE:調用 topicRemove 刪除主題。
  • TOPIC_SUBSCRIBE:調用 topicSubscribe 訂閱主題。
  • TOPIC_CANCEL:調用 topicCancel 取消訂閱。
  • TOPIC_PUBLISH:調用 topicPublish 發布消息。
  • 錯誤處理:若操作失敗,返回錯誤響應(errorResponse)。

(2) onShutdown

?功能:處理客戶端連接斷開時的清理邏輯。
?步驟

  • _subscribers 中移除訂閱者。
  • 遍歷訂閱者的所有主題,從主題的訂閱列表中移除該訂閱者。

(3) 細節實現方法

變更類

// 構造一個主題對象,添加映射關系的管理
void topicCreate(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);// 刪除一個主題對象,刪除映射關系的管理
void topicRemove(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);// 將訂閱者訂閱到指定主題。
bool topicSubscribe(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);// 取消該主題的訂閱者
void topicCancel(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);// 向各個該主題的 訂閱者 發布主題消息
bool topicPublish(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);

返回類

//  返回一個錯誤響應
void errorResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg, RCode rcode);//  返回一個主題響應
void topicResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);

1.8 框架代碼

#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <unordered_set>namespace rpc
{namespace server{class TopicManager{public:using ptr = std::shared_ptr<TopicManager>;// 統一處理有關主題的請求void onTopicRequest(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);// 關閉連接void onShutdown(const BaseConnection::ptr &conn);private://  返回一個錯誤響應void errorResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg, RCode rcode);//  返回一個主題響應void topicResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg);private:// 構造一個主題對象,添加映射關系的管理void topicCreate(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);// 刪除一個主題對象,刪除映射關系的管理void topicRemove(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);// 將訂閱者訂閱到指定主題。bool topicSubscribe(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);// 取消該主題的訂閱者 void topicCancel(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);// 向各個該主題的 訂閱者 發布主題消息bool topicPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg);private:// 定義一個訂閱者對象,記錄其訂閱的 所有主題名稱struct Subscriber{using ptr = std::shared_ptr<Subscriber>;std::mutex _mutex;BaseConnection::ptr _conn;std::unordered_set<std::string> _topics; // 該訂閱者所訂閱的主題名稱Subscriber(const BaseConnection::ptr &conn) : _conn(conn) {};// 增加訂閱的主題void appendTopic(const std::string &topic);// 刪除訂閱的主題void removeTopic(const std::string &topic);};// 主題名稱 和 其訂閱者連接 的映射struct Topic{using ptr = std::shared_ptr<Topic>;std::mutex _mutex;std::string _topic_name;std::unordered_set<Subscriber::ptr> _subscribers; // 當前主題的訂閱者的連接Topic(const std::string &topic_name) : _topic_name(topic_name) {}// 增加訂閱者void appendSubscriber(const Subscriber::ptr &subscriber);// 刪除訂閱者void removeSubscriber(const Subscriber::ptr &subscriber);// 推送消息void pushMessage(const BaseMessage::ptr &msg);};private:std::mutex _mutex;std::unordered_map<std::string, Topic::ptr> _topics;std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;};}
}

1.7 整體代碼

#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <unordered_set>namespace rpc
{namespace server{class TopicManager{public:using ptr = std::shared_ptr<TopicManager>;// 統一處理有關主題的請求void onTopicRequest(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){TopicOptype topic_optype = msg->optype();bool ret = true;switch(topic_optype){case TopicOptype::TOPIC_CREATE: topicCreate(conn, msg); break;              // 主題創建case TopicOptype::TOPIC_REMOVE: topicRemove(conn, msg); break;              // 主題刪除case TopicOptype::TOPIC_SUBSCRIBE: ret = topicSubscribe(conn, msg); break;  // 主題訂閱case TopicOptype::TOPIC_CANCEL: topicCancel(conn, msg); break;              // 主題取消訂閱case TopicOptype::TOPIC_PUBLISH: ret = topicPublish(conn, msg); break;      // 主題發布default: return errorResponse(conn, msg, RCode::RCODE_INVALID_OPTYPE); break;}if(!ret) return errorResponse(conn, msg, RCode::RCODE_NOT_FOUND_TOPIC);return topicResponse(conn, msg);}// 關閉連接void onShutdown(const BaseConnection::ptr &conn){// 消息發布者斷開連接,不需要任何操作// 1. 判斷斷開連接的是否為訂閱者,不是的話直接返回// 2. 獲取到訂閱者退出,受影響的主題對象// 3. 從主題對象中,移除訂閱者// 4. 從訂閱者映射信息中,刪除訂閱者std::vector<Topic::ptr> topics;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto sub_it = _subscribers.find(conn);if(sub_it == _subscribers.end()){ELOG("該訂閱者連接不存在");return;}// 獲取該訂閱者鎖定月的主題subscriber = sub_it->second;// 2. 獲取到訂閱者退出,受影響的主題對象for(auto& topic_name : subscriber->_topics){auto topic_it = _topics.find(topic_name);if(topic_it == _topics.end())continue;topics.push_back(topic_it->second);}_subscribers.erase(sub_it);}}private:void errorResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg, RCode rcode){auto msg_rsp = MessageFactory::create<TopicResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(MType::RSP_TOPIC);msg_rsp->setRcode(rcode);return conn->send(msg_rsp);}void topicResponse(const BaseConnection::ptr& conn, const TopicRequest::ptr& msg){auto msg_rsp = MessageFactory::create<TopicResponse>();msg_rsp->setId(msg->rid());msg_rsp->setMType(MType::RSP_TOPIC);msg_rsp->setRcode(RCode::RCODE_OK);return conn->send(msg_rsp);}private:// 構造一個主題對象,添加映射關系的管理void topicCreate(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){std::unique_lock<std::mutex> lock(_mutex);// 獲取主題名字std::string topic_name = msg->topicKey();// 構造一個主題對象auto topic = std::make_shared<Topic>(topic_name);// 增加訂閱者_topics.insert(std::make_pair(topic_name, topic));std::cout << "創建主題" <<  topic_name << std::endl;}// 刪除一個主題對象,刪除映射關系的管理void topicRemove(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){// 1. 查看當前主題,有哪些訂閱者,然后從訂閱者中將主題信息刪掉// 2. 刪除主題的數據 -- 主題名稱余出題對象的映射std::string topic_name = msg->topicKey();std::unordered_set<Subscriber::ptr> subscribers; // 記錄 當前主題 的 所有訂閱者連接{std::unique_lock<std::mutex> lock(_mutex);auto it = _topics.find(msg->topicKey());if (it == _topics.end()){ELOG("沒有找到 %s 主題的訂閱者", msg->topicKey().c_str());return;}subscribers = it->second->_subscribers;_topics.erase(it); // 刪除主題對象}for (auto &subscriber : subscribers){subscriber->removeTopic(topic_name);}}// 將訂閱者訂閱到指定主題。bool topicSubscribe(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){Topic::ptr topic;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);// 1. 查找或創建訂閱者對象(Subscriber)auto topic_it = _topics.find(msg->topicKey());if (topic_it == _topics.end()){return false;}topic = topic_it->second;auto sub_it = _subscribers.find(conn);if (sub_it != _subscribers.end()){subscriber = sub_it->second;}else{subscriber = std::make_shared<Subscriber>(conn);_subscribers.insert(std::make_pair(conn, subscriber));}}// 2. 在主題對象中,新增一個訂閱者對象關聯的連接;  在訂閱者對象中新增一個訂閱的主題topic->appendSubscriber(subscriber);subscriber->appendTopic(msg->topicKey());return true;}// 取消該主題的訂閱者 void topicCancel(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){// 1. 先找出主題對象,和訂閱者對象Topic::ptr topic;Subscriber::ptr subscriber;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it != _topics.end()){topic = topic_it->second;}auto sub_it = _subscribers.find(conn);if (sub_it != _subscribers.end()){subscriber = sub_it->second;}}// 2. 從主對象中刪除當前的訂閱者連接if(subscriber)subscriber->removeTopic(msg->topicKey());if(topic && subscriber)topic->removeSubscriber(subscriber);}// 向各個該主題的 訂閱者 發布主題消息bool topicPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg){Topic::ptr topic;{std::unique_lock<std::mutex> lock(_mutex);auto topic_it = _topics.find(msg->topicKey());if (topic_it == _topics.end()){ELOG("沒有找到 %s 主題的訂閱者", msg->topicKey().c_str());return false;}topic = topic_it->second;}topic->pushMessage(msg);return true;}private:// 定義一個訂閱者對象,記錄其訂閱的 所有主題名稱struct Subscriber{using ptr = std::shared_ptr<Subscriber>;std::mutex _mutex;BaseConnection::ptr _conn;std::unordered_set<std::string> _topics; // 該訂閱者所訂閱的主題名稱Subscriber(const BaseConnection::ptr &conn) : _conn(conn) {};// 增加訂閱的主題void appendTopic(const std::string &topic){std::unique_lock<std::mutex> lock(_mutex);_topics.insert(topic);}void removeTopic(const std::string &topic){std::unique_lock<std::mutex> lock(_mutex);_topics.erase(topic);}};// 主題名稱 和 其訂閱者連接 的映射struct Topic{using ptr = std::shared_ptr<Topic>;std::mutex _mutex;std::string _topic_name;std::unordered_set<Subscriber::ptr> _subscribers; // 當前主題的訂閱者的連接Topic(const std::string &topic_name) : _topic_name(topic_name) {}// 增加訂閱者void appendSubscriber(const Subscriber::ptr &subscriber){std::unique_lock<std::mutex> lock(_mutex);_subscribers.insert(subscriber);}// 刪除訂閱者void removeSubscriber(const Subscriber::ptr &subscriber){std::unique_lock<std::mutex> lock(_mutex);_subscribers.erase(subscriber);}// 推送消息void pushMessage(const BaseMessage::ptr &msg){std::unique_lock<std::mutex> lock(_mutex);for (auto &subscriber : _subscribers){subscriber->_conn->send(msg);}}};private:std::mutex _mutex;std::unordered_map<std::string, Topic::ptr> _topics;std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;};}
}

🏳??🌈二、服務端整體實現

2.1 邏輯框架

一個 ?分布式 RPC 服務端系統

包含三個核心模塊:?服務注冊中心RPC 服務核心 和 ?主題服務

以下是各模塊的作用及整體架構:

2.1.1 注冊中心(RegistryServer)?

?功能

  • ?服務注冊與發現:接收服務提供者(Provider)注冊的服務信息,供消費者(Consumer)查詢可用服務。
  • 連接管理:在客戶端斷開時清理相關資源(如服務下線通知)。

?關鍵成員

  • _pd_manager:管理服務注冊與發現的業務邏輯(如維護服務列表)。
  • _dispatcher:分發客戶端請求到對應的處理邏輯。
  • _server:底層網絡服務器,監聽端口并處理連接。

?使用場景

  • 服務提供者啟動時向注冊中心注冊自身服務。
  • 消費者通過注冊中心查詢可用的服務地址和方法。

2.1.2 RPC 服務核心(RpcServer)?

?功能

  • ?RPC 服務管理:啟動 RPC 服務,處理客戶端調用請求。
  • 服務注冊(可選)?:將服務方法注冊到注冊中心(若啟用)。
  • ?請求路由:將 RPC 請求路由到對應的業務處理邏輯。

?關鍵成員

  • _router:路由請求到具體的服務方法(如根據方法名匹配處理函數)。
  • _req_client:與注冊中心通信的客戶端(用于服務注冊或發現)。
  • _dispatcher:分發網絡消息到業務邏輯。
    ?
    配置選項
    _enableRegistry:是否啟用注冊中心(決定是否自動注冊服務)。

2.1.3 主題服務(TopicServer)?

?功能

  • ?發布-訂閱模式:管理主題的創建、訂閱和消息推送。
  • ?消息廣播:向訂閱特定主題的客戶端推送消息。

?關鍵成員

  • _topic_manager:管理主題和訂閱者(如 TopicManager 類)。
  • _dispatcher:處理客戶端的訂閱/發布請求。

2.2 邏輯代碼

#pragma once#include "../common/dispatcher.hpp"
#include "../client/rpc_client.hpp"#include "rpc_route.hpp"
#include "rpc_registry.hpp"
#include "rpc_topic.hpp"namespace rpc
{namespace server{// 注冊中心的服務端實現// 啟動服務:監聽指定端口,接收客戶端(服務提供者/發現者)連接。// ?消息路由:將服務注冊/發現請求分發給業務處理器(PDManager)。// ?連接管理:在客戶端斷開時清理相關資源(如服務下線通知)。class RegistryServer{// 注冊中心服務端,只需要針對服務注冊與發現請求進行處理即可public:using ptr = std::shared_ptr<RegistryServer>;RegistryServer(int port);void start();private:void onConnShutdown(const BaseConnection::ptr &conn);private:ProviderDiscovererManager::ptr _pd_manager;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};// ?RPC 服務端核心類,負責管理 RPC 服務的生命周期// 啟動 RPC 服務:監聽指定端口,處理客戶端 RPC 請求。// ?服務注冊?(可選):將服務方法注冊到注冊中心,供客戶端發現。// ?請求路由:將接收到的 RPC 請求分發給對應的業務處理邏輯。class RpcServer{public:using ptr = std::shared_ptr<RpcServer>;RpcServer(const Address &access_addr, bool enableRegistry = false, const Address &registry_server_addr = Address());// 注冊服務到注冊中心void registerMethod(const ServiceDescribe::ptr &service);void start();private:bool _enableRegistry;Address _access_addr;rpc::client::RegistryClient::ptr _req_client;RpcRouter::ptr _router;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};class TopicServer{public:using ptr = std::shared_ptr<TopicServer>;TopicServer(int port);void start();private:void onConnShutdown(const BaseConnection::ptr &conn);private:TopicManager::ptr _topic_manager;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};}
}

2.3 整體代碼

#pragma once#include "../common/dispatcher.hpp"
#include "../client/rpc_client.hpp" #include "rpc_route.hpp"
#include "rpc_registry.hpp"
#include "rpc_topic.hpp"namespace rpc{namespace server{// 注冊中心的服務端實現// 啟動服務:監聽指定端口,接收客戶端(服務提供者/發現者)連接。// ?消息路由:將服務注冊/發現請求分發給業務處理器(PDManager)。// ?連接管理:在客戶端斷開時清理相關資源(如服務下線通知)。class RegistryServer{// 注冊中心服務端,只需要針對服務注冊與發現請求進行處理即可public:using ptr = std::shared_ptr<RegistryServer>;RegistryServer(int port): _pd_manager(std::make_shared<ProviderDiscovererManager>()),_dispatcher(std::make_shared<Dispatcher>()){ // 1. 注冊服務請求處理器// 將 PDManager::onServiceRequest 綁定到 MType::REQ_SERVICE 消息類型。當收到服務注冊或發現請求時,調用此方法處理auto service_cb = std::bind(&ProviderDiscovererManager::onServiceRequest, _pd_manager.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, service_cb);// 2. 創建底層服務器并設置回調// 通過 ServerFactory 創建底層服務器,設置消息總入口為 Dispatcher::onMessage_server = rpc::ServerFactory::create(port);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);// 3. 設置連接關閉回調// 當客戶端斷開連接時,調用 onConnShutdown 清理相關資源auto close_cb = std::bind(&RegistryServer::onConnShutdown, this, std::placeholders::_1);_server->setCloseCallback(close_cb);}void start(){_server->start();   // 啟動服務器,開始監聽端口}private:void onConnShutdown(const BaseConnection::ptr& conn){_pd_manager->onConnShutdown(conn);  // 通知 PDManager 處理連接斷開}private:ProviderDiscovererManager::ptr _pd_manager;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};// ?RPC 服務端核心類,負責管理 RPC 服務的生命周期// 啟動 RPC 服務:監聽指定端口,處理客戶端 RPC 請求。// ?服務注冊?(可選):將服務方法注冊到注冊中心,供客戶端發現。// ?請求路由:將接收到的 RPC 請求分發給對應的業務處理邏輯。class RpcServer{public:using ptr = std::shared_ptr<RpcServer>;RpcServer(const Address& access_addr, bool enableRegistry = false, const Address& registry_server_addr = Address()): _enableRegistry(enableRegistry),_access_addr(access_addr),_router(std::make_shared<rpc::server::RpcRouter>()),_dispatcher(std::make_shared<Dispatcher>()){// 1. ?創建注冊客戶端?(若啟用注冊)if(enableRegistry){_req_client = std::make_shared<client::RegistryClient>(registry_server_addr.first, registry_server_addr.second);}// 2. 注冊 RPC 請求處理回調auto rpc_cb = std::bind(&RpcRouter::onRpcRequest, _router.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<RpcRequest>(MType::REQ_RPC, rpc_cb);// 3. 創建底層服務器并設置回調_server = rpc::ServerFactory::create(access_addr.second);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);}// 注冊服務到注冊中心void registerMethod(const ServiceDescribe::ptr& service){if(_enableRegistry)_req_client->registryMethod(service->method(), _access_addr);_router->registerMethod(service);}void start(){_server->start();}private:bool _enableRegistry;Address _access_addr;rpc::client::RegistryClient::ptr _req_client;RpcRouter::ptr _router;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};class TopicServer{public:using ptr = std::shared_ptr<TopicServer>;TopicServer(int port): _topic_manager(std::make_shared<TopicManager>()),_dispatcher(std::make_shared<Dispatcher>()){// 1. 注冊主題請求處理器auto topic_cb = std::bind(&TopicManager::onTopicRequest, _topic_manager.get(), std::placeholders::_1, std::placeholders::_2);_dispatcher->registerHandler<TopicRequest>(MType::REQ_TOPIC, topic_cb);// 2. 創建底層服務器并設置回調_server = ServerFactory::create(port);auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);_server->setMessageCallback(message_cb);// 3. 設置連接關閉回調auto close_cb = std::bind(&TopicServer::onConnShutdown, this, std::placeholders::_1);_server->setCloseCallback(close_cb);}void start(){_server->start();}private:void onConnShutdown(const BaseConnection::ptr& conn){_topic_manager->onShutdown(conn);}private:TopicManager::ptr _topic_manager;Dispatcher::ptr _dispatcher;BaseServer::ptr _server;};}
}

👥總結

本篇博文對 從零實現Json-Rpc框架】- 項目實現 - 服務端主題實現及整體封裝 做了一個較為詳細的介紹,不知道對你有沒有幫助呢

覺得博主寫得還不錯的三連支持下吧!會繼續努力的~

請添加圖片描述

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/74420.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/74420.shtml
英文地址,請注明出處:http://en.pswp.cn/web/74420.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

AI與玩具結合的可行性分析

文章目錄 一、市場需求&#xff1a;教育與陪伴的雙重驅動&#xff08;一&#xff09;教育需求&#xff08;二&#xff09;情感陪伴需求&#xff08;三&#xff09;消費升級 二、技術發展&#xff1a;賦能玩具智能化&#xff08;一&#xff09;AI技術的成熟&#xff08;二&#…

基于 RK3588 的 YOLO 多線程推理多級硬件加速引擎框架設計(代碼框架和實現細節)

一、前言 接續上一篇文章&#xff0c;這個部分主要分析代碼框架的實現細節和設計理念。 基于RK3588的YOLO多線程推理多級硬件加速引擎框架設計&#xff08;項目總覽和加速效果&#xff09;-CSDN博客https://blog.csdn.net/plmm__/article/details/146542002?spm1001.2014.300…

LeetCode Hot100 刷題筆記(7)—— 貪心

目錄 前言 一、貪心 1. 買賣股票的最佳時機 2. 跳躍游戲 3. 跳躍游戲 II 4. 劃分字母區間 前言 一、貪心&#xff1a;買賣股票的最佳時機&#xff0c;跳躍游戲&#xff0c;跳躍游戲 II&#xff0c;劃分字母區間。 一、貪心 1. 買賣股票的最佳時機 原題鏈接&#xff1a;121. …

SQL語句的訓練

DELECT FROM 蜀國 WHEHE name 劉玄德 AND 創業進度<0.5 AND 存活狀態 true&#xff1b; 基礎的sql語句 SELECT >選擇列FROM >確認數據源JOIN >聯合操作WHERE >篩選數據GROUP BY >分組 HAVING >過濾分組的數據DISTINCT >去重ORDEY BY > 排序…

汽車 HMI 設計的發展趨勢與設計要點

一、汽車HMI設計的發展歷程與現狀 汽車人機交互界面&#xff08;HMI&#xff09;設計經歷了從簡單到復雜、從單一到多元的演變過程。2012年以前&#xff0c;汽車HMI主要依賴物理按鍵進行操作&#xff0c;交互方式較為單一。隨著特斯拉Model S的推出&#xff0c;觸控屏逐漸成為…

基于51單片機的模擬條形碼識別系統proteus仿真

地址&#xff1a; https://pan.baidu.com/s/1AtAry19X3BgavLqXcM4scg 提取碼&#xff1a;1234 仿真圖&#xff1a; 芯片/模塊的特點&#xff1a; AT89C52/AT89C51簡介&#xff1a; AT89C51 是一款常用的 8 位單片機&#xff0c;由 Atmel 公司&#xff08;現已被 Microchip 收…

CD22.【C++ Dev】類和對象(13) 流提取運算符的重載和const成員

目錄 1.流提取運算符>>的重載 知識回顧 重載方法 operator<<格式 operator>>格式 使用cin對日期類對象寫入數據 如果想指定格式輸入 方法1:getchar() 方法2:使用臨時變量接收字符 完善operator>>代碼(修bug) 2.類中的權限問題(const成員) …

Spring 核心技術解析【純干貨版】- XIX:Spring 日志模塊 Spring-Jcl 模塊精講

在現代 Java 開發中&#xff0c;日志是調試、監控和維護應用程序的重要工具。Spring 作為企業級框架&#xff0c;提供了 Spring-Jcl 作為日志抽象層&#xff0c;使開發者可以靈活切換不同的日志實現&#xff0c;而無需修改業務代碼。本篇文章將深入解析 Spring-Jcl 模塊&#x…

Hadoop集群---運維管理和技巧

一. daemon 守護進程管理 1. NameNode守護進程管理 hadoop-daemon.sh start namenode 2. DataNode守護進程管理 hadoop-daemon.sh start datanode 3. ResourceManager守護進程管理 yarn-daemon.sh start resourcemanager 4. NodeManager守護進程管理 yarn-daemon.sh st…

ngx_log_init

定義在 src\core\ngx_log.c ngx_log_t * ngx_log_init(u_char *prefix, u_char *error_log) {u_char *p, *name;size_t nlen, plen;ngx_log.file &ngx_log_file;ngx_log.log_level NGX_LOG_NOTICE;if (error_log NULL) {error_log (u_char *) NGX_ERROR_LOG_PATH;}…

網絡華為HCIA+HCIP 策略路由,雙點雙向

目錄 路由策略&#xff0c;策略路由 策略路由優勢 策略路由分類 接口策略路由 雙點雙向 雙點雙向路由引入特點: 聯系 路由回灌和環路問題 路由策略&#xff0c;策略路由 路由策略:是對路由條目進行控制&#xff0c;通過控制路由條目影響報文的轉發路徑&#xff0c;即路…

水下成像機理分析

一般情況下, 水下環境泛指浸入到人工水體 (如水庫、人工湖等)或自然水體(如海洋、河流、湖 泊、含水層等)中的區域。在水下環境中所拍攝 的圖像由于普遍受到光照、波長、水中懸浮顆粒物 等因素的影響&#xff0c;導致生成的水下圖像出現模糊、退 化、偏色等現象&#xff0c;圖像…

MySQL的數據目錄以及日志

1.MySQL數據目錄 MySQL服務器的管理信息、業務數據、?志?件、磁盤緩沖?件默認存儲在數據?錄下.數據目錄保存了我們用戶的信息,以及我們創建的數據庫和表的數據.維護了日志文件等.mysqld主要操作的就是我們的數據目錄. 如何查看數據目錄: ll /var/lib/mysql#ll 是查看指令 …

論文閱讀:Dual Anchor Graph Fuzzy Clustering for Multiview Data

論文地址:Dual Anchor Graph Fuzzy Clustering for Multiview Data | IEEE Journals & Magazine | IEEE Xplore 代碼地址&#xff1a;https://github.com/BBKing49/DAG_FC 摘要 多視角錨圖聚類近年來成為一個重要的研究領域&#xff0c;催生了多個高效的方法。然而&#…

32f4,串口1,usart.c.h2025

usart.c #include "sys.h" #include "usart.h" #include "led.h" // #include "stdlib.h" #include "stdarg.h" #include "stdio.h" //加入以下代碼,支持printf函數,而不需要選擇use MicroLIB #if 1#pragma…

C語言:一組位操作宏

解析協議時&#xff0c;取得位域的值是一種常見操作&#xff0c;這些宏可以輔助我們工作。 /* ** 將x的第n位置1 ** ** x 0x00000000 ** BIT_SET(x, 7) 0x00000080 */ #define BIT_SET(x, n) ((x) | (1 << (n)))/* ** 將x的第n位置為0 ** ** x 0x00000080 ** …

記一個使用BigDecimal所有類型變為整數的問題

場景 通過 Excel 導入數據&#xff0c;數據中包含金額。數據庫類型 decimal(18, 6) 問題 Excel 導入后所有的金額列都被四舍五入。經過測試&#xff0c;只有數據有整數時所有數據才會被四舍五入&#xff0c;全部為浮點類型沒有問題。 解決 強制設置小數位數 // RoundingM…

nodejs、socket.io、express + 實時線上聊天系統(自用筆記)

留個鏈接給自己參考用&#xff1a; socket.io官方文檔&#xff1a;介紹 | Socket.IO nodejs基礎語法&#xff1a;大前端技能講解&#xff1a;NodeJS、Npm、Es6、Webpack_nodejs webpack-CSDN博客 socket.io教學&#xff1a;半小時學會socket.io【中英字幕】Learn Socket.Io …

配置網絡編輯器

網絡斷開的原因 1.由于網絡未連接的情況 解決方法 方法1&#xff1a;檢查網卡配置 cd /etc/syscongfig/network_scripts vi ifcfg_ens31 方法2&#xff1a;打開虛擬機編輯--- 虛擬網絡編輯器 查看ip地址是否在可用的網段范圍內 修改后重啟網絡 systemctl restart netwo…

vscode代碼片段的設置與使用

在 Visual Studio Code (VS Code) 中&#xff0c;可以通過自定義**代碼片段&#xff08;Snippets&#xff09;**快速插入常用代碼模板。以下是詳細設置步驟&#xff1a; 步驟 1&#xff1a;打開代碼片段設置 按下快捷鍵 Ctrl Shift P&#xff08;Windows/Linux&#xff09;或…