深入解析RabbitMQ與AMQP-CPP:從原理到實戰應用

一、RabbitMQ安裝

1.安裝 RabbitMQ

sudo apt install rabbitmq-server

RabbitMQ 的簡單使用

# 啟動服務
sudo systemctl start rabbitmq-server.service
# 查看服務狀態
sudo systemctl status rabbitmq-server.service
# 安裝完成的時候默認有個用戶 guest ,但是權限不夠,要創建一個administrator 用戶,才可以做為遠程登錄和發表訂閱消息:
#添加用戶
sudo rabbitmqctl add_user root 123456
#設置用戶 tag
sudo rabbitmqctl set_user_tags root administrator
#設置用戶權限
sudo rabbitmqctl set_permissions -p / root "." "." ".*"
# RabbitMQ 自帶了 web 管理界面,執行下面命令開啟
sudo rabbitmq-plugins enable rabbitmq_management# 列出所有用戶及其權限信息
sudo rabbitmqctl list_users
# 刪除用戶
sudo rabbitmqctl delete_user root
# 修改密碼
sudo rabbitmqctl change_password root 123456

查看 rabbitmq-server 的狀態

service rabbitmq-server status 

訪問 webUI 界面, 默認端口為 15672。至此 rabbitmq 安裝成功。

2.安裝 RabbitMQ 客戶端庫

如果需要在其他應用程序中使用 RabbitMQ,則需要安裝 RabbitMQ 的客戶端庫。可以使用以下命令安裝 librabbitmq 庫:

sudo apt-get install librabbitmq-dev

這將在系統上安裝 RabbitMQ 的客戶端庫,包括頭文件和靜態庫文件。

安裝 RabbitMQ 的 C++客戶端庫

C 語言庫:https://github.com/alanxz/rabbitmq-c

C++庫: https://github.com/CopernicaMarketingSoftware/AMQP-CPP/tree/master

我們這里使用 AMQP-CPP 庫來編寫客戶端程序。

安裝 AMQP-CPP

sudo apt install libev-dev #libev 網絡庫組件
git clone https://github.com/CopernicaMarketingSoftware/AMQPCPP.git
cd AMQP-CPP/
make 
sudo make install

至此可以通過 AMQP-CPP 來操作 rabbitmq。
安裝報錯:

/usr/include/openssl/macros.h:147:4: error: #error 
"OPENSSL_API_COMPAT expresses an impossible API compatibility 
level"147 | # error "OPENSSL_API_COMPAT expresses an impossible API 
compatibility level"| ^~~~~
In file included from /usr/include/openssl/ssl.h:18,from linux_tcp/openssl.h:20,from linux_tcp/openssl.cpp:12:
/usr/include/openssl/bio.h:687:1: error: expected constructor, 
destructor, or type conversion before ‘DEPRECATEDIN_1_1_0’687 | DEPRECATEDIN_1_1_0(int BIO_get_port(const char *str, 
unsigned short *port_ptr))

這種錯誤,表示 ssl 版本出現問題。
解決方案:卸載當前的 ssl 庫,重新進行修復安裝

dpkg -l |grep ssl
ii erlang-ssl 
ii libevent-openssl-2.1-7:amd64 
pi libgnutls-openssl27:amd64 
ii libssl-dev:amd64 
ii libssl3:amd64 
ii libxmlsec1-openssl:amd64 
ii libzstd-dev:amd64 
ii libzstd1:amd64 
ii openssl 
ii python3-openssl 
ii zstd 
sudo dpkg -P --force-all libevent-openssl-2.1-7
sudo dpkg -P --force-all openssl
sudo dpkg -P --force-all libssl-dev 
sudo apt --fix-broken install

修復后,重新進行 make

ev 相關接口:

AMQP-CPP 庫的簡單使用

二、RabbitMQ介紹

1.AMQP-CPP

AMQP-CPP 是用于與 RabbitMq 消息中間件通信的 c++庫。它能解析從 RabbitMq服務發送來的數據,也可以生成發向 RabbitMq 的數據包。AMQP-CPP 庫不會向RabbitMq 建立網絡連接,所有的網絡 io 由用戶完成。
當然,AMQP-CPP 提供了可選的網絡層接口,它預定義了 TCP 模塊,用戶就不用自己實現網絡 io,我們也可以選擇 libevent、libev、libuv、asio 等異步通信組件,需要手動安裝對應的組件。

RabbitMQ-消息隊列組件:實現兩個客戶端主機之間消息傳輸的功能(發布&訂閱)

AMQP:標準的高級消息隊列協議

核心概念:交換機(交換機類型) 隊列 綁定 消息

image-20250218214605433

AMQP-CPP 完全異步,沒有阻塞式的系統調用,不使用線程就能夠應用在高性能應用中。注意:它需要 c++17 的支持

AMQP-CPP 的使用有兩種模式:

可以使用默認的 TCP 模塊進行網絡通信,使用擴展的 libevent、libev、libuv、asio 異步通信組件進行通信

TCP 模式:實現一個類繼承自 AMQP::TcpHandler 類, 它負責網絡層的 TCP 連接

重寫相關函數, 其中必須重寫 monitor 函數,在 monitor 函數中需要實現的是將 fd 放入 eventloop(select、epoll)中監控, 當 fd可寫可讀就緒之后,調用 AMQP-CPP 的 connection->process(fd, flags)方法。

