文章目錄
- 一、消息確認
- 1.1、消息確認機制
- 1.2、手動確認方法
- 1.2.1、AcknowledgeMode.NONE
- 1.2.2、AcknowledgeMode.AUTO
- 1.3.3、AcknowledgeMode.MANUAL
- 二、持久性
- 2.1、 交換機持久化
- 2.2、隊列持久化
- 2.3、消息持久化
- 三、發送方確認
- 3.1、confirm確認模式
- 3.2、return退回模式
- 3.3、常見面試題
- 結語
一、消息確認
1.1、消息確認機制
生產者發送消息之后, 到達消費端之后, 可能會有以下情況:
a. 消息處理成功
b. 消息處理異常
RabbitMQ向消費者發送消息之后, 就會把這條消息刪掉, 那么第兩種情況, 就會造成消息丟失. 那么如何確保消費端已經成功接收了, 并正確處理了呢?
為了保證消息從隊列可靠地到達消費者, RabbitMQ提供了消息確認機制(message acknowledgement)。 消費者在訂閱隊列時,可以指定 autoAck 參數, 根據這個參數設置, 消息確認機制分為以下兩種:
? 自動確認: 當autoAck 等于true時, RabbitMQ 會?動把發送出去的消息置為確認, 然后從內存(或者磁盤)中刪除, 而不管消費者是否真正地消費到了這些消息. 自動確認模式適合對于消息可靠性要求不高的場景.
? 手動確認: 當autoAck等于false時,RabbitMQ會等待消費者顯式地調用Basic.Ack命令, 回復確認信號后才從內存(或者磁盤) 中移去消息. 這種模式適合對消息可靠性要求比較高的場景
代碼示例:
DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE_NAME1,true,consumer);
當autoAck參數置為false, 對于RabbitMQ服務端額而言, 隊列中的消息分成了兩個部分: ?是等待投遞給消費者的消息. ?是已經投遞給消費者, 但是還沒有收到消費者確認信號的消息. 如果RabbitMQ一直沒有收到消費者的確認信號, 并且消費此消息的消費者已經斷開連接, 則RabbitMQ會安排該消息重新進入隊列,等待投遞給下?個消費者,當然也有可能還是原來的那個消費者
從RabbitMQ的Web管理平臺上, 也可以看到當前隊列中Ready狀態和Unacked狀態的消息數:
Ready: 等待投遞給消費者的消息數
Unacked: 已經投遞給消費者, 但是未收到消費者確認信號的消息數
1.2、手動確認方法
消費者在收到消息之后, 可以選擇確認, 也可以選擇直接拒絕或者跳過, RabbitMQ也提供了不同的確認應答的方式, 消費者客戶端可以調?與其對應的channel的相關?法, 共有以下三種:
1. 肯定確認: Channel.basicAck(long deliveryTag, boolean multiple)
RabbitMQ 已知道該消息并且成功的處理消息. 可以將其丟棄了.
參數說明:
- deliveryTag: 消息的唯?標識,它是?個單調遞增的64 位的?整型值. deliveryTag 是每個通道(Channel)獨?維護的, 所以在每個通道上都是唯?的. 當消費者確認(ack)?條消息時, 必須使?對應 的通道上進?確認.
2 ) multiple: 是否批量確認. 在某些情況下, 為了減少?絡流量, 可以對?系列連續的 deliveryTag 進行批量確認. 值為 true 則會?次性 ack所有?于或等于指定 deliveryTag 的消息. 值為false, 則只確認當 前指定deliveryTag的消息
若deliverTag = 8時,如果此時multiple 為true ,那么此時8以前的消息都會被確認
反之,如果multiple為false ,那么此時只確認消息8。
deliveryTag 是RabbitMQ中消息確認機制的?個重要組成部分, 它確保了消息傳遞的可靠性和順序性。
2. 否定確認: Channel.basicReject(long deliveryTag, boolean requeue)
RabbitMQ在2.0.0版本開始引?了 Basic.Reject 這個命令, 消費者客?端可以調用channel.basicReject方法來告訴RabbitMQ拒絕這個消息.
參數說明:
- deliveryTag: 參考channel.basicAck
- requeue: 表?拒絕后, 這條消息如何處理. 如果requeue 參數設置為true, 則RabbitMQ會重新將這條 消息存?隊列,以便可以發送給下?個訂閱的消費者.如果requeue參數設置為false, 則RabbitMQ會把 消息從隊列中移除, ?不會把它發送給新的消費者
3. 否定確認: Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
Basic.Reject命令?次只能拒絕?條消息,如果想要批量拒絕消息,則可以使?Basic.Nack這個命令. 消費者客戶端可以調用channel.basicNack方法來實現.
參數介紹參考上面兩個方法.
multiple參數設置為true則表示拒絕deliveryTag編號之前所有未被當前消費者確認的消息.
代碼示例:
Spring-AMQP 對消息確認機制提供了三種策略
public enum AcknowledgeMode {NONE,MANUAL,AUTO;
}
- AcknowledgeMode.NONE
? 這種模式下, 消息?旦投遞給消費者, 不管消費者是否成功處理了消息, RabbitMQ 就會自動確認 消息, 從RabbitMQ隊列中移除消息. 如果消費者處理消息失敗, 消息可能會丟失. - AcknowledgeMode.AUTO(默認)
? 這種模式下, 消費者在消息處理成功時會自動確認消息, 但如果處理過程中拋出了異常, 則不會確認消息. - AcknowledgeMode.MANUAL
? ?動確認模式下, 消費者必須在成功處理消息后顯式調? basicAck ?法來確認消息. 如果消息未被確認, RabbitMQ 會認為消息尚未被成功處理, 并且會在消費者可用時重新投遞該消息, 這 種模式提?了消息處理的可靠性, 因為即使消費者處理消息后失敗, 消息也不會丟失, 而是可以被重新處理.
主要流程:
- 配置確認機制(自動確認/手動機制)
- 生產者發送消息
- 消費端邏輯
- 測試
1.2.1、AcknowledgeMode.NONE
- 配置確認機制
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: none
- 發送消息:
隊列,交換機配置
public class Constant {public static final String ACK_EXCHANGE_NAME = "ack_exchange";public static final String ACK_QUEUE = "ack_queue";
}/*以下為消費端?動應答代碼?例配置*/@Bean("ackExchange")public Exchange ackExchange() {returnExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();}//2. 隊列@Bean("ackQueue")public Queue ackQueue() {return QueueBuilder.durable(Constant.ACK_QUEUE).build();}//3. 隊列和交換機綁定 Binding@Bean("ackBinding")public Binding ackBinding(@Qualifier("ackExchange") Exchange exchange,@Qualifier("ackQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ack").noargs();}
通過接口發送消息:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController@RequestMapping("/producer")public class ProductController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack","consumer ack test...");return "發送成功!";}}
- 寫消費端邏輯
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Componentpublic class AckQueueListener {//指定監聽隊列的名稱@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throwsException {System.out.printf("接收到消息: %s, deliveryTag: %d%n", newString(message.getBody(), "UTF-8"),message.getMessageProperties().getDeliveryTag());//模擬處理失敗//int num = 3 / 0;System.out.println("處理完成");}
這個代碼運行的結果是正常的, 運行后消息會被簽收: Ready為0, unacked為0
- 運行程序
調用接口, 發送消息 可以看到隊列中有?條消息, unacked的為0(需要先把消費者注掉)
開啟消費者, 控制臺輸出:
接收到消息: consumer ack test..., deliveryTag: 1
2 2024-04-29T17:03:57.797+08:00 WARN 16952 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.bite.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception
//....
管理界面:
1.2.2、AcknowledgeMode.AUTO
1、配置確認機制
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: auto
- 重新運行程序
調用接口, 發送消息 可以看到隊列中有?條消息, unacked的為0(需要先把消費者注掉)
開啟消費者, 控制臺不斷輸出錯誤信息:
接收到消息: consumer ack test..., deliveryTag: 1
2024-04-29T17:07:06.114+08:00 WARN 16488 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.bite.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception接收到消息: consumer ack test..., deliveryTag: 2
2024-04-29T17:07:07.161+08:00 WARN 16488 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.bite.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception接收到消息: consumer ack test..., deliveryTag: 3
2024-04-29T17:07:08.208+08:00 WARN 16488 --- [ntContainer#0-1]
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:
Listener method 'public void
com.bite.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception
從日志上可以看出, 當消費者出現異常時, RabbitMQ會不斷的重發. 由于異常,多次重試還是失敗,消息沒被確認,也無法nack,就?直是unacked狀態,導致消息積壓
1.3.3、AcknowledgeMode.MANUAL
- 配置確認機制
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/bitelistener:simple:acknowledge-mode: manual
- 消費端?動確認邏輯
import com.bite.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Componentpublic class AckQueueListener {//指定監聽隊列的名稱@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throwsException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1. 接收消息 System.out.printf("接收到消息: %s, deliveryTag: %d%n", newString(message.getBody(), "UTF-8"),message.getMessageProperties().getDeliveryTag());//2. 處理業務邏輯System.out.println("處理業務邏輯");//?動設置?個異常, 來測試異常拒絕機制// int num = 3/0;//3. ?動簽收channel.basicAck(deliveryTag, true);} catch (Exception e) {//4. 異常了就拒絕簽收//第三個參數requeue, 是否重新發送, 如果為true, 則會重新發送,,若為false, 則直接丟棄channel.basicNack(deliveryTag, true, true);}}}
這個代碼運行的結果是正常的, 運行后消息會被簽收: Ready為0, unacked為0
控制臺輸出:
接收到消息: consumer ack test..., deliveryTag: 1
處理業務邏輯.....
管理界面:
- 異常時拒絕簽收
@Componentpublic class AckQueueListener {//指定監聽隊列的名稱@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throwsException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1. 接收消息 System.out.printf("接收到消息: %s, deliveryTag: %d%n", newString(message.getBody(), "UTF-8"),message.getMessageProperties().getDeliveryTag());//2. 處理業務邏輯System.out.println("處理業務邏輯");//?動設置?個異常, 來測試異常拒絕機制int num = 3/0;//3. ?動簽收channel.basicAck(deliveryTag, true);} catch (Exception e) {//4. 異常了就拒絕簽收//第三個參數requeue, 是否重新發送, 如果為true, 則會重新發送,,若為false, 則直接丟棄channel.basicNack(deliveryTag, true, true);}}}
運行結果: 消費異常時不斷重試, deliveryTag 從1遞增控制臺日志:
接收到消息: consumer ack test..., deliveryTag: 1
處理業務邏輯 接收到消息: consumer ack test..., deliveryTag: 2
處理業務邏輯 接收到消息: consumer ack test..., deliveryTag: 3
處理業務邏輯 接收到消息: consumer ack test..., deliveryTag: 4
處理業務邏輯 接收到消息: consumer ack test..., deliveryTag: 5
處理業務邏輯 接收到消息: consumer ack test..., deliveryTag: 6
處理業務邏輯.....
管理界?上unacked也變成了1
Unacked的狀態變化很快, 為方便觀察, 消費消息前增加?下休眠時間Thread.sleep(10000);
二、持久性
在前?講了消費端處理消息時, 消息如何不丟失, 但是如何保證當RabbitMQ服務停掉以后, 生產者發送的消息不丟失呢. 默認情況下, RabbitMQ 退出或者由于某種原因崩潰時, 會忽視隊列和消息, 除非告知他不要這么做.
RabbitMQ的持久化分為三個部分:交換器的持久化、隊列的持久化和消息的持久化.
2.1、 交換機持久化
交換器的持久化是通過在聲明交換機時是將durable參數置為true實現的.相當于將交換機的屬性在服務器內部保存,當MQ的服務器發生意外或關閉之后,重啟 RabbitMQ 時不需要重新去建立交換機, 交換 機會自動建立,相當于?直存在. 如果交換器不設置持久化, 那么在 RabbitMQ 服務重啟之后, 相關的交換機元數據會丟失, 對?個?期 使用的交換器來說,建議將其置為持久化的.
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();
2.2、隊列持久化
隊列的持久化是通過在聲明隊列時將 durable 參數置為 true實現的. 如果隊列不設置持久化, 那么在RabbitMQ服務重啟之后,該隊列就會被刪掉, 此時數據也會丟失. (隊列沒 有了, 消息也無處可存了)。 隊列的持久化能保證該隊列本身的元數據不會因異常情況而丟失, 但是并不能保證內部所存儲的消息不會丟失. 要確保消息不會丟失, 需要將消息設置為持久化. 咱們前面用的創建隊列的方式都是持久化的
QueueBuilder.durable(Constant.ACK_QUEUE).build();
點進去看源碼會發現,該?法默認durable 是true
public static QueueBuilder durable(String name) {return (new QueueBuilder(name)).setDurable();
}
private QueueBuilder setDurable() {this.durable = true;return this;
}
通過下?代碼,可以創建非持久化的隊列
QueueBuilder.nonDurable(Constant.ACK_QUEUE).build();
2.3、消息持久化
消息實現持久化, 需要把消息的投遞模式( MessageProperties 中的 deliveryMode )設置為2,也就是MessageDeliveryMode.PERSISTENT
NON_PERSISTENT,//?持久化
PERSISTENT;//持久化
設置了隊列和消息的持久化, 當 RabbitMQ 服務重啟之后, 消息依舊存在. 如果只設置隊列持久化, 重啟之后消息會丟失. 如果只設置消息的持久化, 重啟之后隊列消失, 繼而消息也丟失. 所以單單設置消息 持久化而不設置隊列的持久化顯得毫無意義
//?持久化信息
channel.basicPublish(“”,QUEUE_NAME,null,msg.getBytes());
//持久化信息
channel.basicPublish(“”,QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
MessageProperties.PERSISTENT_TEXT_PLAIN 實際就是封裝了這個屬性
public static final BasicProperties PERSISTENT_TEXT_PLAIN =new BasicProperties("text/plain",null,null,2, //deliveryMode0, null, null, null,null, null, null, null,null, null);
如果使?RabbitTemplate 發送持久化消息, 代碼如下:
// 要發送的消息內容
String message = "This is a persistent message";
// 創建?個Message對象,設置為持久化
Message messageObject = new Message(message.getBytes(), newMessageProperties());
messageObject.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 使?RabbitTemplate發送消息
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", messageObject);
注意:RabbitMQ默認情況下會將消息視為持久化的,除?隊列被聲明為非持久化,或者消息在發送時被標記為非持久化
將所有的消息都設置為持久化, 會嚴重影響RabbitMQ的性能(隨機). 寫入磁盤的速度比寫入內 存的速度慢得不只一點點. 對于可靠性不是那么高的消息可以不采用持久化處理以提高整體的吞吐量. 在選擇是否要將消息持久化時, 需要在可靠性和吐吞量之間做?個權衡
將交換器、隊列、消息都設置了持久化之后就能百分之百保證數據不丟失了嗎? 答案是否定的
- 從消費者來說, 如果在訂閱消費隊列時將autoAck參數設置為true, 那么當消費者接收到相關消息之 后, 還沒來得及處理就宕機了, 這樣也算數據居丟失. 這種情況很好解決, 將autoAck參數設置為false, 并進行手動確認。
2 . 在持久化的消息正確存?RabbitMQ之后,還需要有?段時間(雖然很短,但是不可忽視)才能存入磁盤 中.RabbitMQ并不會為每條消息都進行同步存盤(調用內核的fsync方法)的處理, 可能僅僅保存到操 作系統緩存之中而不是物理磁盤之中. 如果在這段時間內RabbitMQ服務節點發生了宕機、重啟等異 常情況, 消息保存還沒來得及落盤, 那么這些消息將會丟失
這個問題怎么解決呢?
可以在發送端引入事務機制或者發送?確認機制來保證消息已經正確地發送并存儲至RabbitMQ中,即"發送方確認"
三、發送方確認
在使用 RabbitMQ的時候, 可以通過消息持久化來解決因為服務器的異常崩潰?導致的消息丟失, 但是還 有?個問題, 當消息的?產者將消息發送出去之后, 消息到底有沒有正確地到達服務器呢? 如果在消息到 達服務器之前已經丟失(比如RabbitMQ重啟, 那么RabbitMQ重啟期間生產者消息投遞失敗), 持久化操作也解決不了這個問題,因為消息根本沒有到達服務器,何談持久化?
RabbitMQ為我們提供了兩種解決?案:
a. 通過事務機制實現
b. 通過發送方確認(publisher confirm) 機制實現
事務機制?較消耗性能, 在實際工作中使用也不多, 咱們主要介紹confirm機制來實現發送方的確認.RabbitMQ為我們提供了兩個方式來控制消息的可靠性投遞
- confirm確認模式
- return退回模式
3.1、confirm確認模式
Producer 在發送消息的時候, 對發送端設置?個ConfirmCallback的監聽, 無論消息是否到達Exchange, 這個監聽都會被執行, 如果Exchange成功收到, ACK( Acknowledge character , 確認字符)為true, 如果沒收到消息, ACK就為false.
步驟如下:
- 配置RabbitMQ
- 設置確認回調邏輯并發送消息
接下來看實現步驟
- 配置RabbitMQ
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/numslistener:simple:acknowledge-mode: manual #消息接收確認publisher-confirm-type: correlated #消息發送確認
- 設置確認回調邏輯并發送消息
?論消息確認成功還是失敗, 都會調?ConfirmCallback的confirm方法. 如果消息成功發送到Broker,ack為true.如果消息發送失敗, ack為false, 并且cause提供失敗的原因
@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactoryconnectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack,String cause) {System.out.printf("");if (ack) {System.out.printf("消息接收成功, id:%s \n",correlationData.getId());} else {System.out.printf("消息接收失敗, id:%s, cause: %s",correlationData.getId(), cause);}}});return rabbitTemplate;}@Resource(name = "confirmRabbitTemplate")private RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() throws InterruptedExceptionCorrelationData correlationData1 = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME,"confirm","confirm test...",correlationData1);return"確認成功";
}
?法說明:
public interface ConfirmCallback {/*** 確認回調* @param correlationData: 發送消息時的附加信息, 通常?于在確認回調中識別特定的消 息* @param ack: 交換機是否收到消息, 收到為true, 未收到為false* @param cause: 當消息確認失敗時,這個字符串參數將提供失敗的原因.這個原因可以?于調 試和錯誤處理.* 成功時, cause為null */void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);
}
RabbitTemplate.ConfirmCallback 和 ConfirmListener 區別
在RabbitMQ中, ConfirmListener和ConfirmCallback都是用來處理消息確認的機制, 但它們屬于不同的客戶端庫, 并且使用的場景和方式有所不同.
1 . ConfirmListener 是 RabbitMQ Java Client 庫中的接口. 這個庫是 RabbitMQ 官方提供的一個直接與RabbitMQ服務器交互的客戶端庫. ConfirmListener 接口提供了兩個方法: handleAck 和handleNack, 用于處理消息確認和否定確認的事件.
2 . ConfirmCallback 是 Spring AMQP 框架中的?個接口. 專門為Spring環境設計. 用于簡化與RabbitMQ交互的過程. 它只包含?個 confirm方法,?于處理消息確認的回調.
在 Spring Boot 應用中, 通常會使用 ConfirmCallback, 因為它與 Spring 框架的其他部分更加整合, 可 以利用 Spring 的配置和依賴注入功能. 而在使用 RabbitMQ Java Client 庫時, 則可能會直接實現ConfirmListener 接口, 更直接的RabbitMQChannel交互
3.2、return退回模式
消息到達Exchange之后, 會根據路由規則匹配, 把消息放?Queue中. Exchange到Queue的過程, 如果一條消息無法被任何隊列消費(即沒有隊列與消息的路由鍵匹配或隊列不存在等), 可以選擇把消息退回 給發送者. 消息退回給發送者時, 我們可以設置?個返回回調方法, 對消息進行處理.
步驟如下:
- 配置RabbitMQ
- 設置返回回調邏輯并發送消息
1 . 配置RabbitMQ
spring:rabbitmq:addresses: amqp://study:study@110.41.51.65:15673/numslistener:simple:acknowledge-mode: manual #消息接收確認publisher-confirm-type: correlated #消息發送確認
- 設置返回回調邏輯并發送消息
這里是引用消息無法被路由到任何隊列, 它將返回給發送者,這時setReturnCallback設置的回調將被觸發
@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(CachingConnectionFactoryconnectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.printf("消息被退回: %s", returned);}});return rabbitTemplate;}@RequestMapping("/msgReturn")public String msgReturn() {CorrelationData correlationData = new CorrelationData("2");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm11", "message return test...", correlationData);return "消息發送成功";}
使?RabbitTemplate的setMandatory方法設置消息的mandatory屬性為true(默認為false). 這個屬性的作用是告訴RabbitMQ, 如果?條消息無法被任何隊列消費, RabbitMQ應該將消息返回給發送者, 此 時 ReturnCallback 就會被觸發.
回調函數中有?個參數: ReturnedMessage, 包含以下屬性:
public class ReturnedMessage {//返回的消息對象,包含了消息體和消息屬性private final Message message;//由Broker提供的回復碼, 表?消息?法路由的原因. 通常是?個數字代碼,每個數字代表不同 的含義. private final int replyCode;//?個?本字符串, 提供了?法路由消息的額外信息或錯誤描述.private final String replyText;//消息被發送到的交換機名稱private final String exchange;//消息的路由鍵,即發送消息時指定的鍵private final String routingKey;}
3.3、常見面試題
如何保證RabbitMQ消息的可靠傳輸?
先放?張RabbitMQ消息傳遞圖:
從這個圖中, 可以看出, 消息可能丟失的場景以及解決?案:
- ?產者將消息發送到 RabbitMQ失敗
a. 可能原因: 網絡問題等
b. 解決辦法: 參考本章節[發送方確認-confirm確認模式] - 消息在交換機中無法路由到指定隊列:
a. 可能原因: 代碼或者配置層面錯誤, 導致消息路由失敗
b. 解決辦法: 參考本章節[發送方確認-return模式] - 消息隊列自身數據丟失
a. 可能原因: 消息到達RabbitMQ之后, RabbitMQ Server 宕機導致消息丟失.
b. 解決辦法: 參考本章節[持久性]. 開啟 RabbitMQ持久化, 就是消息寫入之后會持久化到磁盤, 如果RabbitMQ 掛了, 恢復之后會自動讀取之前存儲的數據. (極端情況下, RabbitMQ還未持久化就掛了, 可能導致少量數據丟失, 這個概率極低, 也可以通過集群的方式提高可靠性) - 消費者異常, 導致消息丟失
a. 可能原因: 消息到達消費者, 還沒來得及消費, 消費者宕機. 消費者邏輯有問題.
b. 解決辦法: RabbitMQ 提供了 消費者應答機制來使 RabbitMQ 能夠感知 到消費者是否消費成功消息. 默認情況下消費者應答機制是自動應答的, 可以開啟手動確認, 當消費者確認消費成功后才會刪除消息, 從而避免消息丟失. 除此之外, 也可以配置重試機制(下篇文章會為大家介紹), 當消息消費異常時, 通過消息重試確保消息的可靠性
結語
本篇文章主要介紹了RAbbitMQ中的部分高級特性,主要從消息確認,持久化,發送方確認三個方面展開
以上就是本文全部內容,感謝各位能夠看到最后,如有問題,歡迎各位大佬在評論區指正,希望大家可以有所收獲!創作不易,希望大家多多支持!
最后,大家再見!祝好!我們下期見!