C++負載均衡遠程調用學習之上報功能與存儲線程池

目錄

1. Lars-reportV0.1 report模塊介紹

2.Lars-reporterV0.1 reporter項目目錄構建

3.Lars-ReporterV0.1 數據表和proto協議環境搭建

4.Lars-ReporterV0.1上報請求業務處理

5.Lars-ReporterV0.1上報請求模塊的測試

6.Lars-ReporterV0.2開辟存儲線程池-網絡存儲分離


1. Lars-reportV0.1 report模塊介紹

?5) 存儲線程池及消息隊列

?????????我們現在的reporter_service的io入庫操作,完全是在消息的callback中進行的,那么實際上,這回占用我們server的工作線程的阻塞時間,從而浪費cpu。所以我們應該將io的入庫操作,交給一個專門做入庫的消息隊列線程池來做,這樣我們的callback就會立刻返回該業務,從而可以繼續處理下一個conn鏈接的消息事件業務。

?????????所以我們就要在此給reporter_service設計一個存儲數據的線程池及配套的消息隊列。當然這里面我們還是直接用寫好的`lars_reactor`框架里的接口即可。

> lars_reporter/src/reporter_service.cpp

```c
#include "lars_reactor.h"
#include "lars.pb.h"
#include "store_report.h"
#include <string>

thread_queue<lars::ReportStatusRequest> **reportQueues = NULL;
int thread_cnt = 0;

void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
????lars::ReportStatusRequest req;

????req.ParseFromArray(data, len);

????//將上報數據存儲到db?
????StoreReport sr;
????sr.store(req);

????//輪詢將消息平均發送到每個線程的消息隊列中
????static int index = 0;
????//將消息發送給某個線程消息隊列
????reportQueues[index]->send(req);
????index ++;
????index = index % thread_cnt;
}

void create_reportdb_threads()
{
????thread_cnt = config_file::instance()->GetNumber("reporter", "db_thread_cnt", 3);
????
????//開線程池的消息隊列
????reportQueues = new thread_queue<lars::ReportStatusRequest>*[thread_cnt];

????if (reportQueues == NULL) {
????????fprintf(stderr, "create thread_queue<lars::ReportStatusRequest>*[%d], error", thread_cnt) ;
????????exit(1);
????}

????for (int i = 0; i < thread_cnt; i++) {
????????//給當前線程創建一個消息隊列queue
????????reportQueues[i] = new thread_queue<lars::ReportStatusRequest>();
????????if (reportQueues == NULL) {
????????????fprintf(stderr, "create thread_queue error\n");
????????????exit(1);
????????}

????????pthread_t tid;
????????int ret = pthread_create(&tid, NULL, store_main, reportQueues[i]);
????????if (ret == -1)??{
????????????perror("pthread_create");
????????????exit(1);
????????}

????????pthread_detach(tid);
????}
}

2.Lars-reporterV0.1 reporter項目目錄構建

int main(int argc, char **argv)
{
????event_loop loop;

????//加載配置文件
????config_file::setPath("./conf/lars_reporter.conf");
????std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
????short port = config_file::instance()->GetNumber("reactor", "port", 7779);


????//創建tcp server
????tcp_server server(&loop, ip.c_str(), port);

????//添加數據上報請求處理的消息分發處理業務
????server.add_msg_router(lars::ID_ReportStatusRequest, get_report_status);

????//為了防止在業務中出現io阻塞,那么需要啟動一個線程池對IO進行操作的,接受業務的請求存儲消息
????create_reportdb_threads();
??
????//啟動事件監聽
????loop.event_process();?

????return 0;
}
```

?????????這里主線程啟動了線程池,根據配置文件的`db_thread_cnt`數量來開辟。每個線程都會執行`store_main`方法,我們來看一下實現

> lars_reporter/src/store_thread.cpp

