介紹
RabbitMQ 是一個開源的消息代理和隊列服務器,用于在分布式系統之間傳遞消息。它實現了高級消息隊列協議(AMQP),同時也支持其他協議如 STOMP、MQTT 等。
核心概念
-
Producer(生產者): 發送消息的應用程序
-
Consumer(消費者): 接收消息的應用程序
-
Queue(隊列): 存儲消息的緩沖區
-
Exchange(交換機): 接收生產者發送的消息并根據規則路由到隊列
-
Binding(綁定): 連接交換機和隊列的規則
-
Message(消息): 包含有效載荷(payload)和標簽(label)的數據
安裝
安裝 RabbitMQ
sudo apt install rabbitmq-server
RabbitMQ 的簡單使用
# 啟動服務
sudo systemctl start rabbitmq-server.service
# 查看服務狀態
sudo systemctl status rabbitmq-server.service
# 安裝完成的時候默認有個用戶 guest ,但是權限不夠,要創建一個
administrator 用戶,才可以做為遠程登錄和發表訂閱消息:
#添加用戶
sudo rabbitmqctl add_user root 123456
#設置用戶 tag
sudo rabbitmqctl set_user_tags root administrator
#設置用戶權限
sudo rabbitmqctl set_permissions -p / root "." "." ".*"
# RabbitMQ 自帶了 web 管理界面,執行下面命令開啟
sudo rabbitmq-plugins enable rabbitmq_management
訪問 webUI 界面的默認端口為 15672。
安裝 RabbitMQ 的 C++客戶端庫
- C 語言庫:https://github.com/alanxz/rabbitmq-c
- C++庫:https://github.com/CopernicaMarketingSoftware/AMQP-CPP/tree/master
AMQP 是一個開放標準的應用層協議,專為面向消息的中間件設計,RabbitMQ 是其最著名的實現之一。 我們這里使用 AMQP-CPP 庫來編寫客戶端程序。
sudo apt install libev-dev #libev 網絡庫組件
git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
cd AMQP-CPP/
make
make install
安裝報錯:
/usr/include/openssl/macros.h:147:4: error: #error
"OPENSSL_API_COMPAT expresses an impossible API compatibility level"
147 | # error "OPENSSL_API_COMPAT expresses an impossible API
compatibility level" | ^~~~~
In file included from /usr/include/openssl/ssl.h:18,
from linux_tcp/openssl.h:20,
from linux_tcp/openssl.cpp:12:
/usr/include/openssl/bio.h:687:1: error: expected constructor,
destructor, or type conversion before ‘DEPRECATEDIN_1_1_0’
687 | DEPRECATEDIN_1_1_0(int BIO_get_port(const char *str, unsigned short *port_ptr))
這種錯誤,表示 ssl 版本出現問題。
解決方案:卸載當前的 ssl 庫,重新進行修復安裝
sudo dpkg -P --force-all libevent-openssl-2.1-7
sudo dpkg -P --force-all openssl
sudo dpkg -P --force-all libssl-dev
sudo apt --fix-broken install
修復后,重新進行 make。
AMQP-CPP 庫的簡單使用
介紹
- AMQP-CPP 是用于與 RabbitMq 消息中間件通信的 c++庫。它能解析從 RabbitMq 服務發送來的數據,也可以生成發向 RabbitMq 的數據包。AMQP-CPP 庫不會向 RabbitMq 建立網絡連接,所有的網絡 io 由用戶完成。
- 當然,AMQP-CPP 提供了可選的網絡層接口,它預定義了 TCP 模塊,用戶就不用自己實現網絡 io,我們也可以選擇 libevent、libev、libuv、asio 等異步通信組件,需要手動安裝對應的組件。
- AMQP-CPP 完全異步,沒有阻塞式的系統調用,不使用線程就能夠應用在高性能應用中。
- 注意:它需要 c++17 的支持。
使用
AMQP-CPP 的使用有兩種模式:
- 使用默認的 TCP 模塊進行網絡通信
- 使用擴展的 libevent、libev、libuv、asio 異步通信組件進行通信
TCP 模式
該模式下需要實現一個類繼承自 AMQP::TcpHandler 類, 它負責網絡層的 TCP 連接,重寫相關函數, 其中必須重寫 monitor 函數。在 monitor 函數中需要實現的是將 fd 放入 eventloop(select 、 epoll) 中監控, 當 fd 可寫可讀就緒之后, 調用 AMQP-CPP 的 connection->process(fd, flags) 方法
TCP 模式使用較為麻煩,不過提供了靈活的網絡層集成能力,可以根據項目需求選擇合適的網絡庫進行集成。在實際應用中,建議結合事件循環庫(如libuv、Boost.Asio等)使用以獲得最佳性能。
擴展模式
以 libev 為例, 我們不必要自己實現 monitor 函數,可以直接使用 AMQP::LibEvHandler。
常用類與接口介紹
Channel
channel 是一個虛擬連接,一個連接上可以建立多個通道。并且所有的 RabbitMq 指令都是通過 channel 傳輸,所以連接建立后的第一步,就是建立 channel 。因為所有操作是異步的,所以在 channel 上執行指令的返回值并不能作為操作執行結果,實際上它返回的是 Deferred 類,可以使用它安裝處理函數。
namespace AMQP
{using SuccessCallback = std::function<void()>;using ErrorCallback = std::function<void(const char *message)>;using FinalizeCallback = std::function<void()>;using QueueCallback = std::function<void(const std::string &name,uint32_t messagecount, uint32_t consumercount)>;using DeleteCallback = std::function<void(uint32_t deletedmessages)>;using MessageCallback = std::function<void(const Message &message,uint64_t deliveryTag, bool redelivered)>;// 當使用發布者確認時,當服務器確認消息已被接收和處理時,將調用AckCallbackusing AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;// 使用確認包裹通道時,當消息被 ack/nacked 時,會調用這些回調using PublishAckCallback = std::function<void()>;using PublishNackCallback = std::function<void()>;using PublishLostCallback = std::function<void()>;class Channel{Channel(Connection *connection);bool connected();// 聲明交換機,如果提供了一個空名稱,則服務器將分配一個名稱。// @param name 交換機的名稱// @param type 交換類型// enum ExchangeType// {// fanout, 廣播交換,綁定的隊列都能拿到消息// direct, 直接交換,只將消息交給 routingkey 一致的隊列// topic, 主題交換,將消息交給符合 bindingkey 規則的隊列// headers,// consistent_hash,// message_deduplication// };// @param flags 交換機標志// 以下 flags 可用于交換機:// *-durable 持久化,重啟后交換機依然有效// *-autodelete 刪除所有連接的隊列后,自動刪除交換// *-passive 僅被動檢查交換機是否存在// *-internal 創建內部交換// @param arguments 其他參數Deferred &declareExchange(const std::string_view &name,ExchangeType type, int flags, const Table &arguments);// 聲明隊列,如果不提供名稱,服務器將分配一個名稱。// @param name 隊列的名稱// @param flags 標志組合// flags 可以是以下值的組合:// -durable 持久隊列在代理重新啟動后仍然有效// -autodelete 當所有連接的使用者都離開時,自動刪除隊列// -passive 僅被動檢查隊列是否存在*-exclusive 隊列僅存在于此連接,并且在連接斷開時自動刪除// @param arguments 可選參數DeferredQueue &declareQueue(const std::string_view &name,int flags, const Table &arguments);// 將隊列綁定到交換機// @param exchange 源交換機// @param queue 目標隊列// @param routingkey 路由密鑰// @param arguments 其他綁定參數Deferred &bindQueue(const std::string_view &exchange, const std::string_view &queue,const std::string_view &routingkey, const Table &arguments);// 將消息發布到 exchange,必須提供交換機的名稱和路由密鑰。然后RabbitMQ 將嘗試將消息發送到一個或多個隊列。// 使用可選的 flags 參數,可以指定如果消息無法路由到隊列時應該發生的情況。// @param exchange 要發布到的交易所// @param routingkey 路由密鑰// @param envelope 要發送的完整信封// @param message 要發送的消息// @param size 消息的大小// @param flags 可選標志// 可以提供以下 flags:// -mandatory 如果設置,服務器將返回未發送到隊列的消息// -immediate 如果設置,服務器將返回無法立即轉發給使用者的消息。bool publish(const std::string_view &exchange, const std::string_view &routingKey,const std::string &message, int flags = 0);// 告訴 RabbitMQ 服務器我們已準備好使用消息-也就是訂閱隊列消息,調用此方法后,RabbitMQ 開始向客戶端應用程序傳遞消息。// @param queue 您要使用的隊列// @param tag 將與此消費操作關聯的消費者標記// consumer tag 是一個字符串標識符,如果以后想通過 channel::cancel()調用停止它,可以使用它來標識使用者。// 如果您沒有指定使用者 tag,服務器將為您分配一個。// @param flags 其他標記// @param arguments 其他參數// 支持以下 flags:// -nolocal 如果設置了,則不會同時消耗在此通道上發布的消息// -noack 如果設置了,則不必對已消費的消息進行確認// -exclusive 請求獨占訪問,只有此使用者可以訪問隊列DeferredConsumer &consume(const std::string_view &queue,const std::string_view &tag, int flags, const Table &arguments);// 確認接收到的消息,當在 DeferredConsumer::onReceived()方法中接收到消息時,必須確認該消息,// 以便 RabbitMQ 將其從隊列中刪除(除非使用 noack 選項消費)。// @param deliveryTag 消息的唯一 delivery 標簽// @param flags 可選標志bool ack(uint64_t deliveryTag, int flags = 0);};class DeferredConsumer{// 注冊一個回調函數,該函數在消費者啟動時被調用。DeferredConsumer &onSuccess(const ConsumeCallback &callback);// 注冊回調函數,用于接收到一個完整消息的時候被調用void MessageCallback(const AMQP::Message &message, uint64_t deliveryTag, bool redelivered);DeferredConsumer &onReceived(const MessageCallback &callback);DeferredConsumer &onMessage(const MessageCallback &callback);};class Message : public Envelope{const std::string &exchange();const std::string &routingkey();};class Envelope : public MetaData{const char *body();uint64_t bodySize();};
}
ev
typedef struct ev_async
{EV_WATCHER(ev_async);EV_ATOMIC_T sent; /* private */
} ev_async;
// break type
enum
{EVBREAK_CANCEL = 0, /* undo unloop */EVBREAK_ONE = 1, /* unloop once */EVBREAK_ALL = 2 /* unloop all loops */
};
struct ev_loop *ev_default_loop(unsigned int flags EV_CPP(= 0));
#define EV_DEFAULT ev_default_loop(0)
int ev_run(struct ev_loop *loop);
void ev_break(struct ev_loop *loop, int32_t break_type);
void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents);
void ev_async_init(ev_async *w, callback cb);
void ev_async_start(struct ev_loop *loop, ev_async *w);
void ev_async_send(struct ev_loop *loop, ev_async *w);
使用案例
二次封裝思想:
實現一臺主機將消息發布給另一臺主機進行處理的功能,可以對 mq 的操作進行簡單的封裝,使 mq 的操作更加簡便,封裝一個 MQClient :
- 提供聲明指定交換機與隊列,并進行綁定的功能
- 提供向指定交換機發布消息的功能
- 提供訂閱指定隊列消息,并設置回調函數進行消息消費處理的功能
rabbitmq.hpp
#pragma once
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
#include <iostream>
#include <functional>
#include "logger.hpp"class MQClient
{
public:using ptr = std::shared_ptr<MQClient>;using MessageCallback = std::function<void(const char *, size_t)>;MQClient(const std::string &user, const std::string &password, const std::string &host){// 1.實例化底層網絡通信框架的IO事件監控句柄_loop = EV_DEFAULT;// 2.實例化LibEvHandler句柄,將AMQP框架與事件監控關聯起來_handler = std::make_unique<AMQP::LibEvHandler>(_loop);// 3.實例化連接對象// amqp://root:2162627569@127.0.0.1:5672/std::string url = "amqp://" + user + ":" + password + "@" + host + "/";AMQP::Address address(url);_connection = std::make_unique<AMQP::TcpConnection>(_handler.get(), address);// 4.實例化信道對象_channel = std::make_unique<AMQP::TcpChannel>(_connection.get());// 5.啟動底層網絡通信框架,開啟IO_loop_thread = std::thread([this](){ ev_run(_loop, 0); });}~MQClient(){ev_async_init(&_async_watcher, watcher_callback);ev_async_start(_loop, &_async_watcher);ev_async_send(_loop, &_async_watcher);_loop_thread.join();_loop = nullptr;}void declareComponents(const std::string &exchange, const std::string &queue,const std::string &routing_key = "routing_key", AMQP::ExchangeType exchange_type = AMQP::ExchangeType::direct){// 聲明交換機_channel->declareExchange(exchange, exchange_type).onError([&exchange](const char *msg){LOG_ERROR("{}交換機創建失敗:{}",exchange,msg);exit(1); }).onSuccess([&exchange](){ LOG_INFO("{}交換機創建成功!", exchange); });// 聲明隊列_channel->declareQueue(queue).onError([&queue](const char *msg){LOG_ERROR("{}隊列創建失敗:{}",queue,msg);exit(1); }).onSuccess([&queue](){ LOG_INFO("{}隊列創建成功!", queue); });// 6.綁定交換機和隊列_channel->bindQueue(exchange, queue, routing_key).onError([&exchange, &queue](const char *msg){LOG_ERROR("{} - {}綁定失敗:{}",exchange,queue,msg);exit(1); }).onSuccess([&exchange, &queue](){ LOG_INFO("{} - {}綁定成功!", exchange, queue); });}bool publish(const std::string &exchange, const std::string &msg, const std::string &routing_key = "routing_key"){LOG_DEBUG("向交換機 {}-{} 發布消息!", exchange, routing_key);bool ret = _channel->publish(exchange, routing_key, msg);if (ret == false){LOG_ERROR("{} 發布消息失敗:", exchange);return false;}return true;}void consume(const std::string &queue, const MessageCallback &cb){LOG_DEBUG("開始訂閱 {} 隊列消息!", queue);_channel->consume(queue, "consume-tags").onReceived([this, &cb](const AMQP::Message &message, uint32_t deliveryTag, bool redelivered){cb(message.body(),message.bodySize());_channel->ack(deliveryTag); }).onError([&queue](const char *message){LOG_ERROR("訂閱 {} 隊列消息失敗: {}", queue, message);exit(1); });}private:static void watcher_callback(struct ev_loop *loop, ev_async *watcher, int32_t revents){ev_break(loop, EVBREAK_ALL);}private:struct ev_async _async_watcher;struct ev_loop *_loop;std::unique_ptr<AMQP::LibEvHandler> _handler;std::unique_ptr<AMQP::TcpConnection> _connection;std::unique_ptr<AMQP::TcpChannel> _channel;std::thread _loop_thread;
};
consume.cc
#include "rabbitmq.hpp"
#include "logger.hpp"
#include <gflags/gflags.h>DEFINE_string(user, "root", "rabbitmq訪問用戶名");
DEFINE_string(password, "2162627569", "rabbitmq訪問密碼");
DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服務器地址信息 host:port");DEFINE_bool(run_mode, false, "程序的運行模式,false-調試; true-發布;");
DEFINE_string(log_file, "", "發布模式下,用于指定日志的輸出文件");
DEFINE_int32(log_level, 0, "發布模式下,用于指定日志輸出等級");void callback(const char *body, size_t sz)
{std::string msg;msg.assign(body, sz);LOG_DEBUG("{}", msg);
}
int main(int argc, char *argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);MQClient client(FLAGS_user, FLAGS_password, FLAGS_host);client.declareComponents("test-exchange", "test-queue", "test-queue-key");client.consume("test-queue", callback);std::this_thread::sleep_for(std::chrono::seconds(60));return 0;
}
publish.cc
#include "rabbitmq.hpp"
#include "logger.hpp"
#include <gflags/gflags.h>DEFINE_string(user, "root", "rabbitmq訪問用戶名");
DEFINE_string(password, "2162627569", "rabbitmq訪問密碼");
DEFINE_string(host, "127.0.0.1:5672", "rabbitmq服務器地址信息 host:port");DEFINE_bool(run_mode, false, "程序的運行模式,false-調試; true-發布;");
DEFINE_string(log_file, "", "發布模式下,用于指定日志的輸出文件");
DEFINE_int32(log_level, 0, "發布模式下,用于指定日志輸出等級");void callback(const char *body, size_t sz)
{std::string msg;msg.assign(body, sz);LOG_DEBUG("{}", msg);
}
int main(int argc, char *argv[])
{google::ParseCommandLineFlags(&argc, &argv, true);init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);MQClient client(FLAGS_user, FLAGS_password, FLAGS_host);client.declareComponents("test-exchange", "test-queue", "test-queue-key");for (int i = 0; i < 10; i++){std::string msg = "hello world - " + std::to_string(i);bool ret = client.publish("test-exchange", msg, "test-queue-key");if (ret == false){std::cout << "publish 失敗!\n";}}std::this_thread::sleep_for(std::chrono::seconds(3));return 0;
}
makefile
all:publish consume
publish:publish.ccg++ -g -o $@ $^ -std=c++17 -lamqpcpp -lev -lspdlog -lfmt -lgflags
consume:consume.ccg++ -g -o $@ $^ -std=c++17 -lamqpcpp -lev -lspdlog -lfmt -lgflags.PHONY:clean
clean:rm -f publish consume