2.相關的類與接口:

#include <amqpcpp.h>
#include <amqpcpp/linux_tcp.h>
class MyTcpHandler : public AMQP::TcpHandler
{/***AMQP 庫在創建新連接時調用的方法*與處理程序相關聯。這是對處理程序的第一次調用*@param connection 附加到處理程序的連接*/virtual void onAttached(AMQP::TcpConnection *connection) override{// @todo// 添加您自己的實現,例如初始化事物// 以處理連接。}/***當 TCP 連接時由 AMQP 庫調用的方法*已經建立。調用此方法后,庫*仍然需要設置可選的 TLS 層和*在 TCP 層的頂部建立 AMQP 連接。這種方法總是與稍后對 onLost()的調用配對。*@param connection 現在可以使用的連接*/virtual void onConnected(AMQP::TcpConnection *connection) override{//@todo// 添加您自己的實現(可能不需要)}/***在建立安全 TLS 連接時調用的方法。*這只對 amqps://連接調用。它允許您檢查連接是否足夠安全,以滿足您的喜好*(例如,您可以檢查服務器證書)。AMQP 協議仍然需要啟動。*@param connection 已被保護的連接*@param ssl 來自 openssl 庫的 ssl 結構*@return bool如果可以使用連接,則為 True*/virtual bool onSecured(AMQP::TcpConnection *connection, const SSL *ssl) override{//@todo// 添加您自己的實現,例如讀取證書并檢查它是否確實是您的return true;}/***當登錄嘗試成功時由 AMQP 庫調用的方法。在此之后,連接就可以使用了。*@param connection 現在可以使用的連接*/virtual void onReady(AMQP::TcpConnection *connection) override{//@todo// 添加您自己的實現,例如通過創建一個通道實例,然后開始發布或使用}/***該方法在服務器嘗試協商檢測信號間隔時調用,*并被覆蓋以擺脫默認實現(否決建議的檢測信號間隔),轉而接受該間隔。*@param connection 發生錯誤的連接*@param interval 建議的間隔(秒)*/virtual uint16_t onNegotiate(AMQP::TcpConnection *connection, uint16_t interval){// 我們接受服務器的建議,但如果間隔小于一分鐘,我們將使用一分鐘的間隔if (interval < 60)interval = 60;//@todo// 在事件循環中設置一個計時器,// 如果在這段時間內沒有發送其他指令,// 請確保每隔 interval 秒調用 connection->heartbeat()。// 返回我們要使用的間隔return interval;}/***發生致命錯誤時由 AMQP 庫調用的方法 例如,因為無法識別從 RabbitMQ 接收的數據,或者基礎連接丟失。* 此調用之后通常會調用 onLost()(如果錯誤發生在 TCP 連接建立之后)和 onDetached()。*@param connection 發生錯誤的連接*@param message 一條人類可讀的錯誤消息*/virtual void onError(AMQP::TcpConnection *connection, const char *message) override{//@todo// 添加您自己的實現,例如,通過向程序的用戶報告錯誤并記錄錯誤}/***該方法在 AMQP 協議結束時調用的方法。這是調用 connection.close()以正常關閉連接的計數器部分。請注意,TCP 連接此時仍處于活動狀態,* 您還將收到對 onLost()和 onDetached()的調用* @param connection AMQP 協議結束的連接*/virtual void onClosed(AMQP::TcpConnection *connection) override{//@todo// 添加您自己的實現, 可能沒有必要,// 但如果您想在 amqp 連接結束后立即執行某些操作,// 又不想等待 tcp 連接關閉,則這可能會很有用}/***當 TCP 連接關閉或丟失時調用的方法。*如果同時調用了 onConnected(),則始終調用此方法* @param connection 已關閉但現在無法使用的連接*/virtual void onLost(AMQP::TcpConnection *connection) override{//@todo// 添加您自己的實現(可能沒有必要)}/***調用的最終方法。這表示將不再對處理程序進行有關連接的進一步調用。*@param connection 可以被破壞的連接*/virtual void onDetached(AMQP::TcpConnection *connection) override{//@todo// 添加您自己的實現,如清理資源或退出應用程序}/***當 AMQP-CPP 庫想要與主事件循環交互時,它會調用該方法。*AMQP-CPP 庫是完全不阻塞的,*并且只有在事先知道這些調用不會阻塞時才進行“write()”或“read()”系統調用。*要在事件循環中注冊文件描述符,它會調用這個“monitor()”方法,*該方法帶有一個文件描述符和指示是否該檢查文件描述符的可讀性或可寫性的標志。**@param connection 想要與事件循環交互的連接*@param fd 應該檢查的文件描述符*@param 標記位或 AMQP::可讀和/或 AMQP::可寫*/virtual void monitor(AMQP::TcpConnection *connection, int fd, int flags) override{//@todo// 添加您自己的實現,// 例如將文件描述符添加到主應用程序事件循環(如 select()或 poll()循環)。// 當事件循環報告描述符變為可讀和或可寫時,// 由您通過調用 connection->process(fd,flags)方法// 通知 AMQP-CPP 庫文件描述符處于活動狀態。}
};

3.交換機類型

1.廣播交換:當交換機收到消息,則將消息發布到所有綁定的隊列中

2.直接交換:根據消息中的bkey與綁定rkey對比一致則放入隊列

3.主題交換:使用bkey與綁定的rkey進行規則匹配,成功則放入隊列

4.Channel

channel 是一個虛擬連接,一個連接上可以建立多個通道。并且所有的 RabbitMq 指令都是通過 channel 傳輸,所以連接建立后的第一步,就是建立 channel。因為所有操作是異步的,所以在 channel 上執行指令的返回值并不能作為操作執行結果,實際上它返回的是 Deferred 類,可以使用它安裝處理函數。

namespace AMQP
{/*** Generic callbacks that are used by many deferred objects*/using SuccessCallback = std::function<void()>;using ErrorCallback = std::function<void(const char *message)>;using FinalizeCallback = std::function<void()>;/*** Declaring and deleting a queue*/using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>;using DeleteCallback = std::function<void(uint32_t deletedmessages)>;using MessageCallback = std::function<void(const Message &message, uint64_t deliveryTag, bool redelivered)>;// 當使用發布者確認時,當服務器確認消息已被接收和處理時,將調用AckCallback using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;// 使用確認包裹通道時,當消息被 ack/nacked 時,會調用這些回調using PublishAckCallback = std::function<void()>;using PublishNackCallback = std::function<void()>;using PublishLostCallback = std::function<void()>;class Channel{Channel(Connection *connection);bool connected();/***聲明交換機*如果提供了一個空名稱,則服務器將分配一個名稱。*以下 flags 可用于交換機:*-durable 持久化,重啟后交換機依然有效*-autodelete 刪除所有連接的隊列后,自動刪除交換*-passive 僅被動檢查交換機是否存在*-internal 創建內部交換*@param name 交換機的名稱*@param-type 交換類型enum ExchangeType{fanout, 廣播交換,綁定的隊列都能拿到消息direct, 直接交換,只將消息交給 routingkey 一致的隊列topic, 主題交換,將消息交給符合 bindingkey 規則的隊列headers,consistent_hash,message_deduplication};*@param flags 交換機標志*@param arguments 其他參數*此函數返回一個延遲處理程序。可以安裝回調using onSuccess(), onError() and onFinalize() methods.*/Deferred &declareExchange(const std::string_view &name, ExchangeType type, int flags, const Table &arguments);/***聲明隊列*如果不提供名稱,服務器將分配一個名稱。*flags 可以是以下值的組合:*-durable 持久隊列在代理重新啟動后仍然有效*-autodelete 當所有連接的使用者都離開時,自動刪除隊列*-passive僅被動檢查隊列是否存在*-exclusive 隊列僅存在于此連接,并且在連接斷開時自動刪除*@param name 隊列的名稱*@param flags 標志組合*@param arguments 可選參數*此函數返回一個延遲處理程序。可以安裝回調*使用 onSuccess()、onError()和 onFinalize()方法。Deferred &onError(const char *message)*可以安裝的 onSuccess()回調應該具有以下簽名:void myCallback(const std::string &name,uint32_t messageCount,uint32_t consumerCount);例如:channel.declareQueue("myqueue").onSuccess([](const std::string &name,uint32_t messageCount,uint32_t consumerCount) {std::cout << "Queue '" << name << "' ";std::cout << "has been declared with ";std::cout << messageCount;std::cout << " messages and ";std::cout << consumerCount;std::cout << " consumers" << std::endl;* });*/DeferredQueue &declareQueue(const std::string_view &name, int flags, const Table &arguments);/***將隊列綁定到交換機*@param exchange 源交換機*@param queue 目標隊列*@param routingkey 路由密鑰*@param arguments 其他綁定參數*此函數返回一個延遲處理程序。可以安裝回調*使用 onSuccess()、onError()和 onFinalize()方法。*/Deferred &bindQueue(const std::string_view &exchange, const std::string_view &queue, const std::string_view &routingkey, const Table &arguments);/***將消息發布到 exchange*您必須提供交換機的名稱和路由密鑰。然后,RabbitMQ 將嘗試將消息發送到一個或多個隊列。使用可選的 flags 參數,可以指定如果消息無法路由到隊列時應該發生的情況。默認情況下,不可更改的消息將被靜默地丟棄。*如果設置了'mandatory'或'immediate'標志,則無法處理的消息將返回到應用程序。在開始發布之前,請確保您已經調用了 recall()-方法,并設置了所有適當的處理程序來處理這些返回的消息。*可以提供以下 flags:*-mandatory 如果設置,服務器將返回未發送到隊列的消息*-immediate 如果設置,服務器將返回無法立即轉發給使用者的消息。*@param exchange 要發布到的交易所*@param routingkey 路由密鑰*@param envelope 要發送的完整信封*@param message 要發送的消息*@param size 消息的大小*@param flags 可選標志*/bool publish(const std::string_view &exchange, const std::string_view &routingKey, const std::string &message, int flags = 0);/***告訴 RabbitMQ 服務器我們已準備好使用消息-也就是訂閱隊列消息*調用此方法后,RabbitMQ 開始向客戶端應用程序傳遞消息。consumer tag 是一個字符串標識符,*如果您以后想通過channel::cancel()調用停止它,可以使用它來標識使用者。*如果您沒有指定使用者 tag,服務器將為您分配一個。*支持以下 flags:*-nolocal 如果設置了,則不會同時消耗在此通道上發布的消息*-noack 如果設置了,則不必對已消費的消息進行確認*-exclusive 請求獨占訪問,只有此使用者可以訪問隊列*@param queue 您要使用的隊列*@param tag 將與此消費操作關聯的消費者標記*@param flags 其他標記*@param arguments 其他參數*此函數返回一個延遲處理程序。可以使用 onSuccess()、onError()和 onFinalize()方法安裝回調。可以安裝的 onSuccess()回調應該具有以下格式:void myCallback(const std::string_view&tag);樣例:channel.consume("myqueue").onSuccess([](const std::string_view& tag) {std::cout << "Started consuming under tag ";std::cout << tag << std::endl;});*/DeferredConsumer &consume(const std::string_view &queue,const std::string_view &tag,int flags, const Table &arguments);/***確認接收到的消息*當在 DeferredConsumer::onReceived()方法中接收到消息時,必須確認該消息,*以便 RabbitMQ 將其從隊列中刪除(除非使用 noack 選項消費)。*支持以下標志:*-多條確認多條消息:之前傳遞的所有未確認消息也會得到確認*@param deliveryTag 消息的唯一 delivery 標簽*@param flags 可選標志*@return bool*/bool ack(uint64_t deliveryTag, int flags = 0);};class DeferredConsumer{/*注冊一個回調函數,該函數在消費者啟動時被調用。void onSuccess(const std::string &consumertag)*/DeferredConsumer &onSuccess(const ConsumeCallback &callback);/*注冊回調函數,用于接收到一個完整消息的時候被調用void MessageCallback(const AMQP::Message &message,uint64_t deliveryTag, bool redelivered)*/DeferredConsumer &onReceived(const MessageCallback &callback);/* Alias for onReceived() */DeferredConsumer &onMessage(const MessageCallback &callback);/*注冊要在服務器取消消費者時調用的函數void CancelCallback(const std::string &tag)*/DeferredConsumer &onCancelled(const CancelCallback &callback);};class Message : public Envelope{const std::string &exchange();const std::string &routingkey();};class Envelope : public MetaData{const char *body();uint64_t bodySize();};
}

5.ev

typedef struct ev_async
{EV_WATCHER(ev_async)EV_ATOMIC_T sent; /* private */
} ev_async;
// break type
enum
{EVBREAK_CANCEL = 0, /* undo unloop */EVBREAK_ONE = 1,    /* unloop once */EVBREAK_ALL = 2     /* unloop all loops */
};
// 實例化并獲取IO事件監控結構的句柄
struct ev_loop *ev_default_loop(unsigned int flags EV_CPP(= 0));
#define EV_DEFAULT ev_default_loop(0)// 開始運行IO事件監控,這是一個阻塞接口
int ev_run(struct ev_loop *loop);
/* break out of the loop */
// 結束IO監控
void ev_break(struct ev_loop *loop, int32_t break_type);
void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents);// 如果當前線程進行ev_run則可以直接調用,如果在其他線程進行ev_run需要通過異步線程通知進行
// 初始化異步事件結構,并設置回調函數
void ev_async_init(ev_async *w, callback cb);
// 啟動事件監控循環中的異步任務處理
void ev_async_start(struct ev_loop *loop, ev_async *w);
// 發送當前異步事件到異步線程中執行
void ev_async_send(struct ev_loop *loop, ev_async *w);