```c
#include "lars.pb.h"
#include "lars_reactor.h"
#include "store_report.h"

struct Args?
{
????thread_queue<lars::ReportStatusRequest>* first;
????StoreReport *second;
};

//typedef void io_callback(event_loop *loop, int fd, void *args);
void thread_report(event_loop *loop, int fd, void *args)
{
????//1. 從queue里面取出需要report的數據(需要thread_queue)
????thread_queue<lars::ReportStatusRequest>* queue = ((Args*)args)->first;
????StoreReport *sr = ((Args*)args)->second;

????std::queue<lars::ReportStatusRequest> report_msgs;

????//1.1 從消息隊列中取出全部的消息元素集合
????queue->recv(report_msgs);
????while ( !report_msgs.empty() ) {
????????lars::ReportStatusRequest msg = report_msgs.front();
????????report_msgs.pop();

????????//2. 將數據存儲到DB中(需要StoreReport)
????????sr->store(msg);
????}
}

3.Lars-ReporterV0.1 數據表和proto協議環境搭建

void *store_main(void *args)
{
????//得到對應的thread_queue
????thread_queue<lars::ReportStatusRequest> *queue = (thread_queue<lars::ReportStatusRequest>*)args;

????//定義事件觸發機制
????event_loop loop;

????//定義一個存儲對象
????StoreReport sr;?

????Args callback_args;
????callback_args.first = queue;
????callback_args.second = &sr;

????queue->set_loop(&loop);
????queue->set_callback(thread_report, &callback_args);


????//啟動事件監聽
????loop.event_process();

????return NULL;
}
```

?????????每個線程都會綁定一個`thread_queue<lars::ReportStatusRequest>`,然后一個線程里面有一個loop,來監控消息隊列是否有消息事件過來,如果有消息實現過來,針對每個消息會觸發`thread_report()`方法, 在`thread_report()`中,我們就直接將`lars::ReportStatusRequest`消息存儲到db中。

?????????那么,由誰來給每個線程的`thread_queue`發送消息呢,就是agent/客戶端發送的請求,我們在處理`lars::ID_ReportStatusRequest`?消息分發業務的時候調用`get_report_status()`來觸發。

> lars_reporter/src/reporter_service.cpp

4.Lars-ReporterV0.1上報請求業務處理

```c
void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
????lars::ReportStatusRequest req;

????req.ParseFromArray(data, len);

????//將上報數據存儲到db?
????StoreReport sr;
????sr.store(req);

????//輪詢將消息平均發送到每個線程的消息隊列中
????static int index = 0;
????//將消息發送給某個線程消息隊列
????reportQueues[index]->send(req);
????index ++;
????index = index % thread_cnt;
}
```

?????????這里的分發機制,是采用最輪詢的方式,是每個線程依次分配,去調用`thread_queue`的`send()`方法,將消息發送給消息隊列。



?????????最后我們進行測試,效果跟之前的效果是一樣的。我們現在已經集成進來了存儲線程池,現在就不用擔心在處理業務的時候,因為DB等的io阻塞,使cpu得不到充分利用了。





###?

5.Lars-ReporterV0.1上報請求模塊的測試

# 六、Lars-Load Balance Agent負載代理



## 1) 簡介



?????????一個服務稱為一個模塊,一個模塊由modid+cmdid來標識
modid+cmdid的組合表示一個遠程服務,這個遠程服務一般部署在多個節點上

LB Agent以UDP方式為業務方提供:1、節點獲取服務;2、節點調用結果上報服務

### 1.1 業務1-節點獲取服務:

?????????業務方每次要向遠程服務發送消息時,先利用modid+cmdid去向LB Agent獲取一個可用節點,然后向該節點發送消息,完成一次遠程調用;具體獲取modid+cmdid下的哪個節點是由LB Agent負責的



### 1.2 業務2-節點調用結果上報服務

?????????對LB Agent節點的一次遠程調用后,調用結果會匯報給LB Agent,以便LB Agent根據自身的LB算法來感知遠程服務節點的狀態是空閑還是過載,進而控制節點獲取時的節點調度.

![4-Lars-agent](./pictures/4-Lars-agent.png)

6.Lars-ReporterV0.2開辟存儲線程池-網絡存儲分離

LB Agent擁有5個線程,一個LB算法:

-?UDP Server服務,并運行LB算法,對業務提供節點獲取和節點調用結果上報服務;為了增大系統吞吐量,使用3個UDP Server服務互相獨立運行LB算法:`modid+cmdid % 3 = i`的那些模塊的服務與調度,由第`i+1`個UDP Server線程負責
-?Dns Service Client:是dnsserver的客戶端線程,負責根據需要,向dnsserver獲取一個模塊的節點集合(或稱為獲取路由);UDP Server會按需向此線程的MQ寫入獲取路由請求,DSS Client將MQ到來的請求轉發到dnsserver,之后將dnsserver返回的路由信息更新到對應的UDP Server線程維護的路由信息中
-?Report Service Client:是reporter的客戶端線程,負責將每個模塊下所有節點在一段時間內的調用結果、過載情況上報到reporter Service端,便于觀察情況、做報警;本身消費MQ數據,UDP Server會按需向MQ寫入上報狀態請求

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/78518.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/78518.shtml
英文地址,請注明出處:http://en.pswp.cn/web/78518.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

