目錄
隊列數據管理
代碼實現
測試代碼
綁定信息(交換機-隊列)管理
代碼實現
測試代碼
隊列數據管理
當前隊列數據的管理,本質上是隊列描述信息的管理,描述當前服務器上有哪些隊列。
- 定義隊列描述數據類
- 隊列名稱
- 是否持久化標志
是否獨占標志是否自動刪除標志其他參數
- 定義隊列數據持久化類(數據持久化的 sqlite3 數據庫中)
- 創建/刪除隊列數據表
- 新增隊列數據
- 移除隊列數據
- 查詢所有隊列數據
- 定義隊列數據管理類
- 創建隊列,并添加管理(存在則 OK,不存在則創建)
- 刪除隊列
- 獲取指定隊列
- 獲取所有隊列
- 判斷指定隊列是否存在
- 獲取隊列數量
- 銷毀所有隊列數據
代碼實現
與交換機數據管理的實現非常相似,只需要修改表結構即可
#pragma once
#include "../mqcommon/helper.hpp"
#include "../mqcommon/log.hpp"
#include "../mqcommon/msg.pb.h"
#include <unordered_map>
#include <memory>namespace jiuqi
{struct MsgQueue{using ptr = std::shared_ptr<MsgQueue>;std::string name;bool durable;bool exclusive;bool auto_delete;std::unordered_map<std::string, std::string> args;MsgQueue() {}MsgQueue(const std::string &qname,bool qdurable,bool qexclusive,bool qauto_delete,const std::unordered_map<std::string, std::string> qargs): name(qname), durable(qdurable), exclusive(qexclusive), auto_delete(qauto_delete), args(qargs) {}void setArgs(const std::string &str_args){// key=val&key=val.....std::vector<std::string> sub_args;StrHelper::split(str_args, "&", sub_args);for (auto &arg : sub_args){size_t pos = arg.find("=");std::string key = arg.substr(0, pos);std::string val = arg.substr(pos + 1);args.insert(std::make_pair(key, val));}}std::string getArgs(){if (args.empty())return "";std::string result;for (auto &arg : args){result += arg.first + "=" + arg.second + "&";}result.pop_back();return result;}};using QueueMap = std::unordered_map<std::string, MsgQueue::ptr>;class QueueMapper{public:QueueMapper(const std::string &dbfile) : _sql_helper(dbfile){std::string path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path); assert(_sql_helper.open());createTable();}void createTable(){std::stringstream ss;ss << "create table if not exists queue_table("<< "name varchar(32) primary key, "<< "durable int, "<< "exclusive int, "<< "auto_delete int, "<< "args varchar(128));";std::string sql = ss.str();bool ret = _sql_helper.exec(sql, nullptr, nullptr);if (!ret){ERROR("創建隊列數據庫表失敗");abort();}}void removeTable(){std::string sql = "drop table if exists queue_table;";bool ret = _sql_helper.exec(sql, nullptr, nullptr);if (!ret){ERROR("刪除交換機數據庫表失敗");abort();}}bool insert(MsgQueue::ptr &queue){std::stringstream ss;ss << "insert into queue_table values('"<< queue->name << "', "<< queue->durable << ", "<< queue->exclusive << ", "<< queue->auto_delete << ", '"<< queue->getArgs() << "');";std::string sql = ss.str();bool ret = _sql_helper.exec(sql, nullptr, nullptr);return ret;}bool remove(const std::string &name){std::stringstream ss;ss << "delete from queue_table where "<< "name = " << "'" << name << "';";std::string sql = ss.str();bool ret = _sql_helper.exec(sql, nullptr, nullptr);return ret;}QueueMap recovery(){QueueMap result;std::string sql = "select name, durable, exclusive, auto_delete, args from queue_table";_sql_helper.exec(sql, selectCallback, &result);return result;}private:static int selectCallback(void *arg, int numcol, char **row, char **fields){QueueMap *result = (QueueMap *)arg;MsgQueue::ptr mqp = std::make_shared<MsgQueue>();mqp->name = row[0];mqp->durable = (bool)std::stoi(row[1]);mqp->exclusive = (bool)std::stoi(row[2]);mqp->auto_delete = (bool)std::stoi(row[3]);if (row[4])mqp->setArgs(row[4]);result->insert(std::make_pair(mqp->name, mqp));return 0;}private:SqliteHelper _sql_helper;};class QueueManager{public:using ptr = std::shared_ptr<QueueManager>;QueueManager(const std::string &dbfile) : _mapper(dbfile){_queues = _mapper.recovery();}void declareQueue(const std::string &name,bool durable,bool exclusive,bool auto_delete,std::unordered_map<std::string, std::string> &args){std::unique_lock<std::mutex> lock(_mutex);auto it = _queues.find(name);if (it != _queues.end())return;auto queue = std::make_shared<MsgQueue>(name, durable, exclusive, auto_delete, args);_queues.insert(std::make_pair(name, queue));if (durable)_mapper.insert(queue);}void deleteQueue(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);auto it = _queues.find(name);if (it == _queues.end())return;if (it->second->durable)_mapper.remove(name);_queues.erase(name);}MsgQueue::ptr selectQueue(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);auto it = _queues.find(name);if (it == _queues.end())return nullptr;return it->second;}QueueMap allQueue(){std::unique_lock<std::mutex> lock(_mutex);return _queues;}bool exists(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);auto it = _queues.find(name);if (it == _queues.end())return false;return true; }size_t size(){std::unique_lock<std::mutex> lock(_mutex);return _queues.size();}void clear(){std::unique_lock<std::mutex> lock(_mutex);_mapper.removeTable();_queues.clear();}private:std::mutex _mutex;QueueMapper _mapper;QueueMap _queues;};
}
測試代碼
#include "../mqserver/queue.hpp"
#include <gtest/gtest.h>jiuqi::QueueManager::ptr qmp;class ExchangeTest : public testing::Environment
{
public:virtual void SetUp() override{qmp = std::make_shared<jiuqi::QueueManager>("./data/queue.db");}virtual void TearDown() override{qmp->clear();}
};TEST(ExchangeTest, insert_test)
{std::unordered_map<std::string, std::string> map = {{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}};std::unordered_map<std::string, std::string> map_empty;qmp->declareQueue("queue1", true, false, false, map);qmp->declareQueue("queue2", true, false, false, map);qmp->declareQueue("queue3", true, false, false, map);qmp->declareQueue("queue4", true, false, false,map_empty);qmp->declareQueue("queue5", true, false, false,map_empty);qmp->declareQueue("queue6", true, false, false,map_empty);ASSERT_EQ(qmp->size(), 6);
}TEST(ExchangeTest, select_test)
{jiuqi::MsgQueue::ptr mqp = qmp->selectQueue("queue3");ASSERT_EQ(mqp->name, "queue3");ASSERT_EQ(mqp->durable, true);ASSERT_EQ(mqp->exclusive, false);ASSERT_EQ(mqp->auto_delete, false);ASSERT_EQ(mqp->getArgs(), std::string("k1=v1&k2=v2&k3=v3"));
}TEST(ExchangeTest, delete_test)
{qmp->deleteQueue("queue1");jiuqi::MsgQueue::ptr mqp = qmp->selectQueue("queue1");ASSERT_EQ(mqp.get(), nullptr);ASSERT_EQ(qmp->exists("queue1"), false);
}int main(int argc, char *argv[])
{testing::InitGoogleTest(&argc, argv);testing::AddGlobalTestEnvironment(new ExchangeTest);return RUN_ALL_TESTS();
}
綁定信息(交換機-隊列)管理
綁定信息,本質上就是一個交換機關聯了哪些隊列的描述。
- 定義綁定信息類
- 交換機名稱
- 隊列名稱
- binding_key(分發匹配規則-決定了哪些數據能被交換機放入隊列)
- 定義綁定信息數據持久化類(數據持久化的 sqlite3 數據庫中)
- 創建/刪除綁定信息數據表
- 新增綁定信息數據
- 移除指定綁定信息數據
- 移除指定交換機相關綁定信息數據:移除交換機的時候會被調用
- 移除指定隊列相關綁定信息數據:移除隊列的時候會被調用f. 查詢所有綁定信息數據:用于重啟服務器時進行歷史數據恢復
- 定義綁定信息數據管理類
- 創建綁定信息,并添加管理(存在則 OK,不存在則創建)
- 解除指定的綁定信息
- 刪除指定隊列的所有綁定信息
- 刪除交換機相關的所有綁定信息
- 獲取交換機相關的所有綁定信息:交換機收到消息后,需要分發給自己關聯的隊列
- 判斷指定綁定信息是否存在
- 獲取當前綁定信息數量
- 銷毀所有綁定信息數據
代碼實現
同樣與上述類的實現類似
?
#pragma once
#include "../mqcommon/helper.hpp"
#include "../mqcommon/log.hpp"
#include "../mqcommon/msg.pb.h"
#include <unordered_map>
#include <memory>namespace jiuqi
{struct Binding{using ptr = std::shared_ptr<Binding>;std::string exchangeName;std::string queueName;std::string bindingKey;Binding() {}Binding(const std::string &ename, const std::string &qname, const std::string &key): exchangeName(ename), queueName(qname), bindingKey(key){}};using QueueBindingMap = std::unordered_map<std::string, Binding::ptr>;using BindingMap = std::unordered_map<std::string, QueueBindingMap>;class BindingMapper{public:BindingMapper(const std::string &dbfile) : _sql_helper(dbfile){std::string path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path);_sql_helper.open();createTable();}void createTable(){std::string sql = "create table if not exists binding_table(exchangeName varchar(32), queueName varchar(32), bindingKey varchar(128), PRIMARY KEY (exchangeName, queueName));";bool ret = _sql_helper.exec(sql, nullptr, nullptr);if (!ret){ERROR("創建綁定信息數據庫表失敗");abort();}}void removeTable(){std::string sql = "drop table if exists binding_table;";bool ret = _sql_helper.exec(sql, nullptr, nullptr);if (!ret){ERROR("刪除綁定信息數據庫表失敗");abort();}}bool insert(Binding::ptr &binding){std::stringstream ss;ss << "insert into binding_table values('"<< binding->exchangeName << "', '"<< binding->queueName << "', '"<< binding->bindingKey << "');";std::string sql = ss.str();if (!_sql_helper.exec(sql, nullptr, nullptr)){ERROR("插入綁定記錄失敗");return false;}return true;}bool remove(const std::string &ename, const std::string &qname){std::stringstream ss;ss << "delete from binding_table where "<< "exchangeName = '" << ename << "' "<< "and queueName = '" << qname << "';";std::string sql = ss.str();bool ret = _sql_helper.exec(sql, nullptr, nullptr);return ret;}bool removeExchangeBindings(const std::string &ename){std::stringstream ss;ss << "delete from binding_table where "<< "exchangeName = '" << ename << "';";std::string sql = ss.str();bool ret = _sql_helper.exec(sql, nullptr, nullptr);return ret;}bool removeQueueBindings(const std::string &qname){std::stringstream ss;ss << "delete from binding_table where "<< "queueName = '" << qname << "';";std::string sql = ss.str();bool ret = _sql_helper.exec(sql, nullptr, nullptr);return ret;}BindingMap recovery(){BindingMap result;std::string sql = "select exchangeName, queueName, bindingKey from binding_table;";_sql_helper.exec(sql, selectCallback, &result);return result;}private:static int selectCallback(void *arg, int numcol, char **row, char **fields){BindingMap *result = (BindingMap *)arg;Binding::ptr bp = std::make_shared<Binding>(row[0], row[1], row[2]);// 為了防止交換機相關綁定信息已經存在,不能直接創建隊列映射// 因此要先獲取交換機對應的映射對象// 使用引用的好處, 如果不存在就會創建QueueBindingMap &qbm = (*result)[bp->exchangeName];qbm.insert(std::make_pair(bp->queueName, bp));return 0;}private:SqliteHelper _sql_helper;};class BindingManager{public:using ptr = std::shared_ptr<BindingManager>;BindingManager(const std::string &dbfile) : _mapper(dbfile){_bindings = _mapper.recovery();}bool bind(const std::string &ename, const std::string &qname, const std::string &key, bool durable){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit != _bindings.end() && eit->second.find(qname) != eit->second.end())return true;Binding::ptr bp = std::make_shared<Binding>(ename, qname, key);if (durable){if(!_mapper.insert(bp))return false;}QueueBindingMap &qbm = _bindings[ename];qbm.insert(std::make_pair(qname, bp));return true;}void unbind(const std::string &ename, const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end())return;auto qit = eit->second.find(qname);if (qit == eit->second.end())return;_bindings[ename].erase(qname);if (eit->second.empty())_bindings.erase(ename);_mapper.remove(ename, qname);}void removeExchangeBinding(const std::string &ename){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end())return;_mapper.removeExchangeBindings(ename);_bindings.erase(ename);}void removeQueueBinding(const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);_mapper.removeQueueBindings(qname);// 遍歷所有交換機的綁定for (auto it = _bindings.begin(); it != _bindings.end();){// 刪除該隊列在當前交換機的綁定it->second.erase(qname);// 如果當前交換機的綁定為空,則刪除該交換機的條目if (it->second.empty()){it = _bindings.erase(it); // erase返回下一個有效的迭代器}else{++it;}}}QueueBindingMap getExchangeBindings(const std::string &ename){std::unique_lock<std::mutex> lock(_mutex);auto it = _bindings.find(ename);if (it == _bindings.end())return QueueBindingMap();return _bindings[ename];}Binding::ptr getBinding(const std::string &ename, const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end())return nullptr;auto qit = eit->second.find(qname);if (qit == eit->second.end())return nullptr;return qit->second;}bool exists(const std::string &ename, const std::string &qname){std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end())return false;auto qit = eit->second.find(qname);if (qit == eit->second.end())return false;return true;}size_t size(){std::unique_lock<std::mutex> lock(_mutex);size_t size = 0;for (auto start = _bindings.begin(); start != _bindings.end(); start++)size += start->second.size();return size;}void clear(){std::unique_lock<std::mutex> lock(_mutex);_mapper.removeTable();_bindings.clear();}private:std::mutex _mutex;BindingMapper _mapper;BindingMap _bindings;};
}
????????值得注意的是:在使用unordered_map保存綁定信息的時候,插入和刪除的方式與之前不同,具體在注釋中給出了解釋。?
測試代碼
#include "../mqserver/binding.hpp"
#include <gtest/gtest.h>jiuqi::BindingManager::ptr bmp;class BindingTest : public testing::Environment
{
public:virtual void SetUp() override{bmp = std::make_shared<jiuqi::BindingManager>("./data/binding.db");}virtual void TearDown() override{bmp->clear();}
};TEST(ExchangeTest, insert_test)
{bmp->bind("exchange1", "queue1", "651", 1);bmp->bind("exchange1", "queue2", "651", 1);bmp->bind("exchange1", "queue3", "651", 1);bmp->bind("exchange2", "queue1", "651", 1);bmp->bind("exchange2", "queue2", "651", 1);bmp->bind("exchange2", "queue3", "651", 1);ASSERT_EQ(bmp->size(), 6);
}TEST(ExchangeTest, select_test)
{jiuqi::Binding::ptr bp = bmp->getBinding("exchange2", "queue1");ASSERT_EQ(bp->exchangeName, "exchange2");ASSERT_EQ(bp->queueName, "queue1");ASSERT_EQ(bp->bindingKey, "651");
}TEST(ExchangeTest, delete_test)
{bmp->unbind("exchange1", "queue3");jiuqi::Binding::ptr bp = bmp->getBinding("exchange1", "queue3");ASSERT_EQ(bp.get(), nullptr);ASSERT_EQ(bmp->exists("exchange1", "queue3"), false);
}int main(int argc, char *argv[])
{testing::InitGoogleTest(&argc, argv);testing::AddGlobalTestEnvironment(new BindingTest);return RUN_ALL_TESTS();
}
? ? ? ? 在測試的過程中,發現了一種錯誤,就是創建數據庫表時發生了out of memory錯誤,開始還以為是系統內存不足,后來發現在構造mapper時忘記了打開數據庫,所以得知如果沒有打開數據庫就創建表就會發生out of memory錯誤。