持續學習&持續更新中…
守破離
【雷豐陽-谷粒商城 】【分布式高級篇-微服務架構篇】【22】【RabbitMQ】
- Message Queue 消息隊列
- 異步處理
- 應用解耦
- 流量控制
- 消息中間件概念
- RabbitMQ概念
- Message
- Publisher
- Exchange
- Queue
- Binding
- Connection
- Channel
- Consumer
- Virtual Host
- Broker
- 圖示
- 安裝
- RabbitMQ運行機制
- AMQP 中的消息路由
- Exchange類型
- 練習
- RabbitMQ整合
- AmqpAdmin 管理組件
- RabbitTemplate 消息發送處理組件
- 監聽消息
- 注意
- RabbitMQ消息確認機制-可靠抵達
- 發送端—ConfirmCallback
- 發送端—ReturnCallback
- 發送端—代碼配置
- 消費端
- 消息的TTL(Time To Live)
- 什么是死信
- 延時隊列
- 延時隊列的實現
- 如何保證消息可靠性
- 消息丟失
- 消息重復
- 消息積壓
- MQ對比
- 參考
Message Queue 消息隊列
異步處理
應用解耦
這樣不管庫存系統的接口會不會發生改變,訂單系統都不關心
流量控制
把用戶請求流量存到消息隊列中,后臺服務根據它自身的處理能力去來進行消費處理,不會導致后臺服務宕機
消息中間件概念
大多應用中,可通過消息服務中間件來提升系統異步通信、擴展解耦能力,消息服務中兩個重要概念:消息代理(message broker)和目的地(destination)
當消息發送者發送消息以后,將由消息代理接管,消息代理保證消息傳遞到指定目的地。
消息隊列主要有兩種形式的目的地
- 隊列(queue):點對點消息通信(point-to-point)
- 主題(topic):發布(publish)/訂閱(subscribe)消息通信
點對點式:
- 消息發送者發送消息,消息代理將其放入一個隊列中,消息接收者從隊列中獲取消息內容,消息讀取后被移出隊列
- 消息只有唯一的發送者和接收者
- 很多人都可以監聽隊列,但是,消息誰搶到就是誰的
發布訂閱式:
- 發送者發送消息到主題,多個接收者(訂閱者)監聽(訂閱)這個主題,那么就會在消息到達時,同時收到消息
- (訂閱者都會收到消息)
JMS(Java Message Service)JAVA消息服務:基于JVM消息代理的規范。ActiveMQ、HornetMQ是JMS實現
AMQP(Advanced Message Queuing Protocol)高級消息隊列協議,也是一個消息代理的規范,兼容JMS
RabbitMQ是AMQP的實現
RabbitMQ概念
RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue Protocol)的開源實現。
Message
消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成, 這些屬性包括routing-key(路由鍵)、priority(相對于其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。
Publisher
消息的生產者,也是一個向交換器發布消息的客戶端應用程序。
Exchange
交換器,用來接收生產者發送的消息并將這些消息路由給服務器中的隊列。
Exchange有4種類型:direct(默認),fanout, topic, 和headers,不同類型的Exchange轉發消息的策略有所區別
Queue
消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直 在隊列里面,等待消費者連接到這個隊列將其取走。
Binding
綁定,用于消息隊列和交換器之間的關聯。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交 換器理解成一個由綁定構成的路由表。
Exchange 和Queue的綁定可以是多對多的關系。
Connection
網絡連接,比如一個TCP連接。
客戶端和消息中間件之間,一直保持一個長連接。1個客戶端只會建立1條連接
長連接的好處:客戶端宕機或下線,該長連接就會斷開,RabbitMQ就會感知到,就不會再繼續派發消息,可以防止大面積消息丟失問題。
Channel
(1條Connection上建立多個Channel,收發數據通過Channel進行)
信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內的虛擬連接,AMQP 命令都是通過信道 發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。
因為對于操作系統來說建立和銷毀 TCP 都 是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。
Consumer
消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
Virtual Host
虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個Vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定、配置和權限機制。VHost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 /
。
虛擬主機跟虛擬主機之間是相互隔離的。可以用于生產環境和開發環境隔離等
類比,RabbitMQ類似一個多租戶系統,各個用戶相互隔離。每一個 RabbitMQ 服務器都能創建 虛擬的消息服務器,稱之為 虛擬主機(virtual host),簡稱 vhost。vhost 本質上是一個獨立的小型 RabbitMQ 服務器,vhost 可避免隊列和交換器等命名沖突,vhost 之間是絕對隔離的
Broker
表示消息隊列服務器實體
圖示
安裝
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
docker update rabbitmq --restart=always
- https://www.rabbitmq.com/networking.html
- 4369, 25672 (Erlang發現&集群端口)
- 5672, 5671 (AMQP端口)
- 15672 (web管理后臺端口)
- 61613, 61614 (STOMP協議端口)
- 1883, 8883 (MQTT協議端口)
RabbitMQ運行機制
AMQP 中的消息路由
AMQP 中消息的路由過程和 Java 開發者熟悉的 JMS 存在一些差別,AMQP 中增加了 Exchange 和Binding 的角色。生產者把消息發布到 Exchange 上,消息最終到達隊列并被消費者接收,而 Binding 決定交換器的消息應該發送到那個隊列。
Exchange類型
Exchange分發消息時根據類型的不同分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。headers 匹配 AMQP 的消息 header 而不是路由鍵,headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了,所以直接看另外三種類型:
消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。
路由鍵與隊列名完全匹配,如果一個隊列綁定到交換機要求路由鍵為“dog”,則只轉發 routing key 標記為“dog”的消息,不會轉發“dog.puppy”,也不會轉發“dog.guard” 等等。它是完全匹配的模式。
每個發到 fanout 類型交換器的消息都會分到所有綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息。
fanout 類型轉發消息是最快的。
topic 交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。 它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點隔開。
它同樣也會識別兩個通配符:符號“#”和符號“*”,#匹配0個或多個單詞,*匹配1個單詞。
練習
RabbitMQ整合
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring.rabbitmq.host=192.168.56.10
# 都有默認配置
#spring.rabbitmq.port=5672
#spring.rabbitmq.virtual-host=/
#spring.rabbitmq.username=guest
#spring.rabbitmq.password=guest
/*** 使用RabbitMQ* 1、引入amqp場景啟動器;RabbitAutoConfiguration 就會自動生效** 2、給容器中自動配置了* RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate;* 所有的屬性都是 spring.rabbitmq* @ConfigurationProperties(prefix = "spring.rabbitmq")* public class RabbitProperties** 3、給配置文件中配置 spring.rabbitmq 信息** 4、@EnableRabbit: @EnableXxxxx;開啟功能** 5、監聽消息:使用@RabbitListener;必須有@EnableRabbit* @RabbitListener: 類+方法上(監聽哪些隊列即可)* @RabbitHandler:配合@RabbitListener標在方法上(重載區分不同的消息)*/@EnableRabbit
@Configuration
public class MyRabbitConfig {/*** 使用JSON序列化機制,進行消息轉換*/@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
AmqpAdmin 管理組件
@Testpublic void createExchange() {/*** DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)*/Exchange directExchange = new DirectExchange("hello-java-exchange", true, false);amqpAdmin.declareExchange(directExchange);log.info("Exchange[{}]創建成功", "hello-java-exchange");}@Testpublic void createQueue() {/*** public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)*/Queue queue = new Queue("hello-java-queue", true, false, false);amqpAdmin.declareQueue(queue);log.info("Queue[{}]創建成功", "hello-java-queue");}@Testpublic void createBinding() {/*** String destination【目的地】,* DestinationType destinationType【目的地類型】,* String exchange【交換機】,* String routingKey【路由鍵】,* Map<String, Object> arguments【自定義參數】** 將exchange指定的交換機和destination目的地進行綁定,使用routingKey作為指定的路由鍵*/Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE, "hello-java-exchange", "hello.java", null);amqpAdmin.declareBinding(binding);log.info("Binding[{}]創建成功", "hello-java-binding");}
RabbitTemplate 消息發送處理組件
@Testpublic void sendMessageTest() {//1、發送消息,如果發送的消息是個對象,我們會使用序列化機制,將對象寫出去。對象必須實現SerializableString msg = "Hello World!";rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", msg);// 2、發送的對象類型的消息,可以自動轉成一個json【自定義MessageConverter即可】
// OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
// reasonEntity.setId(1L);
// reasonEntity.setCreateTime(new Date());
// reasonEntity.setName("哈哈-"+ 666);
// rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java",
// reasonEntity);log.info("消息發送完成{}");}
CorrelationData:消息的唯一ID【可以放在數據庫或者Redis中,方便之后判斷哪些消息沒有可靠抵達】
@RestController
public class RabbitController {@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/sendOneMq")public String sendOneMq(@RequestParam(value = "num", defaultValue = "10") Integer num) {OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();reasonEntity.setId(1L);reasonEntity.setCreateTime(new Date());reasonEntity.setName("哈哈");
// CorrelationData:消息的唯一ID【可以放在數據庫或者Redis中,方便之后判斷哪些消息沒有可靠抵達】rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", reasonEntity, new CorrelationData(UUID.randomUUID().toString()));return "ok";}
監聽消息
@RabbitListener + @RabbitHandler (重載區分不同的消息)
@GetMapping("/sendMq")public String sendMq(@RequestParam(value = "num", defaultValue = "10") Integer num) {for (int i = 0; i < num; i++) {if (i % 2 == 0) {OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();reasonEntity.setId(1L);reasonEntity.setCreateTime(new Date());reasonEntity.setName("哈哈-" + i);rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", reasonEntity, new CorrelationData(UUID.randomUUID().toString()));} else {OrderEntity entity = new OrderEntity();entity.setOrderSn(UUID.randomUUID().toString());rabbitTemplate.convertAndSend("hello-java-exchange", "hello22.java", entity, new CorrelationData(UUID.randomUUID().toString()));}}return "ok";}
@RabbitListener(queues = {"hello-java-queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {@RabbitHandlerpublic void receiveMessage(Message message,OrderReturnReasonEntity content,Channel channel) throws InterruptedException {
// 接收到消息...(Body:'{"id":1,"name":"哈哈-666","sort":null,"status":null,"createTime":1720438632794}' MessageProperties [headers={__TypeId__=com.atguigu.gulimall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1, consumerTag=amq.ctag-ltQ3IQ4H1lvLGqYNyEH3nQ, consumerQueue=hello-java-queue])System.out.println("@RabbitListener接收到消息..." + message);
// System.out.println("接收到消息...content:"+content);
// byte[] body = message.getBody();
// //消息頭屬性信息
// MessageProperties properties = message.getMessageProperties();
// Thread.sleep(3000);
// System.out.println("消息處理完成=>"+content.getName());}@RabbitHandlerpublic void receiveMessage2(OrderEntity content) throws InterruptedException {System.out.println("@RabbitHandler接收到消息..." + content);}}
@RabbitListener寫在方法上
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {@RabbitListener(queues = {"hello-java-queue"})public void receiveMessage(OrderEntity content) throws InterruptedException {System.out.println("接收到消息..." + content);}}
注意
- Queue、Exchange、Binding可以 @Bean 注入進去
- 監聽消息的方法可以有三種參數(不分數量,順序):Object content, Message message, Channel channel
- channel可以用來拒絕或者簽收消息
RabbitMQ消息確認機制-可靠抵達
保證消息不丟失,可靠抵達,可以使用事務消息,這樣性能會下降250倍,為此引入確認機制:
- publisher confirmCallback :確認模式
- publisher returnCallback :未投遞到 queue 退回模式
- consumer 手動 ack 機制
發送端—ConfirmCallback
spring.rabbitmq.publisher-confirms=true
在創建 connectionFactory 的時候設置 PublisherConfirms(true) 選項,開啟confirmcallback 。
CorrelationData:用來表示當前消息唯一性。
消息只要被 broker 接收到就會執行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才會調用confirmCallback。
被 broker 接收到只能表示 message 已經到達服務器,并不能保證消息一定會被投遞到目標 queue 里。所以需要用到接下來的 returnCallback 。
發送端—ReturnCallback
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
confrim 模式只能保證消息到達 broker,不能保證消息準確投遞到目標 queue 里。在有些業務場景下,我們需要保證消息一定要投遞到目標 queue 里,此時就需要用到return 退回模式。
這樣如果消息未能投遞到目標 queue 里將調用 returnCallback ,可以記錄下詳細到投遞數據,定期的巡檢或者自動糾錯都需要這些數據。
發送端—代碼配置
#服務端確認
# 開啟消息正確抵達RabbitMQ確認
spring.rabbitmq.publisher-confirms=true
# 開啟RabbitMQ中的消息正確的從交換機投遞到隊列確認
spring.rabbitmq.publisher-returns=true
# 只要消息抵達隊列,以異步方式優先回調ReturnCallback
spring.rabbitmq.template.mandatory=true
@Configuration
public class MyRabbitConfig {/*** 使用JSON序列化機制,進行消息轉換*/@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}private RabbitTemplate rabbitTemplate;@Primary@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);this.rabbitTemplate = rabbitTemplate;rabbitTemplate.setMessageConverter(messageConverter());initRabbitTemplate();return rabbitTemplate;}/*** 定制RabbitTemplate* 做好消息確認機制(publisher,consumer【手動ack】)* 每一個發送的消息都在數據庫做好記錄。定期將失敗的消息再次發送一遍** 發送端確認:* 1、Broker收到消息就回調【P——>B】* 1、spring.rabbitmq.publisher-confirms=true* 2、設置確認回調ConfirmCallback* 2、消息沒有正確抵達隊列進行回調【E——>Q】* 1、spring.rabbitmq.publisher-returns=true* spring.rabbitmq.template.mandatory=true* 2、設置確認回調ReturnCallback*** 消費端確認(保證每個消息被正確消費,此時broker才可以刪除這個消息)。* spring.rabbitmq.listener.simple.acknowledge-mode=manual #手動簽收* 默認是自動確認的:只要消息接收到,客戶端會自動確認,服務端就會移除這個消息* 問題:* 我們收到很多消息,如果自動回復給服務器ack,當消息由于各種原因沒有成功處理完成,就會發生消息丟失;* 消費者手動確認模式:* 只要我們沒有明確告訴MQ消息被簽收,消息就一直是unacked狀態。* 即使Consumer宕機,消息也不會丟失,會重新變為Ready狀態,下一次有新的Consumer連接進來就發給他* 如何簽收:* channel.basicAck(deliveryTag,false);業務成功,簽收* channel.basicNack(deliveryTag,false,true);業務失敗,拒簽,并讓消息重新入隊;*/
// @PostConstruct //MyRabbitConfig對象創建完成以后,也就是構造器執行完成后,執行這個方法public void initRabbitTemplate() {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** P ——> B [Publisher ——> Broker]** 只要消息抵達Broker這里的ack就為true* @param correlationData 當前消息的唯一關聯數據(這個是消息的唯一id,發布者發消息的時候傳遞的)* @param ack 消息是否成功抵達Broker* @param cause 失敗的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {//Broker收到了:修改消息的狀態——>Broker接收到消息
// System.out.println("confirm...correlationData[" + correlationData + "]==>ack[" + ack + "]==>cause[" + cause + "]");}});rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** E ——> Q [Exchange ——> Queue]** 只要消息沒有正確投遞給指定的隊列,就會觸發這個失敗回調* @param message 投遞失敗的消息的詳細信息* @param replyCode 回復的狀態碼* @param replyText 回復的文本內容* @param exchange 當時這個消息發給了哪個交換機* @param routingKey 當時這個消息指定的路由鍵*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {//報錯了:修改數據庫當前消息的狀態——>隊列接收消息發生錯誤
// System.out.println("Fail Message[" + message + "]==>replyCode[" + replyCode + "]==>replyText[" + replyText + "]===>exchange[" + exchange + "]===>routingKey[" + routingKey + "]");}});}}
消費端
消費者獲取到消息,成功處理,可以回復Ack給Broker
- basic.ack用于肯定確認;broker將移除此消息
- basic.nack用于否定確認;可以指定broker是否丟棄此消息,可以批量
- basic.reject用于否定確認;可以指定broker是否丟棄此消息,但不能批量
默認自動ack,消息被消費者收到,就會從broker的queue中移除
queue無消費者,消息依然會被存儲,直到消費者消費。消費者收到消息,默認會自動ack。但是如果無法確定此消息是否被處理完成, 或者成功處理。我們可以開啟手動ack模式spring.rabbitmq.listener.simple.acknowledge-mode=manual
- 消息處理成功,ack(),接受下一個消息,此消息broker就會移除
- 消息處理失敗,nack()/reject(),重新發送給其他人進行處理,或者容錯處理后ack,或者丟棄
- 消息一直沒有調用ack/nack方法(比如宕機/程序出異常),broker認為此消息正在被處理,不會投遞給別人,此時客戶端斷開,MQ感知到后,消息不會被broker移除,會重新入隊并投遞給別人
# 客戶端確認:開啟手動ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
/*** 消費端確認(保證每個消息被正確消費,此時broker才可以刪除這個消息)。* spring.rabbitmq.listener.simple.acknowledge-mode=manual #手動簽收* 默認是自動確認的:只要消息接收到,客戶端會自動確認,服務端就會移除這個消息* 問題:* 我們收到很多消息,如果自動回復給服務器ack,當消息由于各種原因沒有成功處理完成,就會發生消息丟失;* 消費者手動確認模式:* 只要我們沒有明確告訴MQ消息被簽收,消息就一直是unacked狀態。* 即使Consumer宕機,消息也不會丟失,會重新變為Ready狀態,下一次有新的Consumer連接進來就發給他* 如何簽收:* channel.basicAck(deliveryTag,false);業務成功,簽收* channel.basicNack(deliveryTag,false,true);業務失敗,拒簽,并讓消息重新入隊;*/
@RabbitListener(queues = {"hello-java-queue"})public void receiveMessage0(Message message,Channel channel) {
// multiple:是否批量
// requeue=false 丟棄 requeue=true 發回服務器,讓服務器重新入隊該消息。// long deliveryTag, boolean multiple
// channel.basicAck( deliveryTag, false); 只簽收當前貨物,不批量簽收;// long deliveryTag, boolean multiple, boolean requeue
// channel.basicNack( deliveryTag, false, true); 拒簽當前貨物【是否將該消息讓MQ重新放入隊列,看自己的業務需求】// long deliveryTag, boolean requeue
// channel.basicReject( deliveryTag, true); 拒簽當前貨物【是否將該消息重新放回MQ看自己的業務需求】//DeliveryTag在channel內按順序自增long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("deliveryTag==>" + deliveryTag);//簽收貨物(消息),非批量模式try {if (deliveryTag % 2 == 0) {//收貨channel.basicAck(deliveryTag, false);System.out.println("簽收了貨物"+ message +"...deliveryTag..." + deliveryTag);} else {//退貨 requeue=false 丟棄 requeue=true 發回服務器,服務器重新入隊。channel.basicNack(deliveryTag, false, true);
// channel.basicReject(deliveryTag, true);// channel.basicNack(deliveryTag, false, false);System.out.println("不簽收貨物"+ message +"...deliveryTag..." + deliveryTag);}} catch (Exception e) {//網絡中斷,簽收信息未成功發送給Broker}}
消息的TTL(Time To Live)
消息的TTL就是消息的存活時間。
RabbitMQ給消息設置 TTL
- 通過隊列設置:隊列中的消息都有相同的過期時間
- 給消息本身設置:每條消息的 TTL 可以不同
如果隊列設置了,消息也設置了,則最小的 TTL 生效。所以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設置)。
消息在隊列中生存時間一旦超過 TTL,就會變成死信(Dead Message)
什么是死信
- 一個消息被Consumer拒收了,并且reject方法的參數里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。(
basic.reject/ basic.nack)requeue=false
- 消息的TTL到了,消息過期了。(沒有人消費它)
- 隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上
延時隊列
當消息在一個隊列中變成死信之后,它能被發送到一個指定的交換機中,這個交換機就是 DLX (Dead Letter Exchange),可稱為死信交換機。
綁定在 DLX 上的隊列就稱為死信隊列。
我們既可以控制消息在一段時間后變成死信,又可以控制變成死信的消息被路由到某一個指定的交換機,這個交換機又可以綁定隊列去消費死信,結合它們就可以實現一個延時隊列
不推薦使用給消息設置過期時間這種方式實現延時隊列
延時隊列的實現
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class MyMQConfig {//@Bean Binding,Queue,Exchange/*** 容器中的 Binding,Queue,Exchange 都會自動創建(RabbitMQ沒有的情況)* RabbitMQ中已有的話 @Bean中聲明屬性發生了變化也不會覆蓋*/@Beanpublic Queue orderDelayQueue() {Map<String,Object> arguments = new HashMap<>();/*** x-dead-letter-exchange: order-event-exchange* x-dead-letter-routing-key: order.release.order* x-message-ttl: 60000*/arguments.put("x-dead-letter-exchange","order-event-exchange");arguments.put("x-dead-letter-routing-key","order.release.order");arguments.put("x-message-ttl",60000);//String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> argumentsreturn new Queue("order.delay.queue", true, false, false,arguments);}@Beanpublic Queue orderReleaseOrderQueue() {return new Queue("order.release.order.queue", true, false, false);}@Beanpublic Exchange orderEventExchange() {//String name, boolean durable, boolean autoDelete, Map<String, Object> argumentsreturn new TopicExchange("order-event-exchange",true,false);}@Beanpublic Binding orderCreateOrderBinding() {//String destination, DestinationType destinationType, String exchange, String routingKey,// Map<String, Object> argumentsreturn new Binding("order.delay.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);}@Beanpublic Binding orderReleaseOrderBinding() {return new Binding("order.release.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}}
import com.atguigu.gulimall.order.entity.OrderEntity;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.ResponseBody;import java.io.IOException;
import java.util.Date;
import java.util.UUID;@Controller
public class HelloController {@AutowiredRabbitTemplate rabbitTemplate;@RabbitListener(queues = {"order.release.order.queue"})public void testListenOrderRelease(OrderEntity order, Message message, Channel channel) throws IOException {System.out.println("listenOrderRelease order : " + order);System.out.println("listenOrderRelease message : " + message);System.out.println("listenOrderRelease channel : " + channel);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@ResponseBody@GetMapping("/test/createOrder")public String createOrderTest() {//訂單下單成功OrderEntity entity = new OrderEntity();entity.setOrderSn(UUID.randomUUID().toString());entity.setModifyTime(new Date());//給MQ發送消息。rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", entity);return "ok";}}
如何保證消息可靠性
消息丟失
消息丟失出現的原因:
-
消息發送出去,由于網絡問題沒有抵達MQ服務器(Broker)
- 發送者做好容錯方法(try-catch),發送消息可能會網絡失敗,失敗后要有重試機制;
- 做好記錄,每個消息發送出去后都應該記錄到數據庫
-
消息抵達Broker,Broker要將消息寫入磁盤(持久化)才算成功。此時Broker尚未持久化完成就宕機。
- publisher必須加入確認回調機制,回調后,根據是否成功,修改數據庫消息的發送狀態。
- 做好定期重發,定期去數據庫掃描未成功的消息進行重發
-
自動ACK的狀態下。消費者收到消息,但沒來得及處理完成消息宕機
- 一定開啟手動ACK,消費成功才移除,失敗或者沒來得及處理就noAck并重新入隊
防止消息丟失:
- 做好消息確認機制(publisher,consumer【手動ack】)
- publisher將發送的消息都在數據庫做好記錄。定期從數據庫掃描發送失敗的消息,將它們再次發送
比如給數據庫創建如下表,用來記錄消息的發送狀態:
CREATE TABLE `mq_message`
(`message_id` char(32) NOT NULL,`content` text,`to_exchange` varchar(255) DEFAULT NULL,`routing_key` varchar(255) DEFAULT NULL,`class_type` varchar(255) DEFAULT NULL,`message_status` int(1) DEFAULT '0' COMMENT '0-新建 1-已發送 2-錯誤抵達 3-已抵達',`create_time` datetime DEFAULT NULL,`update_time` datetime DEFAULT NULL,PRIMARY KEY (`message_id`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4
消息重復
- 消息消費失敗,由于重試機制,自動又將消息發送出去。【這種情況允許重復】
- 成功消費,手動ack時宕機或者網絡原因等等,消息由unack變為ready,Broker又重新發送
- 將消費者的業務消費接口設計為冪等的即可。比如要解鎖庫存先判斷狀態
- 使用防重表(redis/mysql),發送消息每一個都有業務的唯 一標識,處理過就不用處理
- rabbitMQ的每一個消息都有redelivered字段,可以獲取是否是被重新投遞過來的,而不是第一次投遞過來的
Boolean redelivered = message.getMessageProperties().getRedelivered();
消息積壓
- 消費者宕機積壓
- 消費者消費能力不足積壓
- 發送者發送流量太大
- 上線更多的消費者,進行正常消費
- 上線專門的消費服務,將消息先批量取出來,記錄數據庫,離線慢慢處理
MQ對比
參考
雷豐陽: Java項目《谷粒商城》Java架構師 | 微服務 | 大型電商項目.
本文完,感謝您的關注支持!