我們主要使用到如下接口:

AMQP::Channel //信道類
Channel(Connection* connection); bool connected()
Deferred& declareExchange() //聲明交換機
DeferredQueue& declaredQueue() // 聲明隊列
Deferred& bindQueue() // 將交換機與隊列進行關系綁定
bool publish() // 發布消息
DeferredConsumer& consume() // 訂閱消息
bool ack() // 消費者客戶端對收到的消息進行確認應答
class Message: // 消息類
const char* body() // 獲取消息正文
uint64_t bodySize() //獲取消息大小
// libev使用到的結構體與接口
struct ev_loop* ev_default_loop()  // 實例化并獲取IO事件監控句柄
#define EV_DEFAULT ev_default_loop(0);
int ev_run(struct ev_loop* loop); // 開始運行IO事件監控,這是一個阻塞接口
void ev_break(struct ev_loop* loop,int32_t break_type); // 結束IO監控
// 如果在當前線程進行ev_run則可以直接調用,如果在其他線程中進行ev_run,需要通過異步通知進行
void ev_async_init(ev_async *w, callback cb);
// 啟動事件監控循環中的異步任務處理
void ev_async_start(struct ev_loop *loop, ev_async *w);
// 發送當前異步事件到異步線程中執行
void ev_async_send(struct ev_loop *loop, ev_async *w);

