目錄
- 1. 什么是消息隊列?
- 2. 消息隊列的優點
- 3. RabbitMQ 消息隊列概述
- 4. RabbitMQ 安裝
- 5. Exchange 四種類型
- 5.1 direct 精準匹配
- 5.2 fanout 廣播
- 5.3 topic 正則匹配
- 6. RabbitMQ 隊列模式
- 6.1 簡單隊列模式
- 6.2 工作隊列模式
- 6.3 發布/訂閱模式
- 6.4 路由模式
- 6.5 主題模式
- 6.6 RPC模式
- 6.7 發布者確認模式
- 7. 安裝C++客戶端庫使用 RabbitMQ
- 8. AMQP-CPP 庫的簡單使用
- 9. RabbitMq 總結
1. 什么是消息隊列?
(1)消息隊列(Message queue)是一種進程間通信或同一進程的不同線程間的通信方式,軟件的貯列用來處理一系列的輸入,通常是來自用戶。
- 消息隊列提供了異步的通信協議,每一個貯列中的紀錄包含詳細說明的數據,包含發生的時間,輸入設備的種類,以及特定的輸入參數,也就是說:消息的發送者和接收者不需要同時與消息隊列互交。消息會保存在隊列中,直到接收者取回它。
- 消息隊列,一般我們會簡稱他為MQ(Message Queue),消息隊列可以簡單的理解為:把要傳輸的數據放在隊列中。
-
消息隊列說明:
- Producer:消息生產者,負責產生和發送消息到 Broker;
- Broker:消息處理中心。負責消息存儲、確認、重試等,一般其中會包含多個 queue;
- Consumer:消息消費者,負責從 Broker 中獲取消息,并進行相應處理;
-
RabbitMQ:是由erlang語言開發,基于AMQP(高級消息隊列協議)協議實現的一種消息隊列。市面上還有很多消息隊列,比如Kafka、RocketMQ、Redis等。
-
官方文檔:https://www.rabbitmq.com/tutorials
2. 消息隊列的優點
(1)應用解偶:
- 比如在我們現在公司的業務常見中:
- 給客戶打完電話之后我們需要根據通話錄音進行打標簽;
- 給客戶打完電話之后我們需要給他發送短信;
- 給客戶打完電話之后我們需要發送他的通話給機器人,讓機器人自學習;
- 簡單架構圖如下:
- 如果沒有消息隊列,在A服務里面要寫上3個API接口分別對應后面三個服務,突然有一天這個客戶說我不需要發短信功能了,如果按照上面這種方式,我們就需要聯系開發開始刪代碼,然后升級,剛升級好沒幾天,客戶說我有要這個功能,那開發又要升級代碼,這個時候開發集體離職了,(這每天干的完全是無用功)但是如果有消息隊列那就完全不一樣了,就會變成下面這個樣子:
- 所以通過引入消息隊列,A只需要寫一個接口對接MQ了。不同的應用程序之間可以通過消息隊列進行通信,而無需直接調用對方的接口或方法。這樣可以降低系統中各個應用程序之間的耦合度,使得它們可以獨立演化和擴展,而不會因為對方的變化而受到影響。
(2)異步處理:
- 還是上面那個場景來簡述這個:A是公司的主要業務,打電話業務,BCD為非主要業務。
- 假設A調用BCD 接口需要50ms,那等A把所有接口調用完成之后需要150ms,對主業務消耗特別大,如果我們不用搭理BCD的處理,A直接把他交給消息隊列,讓消息隊列去處理BCD,A只要把數據給消息隊列就行了,那么A的壓力就會很小,也不會影響主要業務流程。
(3)流量削峰:
- 打個比方,我們目前有A B兩個服務,A服務的OPS峰值為100W,但是B服務的OPS峰值只有10w,這個時候來了90w個請求,A服務能處理過來沒問題,但是這個時候B服務直接就崩潰了
- 如果這個時候我們在A和B之間加一個rabbitmq,我們讓B每次去取9w,這樣B服務就不會掛了。
3. RabbitMQ 消息隊列概述
(1)RabbitMQ 優點如下:
- 可靠性:RabbitMQ提供了多種技術可以讓你在性能和可靠性之間進行權衡。這些技術包括持久性機制、投遞確認、發布者證實和高可用性機制。
- 靈活的路由:消息在到達隊列前是通過交換機進行路由的。RabbitMQ為典型的路由邏輯提供了多種內置交換機類型。如果你有更復雜的路由需求,可以將這些交換機組合起來使用,你甚至可以實現自己的交換機類型,并且當做RabbitMQ的插件來使用。
- 集群:在相同局域網內的多個RabbitMQ服務器可以聚合在一起,作為一個獨立的邏輯代理來使用
- 聯合:對于服務器來說,他比集群需要更多的松散和非可靠鏈接,為此RabbitMQ提供了聯合模型
- 高可用的隊列:在同一個集群里,隊列可以被鏡像到多個機器中,以確保當前某些硬件出現故障后,你的消息仍然可以被使用
- 多協議:RabbitMQ支持多種消息協議的消息傳遞
- 廣泛的客戶端:只要是你能想到的編程語言幾乎都有與其相適配的RabbitMQ客戶端。
- 可視化管理工具:RabbitMQ附帶了一個易于使用的可視化管理工具,它可以幫助你監控消息代理的每一個環節。
- 追蹤:如果你的消息系統有異常行為,RabbitMQ還提供了追蹤的支持,讓你能夠發現問題所在。
- 插件系統:RabbitMQ附帶了各種各樣的插件來對自己進行擴展。你甚至也可以寫自己的插件來使用。
(2)RabbitMQ 的概念模型:
- 所有 MQ 產品從模型抽象上來說都是一樣的過程:消費者(consumer)訂閱某個隊列。生產者(producer)創建消息,然后發布到隊列(queue)中,最后將消息發送到監聽的消費者。
(3)RabbitMQ 基本流程圖:
- Message:消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對于其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。
- Publisher:消息的生產者,也是一個向交換器發布消息的客戶端應用程序。
- Exchange:交換器,用來接收生產者發送的消息并將這些消息路由給服務器中的隊列。
- Binding:綁定,用于消息隊列和交換器之間的關聯。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。
- Queue:消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
- Connection:網絡連接,比如一個TCP連接。
- Channel:信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內地虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。
- Consumer:消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
- Virtual Host:虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / 。
- Broker:表示消息隊列服務器實體。
(4)RabbitMQ使用中的一些概念:
- 在上面的RabbitMQ的基本流程圖里面我們可以看到,RabbitMQ的整體工作流程是,生產者產生數據交給RabbitMQ,然后RabbitMQ通過Exchange更具規則來選擇綁定到那個隊列(Queues)中,然后消費者在到對應的隊列里面去取數據。所以我們先安裝 RabbitMQ 后下面就來講解下Exchange的四種類型。
4. RabbitMQ 安裝
(1)安裝RabbitMq:
sudo apt install rabbitmq-server# 啟動服務
sudo systemctl start rabbitmq-server.service
# 查看服務狀態
sudo systemctl status rabbitmq-server.service
# 安裝完成的時候默認有個用戶 guest ,但是權限不夠,要創建一個administrator 用戶,才可以做為遠程登錄和發表訂閱消息。
# 添加用戶
sudo rabbitmqctl add_user root 123456
# 設置用戶 tag
sudo rabbitmqctl set_user_tags root administrator
# 設置用戶權限
sudo rabbitmqctl set_permissions -p / root "." "." ".*"
# RabbitMQ 自帶了 web 管理界面,執行下面命令開啟
sudo rabbitmq-plugins enable rabbitmq_management
(2)查看 rabbitmq-server 的狀態:
(3)訪問 webUI 界面,默認端口為 15672:
(4)瀏覽器訪問管理界面(用戶名和密碼是上述添加的用戶):
5. Exchange 四種類型
5.1 direct 精準匹配
(1)direct 交換機如下圖:
- 消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名完全匹配,如果一個隊列綁定到交換機要求路由鍵為“dog”,則只轉發 routing key 標記為“dog”的消息,不會轉發“dog.puppy”,也不會轉發“dog.guard”等等。它是完全匹配、單播的模式。
5.2 fanout 廣播
(1)fanout 交換機如下圖:
- 每個發到 fanout 類型交換器的消息都會分到所有綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息。fanout 類型轉發消息是最快的。
5.3 topic 正則匹配
(1)topic 交換機如下圖:
- topic 交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個通配符:符號“#”和符號“”。#匹配0個或多個單詞,匹配不多不少一個單詞。
(2)還有一個是headers 交換器,它是根據頭部匹配。幾乎是不使用的,這里就不介紹了。
6. RabbitMQ 隊列模式
(1)基于Exchange交換機,RabbitMQ截至目前有七種隊列模式:
- 簡單隊列模式。
- 工作隊列模式。
- 發布/訂閱模式。
- 路由模式。
- 主題模式。
- RPC模式。
- 發布者確認模式。
6.1 簡單隊列模式
(1)一個消息生產者,一個消息消費者,一個隊列。也稱為點對點模式。
- 圖中P代表生產者,C代表消費者,Queue是隊列名稱。
- 我們看到是沒有Exchange的,但是RabbitMQ也會有一個默認的交換機。這個默認的交換機通常被稱為"amq.default"或者""(空字符串),是RabbitMQ自動創建的,用于在沒有指定交換機的情況下將消息發送到隊列。
//生產者
var factory = new ConnectionFactory { HostName = "localhost"}; //初始化連接信息
using var connection = factory.CreateConnection(); //創建連接
using var channel = connection.CreateModel(); //創建信道//聲明一個隊列,并將信道與隊列綁定
channel.QueueDeclare(queue: "hello",durable: false,exclusive: false,autoDelete: false,arguments: null);
//發送消息的內容
string message = $"Hello World!";
var body = Encoding.UTF8.GetBytes(message);//信道綁定交換機
channel.BasicPublish(exchange: string.Empty,routingKey: string.Empty,basicProperties: null,body: body);Console.WriteLine($" [x] Sent {message}");Console.WriteLine(" Press [enter] to exit.");//消費者
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();channel.QueueDeclare(queue: "hello",durable: false,exclusive: false,autoDelete: false,arguments: null);Console.WriteLine(" [*] Waiting for messages.");var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] Received {message}");
};channel.BasicConsume(queue: "hello",autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");
- 此時就會生產者發送一條消息,消費者就會接收一條消息。
6.2 工作隊列模式
(1)工作隊列又叫做任務隊列,正常會按順序把消息發送給每一個訂閱的消費者,平均而言,每個消費者將獲得相同數量的消息。(不是P發送一條消息,C1和C2都會收到,而是第一條C1消費,第二條C2消費。每個消息只會被一個消費者接收和處理)。
- 這樣的好處是可以提高吞吐量,因為生產者發送了很多消息,但是消費者只有一個,消費者處理很慢,就會造成消息積壓。
6.3 發布/訂閱模式
(1)發布/訂閱模式是一種消息傳遞模式,它允許發送者(發布者)將消息發布到多個接收者(訂閱者)。消息傳遞模型的核心思想是生產者從不直接向隊列發送任何消息。實際上,生產者通常根本不知道消息是否會被傳遞到任何隊列。
- 所以消息傳遞模式,發布者不需要指定隊列。發布/訂閱模式交換機類型為Fanout。
//發布者
var factory = new ConnectionFactory { HostName = "localhost"};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();//聲明一個交換機,叫做logs,并且交換機的類型是Fanout
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);var message = "publish_subscribe";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",routingKey: string.Empty,basicProperties: null,body: body);
Console.WriteLine($" [x] Sent {message}");Console.WriteLine(" Press [enter] to exit.");//接收者
var factory = new ConnectionFactory { HostName = "localhost"};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);//創建一個具有生成名稱的非持久、獨占、自動刪除隊列
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,exchange: "logs",routingKey: string.Empty);Console.WriteLine(" [*] Waiting for logs.");var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{byte[] body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] {message}");
};channel.BasicConsume(queue: queueName,autoAck: false,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");
(2)注意:如果發布者已經發布消息到交換機,但還沒有隊列綁定到交換機,消息將會丟失。
6.4 路由模式
(1)路由模式也是一種消息傳遞模式,是基于消息的路由鍵(routing key)來將消息從交換機(exchange)發送到一個或多個隊列中。相比較于發布/訂閱模式,路由模式多了一個routing key的概念。
- 路由模式交換機類型為Direct。
//生產者
var factory = new ConnectionFactory { HostName = "localhost"};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();//定義交換機名稱以及類型為Direct
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);//定義路由鍵
string routingKey = "direct_test";//發送消息體
string message = "direct_message";
var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "direct_logs",routingKey: routingKey,basicProperties: null,body: body);
Console.WriteLine($" [x] Sent '{routingKey}':'{message}'");Console.WriteLine(" Press [enter] to exit.");//消費者
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);//創建一個具有生成名稱的非持久、獨占、自動刪除隊列
var queueName = channel.QueueDeclare().QueueName;//路由鍵集合
var routeKeyArr = new string[] { "direct_test", "direct_test2" };foreach (var routeKey in routeKeyArr)
{channel.QueueBind(queue: queueName,exchange: "direct_logs",routingKey: routeKey);
}Console.WriteLine(" [*] Waiting for messages.");var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);var routingKey = ea.RoutingKey;Console.WriteLine($" [x] Received '{routingKey}':'{message}'");
};channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");
- 路由模式,消費者可以監聽多個路由鍵。
6.5 主題模式
(1)基于路由模式,仍然有局限性——它不能基于多個標準進行路由。也就是一個消費者只能接收完全與routing key相匹配的交換機。主題模式主要解決路由模式的不足,可以模糊匹配routing key。
- 路由模式交換機類型為Topic。
- 在生產者方面,基于 . 作為分隔符,用于routing key。比如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。可以是任何單詞,但最多只有255 個字節。在消費者方面,綁定routing key有兩種重要的情況:
(2)*(星號):匹配一個單詞。具體語法:
var routeing_key = "info.debug.error";//匹配 info
"info.*.*"
//匹配debug
"*.debug.*"
//匹配error
"*.*.error"
(3)#(散列):匹配零個或多個單詞。具體語法:
var routeing_key = "info.debug.error";//匹配 info
"info.#"
//匹配debug
"#.debug.#"
//匹配error
"*.*.error"
6.6 RPC模式
(1)RPC模式又叫"請求/回復模式"。
- RPC(Remote Procedure Call,遠程過程調用)是一種用于在分布式系統中進行通信的技術。它允許一個進程(或線程)調用另一個進程(或線程)的過程(函數或方法),就像調用本地函數一樣,而不需要開發者顯式處理底層通信細節。
- 就是生產者發送一條消息,消費者端執行某個方法,獲取值的同時,并返回到生產者。
6.7 發布者確認模式
(1)發布者確認模式(Publisher Confirmation)是 RabbitMQ 提供的一種機制,用于確保消息被成功發送到交換機(exchange)并被接收到,以及確保消息被正確地路由到隊列中。
- 在傳統的消息發布過程中,發布者發送消息到交換機后,并不知道消息是否已經被正確地處理。為了解決這個問題,RabbitMQ 提供了發布者確認模式,允許發布者確認消息是否已經被成功接收到。
7. 安裝C++客戶端庫使用 RabbitMQ
(1)安裝 RabbitMQ 的 C++客戶端庫:
- C 語言庫:https://github.com/alanxz/rabbitmq-c
- C++庫: https://github.com/CopernicaMarketingSoftware/AMQP-CPP/tree/master
(2)我們這里使用 AMQP-CPP 庫來編寫客戶端程序。安裝 AMQP-CPP:
sudo apt install libev-dev # libev 網絡庫組件
git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
cd AMQP-CPP/
make
sudo make install
8. AMQP-CPP 庫的簡單使用
(1)概述:
- AMQP-CPP 是用于與 RabbitMq 消息中間件通信的 c++庫。它能解析從 RabbitMq 服務發送來的數據,也可以生成發向 RabbitMq 的數據包。AMQP-CPP 庫不會向 RabbitMq 建立網絡連接,所有的網絡 io 由用戶完成。
- 當然,AMQP-CPP 提供了可選的網絡層接口,它預定義了 TCP 模塊,用戶就不用自己實現網絡 io,我們也可以選擇 libevent、libev、libuv、asio 等異步通信組件,需要手動安裝對應的組件。
- AMQP-CPP 完全異步,沒有阻塞式的系統調用,不使用線程就能夠應用在高性能應用中。
- 注意:它需要 c++17 的支持。
(2)具體使用:
- AMQP-CPP 的使用有兩種模式:
- 使用默認的 TCP 模塊進行網絡通信。
- 使用擴展的 libevent、libev、libuv、asio 異步通信組件進行通信。
- TCP 模式:
- 該模式下需要實現一個類繼承自 AMQP::TcpHandler 類, 它負責網絡層的 TCP 連接,重寫相關函數, 其中必須重寫 monitor 函數。在 monitor 函數中需要實現的是將 fd 放入 eventloop(select 、 epoll) 中監控, 當 fd 可寫可讀就緒之后, 調用 AMQP-CPP 的 connection->process(fd, flags) 方法。
- TCP 模式使用較為麻煩,不過提供了靈活的網絡層集成能力,可以根據項目需求選擇合適的網絡庫進行集成。在實際應用中,建議結合事件循環庫(如libuv、Boost.Asio等)使用以獲得最佳性能。
- 擴展模式
- 以 libev 為例, 我們不必要自己實現 monitor 函數,可以直接使用 AMQP::LibEvHandler。
(3)常用類與接口介紹:
- Channel:
- channel 是一個虛擬連接,一個連接上可以建立多個通道。并且所有的 RabbitMq 指令都是通過 channel 傳輸,所以連接建立后的第一步,就是建立 channel 。
- 因為所有操作是異步的,所以在 channel 上執行指令的返回值并不能作為操作執行結果,實際上它返回的是 Deferred 類,可以使用它安裝處理函數。
namespace AMQP
{using SuccessCallback = std::function<void()>;using ErrorCallback = std::function<void(const char *message)>;using FinalizeCallback = std::function<void()>;using QueueCallback = std::function<void(const std::string &name,uint32_t messagecount, uint32_t consumercount)>;using DeleteCallback = std::function<void(uint32_t deletedmessages)>;using MessageCallback = std::function<void(const Message &message,uint64_t deliveryTag, bool redelivered)>;// 當使用發布者確認時,當服務器確認消息已被接收和處理時,將調用AckCallbackusing AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;// 使用確認包裹通道時,當消息被 ack/nacked 時,會調用這些回調using PublishAckCallback = std::function<void()>;using PublishNackCallback = std::function<void()>;using PublishLostCallback = std::function<void()>;class Channel{Channel(Connection *connection);bool connected();// 聲明交換機,如果提供了一個空名稱,則服務器將分配一個名稱。// @param name 交換機的名稱// @param type 交換類型// enum ExchangeType// {// fanout, 廣播交換,綁定的隊列都能拿到消息// direct, 直接交換,只將消息交給 routingkey 一致的隊列// topic, 主題交換,將消息交給符合 bindingkey 規則的隊列// headers,// consistent_hash,// message_deduplication// };// @param flags 交換機標志// 以下 flags 可用于交換機:// *-durable 持久化,重啟后交換機依然有效// *-autodelete 刪除所有連接的隊列后,自動刪除交換// *-passive 僅被動檢查交換機是否存在// *-internal 創建內部交換// @param arguments 其他參數Deferred &declareExchange(const std::string_view &name,ExchangeType type, int flags, const Table &arguments);// 聲明隊列,如果不提供名稱,服務器將分配一個名稱。// @param name 隊列的名稱// @param flags 標志組合// flags 可以是以下值的組合:// -durable 持久隊列在代理重新啟動后仍然有效// -autodelete 當所有連接的使用者都離開時,自動刪除隊列// -passive 僅被動檢查隊列是否存在*-exclusive 隊列僅存在于此連接,并且在連接斷開時自動刪除// @param arguments 可選參數DeferredQueue &declareQueue(const std::string_view &name,int flags, const Table &arguments);// 將隊列綁定到交換機// @param exchange 源交換機// @param queue 目標隊列// @param routingkey 路由密鑰// @param arguments 其他綁定參數Deferred &bindQueue(const std::string_view &exchange, const std::string_view &queue,const std::string_view &routingkey, const Table &arguments);// 將消息發布到 exchange,必須提供交換機的名稱和路由密鑰。然后RabbitMQ 將嘗試將消息發送到一個或多個隊列。// 使用可選的 flags 參數,可以指定如果消息無法路由到隊列時應該發生的情況。// @param exchange 要發布到的交易所// @param routingkey 路由密鑰// @param envelope 要發送的完整信封// @param message 要發送的消息// @param size 消息的大小// @param flags 可選標志// 可以提供以下 flags:// -mandatory 如果設置,服務器將返回未發送到隊列的消息// -immediate 如果設置,服務器將返回無法立即轉發給使用者的消息。bool publish(const std::string_view &exchange, const std::string_view &routingKey,const std::string &message, int flags = 0);// 告訴 RabbitMQ 服務器我們已準備好使用消息-也就是訂閱隊列消息,調用此方法后,RabbitMQ 開始向客戶端應用程序傳遞消息。// @param queue 您要使用的隊列// @param tag 將與此消費操作關聯的消費者標記// consumer tag 是一個字符串標識符,如果以后想通過 channel::cancel()調用停止它,可以使用它來標識使用者。// 如果您沒有指定使用者 tag,服務器將為您分配一個。// @param flags 其他標記// @param arguments 其他參數// 支持以下 flags:// -nolocal 如果設置了,則不會同時消耗在此通道上發布的消息// -noack 如果設置了,則不必對已消費的消息進行確認// -exclusive 請求獨占訪問,只有此使用者可以訪問隊列DeferredConsumer &consume(const std::string_view &queue,const std::string_view &tag, int flags, const Table &arguments);// 確認接收到的消息,當在 DeferredConsumer::onReceived()方法中接收到消息時,必須確認該消息,// 以便 RabbitMQ 將其從隊列中刪除(除非使用 noack 選項消費)。// @param deliveryTag 消息的唯一 delivery 標簽// @param flags 可選標志bool ack(uint64_t deliveryTag, int flags = 0);};class DeferredConsumer{// 注冊一個回調函數,該函數在消費者啟動時被調用。DeferredConsumer &onSuccess(const ConsumeCallback &callback);// 注冊回調函數,用于接收到一個完整消息的時候被調用void MessageCallback(const AMQP::Message &message, uint64_t deliveryTag, bool redelivered);DeferredConsumer &onReceived(const MessageCallback &callback);DeferredConsumer &onMessage(const MessageCallback &callback);};class Message : public Envelope{const std::string &exchange();const std::string &routingkey();};class Envelope : public MetaData{const char *body();uint64_t bodySize();};
}
- ev:
typedef struct ev_async
{EV_WATCHER(ev_async);EV_ATOMIC_T sent; /* private */
} ev_async;
// break type
enum
{EVBREAK_CANCEL = 0, /* undo unloop */EVBREAK_ONE = 1, /* unloop once */EVBREAK_ALL = 2 /* unloop all loops */
};
struct ev_loop *ev_default_loop(unsigned int flags EV_CPP(= 0));
#define EV_DEFAULT ev_default_loop(0)
int ev_run(struct ev_loop *loop);
void ev_break(struct ev_loop *loop, int32_t break_type);
void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents);
void ev_async_init(ev_async *w, callback cb);
void ev_async_start(struct ev_loop *loop, ev_async *w);
void ev_async_send(struct ev_loop *loop, ev_async *w);
(4)使用案例:
- publish.cpp:
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>int main()
{//1. 實例化底層網絡通信框架的I/O事件監控句柄auto *loop = EV_DEFAULT;//2. 實例化libEvHandler句柄 --- 將AMQP框架與事件監控關聯起來AMQP::LibEvHandler handler(loop);//2.5. 實例化連接對象AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");AMQP::TcpConnection connection(&handler, address);//3. 實例化信道對象AMQP::TcpChannel channel(&connection);//4. 聲明交換機AMQP::Deferred &deferred = channel.declareExchange("test-exchange", AMQP::ExchangeType::direct); deferred.onError([](const char *message){std::cout << "聲明交換機失敗:" << message << std::endl;exit(0);});deferred.onSuccess([](){std::cout << "test-exchange 交換機創建成功!" << std::endl;});//5. 聲明隊列AMQP::DeferredQueue &deferredQueue = channel.declareQueue("test-queue");deferredQueue.onError([](const char *message){std::cout << "聲明隊列失敗:" << message << std::endl;exit(0);});deferredQueue.onSuccess([](){std::cout << "test-queue 隊列創建成功!" << std::endl;});//6. 針對交換機和隊列進行綁定auto &binding_deferred = channel.bindQueue("test-exchange", "test-queue", "test-queue-key");binding_deferred.onError([](const char *message) {std::cout << "test-exchange - test-queue 綁定失敗:" << message << std::endl;exit(0);});binding_deferred.onSuccess([](){std::cout << "test-exchange - test-queue 綁定成功!" << std::endl;});//7. 向交換機發布消息for(int i = 0; i < 10; i++) {std::string msg = "Hello World-" + std::to_string(i);bool ret = channel.publish("test-exchange", "test-queue-key", msg);if(ret == false) {std::cout << "publish 失敗!\n";}}//啟動底層網絡通信框架--開啟I/Oev_run(loop, 0);return 0;
}
- consume.cpp:
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>// 消息回調處理函數的實現
void MessageCb(AMQP::TcpChannel *channel, const AMQP::Message &message, uint64_t deliveryTag, bool redelivered)
{std::string msg;msg.assign(message.body(), message.bodySize());std::cout << msg << std::endl;channel->ack(deliveryTag); // 對消息進行確認
}int main()
{// 1. 實例化底層網絡通信框架的I/O事件監控句柄auto *loop = EV_DEFAULT;// 2. 實例化libEvHandler句柄 --- 將AMQP框架與事件監控關聯起來AMQP::LibEvHandler handler(loop);// 2.5. 實例化連接對象AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");AMQP::TcpConnection connection(&handler, address);// 3. 實例化信道對象AMQP::TcpChannel channel(&connection);// 4. 聲明交換機AMQP::Deferred &deferred = channel.declareExchange("test-exchange", AMQP::ExchangeType::direct);deferred.onError([](const char *message){std::cout << "聲明交換機失敗:" << message << std::endl;exit(0); });deferred.onSuccess([](){ std::cout << "test-exchange 交換機創建成功!" << std::endl; });// 5. 聲明隊列AMQP::DeferredQueue &deferredQueue = channel.declareQueue("test-queue");deferredQueue.onError([](const char *message){std::cout << "聲明隊列失敗:" << message << std::endl;exit(0); });deferredQueue.onSuccess([]() { std::cout << "test-queue 隊列創建成功!" << std::endl; });// 6. 針對交換機和隊列進行綁定auto &binding_deferred = channel.bindQueue("test-exchange", "test-queue", "test-queue-key");binding_deferred.onError([](const char *message){std::cout << "test-exchange - test-queue 綁定失敗:" << message << std::endl;exit(0); });binding_deferred.onSuccess([](){ std::cout << "test-exchange - test-queue 綁定成功!" << std::endl; });// 7. 訂閱隊列消息 -- 設置消息處理回調函數auto callback = std::bind(MessageCb, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);channel.consume("test-queue", "consume-tag") // 返回值 DeferredConsumer.onReceived(callback).onError([](const char *message){std::cout << "訂閱 test-queue 隊列消息失敗:" << message << std::endl;exit(0); }); // 返回值是 AMQP::Deferred// 8. 啟動底層網絡通信框架--開啟I/Oev_run(loop, 0);return 0;
}
- Makefile:
all:publish consume
publish:publish.ccg++ -g -o $@ $^ -std=c++17 -lamqpcpp -lev -lspdlog -lfmt -lgflags
consume:consume.ccg++ -g -o $@ $^ -std=c++17 -lamqpcpp -lev -lspdlog -lfmt -lgflags.PHONY:clean
clean:rm -f publish consume
- 運行結果:
9. RabbitMq 總結
(1)核心架構與組件:
- 核心組件:
(2)Exchange 類型詳解:
(3)核心特性與機制:
- 消息可靠性:
- 高級功能:
(4)使用場景:
(5)消息生命周期:
(6)與其他消息中間件對比:
(7)總結:
- RabbitMQ 憑借其靈活的的路由機制、可靠的消息傳遞和豐富的生態系統,成為企業級異步通信的首選解決方案。掌握其核心原理和最佳實踐,能有效構建高可靠、可擴展的分布式系統。