文章目錄
- 前言
- 一、生產者重試機制
- 二、生產者確認機制
- 實現生產者確認
- (1)定義ReturnCallback
- (2)定義ConfirmCallback
- 總結
前言
生產者重試機制、生產者確認機制。
一、生產者重試機制
- 問題:生產者發送消息時,出現了網絡故障,導致與MQ的連接中斷。
- 解決:SpringAMQP提供的消息發送時的重試機制。即:當RabbitTemplate與MQ連接超時后,多次重試。
實現:
需要配置application.yaml文件
spring:rabbitmq:connection-timeout: 1s # 設置MQ的連接超時時間template:retry:enabled: true # 開啟超時重試機制initial-interval: 1000ms # 失敗后的初始等待時間multiplier: 1 # 失敗后下次的等待時長倍數,下次等待時長 = initial-interval * multipliermax-attempts: 3 # 最大重試次數
我們利用命令停掉RabbitMQ服務:
docker stop mq
然后測試發送一條消息,會發現會每隔1秒重試1次,總共重試了3次。消息發送的超時重試機制配置成功了!
注意:
當網絡不穩定的時候,利用重試機制可以有效提高消息發送的成功率。不過SpringAMQP提供的重試機制是阻塞式
的重試,也就是說多次重試等待的過程中,當前線程是被阻塞的。
如果對于業務性能有要求,建議禁用重試機制。如果一定要使用,請合理配置等待時長和重試次數,當然也可以考慮使用異步線程來執行發送消息的代碼。
二、生產者確認機制
- 一般情況下,只要生產者與MQ之間的網路連接順暢,基本不會出現發送消息丟失的情況,因此大多數情況下我們無需考慮這種問題。
- 不過,在少數情況下,也會出現消息發送到MQ之后丟失的現象,比如:
- MQ內部處理消息的進程發生了異常
- 生產者發送消息到達MQ后未找到Exchange
- 生產者發送消息到達MQ的Exchange后,未找到合適的Queue,因此無法路由
- 針對上述情況,RabbitMQ提供了生產者消息確認機制,包括Publisher Confirm和Publisher Return兩種。在開啟確認機制的情況下,當生產者發送消息給MQ后,MQ會根據消息處理的情況返回不同的回執。
總結如下:
- 當消息投遞到MQ,但是路由失敗時,通過Publisher Return返回異常信息,同時返回ack的確認信息,代表投遞成功
- 臨時消息投遞到了MQ,并且入隊成功,返回ACK,告知投遞成功
- 持久消息投遞到了MQ,并且
入隊完成持久化
,返回ACK ,告知投遞成功 - 其它情況都會返回NACK,告知投遞失敗
- 其中ack和nack屬于Publisher Confirm機制,ack是投遞成功;nack是投遞失敗。而return則屬于Publisher Return機制。
默認兩種機制都是關閉狀態,需要通過配置文件來開啟。
實現生產者確認
發送者的application.yaml配置:
spring:rabbitmq:publisher-confirm-type: correlated # 開啟publisher confirm機制,并設置confirm類型publisher-returns: true # 開啟publisher return機制
這里publisher-confirm-type有三種模式可選:
- none:關閉confirm機制
- simple:同步阻塞等待MQ的回執
- correlated:MQ異步回調返回回執
一般我們推薦使用correlated,回調機制。
(1)定義ReturnCallback
每個RabbitTemplate只能配置一個ReturnCallback,因此我們可以在配置類中統一設置。我們在發送者定義一個配置類:
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class MqConfirmConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {// 獲取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 設置ReturnCallback(回調)rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.debug("收到消息的return callback, exchange:{},key:{}, msg:{}, code:{},text:{}", returnedMessage.getExchange(),returnedMessage.getRoutingKey(),returnedMessage.getMessage(),returnedMessage.getReplyCode(),returnedMessage.getReplyText());}});}
}
(2)定義ConfirmCallback
由于每個消息發送時的處理邏輯不一定相同,因此ConfirmCallback需要在每次發消息時定義。具體來說,是在調用RabbitTemplate中的convertAndSend方法時,多傳遞一個參數:
這里的CorrelationData中包含兩個核心的東西:
- id:消息的唯一標示,MQ對不同的消息的回執以此做判斷,避免混淆
- SettableListenableFuture:回執結果的Future對象
將來MQ的回執就會通過這個Future來返回,我們可以提前給CorrelationData中的Future添加回調函數來處理消息回執:
發送消息的測試類(添加ConfirmCallback):
@Test
void testPublisherConfirm() {// 1.創建CorrelationDataCorrelationData cd = new CorrelationData();// 2.給Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future發生異常時的處理邏輯,基本不會觸發log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回執的處理邏輯,參數中的result就是回執內容if(result.isAck()){ // result.isAck(),boolean類型,true代表ack回執,false 代表 nack回執log.debug("發送消息成功,收到 ack!");}else{ // result.getReason(),String類型,返回nack時的異常描述log.error("發送消息失敗,收到 nack, reason : {}", result.getReason());}}});// 3.發送消息 RoutingKey是錯誤的rabbitTemplate.convertAndSend("dragon.direct", "q", "hello", cd);
}
由于傳遞的RoutingKey是錯誤的,路由失敗后,觸發了return callback,同時也收到了ack。當我們修改為正確的RoutingKey以后,就不會觸發return callback了,只收到ack。而如果連交換機都是錯誤的,則只會收到nack。
注意:
開啟生產者確認比較消耗MQ性能,一般不建議開啟。而且大家思考一下觸發確認的幾種情況:
- 路由失敗:一般是因為RoutingKey錯誤導致,往往是編程導致
- 交換機名稱錯誤:同樣是編程錯誤導致
- MQ內部故障:這種需要處理,但概率往往較低。因此只有對消息可靠性要求非常高的業務才需要開啟,而且僅僅需要開啟ConfirmCallback處理nack就可以了。
總結
以上就是全部講解。