三、使用案例

1.publish

發布程序流程:

//1. 實例化底層網絡通信框架的I/O事件監控句柄
//2. 實例化libEvHandler句柄 --- 將AMQP框架與事件監控關聯起來
//3. 實例化連接對象
//4. 實例化信道對象
//5. 聲明交換機
//6. 聲明隊列
//7.針對交換機和隊列進行綁定
//8.向交換機發布消息
//9.啟動底層網絡通信框架--開啟I/O
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
#include <gflags/gflags.h>
#include "../../common/logger.hpp"DEFINE_bool(run_mode, false, "程序的運行模式,0-調試,1-發布");
DEFINE_string(log_file, "", "發布模式下,用于指定日志的輸出文件");
DEFINE_int32(log_level, 0, "發布模式下,用于指定日志的輸出等級");int main(int argc,char* argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);hdp::init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);// 1. 實例化底層網絡通信框架的I/O事件監控句柄// struct ev_loop *loop = ev_default_loop();struct ev_loop *loop = EV_DEFAULT;// 2. 實例化libEvHandler句柄 --- 將AMQP框架與事件監控關聯起來AMQP::LibEvHandler handler(loop);// 3. 實例化連接對象AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");AMQP::TcpConnection connection(&handler, address);// 4. 實例化信道對象AMQP::TcpChannel channel(&connection);// 5. 聲明交換機channel.declareExchange("test-exchange", AMQP::ExchangeType::direct).onError([](const char *message){LOG_ERROR("聲明交換機失敗:{}",message);exit(0); }).onSuccess([](){ LOG_INFO("test-exchange交換機創建成功"); });// 6. 聲明隊列channel.declareQueue("test-queue").onError([](const char *message){LOG_ERROR("聲明隊列失敗:{}",message);exit(0); }).onSuccess([](){ LOG_INFO("test-queue隊列創建成功"); });// 7.針對交換機和隊列進行綁定channel.bindQueue("test-exchange", "test-queue", "test-queue-key").onError([](const char *message){LOG_ERROR("test-exchange - test-queue 綁定失敗:{}",message);exit(0); }).onSuccess([](){ LOG_INFO("test-exchange - test-queue 綁定成功!"); });// 8.向交換機發布消息for (int i = 0; i < 10; i++){std::string msg = "Hello World" + std::to_string(i);bool ret = channel.publish("test-exchange", "test-queue-key", msg);if (ret == false){LOG_ERROR("publish 失敗");exit(0);}}// 9.啟動底層網絡通信框架--開啟I/Oev_run(loop, 0);return 0;
}