LabVIEW三軸電機控制

在工業自動化迅猛發展的當下&#xff0c;多軸伺服電機控制系統在制造業、3D 打印等眾多領域的需求與日俱增。它不僅要實現高精度的單軸運動控制&#xff0c;還需保障多軸協同作業的精準度&#xff0c;對響應速度也有嚴格要求。LabVIEW 開發多軸伺服電機控制系統&#xff0c;有效…

驅動開發硬核特訓 · Day 27(下篇):深入掌握 Common Clock Framework 架構與實戰開發

節。 在本篇內容中&#xff0c;我們將圍繞 Linux 內核中的時鐘子系統核心架構 —— Common Clock Framework&#xff08;簡稱 CCF&#xff09;展開深入講解&#xff0c;目標是幫助你全面理解其設計理念、主要數據結構、注冊流程、驅動實現方式&#xff0c;以及如何基于 NXP i.M…

數據庫基礎:數據庫類型與MySQL特點詳解

一、數據庫的主要類型 1. 關系型數據庫(RDBMS) 特點:基于關系模型,數據以表格形式存儲 代表產品:MySQL、Oracle、SQL Server、PostgreSQL 優勢:ACID事務支持、強一致性、成熟的SQL標準 適用場景:需要復雜查詢和事務支持的場景 2. 非關系型數據庫(NoSQL) 文檔型數據庫:Mo…

49認知干貨:產品的生命周期及類型匯總

49章:產品的生命周期與類型劃分 宇宙是運動的而非靜止的,任何事物亦是如此。只要是存在的事物,便必然存在周期性變化,就像四季更替中的冬日枯樹、春日新芽、夏日繁茂與秋日凋零。 這也意味著:事物的發展,離不開周期的更迭與演化,死亡并非終點,而是一種新的循環轉變。 …

【2025最新】為什么用ElasticSearch?和傳統數據庫MySQL與什么區別?

Elasticsearch 深度解析&#xff1a;從原理到實踐 一、為什么選擇 Elasticsearch&#xff1f; 數據模型 Elasticsearch 是基于文檔的搜索引擎&#xff0c;它使用 JSON 文檔來存儲數據。在 Elasticsearch 中&#xff0c;相關的數據通常存儲在同一個文檔中&#xff0c;而不是分散…

Docker安裝Gitblit(圖文教程)

本章教程,使用Docker安裝部署Gitblit。 一、Gitblit簡介 Gitblit 是一個基于 Java 的 Git 倉庫管理工具,主要用于在局域網或小型團隊環境中搭建私有 Git 服務器。它提供了一個簡單易用的 Web 界面,用于瀏覽代碼、管理倉庫和用戶權限等。 二、拉取鏡像 sudo docker pull git…

nDCG(歸一化折損累計增益) 是衡量排序質量的指標,常用于搜索引擎或推薦系統

nDCG&#xff08;歸一化折損累計增益&#xff09; 是衡量排序質量的指標&#xff0c;常用于搜索引擎或推薦系統。核心思想是&#xff1a;排名越靠前的高質量結果&#xff0c;對整體評分的貢獻越大&#xff0c;但后續結果的貢獻會逐漸“打折”。最終通過對比實際排序與理想排序的…

《從線性到二維:CSS Grid與Flex的布局范式革命與差異解析》

在前端開發的廣袤宇宙中&#xff0c;CSS布局技術宛如閃耀的星辰&#xff0c;不斷革新與演進&#xff0c;為構建絢麗多彩的網頁世界提供了堅實的支撐。其中&#xff0c;CSS Grid布局與Flex布局作為兩顆璀璨的明星&#xff0c;以其獨特的魅力和強大的功能&#xff0c;深受開發者們…

2025年01月03日美蜥(杭州普瑞兼職)一面

目錄 vue2 vue3 的區別react 性能優化react 組件傳值v-for 和 v-if 的優先級react 中多個接口請求的數據&#xff0c;需要渲染到一個列表上怎么處理百萬條數據怎么渲染vue2、vue3 的響應式原理微前端了解嗎git 版本控制git mearge 和 git rebase 的區別垂直水平居中react 中實…

【聚類分析】基于copula的風光聯合場景生成與縮減

