RabbitMq 消息隊列組件
- 1. RabbitMq介紹
- 2. 安裝RabbitMQ
- 3. 安裝 RabbitMQ 的 C++客戶端庫
- 4. AMQP-CPP 庫的簡單使用
- 4.1 使用
- 4.1.1 TCP 模式
- 4.1.2 擴展模式
- 4.2 常用類與接口介紹
- 4.2.1 Channel
- 4.3.2 ev
- 5. RabbitMQ樣例編寫
- 5.1 發布消息
- 5.2 訂閱消息
1. RabbitMq介紹
RabbitMq - 消息隊列組件:實現兩個客戶端主機之間消息傳輸的功能(發布&訂閱)。
一端發布消息,一端訂閱消息,消息就會被推送到訂閱消息那一端然后進行處理。
RabbitMq遵守AMQP協議(標準的高級消息隊列協議)
AMQP協議核心概念:交換機(交換機類型)、隊列,綁定,消息。
兩個客戶端之間進行消息傳輸,一端產生消息另一端接收消息然后處理。按照以前的思想就是兩個客戶端直接進行網絡通信socket,通過網絡消息將一條消息發送給對方讓對方進行處理,這是一種最基礎數據傳輸過程。
但是這種消息傳輸是存在缺陷的!如果有一端連接斷開了,那另一端消息到底還發不發,是等,還是將這條消息丟棄掉。如果一直等,新產生的消息又該怎么辦,總不能一直存著。所以這種安全性是很低的。而且一對一這種客戶端里面,通常數據的產生和數據的處理所消耗的時間是不成正比的。通常消息的處理消耗時間更多。
基于兩端消息進行安全傳輸的需求,所以高級消息隊列組件就產生了。兩端不直接進行消息傳輸了。而是通過消息隊列服務器來進行一個中間的數據轉發功能。發布消息客戶端將信息發布到服務器上,服務器在將這條消息推送給訂閱消息隊列客戶端讓它來進行處理。
但是針對一個高級消息隊列設計的話,單純一個只是做中間數據轉發其實是不夠的。我們希望它能在做中間數據轉發更加靈活,在不同場景提供不同的功能。這個時候就有了AMQP的核心概念(交換機、隊列、綁定、消息)。
消息隊列服務器里面首先有一個交換機,它是用來處理數據轉發邏輯功能模塊。然后還有隊列。訂閱客戶端連接服務器告訴服務器訂閱那個隊列。發布客戶端進行消息發布并不是直接把消息發布到某個隊列中,而是把信息發布到交換機,由交換機來決定把這條消息放到那個隊列。決定了這條消息推送到那個訂閱客戶端哪里去進行處理。
交換機該把消息放到那一個隊列中呢?這個時候就有了不同的交換機類型:
- 廣播交換:當交換機收到消息,則將消息發布到所有綁定的隊列中
交換機和隊列都創建好了之后,會把交換機和隊列進行關系綁定,也就是交換機和隊列建立一個關聯關系。而且會設置一個routing key(路由密鑰:一定規則的字符串)用來標識這是一個放置什么類型消息的隊列。
- 直接交換:根據消息中的binding_key與綁定的routing_key對比,一致則放到隊列中
- 主題交換:使用binding_key與綁定的routing_key進行規則匹配,成功則放入隊列
2. 安裝RabbitMQ
sudo apt install rabbitmq-server
# 啟動服務
sudo systemctl start rabbitmq-server.service
# 查看服務狀態
sudo systemctl status rabbitmq-server.service# 安裝完成的時候默認有個用戶 guest ,但是權限不夠,要創建一個
# administrator 用戶,才可以做為遠程登錄和發表訂閱消息:#添加用戶
sudo rabbitmqctl add_user root 123456#設置用戶 tag
sudo rabbitmqctl set_user_tags root administrator#設置用戶權限
sudo rabbitmqctl set_permissions -p / root "." "." ".*"# RabbitMQ 自帶了 web 管理界面,執行下面命令開啟
sudo rabbitmq-plugins enable rabbitmq_management
訪問 webUI 界面, 默認端口為 15672
至此RabbitMQ安裝成功。
3. 安裝 RabbitMQ 的 C++客戶端庫
我們這里使用 AMQP-CPP 庫來編寫客戶端程序。
先安裝libev網絡通信庫。在搭建RabbitMQ客戶端的時候需要進行一個網絡通信的事件監控。事件監控我們可以自己寫poll,epoll但是太麻煩了。這里我們使用第三方網絡通信框架。RabbitMQ對libevent、libev等等這些都支持。這里我們選擇的是libvev。
sudo apt install libev-dev #libev 網絡庫組件
git clone https://gitee.com/iOceanPlus_Forked/AMQP-CPP.git
cd AMQP-CPP/
make
make install
至此可以通過 AMQP-CPP 來操作 rabbitmq
4. AMQP-CPP 庫的簡單使用
AMQP-CPP 是用于與 RabbitMq 消息中間件通信的 c++庫。它能解析從 RabbitMq
服務發送來的數據,也可以生成發向 RabbitMq 的數據包。AMQP-CPP 庫不會向
RabbitMq 建立網絡連接,所有的網絡I/O由用戶完成。
- 當然,AMQP-CPP 提供了可選的網絡層接口,它預定義了 TCP 模塊,用戶就不用自己實現網絡IO,我們也可以選擇 libevent、libev、libuv、asio 等異步通信組件,需要手動安裝對應的組件。
- AMQP-CPP 完全異步,沒有阻塞式的系統調用,不使用線程就能夠應用在高性能
應用中。 - 注意:它需要 c++17 的支持
4.1 使用
AMQP-CPP 的使用有兩種模式:
- 使用默認的 TCP 模塊進行網絡通信
- 使用擴展的 libevent、libev、libuv、asio 異步通信組件進行通信
4.1.1 TCP 模式
- 實現一個類繼承自 AMQP::TcpHandler 類, 它負責網絡層的 TCP 連接
- 重寫相關函數, 其中必須重寫 monitor 函數
- 在 monitor 函數中需要實現的是將 fd 放入 eventloop(select、epoll)中監控, 當 fd可寫可讀就緒之后, 調用 AMQP-CPP 的 connection->process(fd, flags)方法
#include <amqpcpp.h>
#include <amqpcpp/linux_tcp.h>
class MyTcpHandler : public AMQP::TcpHandler
{/***AMQP 庫在創建新連接時調用的方法*與處理程序相關聯。這是對處理程序的第一次調用*@param connection 附加到處理程序的連接*/virtual void onAttached(AMQP::TcpConnection *connection) override{//@todo// 添加您自己的實現,例如初始化事物// 以處理連接。}/***當 TCP 連接時由 AMQP 庫調用的方法*已經建立。調用此方法后,庫*仍然需要設置可選的 TLS 層和*在 TCP 層的頂部建立 AMQP 連接。,這種方法*總是與稍后對 onLost()的調用配對。*@param connection 現在可以使用的連接*/virtual void onConnected(AMQP::TcpConnection *connection) override{//@todo// 添加您自己的實現(可能不需要)}/***在建立安全 TLS 連接時調用的方法。*這只對 amqps://連接調用。它允許您檢查連接是否足夠安全,以滿足您的喜好*(例如,您可以檢查服務器證書)。AMQP 協議仍然需要啟動。*@param connection 已被保護的連接*@param ssl 來自 openssl 庫的 ssl 結構*@return bool 如果可以使用連接,則為 True*/virtual bool onSecured(AMQP::TcpConnection *connection, const SSL *ssl) override{//@todo// 添加您自己的實現,例如讀取證書并檢查它是否確實是您的return true;}/***當登錄嘗試成功時由 AMQP 庫調用的方法。在此之后,連接就可以使用了。*@param connection 現在可以使用的連接*/virtual void onReady(AMQP::TcpConnection *connection) override{//@todo// 添加您自己的實現,例如通過創建一個通道實例,然后開始發布或使用}/***該方法在服務器嘗試協商檢測信號間隔時調用,*并被覆蓋以擺脫默認實現(否決建議的檢測信號間隔),轉而接受該間隔。*@param connection 發生錯誤的連接*@param interval 建議的間隔(秒)*/virtual uint16_t onNegotiate(AMQP::TcpConnection *connection,uint16_t interval){// 我們接受服務器的建議,但如果間隔小于一分鐘,我們將使用一分鐘的間隔if (interval < 60)interval = 60;//@todo// 在事件循環中設置一個計時器,// 如果在這段時間內沒有發送其他指令,// 請確保每隔 interval 秒調用 connection->heartbeat()。// 返回我們要使用的間隔return interval;}/*** *發生致命錯誤時由 AMQP 庫調用的方法
例如,因為無法識別從 RabbitMQ 接收的數據,或者基礎連接丟失。
此調用之后通常會調用 onLost()(如果錯誤發生在 TCP 連接建立之
后)和 onDetached()。
*@param connection 發生錯誤的連接
*@param message 一條人類可讀的錯誤消息
*/virtual void onError(AMQP::TcpConnection *connection, const char *message) override{//@todo// 添加您自己的實現,例如,通過向程序的用戶報告錯誤并記錄錯誤}/***該方法在 AMQP 協議結束時調用的方法。這是調用 connection.close()以正常關閉連接的計數器部分。請注意,TCP 連接此時仍處于活動狀態,您還將收到對 onLost()和 onDetached()的調用@param connection AMQP 協議結束的連接*/virtual void onClosed(AMQP::TcpConnection *connection) override{//@todo// 添加您自己的實現, 可能沒有必要,// 但如果您想在 amqp 連接結束后立即執行某些操作,// 又不想等待 tcp 連接關閉,則這可能會很有用}/***當 TCP 連接關閉或丟失時調用的方法。*如果同時調用了 onConnected(),則始終調用此方法*@param connection 已關閉但現在無法使用的連接*/virtual void onLost(AMQP::TcpConnection *connection) override{//@todo// 添加您自己的實現(可能沒有必要)}/***調用的最終方法。這表示將不再對處理程序進行有關連接的進一步調用。*@param connection 可以被破壞的連接
*/virtual void onDetached(AMQP::TcpConnection *connection) override{//@todo// 添加您自己的實現,如清理資源或退出應用程序}/***當 AMQP-CPP 庫想要與主事件循環交互時,它會調用該方法。*AMQP-CPP 庫是完全不阻塞的,*并且只有在事先知道這些調用不會阻塞時才進行“write()”或“read()”系統調用。*要在事件循環中注冊文件描述符,它會調用這個“monitor()”方法,*該方法帶有一個文件描述符和指示是否該檢查文件描述符的可讀性或可寫性的標志。**@param connection 想要與事件循環交互的連接*@param fd 應該檢查的文件描述符*@param 標記位或 AMQP::可讀和/或 AMQP::可寫*/virtual void monitor(AMQP::TcpConnection *connection, int fd,int flags) override{//@todo// 添加您自己的實現,// 例如將文件描述符添加到主應用程序事件循環(如 select()或 poll()循環)。// 當事件循環報告描述符變為可讀和或可寫時,// 由您通過調用 connection->process(fd,flags)方法// 通知 AMQP-CPP 庫文件描述符處于活動狀態。}
};
4.1.2 擴展模式
以 libev 為例, 我們不必要自己實現 monitor 函數, 可以直接使用AMQP::LibEvHandler
4.2 常用類與接口介紹
4.2.1 Channel
channel(信道類) 是一個虛擬連接,大佬認為一個socket只用于一個連接太浪費了,所有在socket之上又做了封裝,一個連接上可以建立多個信道。每個信道都可以支持一個客戶端和服務器進行通信。并且所有的 RabbitMq 指令都是通過 channel 傳輸,所以連接建立后的第一步,就是建立 channel。因為所有操作是異步的,所以在 channel 上執行指令的返回值并不能作為操作執行結果,實際上它返回的是 Deferred 類,可以使用它安裝處理函數。
namespace AMQP
{/*** Generic callbacks that are used by many deferred objects*/using SuccessCallback = std::function<void()>;using ErrorCallback = std::function<void(const char* message)>;using FinalizeCallback = std::function<void()>;/*** Declaring and deleting a queue*/using QueueCallback = std::function<void(const std::string& name,uint32_t messagecount, uint32_t consumercount)>;using DeleteCallback = std::function<void(uint32_t deletedmessages)>;using MessageCallback = std::function<void(const Message &message,uint64_t deliveryTag,bool redelivered)>;// 當使用發布者確認時,當服務器確認消息已被接收和處理時,將調用AckCallback using AckCallback = std::function<void(uint64_t deliveryTag,bool multiple)>;// 使用確認包裹通道時,當消息被 ack/nacked 時,會調用這些回調using PublishAckCallback = std::function<void()>;using PublishNackCallback = std::function<void()>;using PublishLostCallback = std::function<void()>;class Channel{//構造函數Channel(Connection *connection);//判斷是否連接成功bool connected();/***聲明交換機,交換機已經存在就ok,不存在就創建*如果提供了一個空名稱,則服務器將分配一個名稱。*以下 flags 可用于交換機:**-durable 持久化,重啟后交換機依然有效*-autodelete 刪除所有連接的隊列后,自動刪除交換*-passive 僅被動檢查交換機是否存在*-internal 創建內部交換**@param name 交換機的名稱*@param-type 交換類型enum ExchangeType{fanout, 廣播交換,綁定的隊列都能拿到消息direct, 直接交換,只將消息交給 routingkey 一致的隊列topic, 主題交換,將消息交給符合 bindingkey 規則的隊列headers,consistent_hash,message_deduplication};*@param flags 交換機標志*@param arguments 其他參數**此函數返回一個延遲處理程序Deferred類。可以設置回調函數using onSuccess(), onError() and onFinalize() methods.*/Deferred &declareExchange(const std::string_view &name,ExchangeType type,int flags,const Table &arguments)/***聲明隊列*如果不提供名稱,服務器將分配一個名稱。*flags 可以是以下值的組合:**-durable 持久隊列在代理重新啟動后仍然有效*-autodelete 當所有連接的使用者都離開時,自動刪除隊列*-passive 僅被動檢查隊列是否存在*-exclusive 隊列僅存在于此連接,并且在連接斷開時自動刪除**@param name 隊列的名稱*@param flags 標志組合*@param arguments 可選參數**此函數返回一個延遲處理程序DeferredQueue類。可以設置回調函數*使用 onSuccess()、onError()和 onFinalize()方法。*Deferred &onError(const char *message)**可以安裝的 onSuccess()回調應該具有以下簽名:void myCallback(const std::string &name,uint32_t messageCount,uint32_t consumerCount);例如:channel.declareQueue("myqueue").onSuccess([](const std::string &name,uint32_t messageCount,uint32_t consumerCount) {std::cout << "Queue '" << name << "' ";std::cout << "has been declared with ";std::cout << messageCount;std::cout << " messages and ";std::cout << consumerCount;std::cout << " consumers" << std::endl;* });*/DeferredQueue &declareQueue(const std::string_view &name,int flags,const Table &arguments)/***將隊列綁定到交換機**@param exchange 源交換機*@param queue 目標隊列*@param routingkey 路由密鑰*@param arguments 其他綁定參數**此函數返回一個延遲處理程序。可以安裝回調*使用 onSuccess()、onError()和 onFinalize()方法。*/Deferred &bindQueue(const std::string_view &exchange,const std::string_view &queue,const std::string_view &routingkey,const Table &arguments)/***將消息發布到 exchange*您必須提供交換機的名稱和路由密鑰。然后,RabbitMQ 將嘗試將消息發送到一個或多個隊列。使用可選的 flags 參數,可以指定如果消息無法路由到隊列時應該發生的情況。默認情況下,不可更改的消息將被靜默地丟棄。**如果設置了'mandatory'或'immediate'標志,則無法處理的消息將返回到應用程序。在開始發布之前,請確保您已經調用了 recall()-方法,并設置了所有適當的處理程序來處理這些返回的消息。**可以提供以下 flags:**-mandatory 如果設置,服務器將返回未發送到隊列的消息*-immediate 如果設置,服務器將返回無法立即轉發給使用者的消息。*@param exchange 要發布到的交易所*@param routingkey 路由密鑰*@param envelope 要發送的完整信封*@param message 要發送的消息*@param size 消息的大小*@param flags 可選標志*/bool publish(const std::string_view &exchange,const std::string_view &routingKey,const std::string &message,int flags = 0)/***告訴 RabbitMQ 服務器我們已準備好使用消息-也就是訂閱那個隊列消息**調用此方法后,RabbitMQ 開始向客戶端應用程序傳遞消息。consumer tag 是一個字符串標識符,如果您以后想通過 channel::cancel()調用停止它,可以使用它來標識使用者。*如果您沒有指定使用者 tag,服務器將為您分配一個。**支持以下 flags:**-nolocal 如果設置了,則不會同時消耗在此通道上發布的消息*-noack 如果設置了,則不必對已消費的消息進行確認*-exclusive 請求獨占訪問,只有此使用者可以訪問隊列**@param queue 您要使用的隊列*@param tag 將與此消費操作關聯的消費者標記*@param flags 其他標記*@param arguments 其他參數**此函數返回一個延遲處理程序。可以使用 onSuccess()、onError()和 onFinalize()方法安裝回調。可以安裝的 onSuccess()回調應該具有以下格式:void myCallback(const std::string_view&tag);樣例:channel.consume("myqueue").onSuccess([](const std::string_view& tag) {std::cout << "Started consuming under tag ";std::cout << tag << std::endl;});*/DeferredConsumer &consume(const std::string_view &queue,const std::string_view &tag,int flags,const Table &arguments)/***確認接收到的消息**當在 DeferredConsumer::onReceived()方法中接收到消息進行處理之后,必須確認該消息,以便 RabbitMQ 將其從隊列中刪除(除非使用 noack 選項消費)。**支持以下標志:**-多條確認多條消息:之前傳遞的所有未確認消息也會得到確認**@param deliveryTag 消息的唯一 delivery 標簽*@param flags 可選標志*@return bool*/bool ack(uint64_t deliveryTag, int flags = 0)} class DeferredConsumer{/*注冊一個回調函數,該函數在消費者啟動時被調用。void onSuccess(const std::string &consumertag)*/DeferredConsumer &onSuccess(const ConsumeCallback &callback)/*注冊回調函數,用于接收到一個完整消息的時候被調用void MessageCallback(const AMQP::Message &message,uint64_t deliveryTag, bool redelivered)*/DeferredConsumer &onReceived(const MessageCallback &callback)/* Alias for onReceived() */DeferredConsumer &onMessage(const MessageCallback &callback)/*注冊要在服務器取消消費者時調用的函數void CancelCallback(const std::string &tag)*/DeferredConsumer &onCancelled(const CancelCallback &callback)} class Message : public Envelope{const std::string &exchange()const std::string &routingkey():q} class Envelope : public MetaData{const char *body()uint64_t bodySize()}
}
類與接口的介紹總結:
AMQP::Channel:信道類
- Channel(Connection *connection) 構造
- bool connected() 判斷連接
- Deferred &declareExchange() 聲明交換機
- DeferredQueue &declareQueue() 聲明隊列
- Deferred& bindQueue)() 將交換機與隊列進行關系綁定的功能
- bool publish() 發布消息
- DeferredConsumer&consume() 定訂閱隊列消息
- bool ack() 消費者客戶端對收到的消息進行確認應答
class Message:消息類
- const char* body() 獲取消息正文
- uint64_t bodySize() 獲取消息正文大小
4.3.2 ev
typedef struct ev_async
{EV_WATCHER(ev_async)EV_ATOMIC_T sent; /* private */
} ev_async;
// break type
enum
{EVBREAK_CANCEL = 0, /* undo unloop */EVBREAK_ONE = 1, /* unloop once */EVBREAK_ALL = 2 /* unloop all loops */
};
//實例化并獲取I/O事件監控結構句柄
struct ev_loop *ev_default_loop(unsigned int flags EV_CPP(= 0))
#define EV_DEFAULT ev_default_loop(0)(使用宏獲取上面結構)//開始運行I/O事件監控,這是一個阻塞接口(創建一個線程執行該接口)
int ev_run(struct ev_loop *loop);/* break out of the loop */
//結束I/O監控
void ev_break(struct ev_loop *loop, int32_t break_type);
void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents)
//如果在當前線程進行ev_run則可以直接調用,如果在其他線程中進行ev_run需要通過異步通知進行
void ev_async_init(ev_async *w, callback cb);//初始化異步事件結構,并設置回調函數
void ev_async_start(struct ev_loop *loop, ev_async *w);//啟動事件監控循環中的異步任務處理
void ev_async_send(struct ev_loop *loop, ev_async *w);//發送當前異步事件到異步線程中執行
第三方庫鏈接
g++ -o example example.cpp -lamqpcpp -lev
5. RabbitMQ樣例編寫
5.1 發布消息
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>int main()
{//1. 實例化底層網絡通信框架的I/O事件監控句柄auto *loop = EV_DEFAULT;//2. 實例化libEvHandler句柄 --- 將AMQP框架與事件監控關聯起來AMQP::LibEvHandler handler(loop);//3. 實例化連接對象AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");AMQP::TcpConnection connection(&handler, address);//4. 實例化信道對象AMQP::TcpChannel channel(&connection);//5. 聲明交換機channel.declareExchange("test-exchange", AMQP::ExchangeType::direct).onError([](const char *message) {std::cout << "聲明交換機失敗:" << message << std::endl;exit(0);}).onSuccess([](){std::cout << "test-exchange 交換機創建成功!" << std::endl;});//6. 聲明隊列channel.declareQueue("test-queue").onError([](const char *message) {std::cout << "聲明隊列失敗:" << message << std::endl;exit(0);}).onSuccess([](){std::cout << "test-queue 隊列創建成功!" << std::endl;});//7. 針對交換機和隊列進行綁定channel.bindQueue("test-exchange", "test-queue", "test-queue-key").onError([](const char *message) {std::cout << "test-exchange - test-queue 綁定失敗:" << message << std::endl;exit(0);}).onSuccess([](){std::cout << "test-exchange - test-queue 綁定成功!" << std::endl;});//8. 向交換機發布消息for (int i = 0; i < 10; i++) {std::string msg = "Hello Bite-" + std::to_string(i);bool ret = channel.publish("test-exchange", "test-queue-key", msg);if (ret == false) {std::cout << "publish 失敗!\n";}}//啟動底層網絡通信框架--開啟I/Oev_run(loop, 0);return 0;
}
5.2 訂閱消息
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>//消息回調處理函數的實現
void MessageCb(AMQP::TcpChannel *channel, const AMQP::Message &message, uint64_t deliveryTag, bool redelivered)
{std::string msg;msg.assign(message.body(), message.bodySize());std::cout << msg << std::endl;channel->ack(deliveryTag); // 對消息進行確認
}int main()
{//1. 實例化底層網絡通信框架的I/O事件監控句柄auto *loop = EV_DEFAULT;//2. 實例化libEvHandler句柄 --- 將AMQP框架與事件監控關聯起來AMQP::LibEvHandler handler(loop);//2.5. 實例化連接對象AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");AMQP::TcpConnection connection(&handler, address);//3. 實例化信道對象AMQP::TcpChannel channel(&connection);//4. 聲明交換機channel.declareExchange("test-exchange", AMQP::ExchangeType::direct).onError([](const char *message) {std::cout << "聲明交換機失敗:" << message << std::endl;exit(0);}).onSuccess([](){std::cout << "test-exchange 交換機創建成功!" << std::endl;});//5. 聲明隊列channel.declareQueue("test-queue").onError([](const char *message) {std::cout << "聲明隊列失敗:" << message << std::endl;exit(0);}).onSuccess([](){std::cout << "test-queue 隊列創建成功!" << std::endl;});//6. 針對交換機和隊列進行綁定channel.bindQueue("test-exchange", "test-queue", "test-queue-key").onError([](const char *message) {std::cout << "test-exchange - test-queue 綁定失敗:" << message << std::endl;exit(0);}).onSuccess([](){std::cout << "test-exchange - test-queue 綁定成功!" << std::endl;});//7. 訂閱隊列消息 -- 設置消息處理回調函數auto callback = std::bind(MessageCb, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);channel.consume("test-queue", "consume-tag") //返回值 DeferredConsumer.onReceived(callback).onError([](const char *message){std::cout << "訂閱 test-queue 隊列消息失敗:" << message << std::endl;exit(0);}); // 返回值是 AMQP::Deferred//8. 啟動底層網絡通信框架--開啟I/Oev_run(loop, 0);return 0;
}
all : publish consume
publish : publish.ccg++ -std=c++17 $^ -o $@ -lamqpcpp -lev
consume : consume.ccg++ -std=c++17 $^ -o $@ -lamqpcpp -lev