2.consume

訂閱程序流程

//1. 實例化底層網絡通信框架的I/O事件監控句柄
//2. 實例化libEvHandler句柄 --- 將AMQP框架與事件監控關聯起來
//3. 實例化連接對象
//4. 實例化信道對象
//5. 聲明交換機
//6. 聲明隊列
//7.針對交換機和隊列進行綁定
//8.訂閱隊列消息--設置消息處理回調函數
//9.啟動底層網絡通信框架--開啟I/O

consume.cc:

#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
#include <gflags/gflags.h>
#include "../../common/logger.hpp"DEFINE_bool(run_mode, false, "程序的運行模式,0-調試,1-發布");
DEFINE_string(log_file, "", "發布模式下,用于指定日志的輸出文件");
DEFINE_int32(log_level, 0, "發布模式下,用于指定日志的輸出等級");// 消息回調函數的實現
void MessageCb(AMQP::TcpChannel *channel, const AMQP::Message &message, uint64_t deliveryTag, bool relivered)
{std::string msg;msg.assign(message.body(), message.bodySize());LOG_INFO("{}", msg);channel->ack(deliveryTag);
}int main(int argc,char* argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);hdp::init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);// 1.實例化底層網絡通信框架的I/O事件監控句柄// struct ev_loop *loop = ev_default_loop();struct ev_loop *loop = EV_DEFAULT;// 2.實例化libEvHandler句柄 --- 將AMQP框架與事件監控關聯起來AMQP::LibEvHandler handler(loop);// 3.實例化連接對象AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");AMQP::TcpConnection connection(&handler, address);// 4.實例化信道對象AMQP::TcpChannel channel(&connection);// 5.聲明交換機channel.declareExchange("test-exchange", AMQP::ExchangeType::direct).onError([](const char *message){LOG_ERROR("聲明交換機失敗:{}",message);exit(0); }).onSuccess([](){ LOG_INFO("test-exchange交換機創建成功"); });// 6.聲明隊列channel.declareQueue("test-queue").onError([](const char *message){LOG_ERROR("聲明隊列失敗:{}",message);exit(0); }).onSuccess([](){ LOG_INFO("test-queue隊列創建成功"); });// 7.針對交換機和隊列進行綁定channel.bindQueue("test-exchange", "test-queue", "test-queue-key").onError([](const char *message){LOG_ERROR("test-exchange - test-queue 綁定失敗:{}",message);exit(0); }).onSuccess([](){ LOG_INFO("test-exchange - test-queue 綁定成功!"); });// 8.訂閱隊列消息--設置消息處理回調函數auto callback = std::bind(MessageCb, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);channel.consume("test-queue", "consume-tag").onReceived(callback).onError([](const char *message){LOG_ERROR("訂閱 test-queue 隊列消息失敗:{},message");exit(0); });// 9.啟動底層網絡通信框架--開啟I/Oev_run(loop, 0);return 0;
}

makefile:

all:publish consume
publish:publish.ccg++ -o $@ $^ -std=c++17 -lamqpcpp -lev -lpthread -ldl -lgflags -lspdlog -lfmt
consume:consume.ccg++ -o $@ $^ -std=c++17 -lamqpcpp -lev -lpthread -ldl -lgflags -lspdlog -lfmt.PHONY:clean
clean:rm -rf publish consume

測試結果:

image-20250218204158538

四、二次封裝

1.二次封裝思想

在使用 rabbitmq 的時候,我們目前只需要交換機與隊列的直接交換,實現一臺主機將消息發布給另一臺主機進行處理的功能,因此在這里可以對 mq 的操作進行簡單的封裝,使 mq 的操作在項目中更加簡便:

