brpc的二次封裝以及brpc與etcd的聯合

目的:

搭配etcd的注冊中心管理能知道誰能提供什么服務,并用rpc進行服務調用

封裝思想:

信道管理,將不同服務主機的通信信道管理起來

封裝:

1.指定的信道管理類

一個服務通常會有多個節點,每個節點都會有自己的信道類,建立信道與服務的映射關系,服務一對多信道。

2.總體的信道管理類

管理多個服務的信道管理類管理起來

tips:我們沒必要將所有的服務信道都建立起來,我們得申明我們關心的服務,不關心的就可以不管理這個服務

#pragma once
#include <brpc/channel.h>
#include <string>
#include <vector>
#include <unordered_map>
#include <mutex>
#include <iostream>
#include "./logger.hpp"
namespace common{
class ServerChannel
{
public:using ChannelPtr = std::shared_ptr<brpc::Channel>;using Ptr = std::shared_ptr<ServerChannel>;ServerChannel(const std::string &servername): _service_name(servername), _index(0){}void append(const std::string &host){ChannelPtr newchannel=std::make_shared<brpc::Channel>();brpc::ChannelOptions options;options.connect_timeout_ms = -1; // 嘗試連接時長Default: 200 (milliseconds)options.timeout_ms = -1; // rpc調用等待時間// Max duration of RPC over this Channel RPC調用在信道的期間options.protocol = "baidu_std";options.max_retry = 3;int ret = newchannel->Init(host.c_str(), &options);if (ret == -1){LOG_ERROR("初始化{}-{}信道失敗!", _service_name, host);return;}std::unique_lock<std::mutex> lock(_mutex);_channels.push_back(newchannel);_hosts.insert({host, newchannel});}// 服務器下線了一個節點,則把這個信道刪除void remove(const std::string &host){std::unique_lock<std::mutex> lock(_mutex);auto score = _hosts.find(host);if (score == _hosts.end()){// 沒找到LOG_WARN("沒有找到該服務中的服務為{}主機名為{}的信道", _service_name, host);return;}// 找到了ChannelPtr scoreptr = score->second;for (auto it = _channels.begin(); it != _channels.end(); ++it){if (*it == scoreptr){_channels.erase(it);break;}}_hosts.erase(host);LOG_INFO("服務為{}主機名為{}的信道已經刪除", _service_name, host);}// 選擇一個信道給這個服務ChannelPtr choose(){std::unique_lock<std::mutex> lock(_mutex);if (_channels.size() == 0){LOG_INFO("該服務{}中暫時沒有信道", _service_name);return ChannelPtr();}int pos = _index++ % _channels.size();return _channels[pos];}private:int32_t _index;std::mutex _mutex;std::string _service_name;std::vector<ChannelPtr> _channels;// 節點與信道的位圖std::unordered_map<std::string, ChannelPtr> _hosts;
};class ServerManager
{
public:using Ptr = std::shared_ptr<ServerManager>;ServerManager() {}ServerChannel::ChannelPtr choose(const std::string &instance_name){std::string servicename=instance_name;std::unique_lock<std::mutex> lock(_mutex);auto serversret = _servers.find(servicename);if (serversret == _servers.end()){LOG_INFO("當前服務{}沒有可使用的節點!", servicename);return ServerChannel::ChannelPtr();}// 該服務被訂閱且上線了,提供一個該服務的節點else{       auto score = _servers.find(servicename);ServerChannel::Ptr serverchannal = score->second;return serverchannal->choose();}}void onlinechannel(const std::string &instance_name, const std::string &host){ServerChannel::Ptr onlineserver;std::string servicename=get_service_name(instance_name);{std::unique_lock<std::mutex> lock(_mutex);auto followret = _follows.find(servicename);if (followret == _follows.end()){LOG_INFO("該服務{}-{}上線了,暫時沒有被訂閱", servicename, host);return;}// 新增該服務中的一個節點// 當該服務未上線auto serversret = _servers.find(servicename);if (serversret == _servers.end()){// 沒有找到該服務 則添加該服務onlineserver = std::make_shared<ServerChannel>(servicename);_servers.insert({servicename, onlineserver});}else{onlineserver = serversret->second;}if (!onlineserver){LOG_ERROR("新增 {} 服務管理節點失敗!", servicename);return;}// 已經存在該服務}onlineserver->append(host);LOG_DEBUG("{}-{} 服務上線新節點,進行添加管理!", servicename, host);}void offlinechannel(const std::string &instance_name, const std::string &host){ServerChannel::Ptr onlineserver;std::string servicename=get_service_name(instance_name);{std::unique_lock<std::mutex> lock(_mutex);auto followret = _follows.find(servicename);if (followret == _follows.end()){LOG_INFO("該服務{}-{}下線了,暫時沒有被訂閱", servicename, host);return;}// 刪除該服務中的一個節點// 當該服務未上線auto serversret = _servers.find(servicename);if (serversret == _servers.end()){// 沒有找到該服務LOG_INFO("沒有找到該服務", servicename);return;}else{onlineserver = serversret->second;}if (!onlineserver){LOG_ERROR("刪除 {} 服務管理節點失敗!", servicename);return;}// 已經存在該服務}onlineserver->remove(host);LOG_DEBUG("{}-{} 服務下線該節點", servicename, host);}void declared(const std::string &servername){std::unique_lock<std::mutex> lock(_mutex);_follows.insert(servername);}std::string get_service_name(const std::string &server_instance_name){int pos=server_instance_name.find_last_of('/');if(pos==std::string::npos){return server_instance_name;}std::string retstring = server_instance_name.substr(0,pos);return retstring;}private:std::mutex _mutex;// 訂閱的服務 沒有被訂閱的服務不提供節點使用std::unordered_set<std::string> _follows;// 服務名稱 和 該服務的信道管理器std::unordered_map<std::string, ServerChannel::Ptr> _servers;
};
}

brpc與etcd的聯合

服務端:1.構建echo服務 2.搭建RPC服務器 3.啟動RPC服務器 4.在etcd上注冊這個RPC服務

客戶端: 1.構造RPC信道管理對象 2.再構造etcd的監控對象 3.再從RPC信道管理對象中獲取提供echo服務的信道 4.發起echo業務

??

??

由于choose找的是servicename : /service/echo

而在新增服務時是 /service/echo/instance 所以在新增的回調函數中要采取截斷的方法

?rigistry

#include <iostream>
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Value.hpp>
#include <etcd/Watcher.hpp>
#include "../common/logger.hpp"
#include "../common/etcd.hpp"
#include "../common/channel.hpp"
#include<gflags/gflags.h>
#include <functional>
#include <brpc/server.h>
#include <brpc/closure_guard.h>
#include <butil/logging.h>
#include "main.pb.cc"
DEFINE_bool(run_mode,false,"這是運行的模式默認為調試模式false");
DEFINE_string(log_file,"","發布模式下指定的文件默認為空");
DEFINE_int32(log_level,0,"調試模式下默認的等級TRACE");DEFINE_string(Host,"http://127.0.0.1:2379","服務注冊中心地址");
DEFINE_string(instance_Host,"127.0.0.1:7070","新上線服務的訪問地址");
DEFINE_string(base_service,"/service","服務器監控目錄");
DEFINE_string(instance_service,"/echo/instance","新上線的服務");
//一個 echo 中應該有多個 key - value ,如果定義成echo的話 只能對應一個服務的訪問地址
//一個 echo 中應該有多個instance - value
DEFINE_int32(listen_post,7070,"新上線服務的訪問地址");
class EchoServiceImpl:public example::EchoService               
{public:EchoServiceImpl(){}~EchoServiceImpl() override{} void Echo(google::protobuf::RpcController* controller,const ::example::EchoRequest* request,::example::EchoResponse* response,::google::protobuf::Closure* done) override{brpc::ClosureGuard rpc_guard(done);// ~ClosureGuard() {//if (_done) {//_done->Run();//}std::cout<<"收到了消息"<<request->message()<<std::endl;response->set_message(request->message()+"好的,我知道了");//_done->Run();}};
int main(int argc,char * argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);init_logger(FLAGS_run_mode,FLAGS_log_file,FLAGS_log_level);//1.啟動rpc服務器 并添加服務//2.etcd內注冊該服務 以及 訪問該服務的 地址logging::LoggingSettings logger;logger.logging_dest= logging :: LoggingDestination::LOG_TO_NONE;logging::InitLogging(logger);//2.創建服務器并添加業務brpc::Server server;EchoServiceImpl echosservice;int n=server.AddService(&echosservice,brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE);if(n<0){std::cout<<"AddService error"<<std::endl;exit(0);}//函數不匹配,源碼有可能設置的是指針類型,tips//3.設置服務器選項,并且啟動服務器brpc::ServerOptions opt;opt.idle_timeout_sec=-1;//嘗試連接時間 超時則退出opt.num_threads=1;//number of pthreads that server runs on. 單線程; //number of requests processed in parallel(并行)// Default: 0 (unlimited)int m=server.Start(FLAGS_listen_post,&opt);if(m<0){std::cout<<"server Start error"<<std::endl;exit(1);}Rigistry::ptr rigserver=std::make_shared<Rigistry>(FLAGS_Host);rigserver->registry(FLAGS_base_service+FLAGS_instance_service,FLAGS_instance_Host);//運行等待服務結束server.RunUntilAskedToQuit();//運行等待服務結束
}

discovey

#include <iostream>
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Value.hpp>
#include <etcd/Watcher.hpp>
#include "../common/logger.hpp"
#include "../common/etcd.hpp"
#include "../common/channel.hpp"
#include <gflags/gflags.h>
#include <functional>
#include <brpc/server.h>
#include <brpc/closure_guard.h>
#include <butil/logging.h>
#include "main.pb.cc"
DEFINE_bool(run_mode, false, "這是運行的模式默認為調試模式false");
DEFINE_string(log_file, "", "發布模式下指定的文件默認為空");
DEFINE_int32(log_level, 0, "調試模式下默認的等級TRACE");DEFINE_string(Host, "http://127.0.0.1:2379", "服務注冊中心地址");
DEFINE_string(instance_Host, "168.198.0.1::8080", "新上線服務的訪問地址");DEFINE_string(instance_service, "/echo", "新上線的服務");
DEFINE_string(base_dir, "/service", "新上線的服務");
DEFINE_string(call_service, "/service/echo", "新上線的服務");int main(int argc, char *argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);// 1.創建服務管理類 并申明訂閱的服務ServerManager::Ptr server = std::make_shared<ServerManager>();server->declared(FLAGS_call_service);// 2.將兩個回調函數綁定,創建發現對象auto onlineput = std::bind(&ServerManager::onlinechannel, (server.get()), std::placeholders::_1, std::placeholders::_2);auto offlineput = std::bind(&ServerManager::offlinechannel, (server.get()), std::placeholders::_1, std::placeholders::_2);// 3.發現對象發現該服務,創建Echo_ServicestubDiscovery::ptr Disserver = std::make_shared<Discovery>(FLAGS_Host, FLAGS_base_dir, onlineput, offlineput);while (1){auto channel = server->choose(FLAGS_call_service);if (!channel){std::this_thread::sleep_for(std::chrono::seconds(1));return -1;}example::EchoService_Stub echoServiceStub(channel.get());brpc::Controller *control = new brpc::Controller();control->Reset();example::EchoRequest request1;request1.set_message("你好啊少年");example::EchoResponse *response1 = new example::EchoResponse();// 4.Echo_Servicestub調用Echo服務// rpc業務調用echoServiceStub.Echo(control, &request1, response1, nullptr);if (control->Failed()){std::cout << "rpc echo service failed" << control->ErrorText() << std::endl;delete control;delete response1;std::this_thread::sleep_for(std::chrono::seconds(1));continue;}std::cout << "client收到響應" << response1->message() << std::endl;std::this_thread::sleep_for(std::chrono::seconds(1));}return 0;
}

?

?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/62277.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/62277.shtml
英文地址,請注明出處:http://en.pswp.cn/web/62277.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【提升效率】如何寫好一份詳細設計文檔

版本日期修訂人描述V1.02024/12/6nick huang創建文檔 背景 CSDN在發起“如何做好一份技術文檔”的活動。 想起我最近在寫一份詳細設計&#xff0c;有一些感受&#xff1a; 一份考慮較周全的“詳細設計文檔模板”能起到質量保底的作用。 當一名初級技術人員需要編寫詳細設計文…

電阻計RM3544、RM3545的使用

目錄&#xff1a; 一、電阻計與PC通訊 1、硬件連接 2、RmLogger.exe的使用 二、RM3545測量35uΩ電阻 一、電阻計與PC通訊 1、硬件連接 可以設置USB或COM口(串口)連接PC&#xff0c;也可以設置為“打印”輸出。 1&#xff09;使用USB連接PC 2&#xff09;使用串口連接PC …

Jenkins 的HTTP Request 插件為什么不能配置Basic認證了

本篇遇到的問題 還是因為Jenkins需要及其所在的OS需要升級&#xff0c;升級策略是在一臺新服務器上安裝和配置最新版本的Jenkins&#xff0c; 當前的最新版本是&#xff1a; 2.479.2 LTS。 如果需要這個版本的話可以在官方站點下載&#xff0c;也可以到如下地址下載&#xff1…

uniapp 封裝自定義頭部導航欄

封裝原因 項目中有時候需要使用自定義的頭部導航欄&#xff0c;原生的無法滿足需求 參數 屬性名描述示例title標題字符串&#xff1a;首頁bgColor背景色字符串&#xff1a;#ffftype左側的操作內容字符串&#xff1a;all&#xff0c;詳細值請在下方查看 參數解釋 type all…

docker學習筆記(五)--docker-compose

文章目錄 常用命令docker-compose是什么yml配置指令詳解versionservicesimagebuildcommandportsvolumesdepends_on docker-compose.yml文件編寫 常用命令 命令說明docker-compose up啟動所有docker-compose服務&#xff0c;通常加上-d選項&#xff0c;讓其運行在后臺docker-co…

Linux中inode

磁盤的空間管理 如何對磁盤空間進行管理&#xff1f; 假設在一塊大小為500G的磁盤中&#xff0c;500*1024*1024524288000KB。在磁盤中&#xff0c;扇區是磁盤的基本單位&#xff08;一般大小為512byte&#xff09;&#xff0c;而文件系統訪問磁盤的基本單位是4KB&#xff0c;因…

5G揚帆乘勁風,遨游通訊賦能千行百業譜新篇

在大型工廠&#xff0c;輕觸手機屏幕&#xff0c;實時庫存數據、人員定位等信息便躍然眼前、一目了然&#xff1b;在邊遠油田&#xff0c;動動手指&#xff0c;即可實時查詢設備溫度、危險氣體濃度等信息&#xff0c;大數據瞬間盡在“掌”握……在遨游5G防爆智能手機的助力下&a…

RT Thread Studio新建STM32F407IG工程文件編譯提示錯誤

編譯提示錯誤 原因: RT 源碼使用4.0.3的話&#xff0c;請用STM32F4支持包的0.2.2版本&#xff0c;就不會出錯了。 如果支持包用0.2.3版本的話&#xff0c;需要用RT內核4.1.0版本。0.2.3 版本更新了一些針對內核4.1.0的驅動代碼&#xff0c;這幾個定義都是4.1.0里的。

學生管理系統(java)

1.前期準備 &#xff08;1&#xff09;新建java項目 &#xff08;2&#xff09;新建java軟件包以及三個文件Student.java,Student.txt,StuSystem.java Student.java package student_management_system;public class Student {private String id;private String name;private…

JavaWeb學習(2)(Cookie原理(超詳細)、HTTP無狀態)

目錄 一、HTTP無狀態。 &#xff08;1&#xff09;"記住我"&#xff1f; &#xff08;2&#xff09;HTTP無狀態。 &#xff08;3&#xff09;信息存儲客戶端中。如何處理&#xff1f; 1、loaclStorage與sessionStorage。 2、Cookie。 二、Cookie。 &#xff08;1&…

SpringBoot教程(三十二) SpringBoot集成Skywalking鏈路跟蹤

SpringBoot教程&#xff08;三十二&#xff09; | SpringBoot集成Skywalking鏈路跟蹤 一、Skywalking是什么&#xff1f;二、Skywalking與JDK版本的對應關系三、Skywalking下載四、Skywalking 數據存儲五、Skywalking 的啟動六、部署探針 前提&#xff1a; Agents 8.9.0 放入 …

flask創建templates目錄存放html文件

首先&#xff0c;創建flask項目&#xff0c;在pycharm中File --> New Project&#xff0c;選擇Flask項目。 然后&#xff0c;在某一目錄下&#xff0c;新建名為templates的文件夾&#xff0c;這時會是一個普通的文件夾。 然后右擊templates文件夾&#xff0c;選擇Unmark as …

Java進階(注解,設計模式,對象克隆)

Java進階(注解&#xff0c;設計模式&#xff0c;對象克隆) 一. 注解 1.1 什么是注解 java中注解(Annotation)&#xff0c;又稱java標注&#xff0c;是一種特殊的注釋 可以添加在包&#xff0c;類&#xff0c;成員變量&#xff0c;方法&#xff0c;參數等內容上 注解會隨同…

部署loki,grafana 以及springcloud用法舉例

文章目錄 場景docker 部署grafanadocker-compose部署loki維護配置文件 local-config.yaml維護docker-compose.yml配置啟動 grafana 添加loki數據源springcloud用法舉例查看loki的explore,查看日志 場景 小公司缺少運維崗位&#xff0c;需要研發自己部署日志系統&#xff0c;elk…

keil報錯---connection refused due to device mismatch

解決辦法如下&#xff1a; 記得改成1 把Enable取消

第三節、電機定速轉動【51單片機-TB6600驅動器-步進電機教程】

摘要&#xff1a;本節介紹用定時器定時的方式&#xff0c;精準控制脈沖時間&#xff0c;從而控制步進電機速度 一、計算過程 1.1 電機每一步的角速度等于走這一步所花費的時間&#xff0c;走一步角度等于步距角&#xff0c;走一步的時間等于一個脈沖的時間 w s t e p t … ……

vue中pdf.js的使用,包括pdf顯示,跳轉指定頁面,高亮關鍵詞

目錄 一、下載pdf.js 二、引入到本地的項目中 三、實現預覽pdf 四、跳轉到指定頁面 五、利用pdf里面的find查找關鍵詞 六、修改頁面大小為實際大小 一、下載pdf.js https://github.com/mozilla/pdf.js 里面有很多的版本&#xff0c; 高版本的可能瀏覽器不兼容或者還要考…

OD B卷【連續字母長度】

題目 給定一個字符串&#xff0c;只包含大寫字母&#xff0c;求在包含同一字母的子串中&#xff0c;長度第k長的子串的長度&#xff0c;相同字母只取最長的那個子串。 輸入描述&#xff1a; 第一行輸入一個子串&#xff08;長【1,100】&#xff09;&#xff0c;只包含大寫字母…

python中的 Pydantic 框架介紹

Pydantic 框架介紹 Pydantic 是一個用于數據驗證和設置管理的 Python 庫。它主要通過數據模型類的定義來處理 JSON 數據、解析請求和響應數據&#xff0c;并提供自動化的驗證和轉換。Pydantic 主要用于處理 Python 類型的安全性和驗證&#xff0c;尤其在 FastAPI 等現代 Pytho…

橋接模式和組合模式的區別

橋接模式&#xff08;Bridge Pattern&#xff09;和組合模式&#xff08;Composite Pattern&#xff09;都是結構型設計模式&#xff0c;旨在解決對象結構的復雜性問題&#xff0c;但它們的應用場景和目的有所不同。以下是它們的區別&#xff1a; 1. 定義與目的 橋接模式&…