前言
這里的發布方確認是以 SpringAMQP 寫的,之前我們在前面的篇章中就學過了 使用 Java 原生的SDK編寫,當時是發布確認模式,在這里我們將用 Spring 集成的 rabbitmq 方法來編寫
開啟發布者確認機制需要進行下面的配置,以 yml 為例:
spring:rabbitmq:publisher-confirm-type: correlated #消息發送確認
在使用RabbitMQ的時候,可以通過消息持久化來解決因為服務器的異常崩潰而導致的消息丟失,但是還有一個問題,當消息的生產者將消息發送出去之后,消息到底有沒有正確地到達服務器呢?
如果在消息到達服務器之前已經丟失(比如RabbitMQ重啟,那么RabbitMQ重啟期間生產者消息投遞失敗),持久化操作也解決不了這個問題,因為消息根本沒有到達服務器,何談持久化?
RabbitMQ為我們提供了兩種解決方案:
a.通過事務機制實現
b.通過發送方確認(publisherconfirm)機制實現
事務機制比較消耗性能,在實際工作中使用也不多,咱們主要介紹confirm機制來實現發送方的確認.
RabbitMQ為我們提供了兩個方式來控制消息的可靠性投遞
1.confirm確認模式
2. return退回模式
我們提前準備一下交換機和隊列:
//發送方確認public static final String CONFIRM_QUEUE = "CONFIRM_QUEUE";public static final String CONFIRM_EXCHANGE = "CONFIRM_EXCHANGE";public static final String CONFIRM_ROUTING_KEY = "CONFIRM_ROUTING_KEY";
//發送方確認@Bean("confirmQueue")public Queue confirmQueue() {return QueueBuilder.durable(MQConstants.CONFIRM_QUEUE).build();}@Bean("confirmExchange")public Exchange confirmExchange() {return ExchangeBuilder.directExchange(MQConstants.CONFIRM_EXCHANGE).durable(true).build();}@Bean("confirmBinding")public Binding confirmBinding(@Qualifier("confirmExchange") Exchange exchange, @Qualifier("confirmQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(MQConstants.CONFIRM_ROUTING_KEY).noargs();}
confirm
Producer在發送消息的時候,對發送端設置一個ConfirmCallback的監聽,無論消息是否到達
Exchange,這個監聽都會被執行,如果Exchange成功收到,ACK(Acknowledgecharacter,確認
字符)為true,如果沒收到消息,ACK就為false.
confirm 模式是用于確認發送方的消息是否到達交換機
要啟動 confirm ,我們還需要創建一個新的 RabbitTemplate,通過rabbitTemplate.setConfirmCallback 來進行設置,ack 有兩種返回值,true和false,根據不同的返回值做不同的處理
@Configuration
public class RabbitTemplateConfig {/*** 構建原生的 RabbitTemplate* @param connectionFactory* @return*/@Bean("rabbitTemplate")public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}/*** 構建發布方確認的 RabbitTemplate* @param connectionFactory* @return*/@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//設置回調方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("調用了 confirm 方法");if(ack){System.out.printf("消息發送成功,消息ID:%s \n", correlationData == null ? null : correlationData.getId());}else {System.out.printf("消息發送失敗, 消息ID:%s, cause: %s \n", correlationData == null ? null :correlationData.getId(), cause);//響應的業務處理,如重發消息等等...}}});return rabbitTemplate;}
}
這里如果項目需要使用到原始的 RabbitTemplate 的話,需要創建一個,因為在你自定義創建過 RabbitTmeplate 的時候,Spring 就不會自動幫你創建原始的 RabbitTemplate 了。
參數介紹:
public void confirm(CorrelationData correlationData, boolean ack, String cause)
correlationData是消息的唯一標識,也就說序列號,我們可以通過生產者那邊進行設置,注意這里和消費者那邊的 deliveryTag 是不一樣的,correlationData是生產者使用的,deliveryTag 是隊列創建的
ack 是否確認消息達到
cause 存儲消息發送失敗的原因
我們來演示消息發送失敗的場景:
@RequestMapping("/confirm")public String confirm() {for (int i = 0; i < 10; i++) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());confirmRabbitTemplate.convertAndSend(MQConstants.CONFIRM_EXCHANGE, "hello", "confirm: " + i, correlationData);}return "消息發送成功";}
我們將 routingkey 設置錯誤,觀察效果:
我們會發現消息發送成功,這是因為消息確實達到的指定的交換機,但是我們查看隊列的時候是沒有消息的,因此進一步驗證了 confirm 模式只是確認消息是否達到交換機,并不能確認是否到達指定的隊列
接下來我們來指定一個不存在的交換機:
@RequestMapping("/confirm")public String confirm() {for (int i = 0; i < 10; i++) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());confirmRabbitTemplate.convertAndSend("hello 85246", MQConstants.CONFIRM_ROUTING_KEY, "confirm: " + i, correlationData);}return "消息發送成功";}
在 cause 的打印信息中:得知隊列不存在,因此 confirm 確實可以驗證消息有沒有達到指定的交換機里
returns
消息到達Exchange之后,會根據路由規則匹配,把消息放入Queue中.Exchange到Queue的過程,如果一條消息無法被任何隊列消費(即沒有隊列與消息的路由鍵匹配或隊列不存在等),可以選擇把消息退回給發送者,消息退回給發送者時,我們可以設置一個返回回調方法,對消息進行處理
簡單來說,returns 是確認交換機上的消息是否成功到達隊列上的
@Bean("returnRabbitTemplate")public RabbitTemplate returnRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("調用了 returnedMessage 方法");System.out.printf("消息被退回:%s \n", returned);}});return rabbitTemplate;}
使用RabbitTemplate的setMandatory方法設置消息的mandatory屬性為true(默認為false).這個屬性的作用是告訴RabbitMQ,如果一條消息無法被任何隊列消費,RabbitMQ應該將消息返回給發送者,此時ReturnCallback就會被觸發.
如果我們指定一個不存在的 routingkey,那就沒有隊列能接收到消息,就會觸發ReturnsCallback方法
合并使用
當然這兩個模式是可以一起使用的,代碼演示:
/*** 構建發布方確認的 RabbitTemplate* @param connectionFactory* @return*/@Bean("confirmRabbitTemplate")public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//設置回調方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("調用了 confirm 方法");if(ack){System.out.printf("消息發送成功,消息ID:%s \n", correlationData == null ? null : correlationData.getId());}else {System.out.printf("消息發送失敗, 消息ID:%s, cause: %s \n", correlationData == null ? null :correlationData.getId(), cause);//響應的業務處理,如重發消息等等...}}});//設置消息退回回調rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("調用了 returnedMessage 方法");System.out.printf("消息被退回:%s \n", returned);}});return rabbitTemplate;}