封裝思想:

1.將事件監控結構loop,通信連接connection,以及信道channel封裝起來

2.提供創建交換機,隊列并進行直接綁定的接口

3.提供向指定交換機發布消息的接口

4.提供訂閱指定消息隊列的接口

封裝一個 MQClient:

提供聲明指定交換機與隊列,并進行綁定的功能;

提供向指定交換機發布消息的功能

提供訂閱指定隊列消息,并設置回調函數進行消息消費處理的功能

2.rabbitmq.hpp

#pragma once
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
#include <thread>
#include "logger.hpp"namespace hdp
{class MQClient{public:using MessageCallback = std::function<void(const char *, size_t)>;MQClient(const std::string &username, const std::string &password, const std::string &host){_loop = EV_DEFAULT;_handler = std::make_unique<AMQP::LibEvHandler>(_loop);// amqp://root:123456@127.0.0.1:5672/std::string url = "amqp://" + username + ":" + password + "@" + host + "/";AMQP::Address address(url);_connection = std::make_unique<AMQP::TcpConnection>(_handler.get(), address);_channel = std::make_unique<AMQP::TcpChannel>(_connection.get());_loop_thread = std::thread([this]() { ev_run(_loop, 0); });}~MQClient(){ev_async_init(&_async_watcher, watch_callback);ev_async_start(_loop, &_async_watcher);ev_async_send(_loop, &_async_watcher);_loop_thread.join();// 內部會進行釋放,不需要我們進行手動釋放_loop = nullptr;}void declareComponents(const std::string &exchange, const std::string &queue,const std::string &routing_key = "routing-key",AMQP::ExchangeType exchange_type = AMQP::ExchangeType::direct){_channel->declareExchange(exchange, exchange_type).onError([](const char *message) {LOG_ERROR("聲明交換機失敗: {}",message);exit(0); }).onSuccess([exchange]() { LOG_INFO("{} 交換機創建成功", exchange); });_channel->declareQueue(queue).onError([](const char *message) {LOG_ERROR("聲明隊列失敗: {}",message);exit(0); }).onSuccess([queue](){ LOG_INFO("{}隊列創建成功", queue); });_channel->bindQueue(exchange, queue, routing_key).onError([exchange, queue](const char *message) {LOG_ERROR("{} - {} 綁定失敗:{}",exchange,queue,message);exit(0); }).onSuccess([exchange, queue, routing_key]() { LOG_INFO("{} - {} - {} 綁定成功", exchange, queue, routing_key); });}bool publish(const std::string &exchange, const std::string &msg,const std::string &routing_key = "routing_key"){LOG_DEBUG("向交換機 {}-{} 發布消息!", exchange, routing_key);bool ret = _channel->publish(exchange, routing_key, msg);if (ret == false){LOG_ERROR("{} 發布消息失敗:", exchange);return false;}return true;}void consume(const std::string &queue, const MessageCallback &cb){LOG_DEBUG("開始訂閱 {} 隊列消息!", queue);_channel->consume(queue, "consume-tag") // 返回值 DeferredConsumer.onReceived([this, cb](const AMQP::Message &message,uint64_t deliveryTag, bool redelivered) {cb(message.body(),message.bodySize());_channel->ack(deliveryTag); }).onError([queue](const char *message) {LOG_ERROR("訂閱 {} 隊列消息失敗: {}",queue,message);exit(0); });}private:static void watch_callback(struct ev_loop *loop, ev_async *watcher, int32_t revents){ev_break(loop, EVBREAK_ALL);}private:struct ev_async _async_watcher;struct ev_loop *_loop;std::unique_ptr<AMQP::LibEvHandler> _handler;std::unique_ptr<AMQP::TcpConnection> _connection;std::unique_ptr<AMQP::TcpChannel> _channel;std::thread _loop_thread;};
}

3.consume.cc

#include <gflags/gflags.h>
#include "../../../common/rabbitmq.hpp"DEFINE_string(username, "root", "rabbitmq訪問用戶名");
DEFINE_string(password, "123456", "rabbitmq訪問用戶名密碼");
DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服務器地址信息 host:port");DEFINE_bool(run_mode, false, "程序的運行模式,0-調試,1-發布");
DEFINE_string(log_file, "", "發布模式下,用于指定日志的輸出文件");
DEFINE_int32(log_level, 0, "發布模式下,用于指定日志的輸出等級");void callback(const char *body, size_t size)
{std::string msg;msg.assign(body, size);LOG_INFO("訂閱消息: {}", msg);
}int main(int argc, char *argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);hdp::init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);hdp::MQClient client(FLAGS_username, FLAGS_password, FLAGS_host);client.declareComponents("test-exchange", "test-queue");client.consume("test-queue", callback);std::this_thread::sleep_for(std::chrono::seconds(60));return 0;
}

4.publish.cc

#include <gflags/gflags.h>
#include "../../../common/rabbitmq.hpp"DEFINE_string(username, "root", "rabbitmq訪問用戶名");
DEFINE_string(password, "123456", "rabbitmq訪問用戶名密碼");
DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服務器地址信息 host:port");DEFINE_bool(run_mode, false, "程序的運行模式,0-調試,1-發布");
DEFINE_string(log_file, "", "發布模式下,用于指定日志的輸出文件");
DEFINE_int32(log_level, 0, "發布模式下,用于指定日志的輸出等級");int main(int argc, char *argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);hdp::init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);hdp::MQClient client(FLAGS_username, FLAGS_password, FLAGS_host);client.declareComponents("test-exchange", "test-queue");for (int i = 0; i < 10; i++){std::string msg = "Hello World " + std::to_string(i);bool ret = client.publish("test-exchange", msg);if (ret == false){LOG_ERROR("publish 失敗");}}std::this_thread::sleep_for(std::chrono::seconds(3));return 0;
}

