目錄
- 一、生產者重連
- 二、生產者確認
一、生產者重連
當網絡不穩定的時候,利用重試機制可以有效提高消息發送的成功率。不過SpringAMQP提供的重試機制是阻塞式
的重試,也就是說多次重試過程中,當前線程是被阻塞的,會影響業務性能。
如果對于業務性能有要求,建議禁用
重試機制。如果一定要使用,請合理配置等待時長和重試次數,當然也可以考慮異步
線程來執行發送消息的代碼。
spring:rabbitmq:connection-timeout: 200ms # 設置連接超時時間template:retry:enabled: true # 開啟超時重試機制initial-interval: 1000ms # 失敗后的初始等待時間multiplier: 1 # 失敗后下次的等待時長倍數,下次等待時長=initial-interval * multipliermax-attempts: 3 # 最大重試次數
二、生產者確認
RabbitMQ有兩種確認機制:Publisher Confirm
和Publisher Return
。開啟確認機制后,在MQ成功收到消息后會返回確認消息給生產者
,返回的結果有以下幾種情況:
- 消息投遞到了MQ,但是路由失敗。此時會通過
Publisher Return
返回路由異常原因,然后返回ACK
,告知投遞成功。 - 臨時消息投遞給了MQ,并且入隊成功,返回
ACK
,告知投遞成功。 - 持久消息投遞到MQ,并且入隊成功,返回
ACK
,告知投遞成功。 - 其他情況都會返回
NACK
,告知投遞失敗
在生產者服務YML文件中增加一下配置:
spring:rabbitmq:publisher-returns: true # 開啟publisher-return機制publisher-confirm-type: correlated # 設置機制類型:異步
publisher-confirm-type有三種模式可選:
none
:關閉confirm機制。simple
:同步阻塞等待MQ的回執消息。correlated
:MQ異步回調方式返回回執消息
每個RabbitTemplate
只能配置一個ReturnCallback
,因此需要在項目啟動過程中配置:
package com.ming.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.error(String.format("ReturnsCallback:消息發送失敗,應答碼 %s,原因 %s,交換機 %s,路由鍵 %s,消息 %s ...", returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getMessage()));}});}
}
發送消息,指定消息ID,消息ConfirmCallback
,此回調每一次發送消息的時候都要指定:
@Test
public void confirmCallbackTest() throws InterruptedException {// 1. 創建cdCorrelationData cd = new CorrelationData();// 2. 增加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable throwable) { // Spring消息回調失敗log.warn("ConfirmCallback:Spring消息回調失敗......", throwable);}@Overridepublic void onSuccess(CorrelationData.Confirm confirm) { // 消息發送回調if (confirm.isAck()) {log.info("ConfirmCallback:RabbitMQ 消息發送成功,收到ACK......");}else {log.warn("ConfirmCallback:RabbitMQ 消息發送成功,收到NACK......", confirm.getReason());}}});// 3. 發送消息rabbitTemplate.convertAndSend("mt.topic", "china.weather", "黃色警報 ......", cd); // 成功例子// rabbitTemplate.convertAndSend("mt.topic", "aaa", "黃色警報 ......", cd); // 失敗例子Thread.sleep(2000);
}