目錄
1. confirm確認模式
1.配置RabbitMQ
2.設置確認回調邏輯并發送消息
2.Return退回模式
1.配置RabbitMQ
2.設置返回回調邏輯并發送消息
在使用RabbitMQ的時候, 可以通過消息持久化來解決因為服務器的異常崩潰而導致的消息丟失, 但是還有?個問題, 當消息的生產者將消息發送出去之后, 消息到底有沒有正確地到達服務器呢? 如果在消息到達服務器之前已經丟失(比如RabbitMQ重啟, 那么RabbitMQ重啟期間生產者消息投遞失敗), 持久化操作也解決不了這個問題,因為消息根本沒有到達服務器,何談持久化?
RabbitMQ為我們提供了兩種解決方案:
? ?
? ? ? 1. 通過事務機制實現
? ? ? 2. 通過發送方確認機制實現
事務機制比較消耗性能,咱們主要介紹confirm機制來實現發送方的確認。
RabbitMQ為我們提供了兩個方式來控制消息的可靠性投遞:
? ? ? 1. confirm確認模式
? ? ? 2. return退回模式
1. confirm確認模式
Producer 在發送消息的時候, 對發送端設置?個ConfirmCallback的監聽, ?論消息是否到達
Exchange, 這個監聽都會被執行, 如果Exchange成功收到, ACK( Acknowledge character , 確認
字符)為true, 如果沒有收到,則ACK為false。
步驟如下:
1.配置RabbitMQ
spring:rabbitmq:addresses: amqp://study:study@你的服務器IP:15673/你的虛擬機名listener:simple:acknowledge-mode: manual #消息接收確認publisher-confirm-type: correlated #消息發送確認
2.設置確認回調邏輯并發送消息
無論消息確認成功還是失敗, 都會調用ConfirmCallback的confirm方法. 如果消息成功發送到Broker,
ack為true.
如果消息發送失敗, ack為false, 并且cause提供失敗的原因
@Bean("confirmRabbitTemplate")
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory
connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack,
String cause) {System.out.printf("");if (ack){System.out.printf("消息接收成功, id:%s \n",
correlationData.getId());}else {System.out.printf("消息接收失敗, id:%s, cause: %s",
correlationData.getId(), cause);}}});return rabbitTemplate;
}@Resource(name = "confirmRabbitTemplate")
private RabbitTemplate confirmRabbitTemplate;
@RequestMapping("/confirm")
public String confirm() throws InterruptedException CorrelationData correlationData1 = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE_NAME,
"confirm", "confirm test...", correlationData1);return "確認成功";
}
方法說明:
public interface ConfirmCallback {/*** 確認回調* @param correlationData: 發送消息時的附加信息, 通常?于在確認回調中識別特定的消
息* @param ack: 交換機是否收到消息, 收到為true, 未收到為false* @param cause: 當消息確認失敗時,這個字符串參數將提供失敗的原因.這個原因可以?于調
試和錯誤處理.* 成功時, cause為null */void confirm(@Nullable CorrelationData correlationData, boolean ack,
@Nullable String cause);
}
RabbitTemplate.ConfirmCallback 和 ConfirmListener 區別
在RabbitMQ中, ConfirmListener和ConfirmCallback都是?來處理消息確認的機制, 但它們屬于不同
的客戶端庫, 并且使用的場景和方式有所不同.
1 . ConfirmListener 是 RabbitMQ Java Client 庫中的接口. 這個庫是 RabbitMQ 官?提供的?個直
接與RabbitMQ服務器交互的客戶端庫. ConfirmListener 接?提供了兩個方法: handleAck 和
handleNack, ?于處理消息確認和否定確認的事件.
2 . ConfirmCallback 是 Spring AMQP 框架中的?個接口. 專門為Spring環境設計. 用于簡化與
RabbitMQ交互的過程. 它只包含?個 confirm 方法,?于處理消息確認的回調.
在 Spring Boot 應?中, 通常會使用?ConfirmCallback, 因為它與 Spring 框架的其他部分更加整合, 可以利用Spring 的配置和依賴注入功能. 而在使用?RabbitMQ Java Client 庫時, 則可能會直接實現
ConfirmListener 接口, 更直接的與RabbitMQ的Channel交互
2.Return退回模式
消息到達Exchange之后, 會根據路由規則匹配, 把消息放?Queue中. Exchange到Queue的過程, 如果?條消息無法被任何隊列消費(即沒有隊列與消息的路由鍵匹配或隊列不存在等), 可以選擇把消息退回給發送者. 消息退回給發送者時, 我們可以設置一個返回回調方法,對消息進行處理
步驟如下:
1.配置RabbitMQ
spring:rabbitmq:addresses: amqp://study:study@你的服務器IP:15673/你的虛擬機名listener:simple:acknowledge-mode: manual #消息接收確認publisher-confirm-type: correlated #消息發送確認
2.設置返回回調邏輯并發送消息
消息無法被路由到任何隊列,它將返回給發送者,這時setReturnCallback設置的回調將被觸發
@Bean("confirmRabbitTemplate")
public RabbitTemplate confirmRabbitTemplate(CachingConnectionFactory
connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.printf("消息被退回: %s", returned);}
});return rabbitTemplate;
}@RequestMapping("/msgReturn")
public String msgReturn(){CorrelationData correlationData = new CorrelationData("2");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,
"confirm11", "message return test...", correlationData);return "消息發送成功";
}
使用RabbitTemplate的setMandatory方法設置消息的mandatory屬性為true(默認為false). 這個屬性 的作用是告訴RabbitMQ, 如果?條消息無法被任何隊列消費, RabbitMQ應該將消息返回給發送者, 此時 ReturnCallback 就會被觸發
回調函數中有一個參數,ReturnMessage,包含以下屬性:
public class ReturnedMessage {//返回的消息對象,包含了消息體和消息屬性private final Message message;//由Broker提供的回復碼, 表?消息?法路由的原因. 通常是?個數字代碼,每個數字代表不同
的含義. private final int replyCode;//?個?本字符串, 提供了?法路由消息的額外信息或錯誤描述.private final String replyText;//消息被發送到的交換機名稱private final String exchange;//消息的路由鍵,即發送消息指定的鍵private final String routingKey;
}