5.makefile

all:publish consume
publish:publish.ccg++ -o $@ $^ -std=c++17 -lamqpcpp -lev -lpthread -ldl -lgflags -lspdlog -lfmt
consume:consume.ccg++ -o $@ $^ -std=c++17 -lamqpcpp -lev -lpthread -ldl -lgflags -lspdlog -lfmt.PHONY:clean
clean:rm -rf publish consume

6.測試結果

發布:

image-20250218214220589

訂閱:

e::ParseCommandLineFlags(&argc, &argv, true);
hdp::init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);

hdp::MQClient client(FLAGS_username, FLAGS_password, FLAGS_host);
client.declareComponents("test-exchange", "test-queue");for (int i = 0; i < 10; i++)
{std::string msg = "Hello World " + std::to_string(i);bool ret = client.publish("test-exchange", msg);if (ret == false){LOG_ERROR("publish 失敗");}
}
std::this_thread::sleep_for(std::chrono::seconds(3));
return 0;

}


## 5.makefile```makefile
all:publish consume
publish:publish.ccg++ -o $@ $^ -std=c++17 -lamqpcpp -lev -lpthread -ldl -lgflags -lspdlog -lfmt
consume:consume.ccg++ -o $@ $^ -std=c++17 -lamqpcpp -lev -lpthread -ldl -lgflags -lspdlog -lfmt.PHONY:clean
clean:rm -rf publish consume

6.測試結果

發布:

[外鏈圖片轉存中…(img-aXENtItP-1755520555473)]

訂閱:

image-20250218214245704

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/96011.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/96011.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/96011.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

(論文速讀)ViDAR:視覺自動駕駛預訓練框架

論文題目&#xff1a;Visual Point Cloud Forecasting enables Scalable Autonomous Driving&#xff08;視覺點云預測實現可擴展的自動駕駛&#xff09; 會議&#xff1a;CVPR2024 摘要&#xff1a;與對通用視覺的廣泛研究相比&#xff0c;可擴展視覺自動駕駛的預訓練很少被探…

《Unity Shader入門精要》學習筆記二

1、基礎光照&#xff08;1&#xff09;看世界的光模擬真實的光照環境來生成一張圖像&#xff0c;需要考慮3種物理現象。光線從光源中被發射出來。光線和場景中的一些物體相交&#xff1a;一些光線被物體吸收了&#xff0c;而另一些光線被散射到其他方向攝像機吸收了一些光&…

Windchill 11.0使用枚舉類型自定義實用程序實現生命周期狀態管理

一、Enumerated Type Customization Utility 枚舉類型自定義實用程序,可用于添加或編輯枚舉類型的值,在Windchill 12.0+中可直接在類型和屬性管理中編輯,如下圖所示,而在Windchill 11.0中只能通過windchill shell啟動程序,下面將詳細介紹Windchill 11.0中啟動并使用枚舉類…

UGUI源碼剖析(10):總結——基于源碼分析的UGUI設計原則與性能優化策略

UGUI源碼剖析&#xff08;第十章&#xff09;&#xff1a;總結——基于源碼分析的UGUI設計原則與性能優化策略 本系列文章對UGUI的核心組件與系統進行了深入的源代碼級分析。本章旨在對前述內容進行系統性總結&#xff0c;提煉出UGUI框架最核心的設計原則&#xff0c;并基于這些…

STM32N6引入NPU,為邊緣AI插上“隱形的翅膀”

2025年的春天格外特別。伴隨著人形機器人、DeepSeek的強勢刷屏&#xff0c;AI成了最有前景的賽道。萬物皆可AI&#xff0c;萬物也在尋覓用上AI或者讓AI“轉正”的“aha moment”。 幫助機器更好地“思考”&#xff0c;讓更多的AI走向邊緣&#xff0c;是AI發展的重要趨勢之一。…

演練:使用VB開發多智能體協作的榮格八維分析器

在大語言模型高速發展的時代&#xff0c;我們面對困難的語義分析任務&#xff0c;通過構建智能體進行處理是一個流行趨勢。本文將介紹如何使用 Visual Basic .NET 開發一個多智能體協作系統&#xff0c;用于分析聊天記錄中特定人物的榮格八維人格類型。 本文使用 CC-BY-NC-SA …

llamafactory使用qlora訓練

llamafactory使用qlora訓練 1.環境搭建 conda create -n qlora python3.10 -y conda activate qlora# 克隆LLaMA-Factory倉庫 git clone https://github.com/hiyouga/LLaMA-Factory.git# 進入倉庫目錄 cd LLaMA-Factory# 切換到0.9.4版本 git checkout v0.9.4pip install -e .2…

模型微調/量化技術整理

一、模型微調技術1.模型微調簡介大模型微調(Fine-tuning)&#xff0c;是指在已經預訓練好的大語言模型基礎上&#xff08;基座模型&#xff09;&#xff0c;使用特定的數據集進行進一步訓練&#xff0c;讓模型適應特定任務或領域。通常LLM的預訓練是無監督的&#xff0c;但微調…

