目錄
?
1.lars-DnsV0.1回顧
2.Lars-DnsV0.2-訂閱功能的訂閱模塊分析
3.Lars-DnsV0.2-訂閱模塊的類的單例創建及方法屬性初始化
4.Lars-DnsV0.2-發布功能的實現
5.Lars-DnsV0.2-發布功能的總結
6.Lars-DnsV0.2-訂閱流程復習
7.Lars-DnsV0.2-訂閱模塊的集成
8.Lars-DnsV0.2訂閱模塊的測試
9.Lars-DnsV0.2訂閱模塊測試2
10.Lars-DnsV0.2的發布問題bug修復
11.Lars-DnsV0.2訂閱發布流程梳理
?
1.lars-DnsV0.1回顧
6) Route訂閱模式
### 6.1 訂閱模塊的設計與實現????????
?????????訂閱模式整體的設計.
> lars_dns/include/subscribe.h
```c
#pragma once
#include <vector>
#include <pthread.h>
#include <ext/hash_set>
#include <ext/hash_map>
#include "lars_reactor.h"
#include "lars.pb.h"
#include "dns_route.h"
using namespace __gnu_cxx;
//定義訂閱列表數據關系類型,key->modid/cmdid, value->fds(訂閱的客戶端文件描述符)
typedef hash_map<uint64_t, hash_set<int>> subscribe_map;
//定義發布列表的數據關系類型, key->fd(訂閱客戶端的文件描述符), value->modids
typedef hash_map<int, hash_set<uint64_t>> publish_map;
2.Lars-DnsV0.2-訂閱功能的訂閱模塊分析
class SubscribeList {
public:
????//設計單例
????static void init()??{
????????_instance = new SubscribeList();
????}
????static SubscribeList *instance() {
????????//保證init方法在這個進程執行中,只執行一次
????????pthread_once(&_once, init);
????????return _instance;
????}
????//訂閱
????void subscribe(uint64_t mod, int fd);
????
????//取消訂閱
????void unsubscribe(uint64_t mod, int fd);
????
????//發布
????void publish(std::vector<uint64_t> &change_mods);
????//根據在線用戶fd得到需要發布的列表
????void make_publish_map(listen_fd_set &online_fds,?
??????????????????????????publish_map &need_publish);
????
????
private:
????//設計單例
????SubscribeList();
????SubscribeList(const SubscribeList &);
????const SubscribeList& operator=(const SubscribeList);
????static SubscribeList *_instance;
????static pthread_once_t _once;
????subscribe_map _book_list; //訂閱清單
????pthread_mutex_t _book_list_lock;
????publish_map _push_list; //發布清單
????pthread_mutex_t _push_list_lock;
};
```
?????????首先`SubscribeList`采用單例設計。這里面定義了兩種數據類型
3.Lars-DnsV0.2-訂閱模塊的類的單例創建及方法屬性初始化
``c
//定義訂閱列表數據關系類型,key->modid/cmdid, value->fds(訂閱的客戶端文件描述符)
typedef hash_map<uint64_t, hash_set<int>> subscribe_map;
//定義發布列表的數據關系類型, key->fd(訂閱客戶端的文件描述符), value->modids
typedef hash_map<int, hash_set<uint64_t>> publish_map;
```
?????????`subscribe_map`是目前dns系統的總體訂閱列表,記錄了訂閱的modid/cmdid都有哪些fds已經訂閱了,啟動一個fd就代表一個客戶端。
?????????`publish_map`是即將發布的表,啟動這里面是subscribe_map的一個反表,可以是訂閱的客戶端fd,而value是該客戶端需要接收的訂閱modid/cmdid數據。
**屬性**:
`_book_list`:目前dns已經全部的訂閱信息清單。
`_push_list`:目前dns即將發布的客戶端及訂閱信息清單。
**方法**
`void subscribe(uint64_t mod, int fd)`: 加入modid/cmdid 和訂閱的客戶端fd到_book_list中。
`void unsubscribe(uint64_t mod, int fd)`:取消一條訂閱數據。
`void publish(std::vector<uint64_t> &change_mods)`: 發布訂閱數據,其中change_mods是需要發布的那些modid/cmdid組合。
`void make_publish_map(listen_fd_set &online_fds, publish_map &need_publish)`: 根據目前在線的訂閱用戶,得到需要通信的發布訂閱列表。
4.Lars-DnsV0.2-發布功能的實現
//根據在線用戶fd得到需要發布的列表
void SubscribeList::make_publish_map(
????????????listen_fd_set &online_fds,?
????????????publish_map &need_publish)
{
????publish_map::iterator it;
????pthread_mutex_lock(&_push_list_lock);
????//遍歷_push_list 找到 online_fds匹配的數據,放到need_publish中
????for (it = _push_list.begin(); it != _push_list.end(); it++)??{
????????//it->first 是 fd
????????//it->second 是 modid/cmdid
????????if (online_fds.find(it->first) != online_fds.end()) {
????????????//匹配到
????????????//當前的鍵值對移動到need_publish中
????????????need_publish[it->first] = _push_list[it->first];
????????????//當該組數據從_push_list中刪除掉
????????????_push_list.erase(it);
????????}
????}
????pthrea
5.Lars-DnsV0.2-發布功能的總結
//發布
void SubscribeList::publish(std::vector<uint64_t> &change_mods)
{
????//1 將change_mods已經修改的mod->fd?
????//??放到 發布清單_push_list中?
????pthread_mutex_lock(&_book_list_lock);
????pthread_mutex_lock(&_push_list_lock);
????std::vector<uint64_t>::iterator it;
????for (it = change_mods.begin(); it != change_mods.end(); it++) {
????????uint64_t mod = *it;
????????if (_book_list.find(mod) != _book_list.end()) {
????????????//將mod下面的fd set集合拷遷移到 _push_list中
????????????hash_set<int>::iterator fds_it;
????????????for (fds_it = _book_list[mod].begin(); fds_it != _book_list[mod].end(); fds_it++) {
????????????????int fd = *fds_it;
????????????????_push_list[fd].insert(mod);
????????????}
????????}
????}
????pthread_mutex_unlock(&_push_list_lock);
????pthread_mutex_unlock(&_book_list_lock);
????//2 通知各個線程去執行推送任務
????server->thread_poll()->send_task(push_change_task, this);
}
```
?????????這里需要注意的是`publish()`里的server變量是全局變量,全局唯一的server句柄。
6.Lars-DnsV0.2-訂閱流程復習
### 6.2 開啟訂閱
?????????那么訂閱功能實現了,該如何是調用觸發訂閱功能能,我們可以在一個客戶端建立連接成功之后來調用.
>??lars_dns/src/dns_service.cpp
```c
#include <ext/hash_set>
#include "lars_reactor.h"
#include "subscribe.h"
#include "dns_route.h"
#include "lars.pb.h"
tcp_server *server;
using __gnu_cxx::hash_set;
typedef hash_set<uint64_t> client_sub_mod_list;
// ...
//訂閱route 的modid/cmdid
void create_subscribe(net_connection * conn, void *args)
{
????conn->param = new client_sub_mod_list;
}
//退訂route 的modid/cmdid
void clear_subscribe(net_connection * conn, void *args)
{
????client_sub_mod_list::iterator it;
????client_sub_mod_list *sub_list = (client_sub_mod_list*)conn->param;
????for (it = sub_list->begin(); it??!= sub_list->end(); it++) {
????????uint64_t mod = *it;
????????SubscribeList::instance()->unsubscribe(mod, conn->get_fd());
????}
????delete sub_list;
????conn->param = NULL;
}
int main(int argc, char **argv)
{
????event_loop loop;
????//加載配置文件
????config_file::setPath("conf/lars_dns.conf");
????std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
????short port = config_file::instance()->GetNumber("reactor", "port", 7778);
????//創建tcp服務器
????server = new tcp_server(&loop, ip.c_str(), port);
????//==========注冊鏈接創建/銷毀Hook函數============
????server->set_conn_start(create_subscribe);
????server->set_conn_close(clear_subscribe);
??????//============================================
????//注冊路由業務
????server->add_msg_router(lars::ID_GetRouteRequest, get_route);
????//開始事件監聽????
????printf("lars dns service ....\n");
????loop.event_process();
????return 0;
}
```
?????????這里注冊了兩個鏈接Hook。`create_subscribe()`和`clear_subscribe()`。
`client_sub_mod_list`為當前客戶端鏈接所訂閱的route信息列表。主要存放當前客戶訂閱的modid/cmdid的集合。因為不同的客戶端訂閱的信息不同,所以要將該列表與每個conn進行綁定。
7.Lars-DnsV0.2-訂閱模塊的集成
## 7) Backend Thread實時監控
?????????????Backend Thread的后臺總業務流程如下:

### 7.1 數據庫表相關查詢方法實現
?????????我們先實現一些基本的數據表達查詢方法:
> lars_dns/src/dns_route.cpp
```c
/*
?*??return 0, 表示 加載成功,version沒有改變
?*?????????1, 表示 加載成功,version有改變
?*?????????-1 表示 加載失敗
?* */
int Route::load_version()
{
????//這里面只會有一條數據
????snprintf(_sql, 1000, "SELECT version from RouteVersion WHERE id = 1;");
????int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
????if (ret)
????{
????????fprintf(stderr, "load version error: %s\n", mysql_error(&_db_conn));
????????return -1;
????}
????MYSQL_RES *result = mysql_store_result(&_db_conn);
????if (!result)
????{
????????fprintf(stderr, "mysql store result: %s\n", mysql_error(&_db_conn));
????????return -1;
????}
????long line_num = mysql_num_rows(result);
????if (line_num == 0)
????{
????????fprintf(stderr, "No version in table RouteVersion: %s\n", mysql_error(&_db_conn));
????????return -1;
????}
????MYSQL_ROW row = mysql_fetch_row(result);
????//得到version
????long new_version = atol(row[0]);
????if (new_version == this->_version)
????{
????????//加載成功但是沒有修改
????????return 0;
????}
????this->_version = new_version;
????printf("now route version is %ld\n", this->_version);
????mysql_free_result(result);
????return 1;
}
8.Lars-DnsV0.2訂閱模塊的測試
//加載RouteChange得到修改的modid/cmdid
//將結果放在vector中
void Route::load_changes(std::vector<uint64_t> &change_list)?
{
????//讀取當前版本之前的全部修改?
????snprintf(_sql, 1000, "SELECT modid,cmdid FROM RouteChange WHERE version <= %ld;", _version);
????int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
????if (ret)
????{
????????fprintf(stderr, "mysql_real_query: %s\n", mysql_error(&_db_conn));
????????return ;
????}
????MYSQL_RES *result = mysql_store_result(&_db_conn);
????if (!result)
????{
????????fprintf(stderr, "mysql_store_result %s\n", mysql_error(&_db_conn));
????????return ;
????}
????long lineNum = mysql_num_rows(result);
????if (lineNum == 0)
????{
????????fprintf(stderr,??"No version in table ChangeLog: %s\n", mysql_error(&_db_conn));
????????return ;
????}
????MYSQL_ROW row;
????for (long i = 0;i < lineNum; ++i)
????{
????????row = mysql_fetch_row(result);
????????int modid = atoi(row[0]);
????????int cmdid = atoi(row[1]);
????????uint64_t key = (((uint64_t)modid) << 32) + cmdid;
????????change_list.push_back(key);
????}
????mysql_free_result(result);????
}
9.Lars-DnsV0.2訂閱模塊測試2
//將RouteChange
//刪除RouteChange的全部修改記錄數據,remove_all為全部刪除
//否則默認刪除當前版本之前的全部修改
void Route::remove_changes(bool remove_all)
{
????if (remove_all == false)
????{
????????snprintf(_sql, 1000, "DELETE FROM RouteChange WHERE version <= %ld;", _version);
????}
????else
????{
????????snprintf(_sql, 1000, "DELETE FROM RouteChange;");
????}
????int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
????if (ret != 0)
????{
????????fprintf(stderr, "delete RouteChange: %s\n", mysql_error(&_db_conn));
????????return ;
????}?
????return;
}
```
這里面提供了基本的對一些表的加載和刪除操作:
`load_version()`:加載當前route信息版本號。
`load_route_data()`:加載`RouteData`信息表,到_temp_pointer中。
`swap()`:將__temp_pointer的表數據同步到_data_temp表中.
`load_changes()`:加載RouteChange得到修改的modid/cmdid,將結果放在vector中
`remove_changes()`:清空之前的修改記錄。
10.Lars-DnsV0.2的發布問題bug修復
### 7.2 Backend Thread業務流程實現
> lars_dns/src/dns_route.cpp
```c
//周期性后端檢查db的route信息的更新變化業務
//backend thread完成
void *check_route_changes(void *args)
{
????int wait_time = 10;//10s自動修改一次,也可以從配置文件讀取
????long last_load_time = time(NULL);
????//清空全部的RouteChange
????Route::instance()->remove_changes(true);
????//1 判斷是否有修改
????while (true) {
????????sleep(1);
????????long current_time = time(NULL);
????????//1.1 加載RouteVersion得到當前版本號
????????int ret = Route::instance()->load_version();
????????if (ret == 1) {
????????????//version改版 有modid/cmdid修改
????????????//2 如果有修改
????????????//2.1 將最新的RouteData加載到_temp_pointer中
????????????if (Route::instance()->load_route_data() == 0) {
????????????????//2.2 更新_temp_pointer數據到_data_pointer map中
????????????????Route::instance()->swap();
????????????????last_load_time = current_time;//更新最后加載時間
????????????}
????????????//2.3 獲取被修改的modid/cmdid對應的訂閱客戶端,進行推送?????????
????????????std::vector<uint64_t> changes;
????????????Route::instance()->load_changes(changes);
????????????//推送
????????????SubscribeList::instance()->publish(changes);
????????????//2.4 刪除當前版本之前的修改記錄
????????????Route::instance()->remove_changes();
????????}
????????else {
????????????//3 如果沒有修改
????????????if (current_time - last_load_time >= wait_time) {
????????????????//3.1 超時,加載最新的temp_pointer
????????????????if (Route::instance()->load_route_data() == 0) {
????????????????????//3.2 _temp_pointer數據更新到_data_pointer map中
????????????????????Route::instance()->swap();
????????????????????last_load_time = current_time;
????????????????}
????????????}
????????}
????}
????return NULL;
}
```
11.Lars-DnsV0.2訂閱發布流程梳理
完成dns模塊的訂閱功能測試V0.3
?????????我們提供一個修改一個modid/cmdid的sql語句來觸發訂閱條件,并且讓dns service服務器主動給訂閱的客戶端發送該訂閱消息。
> lars_dns/test/test_insert_dns_route.sql
```sql
USE lars_dns;
SET @time = UNIX_TIMESTAMP(NOW());
INSERT INTO RouteData(modid, cmdid, serverip, serverport) VALUES(1, 1, 3232235953, 9999);
UPDATE RouteVersion SET version = @time WHERE id = 1;
INSERT INTO RouteChange(modid, cmdid, version) VALUES(1, 1, @time);
```
客戶端代碼:
> lars_dns/test/lars_dns_test1.cpp
```c
#include <string.h>
#include <unistd.h>
#include <string>
#include "lars_reactor.h"
#include "lars.pb.h"
//命令行參數
struct Option
{
????Option():ip(NULL),port(0) {}
????char *ip;
????short port;
};
Option option;
void Usage() {
????printf("Usage: ./lars_dns_test -h ip -p port\n");
}
//解析命令行
void parse_option(int argc, char **argv)
{
????for (int i = 0; i < argc; i++) {
????????if (strcmp(argv[i], "-h") == 0) {
????????????option.ip = argv[i + 1];
????????}
????????else if (strcmp(argv[i], "-p") == 0) {
????????????option.port = atoi(argv[i + 1]);
????????}
????}
????if ( !option.ip || !option.port ) {
????????Usage();
????????exit(1);
????}
}
?