今日完成記錄
Time | Plan | 完成情況 |
---|---|---|
7:00 - 7:40 | 爬坡 | √ |
8:30 - 11:30 | Rabbit MQ | √ |
17:30 - 18:30 | 羽毛球 | √ |
RabbitMQ
消費者端如何保證可靠性?
- 消息投遞過程出現網絡故障
- 消費者接收到消息但是突然宕機未消費消息
- 消費者接收到消息后處理不當拋出異常
- 。。。
消費者確認機制
Consumer Acknowledgement:消費者處理消息結束應該給MQ發送一個回執,告知自己的消息處理狀態:ack【成功處理消息,MQ從隊列中刪除消息】nack【消息處理失敗,MQ需要重新推送消息】reject【消息處理失敗并拒絕該消息,MQ從隊列刪除消息】
springAMQP提供了三種ACK處理方式:
- none:不處理,消息投遞給消費者后直接返回ack【不安全,不建議】
- manual:手動處理,自己在業務代碼中調用api發送ack或者reject【存在業務入侵但是更靈活】
- auto:自動處理,利用aop自動對業務代碼進行增強,正常執行則返回ack,出現異常則根據異常類型處理【業務異常返回nack, 消息處理或者校驗異常返回reject】
返回reject常見異常:MessageConversionException、MethodArgumentNotValidException、MethodArgumentTypeMissmatchException、NoSuchMethodException、ClassCastException
基本上就是消息校驗異常以及不匹配處理方法或者參數的異常
通過如下配置可以設置ack處理方法:
spring:rabbitmq:listener:simple:acknowledge-mode: none # 不做處理
1、測試none處理方式,修改消費者端代碼,使其拋出觸發reject的異常。在拋出異常前打斷點,并觀察發現rabbitmq的客戶端發現消息在發送到消費者則觸發了自動ack并且刪除了消息 ,觸發異常后客戶端并沒有做任何處理。
2、修改acknowledge-mode為auto,再觀察發現阻塞異常觸發前消息處于uack狀態,但同時觀察到收到了一個manual ack。
(1)當代碼繼續執行,拋出MessageConversionException,會向MQ發送reject,刪除消息。
這里發生了一個有意思的現象,因為我消息阻塞了太久觸發了MQ消息重新投遞,因此又出現了一個manual ack以及交替出現的ready和unack。
(2)當拋出異常是RuntimeException,可以觀察到unack一直是1,且一直嘗試重新投遞。(重新投遞沒有觸發那個自動的manual ack)
這里留兩個小問題:為什么會自動發送了一個manual ack?這個重傳是否是超時重傳還是什么其他機制?
3、設置acknowledge-mode為manual,修改消費者端代碼手動調用api返回消息回執
@RabbitListener(bindings = @QueueBinding(key="*.top",value = @Queue(value="df.topic.queue1"),exchange = @Exchange(value = "df.topic1", type = ExchangeTypes.TOPIC)
))
public void listenDirectQueue1(Object msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {System.out.println("這是第" + (cnt++) + "條消息");channel.basicAck(deliveryTag, false);
}
(1)測試ack
首先是接著2的調試代碼繼續調試【也就是此時消息隊列中有一個消息沒有被接收】,所以啟動測試代碼后這個消息會被重新投遞,消息被消費者接收后手動回復確定,整個過程如下圖
接下來重新投遞一條消息觀察正常的手動ack全過程,圖中上面的圖藍色線(unacked)被紅色線遮擋,它們其實是同樣的走勢。也就是當消息成功投遞到消費者,會觸發一次自動的ack(Deliver manual ack),但是消息處于uack,等到業務代碼完成手動進行ack后該消息被ack并且刪除。
(2)測試nack
@RabbitListener(bindings = @QueueBinding(key="*.top",value = @Queue(value="df.topic.queue1"),exchange = @Exchange(value = "df.topic1", type = ExchangeTypes.TOPIC)
))
public void listenDirectQueue1(Object msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {try {System.out.println("這是第" + (cnt++) + "條消息");throw new BusinessException();
// channel.basicAck(deliveryTag, false);}catch (BusinessException e){// nack且重新入隊 重新推送channel.basicNack(deliveryTag, false, true);}catch (MessageConversionException e){// reject 并且不重新入隊channel.basicReject(deliveryTag, false);}
}
上面的代碼拋出了自定義的業務異常,這個異常會被捕獲并且返回nack,然后重新推送,如下圖
(3)測試reject
@RabbitListener(bindings = @QueueBinding(key="*.top",value = @Queue(value="df.topic.queue1"),exchange = @Exchange(value = "df.topic1", type = ExchangeTypes.TOPIC)
))
public void listenDirectQueue1(Object msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {try {System.out.println("這是第" + (cnt++) + "條消息");throw new MessageConversionException("just a test for msg reject");
// channel.basicAck(deliveryTag, false);}catch (MessageConversionException e){// reject 并且不重新入隊channel.basicReject(deliveryTag, false);}}
這里拋出MessageConversionException,捕獲后手動返回拒絕并且不重新投遞,過程如下
**總結:**實際上SpringAMQP只是提供了三個接口basicAck
、basicNack
、basicReject
,這三個接口何時觸發,基于何種規則觸發都是可以自定義的,上面的三個實現是基本與acknowledge-mode: auto
一樣的邏輯:業務異常nack且重新投遞、消息異常reject且不重新投遞、正常接收和消費則ack
消息失敗重試機制
當
上面提到的兩個小問題
為什么會觸發一次自動的Deliver(Manual ACK)
原來這個Deliver(Manual ACK)是一個投遞事件ACK,當消息進入消息隊列未被消費,其狀態為ready,當其被投遞到消費者,狀態會更新為unacked,如果被成功消費并且確認,則會被刪除。
當首次投遞,則會觸發一個投遞事件(ready變為unacked)
當消息被重新投遞,不會再觸發投遞事件
這是因為:
- 性能優化:重復發送投遞事件會導致網絡帶寬浪費、Broker的CPU浪費、監控系統負載
- 語義精確性:RabbitMQ的事件新系統旨在“報告狀態變化的邊界,而非狀態本身”,重復投遞的狀態變化是首次投遞的重復,因此沒有必要重復報告
- 避免誤導性監控:重復報告投遞事件會導致消息計數錯誤,無法區分實際新消息以及重新投遞消息
這個重傳是否是超時重傳還是什么其他機制?
明天再補了