目錄
1. Lars-reportV0.1 report模塊介紹
2.Lars-reporterV0.1 reporter項目目錄構建
3.Lars-ReporterV0.1 數據表和proto協議環境搭建
4.Lars-ReporterV0.1上報請求業務處理
5.Lars-ReporterV0.1上報請求模塊的測試
6.Lars-ReporterV0.2開辟存儲線程池-網絡存儲分離
1. Lars-reportV0.1 report模塊介紹
?5) 存儲線程池及消息隊列
?????????我們現在的reporter_service的io入庫操作,完全是在消息的callback中進行的,那么實際上,這回占用我們server的工作線程的阻塞時間,從而浪費cpu。所以我們應該將io的入庫操作,交給一個專門做入庫的消息隊列線程池來做,這樣我們的callback就會立刻返回該業務,從而可以繼續處理下一個conn鏈接的消息事件業務。
?????????所以我們就要在此給reporter_service設計一個存儲數據的線程池及配套的消息隊列。當然這里面我們還是直接用寫好的`lars_reactor`框架里的接口即可。
> lars_reporter/src/reporter_service.cpp
```c
#include "lars_reactor.h"
#include "lars.pb.h"
#include "store_report.h"
#include <string>
thread_queue<lars::ReportStatusRequest> **reportQueues = NULL;
int thread_cnt = 0;
void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
????lars::ReportStatusRequest req;
????req.ParseFromArray(data, len);
????//將上報數據存儲到db?
????StoreReport sr;
????sr.store(req);
????//輪詢將消息平均發送到每個線程的消息隊列中
????static int index = 0;
????//將消息發送給某個線程消息隊列
????reportQueues[index]->send(req);
????index ++;
????index = index % thread_cnt;
}
void create_reportdb_threads()
{
????thread_cnt = config_file::instance()->GetNumber("reporter", "db_thread_cnt", 3);
????
????//開線程池的消息隊列
????reportQueues = new thread_queue<lars::ReportStatusRequest>*[thread_cnt];
????if (reportQueues == NULL) {
????????fprintf(stderr, "create thread_queue<lars::ReportStatusRequest>*[%d], error", thread_cnt) ;
????????exit(1);
????}
????for (int i = 0; i < thread_cnt; i++) {
????????//給當前線程創建一個消息隊列queue
????????reportQueues[i] = new thread_queue<lars::ReportStatusRequest>();
????????if (reportQueues == NULL) {
????????????fprintf(stderr, "create thread_queue error\n");
????????????exit(1);
????????}
????????pthread_t tid;
????????int ret = pthread_create(&tid, NULL, store_main, reportQueues[i]);
????????if (ret == -1)??{
????????????perror("pthread_create");
????????????exit(1);
????????}
????????pthread_detach(tid);
????}
}
2.Lars-reporterV0.1 reporter項目目錄構建
int main(int argc, char **argv)
{
????event_loop loop;
????//加載配置文件
????config_file::setPath("./conf/lars_reporter.conf");
????std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
????short port = config_file::instance()->GetNumber("reactor", "port", 7779);
????//創建tcp server
????tcp_server server(&loop, ip.c_str(), port);
????//添加數據上報請求處理的消息分發處理業務
????server.add_msg_router(lars::ID_ReportStatusRequest, get_report_status);
????//為了防止在業務中出現io阻塞,那么需要啟動一個線程池對IO進行操作的,接受業務的請求存儲消息
????create_reportdb_threads();
??
????//啟動事件監聽
????loop.event_process();?
????return 0;
}
```
?????????這里主線程啟動了線程池,根據配置文件的`db_thread_cnt`數量來開辟。每個線程都會執行`store_main`方法,我們來看一下實現
> lars_reporter/src/store_thread.cpp
```c
#include "lars.pb.h"
#include "lars_reactor.h"
#include "store_report.h"
struct Args?
{
????thread_queue<lars::ReportStatusRequest>* first;
????StoreReport *second;
};
//typedef void io_callback(event_loop *loop, int fd, void *args);
void thread_report(event_loop *loop, int fd, void *args)
{
????//1. 從queue里面取出需要report的數據(需要thread_queue)
????thread_queue<lars::ReportStatusRequest>* queue = ((Args*)args)->first;
????StoreReport *sr = ((Args*)args)->second;
????std::queue<lars::ReportStatusRequest> report_msgs;
????//1.1 從消息隊列中取出全部的消息元素集合
????queue->recv(report_msgs);
????while ( !report_msgs.empty() ) {
????????lars::ReportStatusRequest msg = report_msgs.front();
????????report_msgs.pop();
????????//2. 將數據存儲到DB中(需要StoreReport)
????????sr->store(msg);
????}
}
3.Lars-ReporterV0.1 數據表和proto協議環境搭建
void *store_main(void *args)
{
????//得到對應的thread_queue
????thread_queue<lars::ReportStatusRequest> *queue = (thread_queue<lars::ReportStatusRequest>*)args;
????//定義事件觸發機制
????event_loop loop;
????//定義一個存儲對象
????StoreReport sr;?
????Args callback_args;
????callback_args.first = queue;
????callback_args.second = &sr;
????queue->set_loop(&loop);
????queue->set_callback(thread_report, &callback_args);
????//啟動事件監聽
????loop.event_process();
????return NULL;
}
```
?????????每個線程都會綁定一個`thread_queue<lars::ReportStatusRequest>`,然后一個線程里面有一個loop,來監控消息隊列是否有消息事件過來,如果有消息實現過來,針對每個消息會觸發`thread_report()`方法, 在`thread_report()`中,我們就直接將`lars::ReportStatusRequest`消息存儲到db中。
?????????那么,由誰來給每個線程的`thread_queue`發送消息呢,就是agent/客戶端發送的請求,我們在處理`lars::ID_ReportStatusRequest`?消息分發業務的時候調用`get_report_status()`來觸發。
> lars_reporter/src/reporter_service.cpp
4.Lars-ReporterV0.1上報請求業務處理
```c
void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
????lars::ReportStatusRequest req;
????req.ParseFromArray(data, len);
????//將上報數據存儲到db?
????StoreReport sr;
????sr.store(req);
????//輪詢將消息平均發送到每個線程的消息隊列中
????static int index = 0;
????//將消息發送給某個線程消息隊列
????reportQueues[index]->send(req);
????index ++;
????index = index % thread_cnt;
}
```
?????????這里的分發機制,是采用最輪詢的方式,是每個線程依次分配,去調用`thread_queue`的`send()`方法,將消息發送給消息隊列。
?????????最后我們進行測試,效果跟之前的效果是一樣的。我們現在已經集成進來了存儲線程池,現在就不用擔心在處理業務的時候,因為DB等的io阻塞,使cpu得不到充分利用了。
###?
5.Lars-ReporterV0.1上報請求模塊的測試
# 六、Lars-Load Balance Agent負載代理
## 1) 簡介
?????????一個服務稱為一個模塊,一個模塊由modid+cmdid來標識
modid+cmdid的組合表示一個遠程服務,這個遠程服務一般部署在多個節點上
LB Agent以UDP方式為業務方提供:1、節點獲取服務;2、節點調用結果上報服務
### 1.1 業務1-節點獲取服務:
?????????業務方每次要向遠程服務發送消息時,先利用modid+cmdid去向LB Agent獲取一個可用節點,然后向該節點發送消息,完成一次遠程調用;具體獲取modid+cmdid下的哪個節點是由LB Agent負責的
### 1.2 業務2-節點調用結果上報服務
?????????對LB Agent節點的一次遠程調用后,調用結果會匯報給LB Agent,以便LB Agent根據自身的LB算法來感知遠程服務節點的狀態是空閑還是過載,進而控制節點獲取時的節點調度.

6.Lars-ReporterV0.2開辟存儲線程池-網絡存儲分離
LB Agent擁有5個線程,一個LB算法:
-?UDP Server服務,并運行LB算法,對業務提供節點獲取和節點調用結果上報服務;為了增大系統吞吐量,使用3個UDP Server服務互相獨立運行LB算法:`modid+cmdid % 3 = i`的那些模塊的服務與調度,由第`i+1`個UDP Server線程負責
-?Dns Service Client:是dnsserver的客戶端線程,負責根據需要,向dnsserver獲取一個模塊的節點集合(或稱為獲取路由);UDP Server會按需向此線程的MQ寫入獲取路由請求,DSS Client將MQ到來的請求轉發到dnsserver,之后將dnsserver返回的路由信息更新到對應的UDP Server線程維護的路由信息中
-?Report Service Client:是reporter的客戶端線程,負責將每個模塊下所有節點在一段時間內的調用結果、過載情況上報到reporter Service端,便于觀察情況、做報警;本身消費MQ數據,UDP Server會按需向MQ寫入上報狀態請求