一、RabbitMQ安裝
1.安裝 RabbitMQ
sudo apt install rabbitmq-server
RabbitMQ 的簡單使用
# 啟動服務
sudo systemctl start rabbitmq-server.service
# 查看服務狀態
sudo systemctl status rabbitmq-server.service
# 安裝完成的時候默認有個用戶 guest ,但是權限不夠,要創建一個administrator 用戶,才可以做為遠程登錄和發表訂閱消息:
#添加用戶
sudo rabbitmqctl add_user root 123456
#設置用戶 tag
sudo rabbitmqctl set_user_tags root administrator
#設置用戶權限
sudo rabbitmqctl set_permissions -p / root "." "." ".*"
# RabbitMQ 自帶了 web 管理界面,執行下面命令開啟
sudo rabbitmq-plugins enable rabbitmq_management# 列出所有用戶及其權限信息
sudo rabbitmqctl list_users
# 刪除用戶
sudo rabbitmqctl delete_user root
# 修改密碼
sudo rabbitmqctl change_password root 123456
查看 rabbitmq-server 的狀態
service rabbitmq-server status
訪問 webUI 界面, 默認端口為 15672。至此 rabbitmq 安裝成功。
2.安裝 RabbitMQ 客戶端庫
如果需要在其他應用程序中使用 RabbitMQ,則需要安裝 RabbitMQ 的客戶端庫。可以使用以下命令安裝 librabbitmq 庫:
sudo apt-get install librabbitmq-dev
這將在系統上安裝 RabbitMQ 的客戶端庫,包括頭文件和靜態庫文件。
安裝 RabbitMQ 的 C++客戶端庫
C 語言庫:https://github.com/alanxz/rabbitmq-c
C++庫: https://github.com/CopernicaMarketingSoftware/AMQP-CPP/tree/master
我們這里使用 AMQP-CPP 庫來編寫客戶端程序。
安裝 AMQP-CPP
sudo apt install libev-dev #libev 網絡庫組件
git clone https://github.com/CopernicaMarketingSoftware/AMQPCPP.git
cd AMQP-CPP/
make
sudo make install
至此可以通過 AMQP-CPP 來操作 rabbitmq。
安裝報錯:
/usr/include/openssl/macros.h:147:4: error: #error
"OPENSSL_API_COMPAT expresses an impossible API compatibility
level"147 | # error "OPENSSL_API_COMPAT expresses an impossible API
compatibility level"| ^~~~~
In file included from /usr/include/openssl/ssl.h:18,from linux_tcp/openssl.h:20,from linux_tcp/openssl.cpp:12:
/usr/include/openssl/bio.h:687:1: error: expected constructor,
destructor, or type conversion before ‘DEPRECATEDIN_1_1_0’687 | DEPRECATEDIN_1_1_0(int BIO_get_port(const char *str,
unsigned short *port_ptr))
這種錯誤,表示 ssl 版本出現問題。
解決方案:卸載當前的 ssl 庫,重新進行修復安裝
dpkg -l |grep ssl
ii erlang-ssl
ii libevent-openssl-2.1-7:amd64
pi libgnutls-openssl27:amd64
ii libssl-dev:amd64
ii libssl3:amd64
ii libxmlsec1-openssl:amd64
ii libzstd-dev:amd64
ii libzstd1:amd64
ii openssl
ii python3-openssl
ii zstd
sudo dpkg -P --force-all libevent-openssl-2.1-7
sudo dpkg -P --force-all openssl
sudo dpkg -P --force-all libssl-dev
sudo apt --fix-broken install
修復后,重新進行 make
ev 相關接口:
AMQP-CPP 庫的簡單使用
二、RabbitMQ介紹
1.AMQP-CPP
AMQP-CPP 是用于與 RabbitMq 消息中間件通信的 c++庫。它能解析從 RabbitMq服務發送來的數據,也可以生成發向 RabbitMq 的數據包。AMQP-CPP 庫不會向RabbitMq 建立網絡連接,所有的網絡 io 由用戶完成。
當然,AMQP-CPP 提供了可選的網絡層接口,它預定義了 TCP 模塊,用戶就不用自己實現網絡 io,我們也可以選擇 libevent、libev、libuv、asio 等異步通信組件,需要手動安裝對應的組件。
RabbitMQ-消息隊列組件:實現兩個客戶端主機之間消息傳輸的功能(發布&訂閱)
AMQP:標準的高級消息隊列協議
核心概念:交換機(交換機類型) 隊列 綁定 消息
AMQP-CPP 完全異步,沒有阻塞式的系統調用,不使用線程就能夠應用在高性能應用中。注意:它需要 c++17 的支持
AMQP-CPP 的使用有兩種模式:
可以使用默認的 TCP 模塊進行網絡通信,使用擴展的 libevent、libev、libuv、asio 異步通信組件進行通信
TCP 模式:實現一個類繼承自 AMQP::TcpHandler 類, 它負責網絡層的 TCP 連接
重寫相關函數, 其中必須重寫 monitor 函數,在 monitor 函數中需要實現的是將 fd 放入 eventloop(select、epoll)中監控, 當 fd可寫可讀就緒之后,調用 AMQP-CPP 的 connection->process(fd, flags)方法。
2.相關的類與接口:
#include <amqpcpp.h>
#include <amqpcpp/linux_tcp.h>
class MyTcpHandler : public AMQP::TcpHandler
{/***AMQP 庫在創建新連接時調用的方法*與處理程序相關聯。這是對處理程序的第一次調用*@param connection 附加到處理程序的連接*/virtual void onAttached(AMQP::TcpConnection *connection) override{// @todo// 添加您自己的實現,例如初始化事物// 以處理連接。}/***當 TCP 連接時由 AMQP 庫調用的方法*已經建立。調用此方法后,庫*仍然需要設置可選的 TLS 層和*在 TCP 層的頂部建立 AMQP 連接。這種方法總是與稍后對 onLost()的調用配對。*@param connection 現在可以使用的連接*/virtual void onConnected(AMQP::TcpConnection *connection) override{//@todo// 添加您自己的實現(可能不需要)}/***在建立安全 TLS 連接時調用的方法。*這只對 amqps://連接調用。它允許您檢查連接是否足夠安全,以滿足您的喜好*(例如,您可以檢查服務器證書)。AMQP 協議仍然需要啟動。*@param connection 已被保護的連接*@param ssl 來自 openssl 庫的 ssl 結構*@return bool如果可以使用連接,則為 True*/virtual bool onSecured(AMQP::TcpConnection *connection, const SSL *ssl) override{//@todo// 添加您自己的實現,例如讀取證書并檢查它是否確實是您的return true;}/***當登錄嘗試成功時由 AMQP 庫調用的方法。在此之后,連接就可以使用了。*@param connection 現在可以使用的連接*/virtual void onReady(AMQP::TcpConnection *connection) override{//@todo// 添加您自己的實現,例如通過創建一個通道實例,然后開始發布或使用}/***該方法在服務器嘗試協商檢測信號間隔時調用,*并被覆蓋以擺脫默認實現(否決建議的檢測信號間隔),轉而接受該間隔。*@param connection 發生錯誤的連接*@param interval 建議的間隔(秒)*/virtual uint16_t onNegotiate(AMQP::TcpConnection *connection, uint16_t interval){// 我們接受服務器的建議,但如果間隔小于一分鐘,我們將使用一分鐘的間隔if (interval < 60)interval = 60;//@todo// 在事件循環中設置一個計時器,// 如果在這段時間內沒有發送其他指令,// 請確保每隔 interval 秒調用 connection->heartbeat()。// 返回我們要使用的間隔return interval;}/***發生致命錯誤時由 AMQP 庫調用的方法 例如,因為無法識別從 RabbitMQ 接收的數據,或者基礎連接丟失。* 此調用之后通常會調用 onLost()(如果錯誤發生在 TCP 連接建立之后)和 onDetached()。*@param connection 發生錯誤的連接*@param message 一條人類可讀的錯誤消息*/virtual void onError(AMQP::TcpConnection *connection, const char *message) override{//@todo// 添加您自己的實現,例如,通過向程序的用戶報告錯誤并記錄錯誤}/***該方法在 AMQP 協議結束時調用的方法。這是調用 connection.close()以正常關閉連接的計數器部分。請注意,TCP 連接此時仍處于活動狀態,* 您還將收到對 onLost()和 onDetached()的調用* @param connection AMQP 協議結束的連接*/virtual void onClosed(AMQP::TcpConnection *connection) override{//@todo// 添加您自己的實現, 可能沒有必要,// 但如果您想在 amqp 連接結束后立即執行某些操作,// 又不想等待 tcp 連接關閉,則這可能會很有用}/***當 TCP 連接關閉或丟失時調用的方法。*如果同時調用了 onConnected(),則始終調用此方法* @param connection 已關閉但現在無法使用的連接*/virtual void onLost(AMQP::TcpConnection *connection) override{//@todo// 添加您自己的實現(可能沒有必要)}/***調用的最終方法。這表示將不再對處理程序進行有關連接的進一步調用。*@param connection 可以被破壞的連接*/virtual void onDetached(AMQP::TcpConnection *connection) override{//@todo// 添加您自己的實現,如清理資源或退出應用程序}/***當 AMQP-CPP 庫想要與主事件循環交互時,它會調用該方法。*AMQP-CPP 庫是完全不阻塞的,*并且只有在事先知道這些調用不會阻塞時才進行“write()”或“read()”系統調用。*要在事件循環中注冊文件描述符,它會調用這個“monitor()”方法,*該方法帶有一個文件描述符和指示是否該檢查文件描述符的可讀性或可寫性的標志。**@param connection 想要與事件循環交互的連接*@param fd 應該檢查的文件描述符*@param 標記位或 AMQP::可讀和/或 AMQP::可寫*/virtual void monitor(AMQP::TcpConnection *connection, int fd, int flags) override{//@todo// 添加您自己的實現,// 例如將文件描述符添加到主應用程序事件循環(如 select()或 poll()循環)。// 當事件循環報告描述符變為可讀和或可寫時,// 由您通過調用 connection->process(fd,flags)方法// 通知 AMQP-CPP 庫文件描述符處于活動狀態。}
};
3.交換機類型
1.廣播交換:當交換機收到消息,則將消息發布到所有綁定的隊列中
2.直接交換:根據消息中的bkey與綁定rkey對比一致則放入隊列
3.主題交換:使用bkey與綁定的rkey進行規則匹配,成功則放入隊列
4.Channel
channel 是一個虛擬連接,一個連接上可以建立多個通道。并且所有的 RabbitMq 指令都是通過 channel 傳輸,所以連接建立后的第一步,就是建立 channel。因為所有操作是異步的,所以在 channel 上執行指令的返回值并不能作為操作執行結果,實際上它返回的是 Deferred 類,可以使用它安裝處理函數。
namespace AMQP
{/*** Generic callbacks that are used by many deferred objects*/using SuccessCallback = std::function<void()>;using ErrorCallback = std::function<void(const char *message)>;using FinalizeCallback = std::function<void()>;/*** Declaring and deleting a queue*/using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>;using DeleteCallback = std::function<void(uint32_t deletedmessages)>;using MessageCallback = std::function<void(const Message &message, uint64_t deliveryTag, bool redelivered)>;// 當使用發布者確認時,當服務器確認消息已被接收和處理時,將調用AckCallback using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;// 使用確認包裹通道時,當消息被 ack/nacked 時,會調用這些回調using PublishAckCallback = std::function<void()>;using PublishNackCallback = std::function<void()>;using PublishLostCallback = std::function<void()>;class Channel{Channel(Connection *connection);bool connected();/***聲明交換機*如果提供了一個空名稱,則服務器將分配一個名稱。*以下 flags 可用于交換機:*-durable 持久化,重啟后交換機依然有效*-autodelete 刪除所有連接的隊列后,自動刪除交換*-passive 僅被動檢查交換機是否存在*-internal 創建內部交換*@param name 交換機的名稱*@param-type 交換類型enum ExchangeType{fanout, 廣播交換,綁定的隊列都能拿到消息direct, 直接交換,只將消息交給 routingkey 一致的隊列topic, 主題交換,將消息交給符合 bindingkey 規則的隊列headers,consistent_hash,message_deduplication};*@param flags 交換機標志*@param arguments 其他參數*此函數返回一個延遲處理程序。可以安裝回調using onSuccess(), onError() and onFinalize() methods.*/Deferred &declareExchange(const std::string_view &name, ExchangeType type, int flags, const Table &arguments);/***聲明隊列*如果不提供名稱,服務器將分配一個名稱。*flags 可以是以下值的組合:*-durable 持久隊列在代理重新啟動后仍然有效*-autodelete 當所有連接的使用者都離開時,自動刪除隊列*-passive僅被動檢查隊列是否存在*-exclusive 隊列僅存在于此連接,并且在連接斷開時自動刪除*@param name 隊列的名稱*@param flags 標志組合*@param arguments 可選參數*此函數返回一個延遲處理程序。可以安裝回調*使用 onSuccess()、onError()和 onFinalize()方法。Deferred &onError(const char *message)*可以安裝的 onSuccess()回調應該具有以下簽名:void myCallback(const std::string &name,uint32_t messageCount,uint32_t consumerCount);例如:channel.declareQueue("myqueue").onSuccess([](const std::string &name,uint32_t messageCount,uint32_t consumerCount) {std::cout << "Queue '" << name << "' ";std::cout << "has been declared with ";std::cout << messageCount;std::cout << " messages and ";std::cout << consumerCount;std::cout << " consumers" << std::endl;* });*/DeferredQueue &declareQueue(const std::string_view &name, int flags, const Table &arguments);/***將隊列綁定到交換機*@param exchange 源交換機*@param queue 目標隊列*@param routingkey 路由密鑰*@param arguments 其他綁定參數*此函數返回一個延遲處理程序。可以安裝回調*使用 onSuccess()、onError()和 onFinalize()方法。*/Deferred &bindQueue(const std::string_view &exchange, const std::string_view &queue, const std::string_view &routingkey, const Table &arguments);/***將消息發布到 exchange*您必須提供交換機的名稱和路由密鑰。然后,RabbitMQ 將嘗試將消息發送到一個或多個隊列。使用可選的 flags 參數,可以指定如果消息無法路由到隊列時應該發生的情況。默認情況下,不可更改的消息將被靜默地丟棄。*如果設置了'mandatory'或'immediate'標志,則無法處理的消息將返回到應用程序。在開始發布之前,請確保您已經調用了 recall()-方法,并設置了所有適當的處理程序來處理這些返回的消息。*可以提供以下 flags:*-mandatory 如果設置,服務器將返回未發送到隊列的消息*-immediate 如果設置,服務器將返回無法立即轉發給使用者的消息。*@param exchange 要發布到的交易所*@param routingkey 路由密鑰*@param envelope 要發送的完整信封*@param message 要發送的消息*@param size 消息的大小*@param flags 可選標志*/bool publish(const std::string_view &exchange, const std::string_view &routingKey, const std::string &message, int flags = 0);/***告訴 RabbitMQ 服務器我們已準備好使用消息-也就是訂閱隊列消息*調用此方法后,RabbitMQ 開始向客戶端應用程序傳遞消息。consumer tag 是一個字符串標識符,*如果您以后想通過channel::cancel()調用停止它,可以使用它來標識使用者。*如果您沒有指定使用者 tag,服務器將為您分配一個。*支持以下 flags:*-nolocal 如果設置了,則不會同時消耗在此通道上發布的消息*-noack 如果設置了,則不必對已消費的消息進行確認*-exclusive 請求獨占訪問,只有此使用者可以訪問隊列*@param queue 您要使用的隊列*@param tag 將與此消費操作關聯的消費者標記*@param flags 其他標記*@param arguments 其他參數*此函數返回一個延遲處理程序。可以使用 onSuccess()、onError()和 onFinalize()方法安裝回調。可以安裝的 onSuccess()回調應該具有以下格式:void myCallback(const std::string_view&tag);樣例:channel.consume("myqueue").onSuccess([](const std::string_view& tag) {std::cout << "Started consuming under tag ";std::cout << tag << std::endl;});*/DeferredConsumer &consume(const std::string_view &queue,const std::string_view &tag,int flags, const Table &arguments);/***確認接收到的消息*當在 DeferredConsumer::onReceived()方法中接收到消息時,必須確認該消息,*以便 RabbitMQ 將其從隊列中刪除(除非使用 noack 選項消費)。*支持以下標志:*-多條確認多條消息:之前傳遞的所有未確認消息也會得到確認*@param deliveryTag 消息的唯一 delivery 標簽*@param flags 可選標志*@return bool*/bool ack(uint64_t deliveryTag, int flags = 0);};class DeferredConsumer{/*注冊一個回調函數,該函數在消費者啟動時被調用。void onSuccess(const std::string &consumertag)*/DeferredConsumer &onSuccess(const ConsumeCallback &callback);/*注冊回調函數,用于接收到一個完整消息的時候被調用void MessageCallback(const AMQP::Message &message,uint64_t deliveryTag, bool redelivered)*/DeferredConsumer &onReceived(const MessageCallback &callback);/* Alias for onReceived() */DeferredConsumer &onMessage(const MessageCallback &callback);/*注冊要在服務器取消消費者時調用的函數void CancelCallback(const std::string &tag)*/DeferredConsumer &onCancelled(const CancelCallback &callback);};class Message : public Envelope{const std::string &exchange();const std::string &routingkey();};class Envelope : public MetaData{const char *body();uint64_t bodySize();};
}
5.ev
typedef struct ev_async
{EV_WATCHER(ev_async)EV_ATOMIC_T sent; /* private */
} ev_async;
// break type
enum
{EVBREAK_CANCEL = 0, /* undo unloop */EVBREAK_ONE = 1, /* unloop once */EVBREAK_ALL = 2 /* unloop all loops */
};
// 實例化并獲取IO事件監控結構的句柄
struct ev_loop *ev_default_loop(unsigned int flags EV_CPP(= 0));
#define EV_DEFAULT ev_default_loop(0)// 開始運行IO事件監控,這是一個阻塞接口
int ev_run(struct ev_loop *loop);
/* break out of the loop */
// 結束IO監控
void ev_break(struct ev_loop *loop, int32_t break_type);
void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents);// 如果當前線程進行ev_run則可以直接調用,如果在其他線程進行ev_run需要通過異步線程通知進行
// 初始化異步事件結構,并設置回調函數
void ev_async_init(ev_async *w, callback cb);
// 啟動事件監控循環中的異步任務處理
void ev_async_start(struct ev_loop *loop, ev_async *w);
// 發送當前異步事件到異步線程中執行
void ev_async_send(struct ev_loop *loop, ev_async *w);
我們主要使用到如下接口:
AMQP::Channel //信道類
Channel(Connection* connection); bool connected()
Deferred& declareExchange() //聲明交換機
DeferredQueue& declaredQueue() // 聲明隊列
Deferred& bindQueue() // 將交換機與隊列進行關系綁定
bool publish() // 發布消息
DeferredConsumer& consume() // 訂閱消息
bool ack() // 消費者客戶端對收到的消息進行確認應答
class Message: // 消息類
const char* body() // 獲取消息正文
uint64_t bodySize() //獲取消息大小
// libev使用到的結構體與接口
struct ev_loop* ev_default_loop() // 實例化并獲取IO事件監控句柄
#define EV_DEFAULT ev_default_loop(0);
int ev_run(struct ev_loop* loop); // 開始運行IO事件監控,這是一個阻塞接口
void ev_break(struct ev_loop* loop,int32_t break_type); // 結束IO監控
// 如果在當前線程進行ev_run則可以直接調用,如果在其他線程中進行ev_run,需要通過異步通知進行
void ev_async_init(ev_async *w, callback cb);
// 啟動事件監控循環中的異步任務處理
void ev_async_start(struct ev_loop *loop, ev_async *w);
// 發送當前異步事件到異步線程中執行
void ev_async_send(struct ev_loop *loop, ev_async *w);
三、使用案例
1.publish
發布程序流程:
//1. 實例化底層網絡通信框架的I/O事件監控句柄
//2. 實例化libEvHandler句柄 --- 將AMQP框架與事件監控關聯起來
//3. 實例化連接對象
//4. 實例化信道對象
//5. 聲明交換機
//6. 聲明隊列
//7.針對交換機和隊列進行綁定
//8.向交換機發布消息
//9.啟動底層網絡通信框架--開啟I/O
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
#include <gflags/gflags.h>
#include "../../common/logger.hpp"DEFINE_bool(run_mode, false, "程序的運行模式,0-調試,1-發布");
DEFINE_string(log_file, "", "發布模式下,用于指定日志的輸出文件");
DEFINE_int32(log_level, 0, "發布模式下,用于指定日志的輸出等級");int main(int argc,char* argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);hdp::init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);// 1. 實例化底層網絡通信框架的I/O事件監控句柄// struct ev_loop *loop = ev_default_loop();struct ev_loop *loop = EV_DEFAULT;// 2. 實例化libEvHandler句柄 --- 將AMQP框架與事件監控關聯起來AMQP::LibEvHandler handler(loop);// 3. 實例化連接對象AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");AMQP::TcpConnection connection(&handler, address);// 4. 實例化信道對象AMQP::TcpChannel channel(&connection);// 5. 聲明交換機channel.declareExchange("test-exchange", AMQP::ExchangeType::direct).onError([](const char *message){LOG_ERROR("聲明交換機失敗:{}",message);exit(0); }).onSuccess([](){ LOG_INFO("test-exchange交換機創建成功"); });// 6. 聲明隊列channel.declareQueue("test-queue").onError([](const char *message){LOG_ERROR("聲明隊列失敗:{}",message);exit(0); }).onSuccess([](){ LOG_INFO("test-queue隊列創建成功"); });// 7.針對交換機和隊列進行綁定channel.bindQueue("test-exchange", "test-queue", "test-queue-key").onError([](const char *message){LOG_ERROR("test-exchange - test-queue 綁定失敗:{}",message);exit(0); }).onSuccess([](){ LOG_INFO("test-exchange - test-queue 綁定成功!"); });// 8.向交換機發布消息for (int i = 0; i < 10; i++){std::string msg = "Hello World" + std::to_string(i);bool ret = channel.publish("test-exchange", "test-queue-key", msg);if (ret == false){LOG_ERROR("publish 失敗");exit(0);}}// 9.啟動底層網絡通信框架--開啟I/Oev_run(loop, 0);return 0;
}
2.consume
訂閱程序流程
//1. 實例化底層網絡通信框架的I/O事件監控句柄
//2. 實例化libEvHandler句柄 --- 將AMQP框架與事件監控關聯起來
//3. 實例化連接對象
//4. 實例化信道對象
//5. 聲明交換機
//6. 聲明隊列
//7.針對交換機和隊列進行綁定
//8.訂閱隊列消息--設置消息處理回調函數
//9.啟動底層網絡通信框架--開啟I/O
consume.cc:
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
#include <gflags/gflags.h>
#include "../../common/logger.hpp"DEFINE_bool(run_mode, false, "程序的運行模式,0-調試,1-發布");
DEFINE_string(log_file, "", "發布模式下,用于指定日志的輸出文件");
DEFINE_int32(log_level, 0, "發布模式下,用于指定日志的輸出等級");// 消息回調函數的實現
void MessageCb(AMQP::TcpChannel *channel, const AMQP::Message &message, uint64_t deliveryTag, bool relivered)
{std::string msg;msg.assign(message.body(), message.bodySize());LOG_INFO("{}", msg);channel->ack(deliveryTag);
}int main(int argc,char* argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);hdp::init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);// 1.實例化底層網絡通信框架的I/O事件監控句柄// struct ev_loop *loop = ev_default_loop();struct ev_loop *loop = EV_DEFAULT;// 2.實例化libEvHandler句柄 --- 將AMQP框架與事件監控關聯起來AMQP::LibEvHandler handler(loop);// 3.實例化連接對象AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");AMQP::TcpConnection connection(&handler, address);// 4.實例化信道對象AMQP::TcpChannel channel(&connection);// 5.聲明交換機channel.declareExchange("test-exchange", AMQP::ExchangeType::direct).onError([](const char *message){LOG_ERROR("聲明交換機失敗:{}",message);exit(0); }).onSuccess([](){ LOG_INFO("test-exchange交換機創建成功"); });// 6.聲明隊列channel.declareQueue("test-queue").onError([](const char *message){LOG_ERROR("聲明隊列失敗:{}",message);exit(0); }).onSuccess([](){ LOG_INFO("test-queue隊列創建成功"); });// 7.針對交換機和隊列進行綁定channel.bindQueue("test-exchange", "test-queue", "test-queue-key").onError([](const char *message){LOG_ERROR("test-exchange - test-queue 綁定失敗:{}",message);exit(0); }).onSuccess([](){ LOG_INFO("test-exchange - test-queue 綁定成功!"); });// 8.訂閱隊列消息--設置消息處理回調函數auto callback = std::bind(MessageCb, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);channel.consume("test-queue", "consume-tag").onReceived(callback).onError([](const char *message){LOG_ERROR("訂閱 test-queue 隊列消息失敗:{},message");exit(0); });// 9.啟動底層網絡通信框架--開啟I/Oev_run(loop, 0);return 0;
}
makefile:
all:publish consume
publish:publish.ccg++ -o $@ $^ -std=c++17 -lamqpcpp -lev -lpthread -ldl -lgflags -lspdlog -lfmt
consume:consume.ccg++ -o $@ $^ -std=c++17 -lamqpcpp -lev -lpthread -ldl -lgflags -lspdlog -lfmt.PHONY:clean
clean:rm -rf publish consume
測試結果:
四、二次封裝
1.二次封裝思想
在使用 rabbitmq 的時候,我們目前只需要交換機與隊列的直接交換,實現一臺主機將消息發布給另一臺主機進行處理的功能,因此在這里可以對 mq 的操作進行簡單的封裝,使 mq 的操作在項目中更加簡便:
封裝思想:
1.將事件監控結構loop,通信連接connection,以及信道channel封裝起來
2.提供創建交換機,隊列并進行直接綁定的接口
3.提供向指定交換機發布消息的接口
4.提供訂閱指定消息隊列的接口
封裝一個 MQClient:
提供聲明指定交換機與隊列,并進行綁定的功能;
提供向指定交換機發布消息的功能
提供訂閱指定隊列消息,并設置回調函數進行消息消費處理的功能
2.rabbitmq.hpp
#pragma once
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
#include <thread>
#include "logger.hpp"namespace hdp
{class MQClient{public:using MessageCallback = std::function<void(const char *, size_t)>;MQClient(const std::string &username, const std::string &password, const std::string &host){_loop = EV_DEFAULT;_handler = std::make_unique<AMQP::LibEvHandler>(_loop);// amqp://root:123456@127.0.0.1:5672/std::string url = "amqp://" + username + ":" + password + "@" + host + "/";AMQP::Address address(url);_connection = std::make_unique<AMQP::TcpConnection>(_handler.get(), address);_channel = std::make_unique<AMQP::TcpChannel>(_connection.get());_loop_thread = std::thread([this]() { ev_run(_loop, 0); });}~MQClient(){ev_async_init(&_async_watcher, watch_callback);ev_async_start(_loop, &_async_watcher);ev_async_send(_loop, &_async_watcher);_loop_thread.join();// 內部會進行釋放,不需要我們進行手動釋放_loop = nullptr;}void declareComponents(const std::string &exchange, const std::string &queue,const std::string &routing_key = "routing-key",AMQP::ExchangeType exchange_type = AMQP::ExchangeType::direct){_channel->declareExchange(exchange, exchange_type).onError([](const char *message) {LOG_ERROR("聲明交換機失敗: {}",message);exit(0); }).onSuccess([exchange]() { LOG_INFO("{} 交換機創建成功", exchange); });_channel->declareQueue(queue).onError([](const char *message) {LOG_ERROR("聲明隊列失敗: {}",message);exit(0); }).onSuccess([queue](){ LOG_INFO("{}隊列創建成功", queue); });_channel->bindQueue(exchange, queue, routing_key).onError([exchange, queue](const char *message) {LOG_ERROR("{} - {} 綁定失敗:{}",exchange,queue,message);exit(0); }).onSuccess([exchange, queue, routing_key]() { LOG_INFO("{} - {} - {} 綁定成功", exchange, queue, routing_key); });}bool publish(const std::string &exchange, const std::string &msg,const std::string &routing_key = "routing_key"){LOG_DEBUG("向交換機 {}-{} 發布消息!", exchange, routing_key);bool ret = _channel->publish(exchange, routing_key, msg);if (ret == false){LOG_ERROR("{} 發布消息失敗:", exchange);return false;}return true;}void consume(const std::string &queue, const MessageCallback &cb){LOG_DEBUG("開始訂閱 {} 隊列消息!", queue);_channel->consume(queue, "consume-tag") // 返回值 DeferredConsumer.onReceived([this, cb](const AMQP::Message &message,uint64_t deliveryTag, bool redelivered) {cb(message.body(),message.bodySize());_channel->ack(deliveryTag); }).onError([queue](const char *message) {LOG_ERROR("訂閱 {} 隊列消息失敗: {}",queue,message);exit(0); });}private:static void watch_callback(struct ev_loop *loop, ev_async *watcher, int32_t revents){ev_break(loop, EVBREAK_ALL);}private:struct ev_async _async_watcher;struct ev_loop *_loop;std::unique_ptr<AMQP::LibEvHandler> _handler;std::unique_ptr<AMQP::TcpConnection> _connection;std::unique_ptr<AMQP::TcpChannel> _channel;std::thread _loop_thread;};
}
3.consume.cc
#include <gflags/gflags.h>
#include "../../../common/rabbitmq.hpp"DEFINE_string(username, "root", "rabbitmq訪問用戶名");
DEFINE_string(password, "123456", "rabbitmq訪問用戶名密碼");
DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服務器地址信息 host:port");DEFINE_bool(run_mode, false, "程序的運行模式,0-調試,1-發布");
DEFINE_string(log_file, "", "發布模式下,用于指定日志的輸出文件");
DEFINE_int32(log_level, 0, "發布模式下,用于指定日志的輸出等級");void callback(const char *body, size_t size)
{std::string msg;msg.assign(body, size);LOG_INFO("訂閱消息: {}", msg);
}int main(int argc, char *argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);hdp::init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);hdp::MQClient client(FLAGS_username, FLAGS_password, FLAGS_host);client.declareComponents("test-exchange", "test-queue");client.consume("test-queue", callback);std::this_thread::sleep_for(std::chrono::seconds(60));return 0;
}
4.publish.cc
#include <gflags/gflags.h>
#include "../../../common/rabbitmq.hpp"DEFINE_string(username, "root", "rabbitmq訪問用戶名");
DEFINE_string(password, "123456", "rabbitmq訪問用戶名密碼");
DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服務器地址信息 host:port");DEFINE_bool(run_mode, false, "程序的運行模式,0-調試,1-發布");
DEFINE_string(log_file, "", "發布模式下,用于指定日志的輸出文件");
DEFINE_int32(log_level, 0, "發布模式下,用于指定日志的輸出等級");int main(int argc, char *argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);hdp::init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);hdp::MQClient client(FLAGS_username, FLAGS_password, FLAGS_host);client.declareComponents("test-exchange", "test-queue");for (int i = 0; i < 10; i++){std::string msg = "Hello World " + std::to_string(i);bool ret = client.publish("test-exchange", msg);if (ret == false){LOG_ERROR("publish 失敗");}}std::this_thread::sleep_for(std::chrono::seconds(3));return 0;
}
5.makefile
all:publish consume
publish:publish.ccg++ -o $@ $^ -std=c++17 -lamqpcpp -lev -lpthread -ldl -lgflags -lspdlog -lfmt
consume:consume.ccg++ -o $@ $^ -std=c++17 -lamqpcpp -lev -lpthread -ldl -lgflags -lspdlog -lfmt.PHONY:clean
clean:rm -rf publish consume
6.測試結果
發布:
訂閱:
e::ParseCommandLineFlags(&argc, &argv, true);
hdp::init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);
hdp::MQClient client(FLAGS_username, FLAGS_password, FLAGS_host);
client.declareComponents("test-exchange", "test-queue");for (int i = 0; i < 10; i++)
{std::string msg = "Hello World " + std::to_string(i);bool ret = client.publish("test-exchange", msg);if (ret == false){LOG_ERROR("publish 失敗");}
}
std::this_thread::sleep_for(std::chrono::seconds(3));
return 0;
}
## 5.makefile```makefile
all:publish consume
publish:publish.ccg++ -o $@ $^ -std=c++17 -lamqpcpp -lev -lpthread -ldl -lgflags -lspdlog -lfmt
consume:consume.ccg++ -o $@ $^ -std=c++17 -lamqpcpp -lev -lpthread -ldl -lgflags -lspdlog -lfmt.PHONY:clean
clean:rm -rf publish consume
6.測試結果
發布:
[外鏈圖片轉存中…(img-aXENtItP-1755520555473)]
訂閱: