12.消息可靠性
1.消息丟失的情況
- 生產者向消息代理傳遞消息的過程中,消息丟失了
- 消息代理( RabbitMQ )把消息弄丟了
- 消費者把消息弄丟了
那怎么保證消息的可靠性呢,我們可以從消息丟失的情況入手——從生產者、消息代理( RabbitMQ )、消費者三個方面來保證消息的可靠性
2.生產者的可靠性
1.生產者重連
由于網絡問題,可能會出現客戶端連接 RabbitMQ 失敗的情況,我們可以通過配置開啟連接 RabbitMQ 失敗后的重連機制
application.yml(將 host 更改為部署 RabbitMQ 的服務器的地址)
spring:rabbitmq:host: 127.0.0.1port: 5672virtual-host: /blogusername: CaiXuKunpassword: T1rhFXMGXIOYCoyiconnection-timeout: 1s # 連接超時時間template:retry:enabled: true # 開啟連接超時重試機制initial-interval: 1000ms # 連接失敗后的初始等待時間multiplier: 1 # 連接失敗后的等待時長倍數,下次等待時長 = (initial-interval) * multipliermax-attempts: 3 # 最大重試次數
注意事項: 當網絡不穩定的時候,利用重試機制可以有效提高消息發送的成功率,但 SpringAMOP 提供的重試機制是阻塞式的重試,也就是說多次重試等待的過程中,線程會被阻塞,影響業務性能
如果對于業務性能有要求,建議禁用重試機制。如果一定要使用,請合理配置等待時長(比如 200 ms)和重試次數,也可以考慮使用異步線程來執行發送消息的代碼
2.生產者確認
RabbitMQ 提供了 Publisher Confirm
和 Publisher Return
兩種確認機制。開啟確機制認后,如果 MQ 成功收到消息后,會返回確認消息給生產者,返回的結果有以下幾種情況
-
消息投遞到了 MQ,但是路由失敗(業務原因),此時會通過 PublisherReturn 機制返回路由異常的原因,然后返回 ACK,告知生產者消息投遞成功
-
臨時消息投遞到了 MQ,并且入隊成功,返回 ACK,告知生產者消息投遞成功
-
持久消息投遞到了MQ,并且入隊完成持久化,返回 ACK,告知生產者消息投遞成功
-
其它情況都會返回 NACK,告知生產者消息投遞失敗
生產者確認機制有關的配置信息( application.yml 文件)
spring:rabbitmq:publisher-returns: truepublisher-confirm-type: correlated
publisher-confirm-type 有三種模式:
- none:關閉 confirm 機制
- simple:以同步阻塞等待的方式返回 MQ 的回執消息
- correlated:以異步回調方式的方式返回 MQ 的回執消息
每個 RabbitTemplate 只能配置一個 ReturnCallback
新增一個名為 RabbitMQConfig
的配置類,并讓該類實現 ApplicationContextAware
接口
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置回調rabbitTemplate.setReturnsCallback((returnedMessage) -> {System.out.println("收到消息的return callback, " +"exchange = " + returnedMessage.getExchange() + ", " +"routingKey = " + returnedMessage.getRoutingKey() + ", " +"replyCode = " + returnedMessage.getReplyCode() + ", " +"replyText = " + returnedMessage.getReplyText() + ", " +"message = " + returnedMessage.getMessage());});}}
添加一個測試類,測試 ReturnCallback 的效果
@Test
void testConfirmCallback() throws InterruptedException {CorrelationData correlationData = new CorrelationData();correlationData.getFuture().whenCompleteAsync((confirm, throwable) -> {if (confirm.isAck()) {// 消息發送成功System.out.println("消息發送成功,收到ack");} else {// 消息發送失敗System.err.println("消息發送失敗,收到nack,原因是" + confirm.getReason());}if (throwable != null) {// 消息回調失敗System.err.println("消息回調失敗");}});rabbitTemplate.convertAndSend("blog.direct", "red", "Hello, confirm callback", correlationData);// 測試方法執行結束后程序就結束了,所以這里需要阻塞線程,否則程序看不到回調結果Thread.sleep(2000);
}
如何看待和處理生產者的確認信息:
- 生產者確認需要額外的網絡開銷和系統資源開銷,盡量不要使用
- 如果一定要使用,無需開啟 Publisher-Return 機制,因為路由失敗一般是業務出了問題
- 對于返回 nack 的消息,可以嘗試重新投遞,如果依然失敗,則記錄異常消息
3.消息代理(RabbitMQ)的可靠性
在默認情況下,RabbitMQ 會將接收到的信息保存在內存中以降低消息收發的延遲,這樣會導致兩個問題:
- 一旦 RabbitMQ 宕機,內存中的消息會丟失
- 內存空間是有限的,當消費者處理過慢或者消費者出現故障或時,會導致消息積壓,引發 MQ 阻塞( Paged Out 現象
**MQ 阻塞:**當隊列的空間被消息占滿了之后,RabbitMQ 會先把老舊的信息存到磁盤,為新消息騰出空間,在這個過程中,整個 MQ 是被阻塞的,也就是說,在 MQ 完成這一系列工作之前,無法處理已有的消息和接收新的消息
1.數據持久化
RabbitMQ 實現數據持久化包括 3 個方面:
- 交換機持久化
- 隊列持久化
- 消息持久化
注意事項:利用 SpringAMQP 創建的交換機、隊列、消息,默認都是持久化的
在 RabbitMQ 控制臺創建的交換機、隊列默認是持久化的,而消息默認是存在內存中( 3.12 版本之前默認存放在內存,3.12 版本及之后默認先存放在磁盤,消費者處理消息時才會將消息取出來放到內存中)
2. LazyQueue( 3.12 版本后所有隊列都是 Lazy Queue 模式)
從 RabbitMQ 的 3.6.0
版本開始,增加了 Lazy Queue 的概念,也就是惰性隊列,惰性隊列的特征如下:
- 接收到消息后直接存入磁盤而非內存(內存中只保留最近的消息,默認 2048條 )
- 消費者要處理消息時才會從磁盤中讀取并加載到內存
- 支持數百萬條的消息存儲,在 3.12 版本后,所有隊列都是 Lazy Queue 模式,無法更改
開啟持久化和生產者確認時,RabbitMQ 只有在消息持久化完成后才會給生產者返回 ACK 回執
-
在 RabbitMQ 控制臺中,要創建一個惰性隊列,只需要在聲明隊列時,指定 x-queue-mode 屬性為 lazy 即可
-
在 Java 代碼中,要創建一個惰性隊列,只需要在聲明隊列時,指定 x-queue-mode 屬性為 lazy 即可
//注解式創建 @RabbitListener(queuesToDeclare = @org.springframework.amqp.rabbit.annotation.Queue(name = "lazy.queue2",durable = "true",arguments = @Argument(name = "x-queue-mode",value = "lazy") )) public void listenLazeQueue(String message) {System.out.println("消費者收到了 laze.queue2的消息: " + message); }
//編程式創建@Bean public org.springframework.amqp.core.Queue lazeQueue() {return QueueBuilder.durable("lazy.queue1").lazy().build(); }
4.消費者的可靠性
1. 消費者確認機制
為了確認消費者是否成功處理消息,RabbitMQ 提供了消費者確認機制(Consumer Acknowledgement)。處理消息后,消費者應該向 RabbitMQ 發送一個回執,告知 RabbitMQ 消息的處理狀態,回執有三種可選值:
- ack:成功處理消息,RabbitMQ 從隊列中刪除該消息
- nack:消息處理失敗,RabbitMQ 需要再次投遞消息
- reject:消息處理失敗并拒絕該消息,RabbitMQ 從隊列中刪除該消息
SpringAMQP 已經實現了消息確認功能,并允許我們通過配置文件選擇 ACK 的處理方式,有三種方式:
-
none:不處理,即消息投遞給消費者后立刻 ack,消息會會立刻從 MQ 中刪除,非常不安全,不建議使用
-
manual:手動模式。需要自己在業務代碼中調用 api,發送 ack 或 reject ,存在業務入侵,但更靈活
-
auto:自動模式,SpringAMQP 利用 AOP 對我們的消息處理邏輯做了環繞增強,當業務正常執行時則自動返回 ack,當業務出現異常時,會根據異常的類型返回不同結果:
- 如果是業務異常,會自動返回 nack
- 如果是消息處理或校驗異常,自動返回 reject
開啟消息確認機制,需要在 application.yml
文件中編寫相關的配置
spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: none
2.失敗重試機制
當消費者出現異常后,消息會不斷重新入隊,重新發送給消費者,然后再次發生異常,再次 requeue(重新入隊),陷入 無限循環,給 RabbitMQ 帶來不必要的壓力
我們可以利用 Spring 提供的 retry 機制,在消費者出現異常時利用本地重試,而不是無限制地重新入隊
在 application.yml 配置文件中開啟失敗重試機制
spring:rabbitmq:listener:simple:prefetch: 1acknowledge-mode: autoretry:enabled: true # 開啟消息消費失敗重試機制initial-interval: 1000ms # 消息消費失敗后的初始等待時間multiplier: 1 # 消息消費失敗后的等待時長倍數,下次等待時長 = (initial-interval) * multipliermax-attempts: 3 # 最大重試次數stateless: true # true表示無狀態,false表示有狀態,如果業務中包含事務,需要設置為false
在達到最大重試次數后,消息會丟失!!!怎樣解決?
3. 失敗消息的處理策略
開啟重試模式后,如果重試次數耗盡后消息依然處理失敗,則需要由 MessageRecoverer 接口來處理, MessageRecoverer 有三個實現類:
-
RejectAndDontRequeueRecoverer:重試次數耗盡后,直接 reject,丟棄消息,默認就是這種方式
-
ImmediateRequeueMessageRecoverer:重試次數耗盡后,返回 nack,消息重新入隊
-
RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機
我們來演示一下使用 RepublishMessageRecoverer 類的情況
-
第一步:定義一個名為 blog.error 的交換機、一個名為 error.queue 的隊列,并將隊列和交換機進行綁定
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration @ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true") public class ErrorConfiguration {@Beanpublic DirectExchange errorExchange() {return new DirectExchange("error.direct", true, false);}@Beanpublic Queue errorQueue() {return new Queue("error.queue", true, false, false);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}}
-
第二步:將失敗處理策略改為 RepublishMessageRecoverer (開起了消費者重試機制才會生效)
@Bean public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); }
-
在控制臺中可以看到,消息的重試次數耗盡后,消息被放入了 error.queue 隊列
-
在 RabbitMQ 的控制塔也可以看到, error.direct 交換機 和 error.queue 隊列成功創建,消息也成功放入了 error.queue 隊列
總結:消費者如何保證消息一定被消費?
- 開啟消費者確認機制為 auto ,由 Spring 幫我們確認,消息處理成功后返回 ack,異常時返回 nack
- 開啟消費者失敗重試機制,并設置
MessageRecoverer
,多次重試失敗后將消息投遞到異常交換機,交由人工處理
4. 業務冪等性
在程序開發中,冪等是指同一個業務,執行一次或多次對業務狀態的影響是一致的
那么有什么方法能夠確保業務的冪等性呢
方案一:為每條消息設置一個唯一的 id
給每個消息都設置一個唯一的 id,利用 id 區分是否是重復消息:
- 為每條消息都生成一個唯一的 id,與消息一起投遞給消費者
- 消費者接收到消息后處理自己的業務,業務處理成功后將消息 id 保存到數據庫
- 如果消費者下次又收到相同消息,先去數據庫查詢該消息對應的 id 是否存在,如果存在則為重復消息,放棄處理
可以在指定 MessageConverter 的具體類型時,同時為 MessageConverter 設置自動創建一個 messageId
@Bean
public MessageConverter jacksonMessageConvertor() {Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}
發送消息后,在 RabbitMQ 的控制臺可以看到,消息的 properties 屬性附帶了 messageId 信息
但這種方式對業務有一定的侵入性
方案二:結合業務判斷 – 結合實例
兜底的解決方案
我們可以在交易服務設置定時任務,定期查詢訂單支付狀態,這樣即便 MQ 通知失敗,還可以利用定時任務作為兜底方案,確保訂單支付狀態的最終一致性