第 4 章?RabbitMQ 進階
mandatory 參數?Returning | RabbitMQ
?????????當 mandatory 參數設為 true 時,交換器無法根據自身的類型和路由鍵找到一個符合條件的隊列,那么RabbitMQ 會調用 Basic.Return 命令將消息返回給生產者,通過調用channel.addReturnListener 來添加 ReturnListener 監聽器實現。當 mandatory 參數設置為 false 時,出現上述情形,則消息直接被丟棄。
channel.basicPublish(EXCHANGE_NAME, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "mandatory test".getBytes());channel.addReturnListener(new ReturnListener() {public void handleReturn(int replyCode, String replyText,String exchange, String routingKey,AMQP.BasicProperties basicProperties,byte[] body) throws IOException {String message = new String(body);System.out.println("Basic.Return返回的結果是:" + message);}});
備份交換器?Alternate Exchanges | RabbitMQ
// 創建備份交換機、隊列、聲明綁定關系String myAeExchange = "myAe";channel.exchangeDeclare(myAeExchange, "fanout", true, false, null);channel.queueDeclare("unroutedQueue", true, false, false, null);channel.queueBind("unroutedQueue", myAeExchange, "");// 創建普通交換機, 并設置 alternate-exchange,隊列、聲明綁定關系Map<String, Object> args = new HashMap<String, Object>();args.put("alternate-exchange", myAeExchange);channel.exchangeDeclare("normalExchange", "direct", true, false, args);channel.queueDeclare("normalQueue", true, false, false, null);channel.queueBind("normalQueue", "normalExchange", "normalKey");
????????如果備份交換器和 mandatory 參數一起使用,那么 mandatory 參數無效。?
4.2過期時間 TTL?Time-To-Live and Expiration | RabbitMQ
????????第一種方法是通過隊列屬性設置,隊列中所有消息都有相同的過期時間。channel.queueDeclare 方法中加入x-message-ttl 參數實現的,這個參數的單位是毫秒。
????????第二種方法是對消息本身進行單獨設置,
每條消息的TTL 可以不同。如果兩種方法一起使用,則消息的 TTL 以兩者之間較小的那個數值為準。消息在隊列中的生存時間一旦超過設置的 TTL 值時,就會變成“死信”(Dead Message),消費者將無法再收到該消息(這點不是絕對的,可以參考 4.3 節)。?
// 聲明帶TTL的隊列Map<String, Object> queueArgs = new HashMap<>();queueArgs.put("x-message-ttl", 30000);channel.queueDeclare("ttl.queue", true, false, false, queueArgs);// 發送帶TTL屬性的消息AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().expiration("5000").build();channel.basicPublish("", "ttl.queue", props, "Test TTL Message".getBytes())
4.3 死信&延時隊列
????????DLX,全稱為 Dead-Letter-Exchange,可以稱之為死信交換器,也有人稱之為死信郵箱。當消息在一個隊列中變成死信(dead message)之后,它能被重新被發送到另一個交換器中,這個交換器就是DLX,綁定DLX 的隊列就稱之為死信隊列。
出現的場景
- 消息被拒絕(Basic.Reject/Basic.Nack),并且設置 requeue 參數為 false;?
- 消息過期;?
- 隊列達到最大長度。
// 創建一個持久化、非排他的、非自動刪除的隊列channel.exchangeDeclare("exchange.dlx", "direct", true);channel.queueDeclare("queue.dlx", true, false, false, null);channel.queueBind("queue.dlx", "exchange.dlx", "routingkey");// 創建一個不用隊列 并設置 TTL 以及 DLX、DLXroutingkeychannel.exchangeDeclare("exchange.normal", "fanout", true);Map<String, Object> args = new HashMap<>();args.put("x-message-ttl", 10000);args.put("x-dead-letter-exchange", "exchange.dlx");args.put("x-dead-letter-routing-key", "routingkey");channel.queueDeclare("queue.normal", true, false, false, args);channel.queueBind("queue.normal", "exchange.normal", "");channel.basicPublish("exchange.normal", "rk",MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes());
????????由Web 管理頁面(圖4-3)可以看出,兩個隊列都被標記了“D”,這個是durable 的縮寫,即設置了隊列持久化。queue.normal 這個隊列還配置了 TTL、DLX 和 DLK,其中 DLX 指的是x-dead-letter-routing-key 這個屬性。?
????????在 AMQP 協議中,或者 RabbitMQ 本身沒有直接支持延遲隊列的功能,但是可以通過前面所介紹的 DLX 和 TTL 模擬出延遲隊列的功能。?
4.7 持久化
????????RabbitMQ的持久化分為三個部分:交換器的持久化、隊列的持久化和消息的持久化。?
deliveryMode(2) // 設置為持久化
????????將交換器、隊列、消息都設置了持久化之后就能百分之百保證數據不丟失了嗎?答案是否定的。?
????????首先從消費者來說,如果在訂閱消費隊列時將 autoAck 參數設置為true,那么當消費者接收到相關消息之后,還沒來得及處理就宕機了,這樣也算數據丟失。這種情況很好解決,將autoAck 參數設置為 false,并進行手動確認,詳細可以參考3.5 節。?
????????其次,在持久化的消息正確存入 RabbitMQ 之后,還需要有一段時間(雖然很短,但是不可忽視)才能存入磁盤之中。RabbitMQ 并不會為每條消息都進行同步存盤(調用內核的fsync1方法)的處理,可能僅僅保存到操作系統緩存之中而不是物理磁盤之中。如果在這段時間內
RabbitMQ 服務節點發生了宕機、重啟等異常情況,消息保存還沒來得及落盤,那么這些消息將丟失。RabbitMQ 在運行時會根據統計的消息傳送速度定期計算一個當前內存中能夠保存的最大消息數量(target_ram_count),如果alpha 狀態的消息數量大于此值時,就會引起消息的狀態轉換會丟失。?這里可以引入 RabbitMQ 的鏡像隊列機制(詳細參考 9.4 節)
4.8 生產者確認?
????????生產者如何知道消息有沒有正確地到達服務器
4.8.1 事務機制
- 通過事務機制實現;?
- 通過發送方確認(publisher confirm)機制實現。
try { channel.txSelect(); channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); int result = 1 / 0; channel.txCommit();
} catch (Exception e) { e.printStackTrace(); channel.txRollback();
}
4.8.2 發送方確認機制
??采用事務機制實現會嚴重降低 RabbitMQ 的消息吞吐量,這里就引入了一種輕量級的方式——發送方確認(publisher confirm)機制。?
????????生產者通過調用 channel.confirmSelect 方法(即 Confirm.Select 命令)將信道設置為 confirm 模式,之后 RabbitMQ 會返回 Confirm.Select-Ok 命令表示同意生產者將當前信道設置為 confirm 模式。所有被發送的后續消息都被 ack 或者 nack 一次,不會出現一條消
息既被 ack 又被 nack 的情況,并且 RabbitMQ 也并沒有對消息被 confirm 的快慢做任何保證
try {channel.confirmSelect();//將信道置為publisher confirm模式//之后正常發送消息channel.basicPublish("exchange", "routingKey", null,"publisher confirm test".getBytes());if (!channel.waitForConfirms()) {// 當消息發送不成功時候進入 if 代碼塊 do something else....System.out.println("send message failed");}} catch (InterruptedException e) {e.printStackTrace();}
注意要點:?
(1)事務機制和 publisher confirm 機制兩者是互斥的,不能共存。如果企圖將已開啟事務模式的信道再設置為 publisher confirm 模式,RabbitMQ 會報錯:{amqp_error, precondition_?failed, "cannot switch from tx to confirm mode", 'confirm.select'};或者如果企圖將已開啟 publisher confirm 模式的信道再設置為事務模式,RabbitMQ 也會報錯:{amqp_error, precondition_failed, "cannot switch from confirm to tx?
mode", 'tx.select' }。?
(2)事務機制和 publisher confirm 機制確保的是消息能夠正確地發送至 RabbitMQ,這里的“發送至 RabbitMQ”的含義是指消息被正確地發往至 RabbitMQ 的交換器,如果此交換器沒有匹配的隊列,那么消息也會丟失。所以在使用這兩種機制的時候要確保所涉及的交換器能夠有匹配的隊列。更進一步地講,發送方要配合 mandatory 參數或者備份交換器一起使用來提高消息傳輸的可靠性。?
????????publisher confirm 的優勢在于并不一定需要同步確認。這里我們改進了一下使用方式,總結有如下兩種:
- 批量 confirm 方法:每發送一批消息后,調用 channel.waitForConfirms 方法,等待服務器的確認返回。?
????????相比于前面示例中的普通 confirm 方法,批量極大地提升了 confirm 的效率,但是問題在于出現返回 Basic.Nack 或者超時情況時,客戶端需要將這一批次的消息全部重發,這會帶來明顯的重復消息數量,并且當消息經常丟失時,批量 confirm 的性能應該是不升反降的。?
channel.confirmSelect(); // 開啟確認模式
for(int i=0; i<100; i++){channel.basicPublish("", "queue", null, message.getBytes());
}
// 批量確認所有未確認消息
channel.waitForConfirmsOrDie(5000); // 超時5秒// 缺點:簡單但會阻塞生產者線程,批量失敗需重發全部消息
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
channel.addConfirmListener((sequenceNumber, multiple) -> {if(multiple) {outstandingConfirms.headMap(sequenceNumber, true).clear();} else {outstandingConfirms.remove(sequenceNumber);}
}, (sequenceNumber, multiple) -> {// NACK處理邏輯
});// 批量發送100條消息for (int i = 0; i < 100; i++) {String message = "Msg-" + i;long seqNo = channel.getNextPublishSeqNo();outstandingConfirms.put(seqNo, message);channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());}
4.9 消費端要點介紹
????????消費者客戶端可以通過推模式或者拉模式(推薦方式)的方式來獲取并消費消息,當消費者處理完業務邏輯需要手動確認消息已被接收,這樣 RabbitMQ才能把當前消息從隊列中標記清除。當然如果消費者由于某些原因無法處理當前接收到的消息,可以通過 channel.basicNack 或者 channel.basicReject 來拒絕掉。?
- 消息分發;?
- 消息順序性;?
- 棄用 QueueingConsumer。?
消息順序
在RabbitMQ中保證消息順序性需結合隊列特性和業務設計,以下是核心方案:
一、基礎保障機制
-
?單隊列單消費者模式?
- 利用隊列FIFO特性,僅允許一個消費者處理隊列,避免并發消費導致亂序
- 缺點:吞吐量受限,需配合消息持久化和手動ACK確保可靠性
-
?分區消費策略?
- 通過路由鍵將關聯消息(如相同訂單ID)固定路由到同一隊列,每個隊列對應獨立消費者
- 示例:使用Direct交換機按業務ID路由,實現"局部順序性"
二、增強控制手
-
?消息序列化標記?
- 在消息體中嵌入序列號,消費者端通過緩存排序實現邏輯順序控制
- 需配合冪等處理避免重復消息干擾
-
?單活消費者模式?
- 通過
x-single-active-consumer
參數確保隊列同一時間僅有一個活躍消費者,故障時自動切換
// Spring AMQP配置示例 Map<String, Object> args = new HashMap<>(); args.put("x-single-active-consumer", true); new Queue("seq_queue", true, false, false, args);
- 通過
三、高級方案
-
?事務與發布確認?
- 生產者啟用事務或發布確認機制,確保消息按發送順序持久化到隊列
- 事務適用于批量消息,確認機制適合單條消息
-
?死信隊列重試?
- 對處理失敗的消息進入死信隊列延時重試,避免立即重入破壞順序
4.10 消息傳輸保障
第九章 RabbitMQ 高階
9.1 存儲機制?
????????不管是持久化的消息還是非持久化的消息都可以被寫入到磁盤。這兩種類型的消息的落盤處理都在RabbitMQ 的“持久層”中完成。?
- 持久化的消息也會在內存中保存一份備份,這樣可以提高一定的性能,當內存吃緊的時候會從內存中清除。
- 非持久化的消息一般只保存在內存中,在內存吃緊的時候會被換入到磁盤中,以節省內存空間。
持久層
- 隊列索引(rabbit_queue_index):負責維護隊列中落盤消息的信息,包括消息的存儲地點、是否已被交付給消費者、是否已被消費者ack 等。每個隊列都有與之對應的一個rabbit_queue_index
- 消息存儲(rabbit_msg_store):以鍵值對的形式存儲消息,它被所有隊列共享,在每個節點中有且只有一個。從技術層面上來說,rabbit_msg_store 具體還可以分為msg_store_persistent 和 msg_store_transient
- ????????msg_store_persistent 負責持久化消息的持久化,重啟后消息不會丟失;
- ????????msg_store_transient 負責非持久化消息的持久化,重啟后消息會丟失。
結構查看??/opt/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@node1??
?隊列的結構
- rabbit_amqqueue_process 負責協議相關的消息處理,即接收生產者發布的消息、向消費者交付消息、處理消息的確認(包括生產端的 confirm 和消費端的 ack)等。
- backing_queue 是消息存儲的具體形式和引擎,并向rabbit_amqqueue_process提供相關的接口以供調用。
隊列消息狀態
- alpha:消息內容(包括消息體、屬性和 headers)和消息索引都存儲在內存中。 (alpha狀態最耗內存,但很少消耗CPU)
- beta:消息內容保存在磁盤中,消息索引保存在內存中(只需要一次I/O 操作就可以讀取到消息(從 rabbit_msg_store 中))。?
- gamma:消息內容保存在磁盤中,消息索引在磁盤和內存中都有(只需要一次I/O 操作就可以讀取到消息(從 rabbit_msg_store 中))。?
- delta:消息內容和索引都在磁盤中。(狀態基本不消耗內存,但是需要消耗更多的 CPU 和磁盤 I/O 操作,delta 狀態需要執行兩次I/O 操作才能讀取到消息,一次是讀消息索引(從 rabbit_queue_index 中),一次是讀消息
內容(從 rabbit_msg_store 中))