一.發送者的可靠性
1.生產者重試機制
修改publisher
模塊的application.yaml
文件
spring:rabbitmq:connection-timeout: 1s # 設置MQ的連接超時時間template:retry:enabled: true # 開啟超時重試機制initial-interval: 1000ms # 失敗后的初始等待時間multiplier: 1 # 失敗后下次的等待時長倍數,下次等待時長 = initial-interval * multipliermax-attempts: 3 # 最大重試次數
注意:
①當網絡不穩定的時候,利用重試機制可以有效提高消息發送的成功率。不過SpringAMQP提供的重試機制是 阻塞式 的重試,也就是說多次重試等待的過程中,當前線程是被阻塞的。
②如果對于業務性能有要求,建議禁用重試機制。如果一定要使用,請合理配置等待時長和重試次數,當然也可以考慮使用異步線程來執行發送消息的代碼。?
2.生產者確認機制
RabbitMQ提供了生產者消息確認機制,包括 Publisher Confirm 和 Publisher Return 兩種。在開啟確認機制的情況下,當生產者發送消息給MQ后,MQ會根據消息處理的情況返回不同的 回執。
如何返回基本內容如下:
① 當消息投遞到MQ,但是路由失敗時,通過 Publisher Return 返回異常信息,同時通過 Publisher Confirm 返回ACK 的確認信息,代表投遞成功。
② 臨時消息投遞到了MQ,并且入隊成功,返回ACK,告知投遞成功。
③ 持久消息投遞到了MQ,并且入隊完成持久化,返回ACK ,告知投遞成功。
④ 其它情況都會返回NACK,告知投遞失敗。其中 ack 和 nack 屬于 Publisher Confirm 機制,ack 是投遞成功;nack 是投遞失敗。而return 則屬于?Publisher Return?機制。默認兩種機制都是關閉狀態,需要通過配置文件來開啟。
①開啟生產者確認機制
在publisher模塊的 application.yaml 中添加配置:
spring:rabbitmq:publisher-confirm-type: correlated # 開啟publisher confirm機制,并設置confirm類型 publisher-returns: true # 開啟publisher return機制
這里 publisher-confirm-type 有三種模式可選:
① none:關閉confirm機制
② simple:同步阻塞等待MQ的回執
③ correlated:MQ異步回調返回回執
②定義ReturnCallback
每個 RabbitTemplate 只能配置一個 ReturnCallback,因此我們可以在配置類中統一設置。我們在publisher模塊定義一個配置類:
@Slf4j @AllArgsConstructor @Configuration public class MqConfig {private final RabbitTemplate rabbitTemplate;@PostConstructpublic void init(){rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("觸發return callback,");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});} }
③定義ConfirmCallback
由于每個消息發送時的處理邏輯不一定相同,因此ConfirmCallback需要在每次發消息時定義。具體來說,是在調用RabbitTemplate中的convertAndSend方法時,多傳遞一個參數:
@Test void testPublisherConfirm() {// 1.創建CorrelationDataCorrelationData cd = new CorrelationData(UUID.randomUUID().toString());// 2.給Future添加ConfirmCallbackcd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 2.1.Future發生異常時的處理邏輯,基本不會觸發log.error("send message fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 2.2.Future接收到回執的處理邏輯,參數中的result就是回執內容if(result.isAck()){ // result.isAck(),boolean類型,true代表ack回執,false 代表 nack回執log.debug("發送消息成功,收到 ack!");}else{ // result.getReason(),String類型,返回nack時的異常描述log.error("發送消息失敗,收到 nack, reason : {}", result.getReason());}}});// 3.發送消息rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd); }
總結:Publisher Confirm?用來確認消息是否發送到MQ,而Publish Return 用來通知生產者哪些消息由于路由失敗沒有被接收。
注意:
開啟生產者確認比較消耗MQ性能,一般不建議開啟。?
二.MQ的可靠性
1.數據持久化
交換機持久化,隊列持久化,消息持久化(先保存到內存在寫入磁盤),這三個持久化都是默認開啟的。如果消息類型是非持久化的,只有在消息隊列滿了后會被迫寫入磁盤。
總結:
持久化是持續將消息寫入磁盤,非持久化是當mq內存被使用完畢后才將消息寫入磁盤,因此性能較差。
2.LazyQueue
① 接收到(不論臨時還是持久的消息)消息后直接存入磁盤而非內存
② 消費者要消費消息時才會從磁盤中讀取并加載到內存(也就是懶加載)(如果消費者的速度很快也會把消息提前緩存到內存)
③ 支持數百萬條的消息存儲這種模式屬于數據持久化的升級版。
從RabbitMQ的3.6.0版本開始,就增加了Lazy Queues的模式,而在3.12版本之后,LazyQueue已經成為所有隊列的默認格式。
三.消費者的可靠性
1.消費者確認機制
為了確認消費者是否成功處理消息,RabbitMQ提供了消費者確認機制(Consumer Acknowledgement)。即:當消費者處理消息結束后,應該向RabbitMQ發送一個回執,告知RabbitMQ自己消息處理狀態。回執有三種可選值:
? - ack:成功處理消息,RabbitMQ從隊列中刪除該消息
? - nack:消息處理失敗,RabbitMQ需要再次投遞消息
? - reject:消息處理失敗并拒絕該消息,RabbitMQ從隊列中刪除該消息由于消息回執的處理代碼比較統一,因此SpringAMQP幫我們實現了消息確認。并允許我們通過配置文件設置ACK處理方式,有三種模式:
? ? ?①none:不處理。即消息投遞給消費者后立刻ack,消息會立刻從MQ刪除。非常不安全,不建議使用
? ? ?②manual:手動模式。需要自己在業務代碼中調用api,發送 ack 或 reject ,存在業務入侵,但更靈活。
? ? ?③auto:自動模式。SpringAMQP利用AOP對我們的消息處理邏輯做了環繞增強,當業務正常執行時則自動返回 ack. ?當業務出現異常時,根據異常判斷返回不同結果:
- 如果是 業務異常,會自動返回 nack,消息處理失敗后,會回到RabbitMQ,并重新投遞到消費者。
- 如果是 消息處理或校驗異常,自動返回 reject。
?通過下面的配置可以修改SpringAMQP的ACK處理方式:
spring:rabbitmq:listener:simple:acknowledge-mode: none # 不做處理
2.失敗重試機制(對消費者確認機制的增強)
如果上面的代碼一直返回nack會導致無線循環。
所以我們配置消費者自己重試,如果超過了配置重試的次數,就會返回reject
修改consumer服務的application.yml文件,添加內容:
spring:rabbitmq:listener:simple:retry:enabled: true # 開啟消費者失敗重試initial-interval: 1000ms # 初識的失敗等待時長為1秒multiplier: 1 # 失敗的等待時長倍數,下次等待時長 = multiplier * last-intervalmax-attempts: 3 # 最大重試次數stateless: true # true無狀態;false有狀態。如果業務中包含事務,這里改為false
配置之后出現的現象:
- 消費者在失敗后消息沒有重新回到MQ無限重新投遞,而是在本地重試了3次
- 本地重試3次以后,拋出了 AmqpRejectAndDontRequeueException 異常。查看RabbitMQ控制臺,發現消息被刪除了,說明最后SpringAMQP返回的是 reject?
3.自定義失敗處理策略(對消費者重試機制的增強)
有上面失敗之后是直接返回的reject,這可能并不是我們想要返回的結果,于是我們有了失敗處理策略。
因此Spring允許我們自定義重試次數耗盡后的消息處理策略,這個策略是由MessageRecovery 接口來定義的,它有3個不同實現:
? ①RejectAndDontRequeueRecoverer:重試耗盡后,直接 reject ,丟棄消息。默認就是這種方式 (黑馬說不可選,下面兩個可選)
? ②ImmediateRequeueMessageRecoverer:重試耗盡后,返回 nack ,消息重新入隊?
? ③RepublishMessageRecoverer :重試耗盡后,將失敗消息投遞到指定的交換機,然后轉入到指定的隊列,后續由人工集中處理。代碼實現:
@Configuration @ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true") public class ErrorMessageConfig {@Beanpublic DirectExchange errorMessageExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue", true);}@Beanpublic Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); //這里的with是routingkey}// 這個是配置消息處理失敗之后投入到那個交換機@Beanpublic MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");} }
4.確保業務冪等性(解決消費者重復消費的問題)
冪等性:指同一個業務,執行一次或多次對業務狀態的影響是一致的。
數據的刪除,查詢一般是冪等的,但是修改和新增不是冪等的,案例如下:
所以,我們要盡可能避免業務被重復執行。
為了解決上面圖中的問題我們有了如下的解決方案:
①使用唯一消息id
給每一個消息都設置一個唯一ID,用ID區分是否被消費過。當我們消費一個消息的時候,先在數據庫中查詢是否存在這個數據的ID,如果不存在就消費。如果存在就說明這個消息之前被消費過。
給消息設置唯一id:這就是在配置消息轉換器的時候添加了一點代碼
@Bean
public MessageConverter messageConverter(){// 1.定義消息轉換器Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();// 2.配置自動創建消息id,用于識別不同消息,也可以在業務中基于ID判斷是否是重復消息jjmc.setCreateMessageIds(true);return jjmc;
}
?獲取消息的id:
@RabbitListener(queues = "simple.queue")
public void listensimpleQueue(Message message) {log.info("監聽到simple.queue的消息:ID:【{}】", message.getMessageProperties().getMessageId());log.info("監聽到simple.queue的消息:【{}】", new String(message.getBody()));
}
缺點:
?但是這種方法也有自己的缺點,也就是對業務邏輯有侵入性,而且還有額外的數據庫操作。?
②根據業務判斷
根據上面出現非冪等性問題,我們可以把業務邏輯改成這樣就可以保證業務的冪等性了。