實踐筆記-VSCode與IDE同步問題解決指南;程序總是進入中斷服務程序。

一、VSCode 修改文件后&#xff0c;IDE 未同步如果你在 VSCode 中異步修改了項目文件內容&#xff0c;但 S32DS 或 Keil&#xff08;等集成開發環境&#xff09;中的項目沒有同步更新&#xff0c;有兩個解決方法&#xff1a;檢查文件是否已保存&#xff1a;確保 VSCode 中修改的…

C#WPF實戰出真汁04--登錄功能實現

1、登錄功能實現要點對于登錄系統&#xff0c;應該注意幾個要點&#xff1a;用戶認證流程設計&#xff0c;密碼存儲與驗證&#xff0c;會話管理&#xff0c;防暴力破解措施&#xff0c;錯誤處理與提示2、登錄功能的視圖模型首先在xaml文件中必須指定該頁面使用的視圖模型&#…

鴻蒙入門簡化版

第一步&#xff1a; 首先下載DEVStudio https://developer.huawei.com/consumer/cn/deveco-studio/ 第二步&#xff1a; 了解基本的ArkTs語言 https://developer.huawei.com/consumer/cn/doc/harmonyos-guides/introduction-to-arkts 第三步 &#xff1a; 教學視頻有兩個途徑&a…

day25|學習前端js

函數聲明&#xff0c;被提升&#xff08;hoisting&#xff09;。函數表達式必須先定義才能用。對象解構&#xff0c;按屬性名數組解構按順序點運算符. 對象.屬性名哪些可迭代&#xff08;可以被for..of循環的東西&#xff09;&#xff1a;array&#xff0c;string&#xff0c;m…

quic協議與應用開發

quic為什么出現&#xff1f;quic主要是為了解決TCP協議的局限性而提出的&#xff0c;具體來說是要解決如下問題&#xff1a;1. 加密連接建立時間長TCP協議是傳輸層協議&#xff0c;而TLS是會話層協議&#xff0c;在Linux等主流操作系統中TCP在內核實現而TLS一般在用戶態實現&am…

【淺學】tflite-micro + ESP32S3 + VScode + ESP-IDF 基于例程快速實現自己的圖像分類模型訓練部署全流程

如果你用Pytorch訓練的模型那么可以參考我的步驟&#xff0c;使用的是Tensorflow的話參考官方文檔即可&#xff0c;但流程都是一樣的&#xff0c;每一步我都會提到部分操作細節及注意事項 官方教程 要詳細學習的話tflite-micro里的微控制器章節下都詳細看&#xff08;頁面左側…

【HarmonyOS】應用設置全屏和安全區域詳解

【HarmonyOS】應用設置全屏和安全區域詳解 一、前言 IDE創建的鴻蒙應用&#xff0c;默認采取組件安全區布局方案。頂部會預留狀態欄區域&#xff0c;底部會預留導航條區域。這就是所謂的安全區域。 如果不處理&#xff0c;界面效果很割裂。所以業內UI交互設計&#xff0c;都會設…

openfeign 只有接口如何創建bean的

OpenFeign 能夠為純接口創建 Spring Bean&#xff0c;其核心機制是通過動態代理和 Spring 的 FactoryBean 機制實現的。以下是詳細的工作原理&#xff1a;1. EnableFeignClients 注解的啟動在 Spring Boot 主類上添加 EnableFeignClients 注解&#xff1a;SpringBootApplicatio…

【展廳多媒體】互動地磚屏怎么提升展廳互動感的?

在數字化展廳設計中&#xff0c;互動地磚屏 正成為提升觀眾參與度的重要工具。這種融合視覺科技與交互體驗的裝置&#xff0c;通過動態影像與即時反饋&#xff0c;讓參觀者從被動觀看轉變為主動探索&#xff0c;從而大幅增強展廳的互動感。 Led地面互動屏的優勢在于其強大的視…

AI賦能電力巡檢:變壓器漏油智能檢測系統全解析

&#x1f525; AI賦能電力巡檢&#xff1a;變壓器漏油智能檢測系統全解析 &#x1f4d6; 前言 在電力系統的日常運維中&#xff0c;變壓器作為核心設備&#xff0c;其安全運行直接關系到整個電網的穩定性。傳統的人工巡檢方式不僅效率低下&#xff0c;還存在安全隱患和漏檢風險…

GitHub上值得Star的計算機視覺項目

GitHub上值得Star的計算機視覺項目 前言 一、OpenCV:計算機視覺領域的瑞士軍刀 1.1 項目簡介 1.2 核心功能與技術特點 1.3 代碼示例 二、YOLO 系列:實時目標檢測的領導者 2.1 項目簡介 2.2 核心功能與技術特點 2.3 代碼示例 三、Detectron2:Facebook AI Research 的目標檢測…

【深度學習】pytorch深度學習框架的環境配置

文章目錄1. 配置cuda環境2. 配置conda環境3. 配置pytorch gpu環境1. 配置cuda環境 在命令行輸入以下命令可以查看當前顯卡驅動版本和最高支持的cuda版本 nvidia-smi根據cuda版本去官網下載并安裝cuda 下載鏈接&#xff1a;https://developer.nvidia.com/cuda-toolkit-archive…