目錄 1 主要內容 風光出力場景生成方法 2 部分程序 3 程序結果 4 下載鏈接 1 主要內容 該程序方法復現《融合風光出力場景生成的多能互補微網系統優化配置》風光出力場景生成部分&#xff0c;目前大多數研究的是不計風光出力之間的相關性影響&#xff0c;但是地理位置相近…

LeetCode 1128 等價多米諾骨牌對的數量 題解

今天的每日一題&#xff0c;我的思路還是硬做&#xff0c;不如評論區通過狀壓寫的簡單&#xff0c;但是答題思路加算法實現是沒有問題的&#xff0c;且時間復雜度也是可以通過的&#xff0c;畢竟全是o(n) 那么我就來說一下我的思路&#xff0c;根據dominoes[i] [a, b] 與 domi…

技術部測試規范

簡短測試流程&#xff1a; 開發完成 -> 本地自測 -> 測試環境自測 -> 通知測試同事復測 -> 確認無誤后上生產 -> 生產環境自測 -> 再次通知測試同事復測 -> 提交產品驗收。 當然可以&#xff01;以下是進一步優化后的測試流程規范&#xff0c;特別強調了開…

算法每日一題 | 入門-順序結構-大象喝水

大象喝水 題目描述 一只大象口渴了&#xff0c;要喝 20 升水才能解渴&#xff0c;但現在只有一個深 h 厘米&#xff0c;底面半徑為 r 厘米的小圓桶 &#xff08;h 和 r 都是整數&#xff09;。問大象至少要喝多少桶水才會解渴。 這里我們近似地取圓周率 π 3.14 \pi3.14 π…

Qt中實現工廠模式

在Qt中實現工廠模式可以通過多種方式&#xff0c;具體選擇取決于需求和場景。以下是幾種常見的實現方法&#xff1a; 1. 簡單工廠模式通過一個工廠類根據參數創建不同對象。cppclass Shape {public: virtual void draw() 0; virtual ~Shape() default;};class Circle : publ…

【前端】ES6一本通_劃重點_補充面試題

近兩天更新完基本內容&#xff0c;后續長期更新&#xff0c;建議關注收藏點贊。 ES6&#xff08;ECMAScript 2015&#xff09;是現代 JavaScript 的基礎&#xff0c;在前端面試中非常常見。 本文已匯總的本站筆記 ES6最重要10特性 對象新增 數組新增 異步、生成器 Promise 模塊…

初識 iOS 開發中的證書固定

引言 在移動應用安全領域&#xff0c;HTTPS/TLS 是數據傳輸的第一道防線&#xff0c;但僅依賴系統默認的證書驗證仍有被中間人&#xff08;MITM&#xff09;攻擊的風險。Certificate Pinning&#xff08;證書固定&#xff09;通過將客戶端信任“釘”在指定的服務器證書或公鑰上…

單片機的各個種類及其詳細介紹

一、按架構分類的深度解析 1. ARM Cortex-M系列 核心優勢&#xff1a; 統一架構&#xff1a;ARM生態完善&#xff0c;工具鏈&#xff08;Keil、IAR、GCC&#xff09;通用。 性能分層&#xff1a;M0&#xff08;低功耗&#xff09;、M3&#xff08;平衡&#xff09;、M4/M7&am…

5.7/Q1,GBD數據庫最新文章解讀

文章題目&#xff1a;Global, regional, and national burden and trends of rheumatoid arthritis among the elderly population: an analysis based on the 2021 Global Burden of Disease study DOI&#xff1a;10.3389/fimmu.2025.1547763 中文標題&#xff1a;全球、區域…

從微服務到AI服務:Nacos 3.0如何重構下一代動態治理體系?

在現代微服務架構的浪潮中&#xff0c;Nacos早已成為開發者手中的“瑞士軍刀”。作為阿里巴巴開源的核心中間件&#xff0c;它通過動態服務發現、統一配置管理和服務治理能力&#xff0c;為云原生應用提供了堅實的基石。從初創公司到全球500強企業&#xff0c;Nacos憑借其開箱即…

Unity與Unreal Engine(UE)的深度解析及高級用法

以下是Unity與Unreal Engine(UE)的深度解析及高級用法對比,結合技術特性、行業應用與未來發展進行綜合闡述: 一、核心差異與適用場景對比 1. 技術架構與編程模式 Unity 語言與腳本:主要使用C#,語法簡潔且易于學習,適合快速原型開發和中小型項目。支持可視化腳本工具(如…