【雷豐陽-谷粒商城 】【分布式高級篇-微服務架構篇】【22】【RabbitMQ】


持續學習&持續更新中…

守破離


【雷豐陽-谷粒商城 】【分布式高級篇-微服務架構篇】【22】【RabbitMQ】

  • Message Queue 消息隊列
    • 異步處理
    • 應用解耦
    • 流量控制
  • 消息中間件概念
  • RabbitMQ概念
    • Message
    • Publisher
    • Exchange
    • Queue
    • Binding
    • Connection
    • Channel
    • Consumer
    • Virtual Host
    • Broker
    • 圖示
  • 安裝
  • RabbitMQ運行機制
    • AMQP 中的消息路由
    • Exchange類型
    • 練習
  • RabbitMQ整合
    • AmqpAdmin 管理組件
    • RabbitTemplate 消息發送處理組件
    • 監聽消息
    • 注意
  • RabbitMQ消息確認機制-可靠抵達
    • 發送端—ConfirmCallback
    • 發送端—ReturnCallback
    • 發送端—代碼配置
    • 消費端
  • 消息的TTL(Time To Live)
  • 什么是死信
  • 延時隊列
  • 延時隊列的實現
  • 如何保證消息可靠性
    • 消息丟失
    • 消息重復
    • 消息積壓
  • MQ對比
  • 參考

Message Queue 消息隊列

異步處理

在這里插入圖片描述

在這里插入圖片描述

在這里插入圖片描述

應用解耦

在這里插入圖片描述

在這里插入圖片描述

這樣不管庫存系統的接口會不會發生改變,訂單系統都不關心

流量控制

在這里插入圖片描述

把用戶請求流量存到消息隊列中,后臺服務根據它自身的處理能力去來進行消費處理,不會導致后臺服務宕機

消息中間件概念

大多應用中,可通過消息服務中間件來提升系統異步通信、擴展解耦能力,消息服務中兩個重要概念:消息代理(message broker)和目的地(destination)

當消息發送者發送消息以后,將由消息代理接管,消息代理保證消息傳遞到指定目的地。

消息隊列主要有兩種形式的目的地

  • 隊列(queue):點對點消息通信(point-to-point)
  • 主題(topic):發布(publish)/訂閱(subscribe)消息通信

點對點式:

  • 消息發送者發送消息,消息代理將其放入一個隊列中,消息接收者從隊列中獲取消息內容,消息讀取后被移出隊列
  • 消息只有唯一的發送者和接收者
  • 很多人都可以監聽隊列,但是,消息誰搶到就是誰的

發布訂閱式:

  • 發送者發送消息到主題,多個接收者(訂閱者)監聽(訂閱)這個主題,那么就會在消息到達時,同時收到消息
  • (訂閱者都會收到消息)

JMS(Java Message Service)JAVA消息服務:基于JVM消息代理的規范。ActiveMQ、HornetMQ是JMS實現

AMQP(Advanced Message Queuing Protocol)高級消息隊列協議,也是一個消息代理的規范,兼容JMS
RabbitMQ是AMQP的實現

在這里插入圖片描述

RabbitMQ概念

RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue Protocol)的開源實現。

Message

消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成, 這些屬性包括routing-key(路由鍵)、priority(相對于其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。

Publisher

消息的生產者,也是一個向交換器發布消息的客戶端應用程序。

Exchange

交換器,用來接收生產者發送的消息并將這些消息路由給服務器中的隊列。

Exchange有4種類型:direct(默認),fanout, topic, 和headers,不同類型的Exchange轉發消息的策略有所區別

Queue

消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直 在隊列里面,等待消費者連接到這個隊列將其取走。

Binding

綁定,用于消息隊列和交換器之間的關聯。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交 換器理解成一個由綁定構成的路由表。

Exchange 和Queue的綁定可以是多對多的關系。

Connection

網絡連接,比如一個TCP連接。

客戶端和消息中間件之間,一直保持一個長連接。1個客戶端只會建立1條連接

長連接的好處:客戶端宕機或下線,該長連接就會斷開,RabbitMQ就會感知到,就不會再繼續派發消息,可以防止大面積消息丟失問題。

Channel

(1條Connection上建立多個Channel,收發數據通過Channel進行)

信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內的虛擬連接,AMQP 命令都是通過信道 發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。

因為對于操作系統來說建立和銷毀 TCP 都 是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。

Consumer

消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。

Virtual Host

虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個Vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定、配置和權限機制。VHost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 /

虛擬主機跟虛擬主機之間是相互隔離的。可以用于生產環境和開發環境隔離等

類比,RabbitMQ類似一個多租戶系統,各個用戶相互隔離。每一個 RabbitMQ 服務器都能創建 虛擬的消息服務器,稱之為 虛擬主機(virtual host),簡稱 vhost。vhost 本質上是一個獨立的小型 RabbitMQ 服務器,vhost 可避免隊列和交換器等命名沖突,vhost 之間是絕對隔離的

Broker

表示消息隊列服務器實體

圖示

在這里插入圖片描述

在這里插入圖片描述

在這里插入圖片描述

安裝

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
docker update rabbitmq --restart=always 

在這里插入圖片描述

在這里插入圖片描述

  • https://www.rabbitmq.com/networking.html
  • 4369, 25672 (Erlang發現&集群端口)
  • 5672, 5671 (AMQP端口)
  • 15672 (web管理后臺端口)
  • 61613, 61614 (STOMP協議端口)
  • 1883, 8883 (MQTT協議端口)

在這里插入圖片描述

RabbitMQ運行機制

AMQP 中的消息路由

AMQP 中消息的路由過程和 Java 開發者熟悉的 JMS 存在一些差別,AMQP 中增加了 Exchange 和Binding 的角色。生產者把消息發布到 Exchange 上,消息最終到達隊列并被消費者接收,而 Binding 決定交換器的消息應該發送到那個隊列。

在這里插入圖片描述

Exchange類型

Exchange分發消息時根據類型的不同分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。headers 匹配 AMQP 的消息 header 而不是路由鍵,headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了,所以直接看另外三種類型:

在這里插入圖片描述

消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。

路由鍵與隊列名完全匹配,如果一個隊列綁定到交換機要求路由鍵為“dog”,則只轉發 routing key 標記為“dog”的消息,不會轉發“dog.puppy”,也不會轉發“dog.guard” 等等。它是完全匹配的模式。

在這里插入圖片描述

每個發到 fanout 類型交換器的消息都會分到所有綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息。

fanout 類型轉發消息是最快的。

在這里插入圖片描述

topic 交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。 它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點隔開。

它同樣也會識別兩個通配符:符號“#”和符號“*”,#匹配0個或多個單詞,*匹配1個單詞。

練習

在這里插入圖片描述

RabbitMQ整合

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring.rabbitmq.host=192.168.56.10
# 都有默認配置
#spring.rabbitmq.port=5672
#spring.rabbitmq.virtual-host=/
#spring.rabbitmq.username=guest
#spring.rabbitmq.password=guest
/*** 使用RabbitMQ* 1、引入amqp場景啟動器;RabbitAutoConfiguration 就會自動生效** 2、給容器中自動配置了*      RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate;*      所有的屬性都是 spring.rabbitmq*      @ConfigurationProperties(prefix = "spring.rabbitmq")*      public class RabbitProperties** 3、給配置文件中配置 spring.rabbitmq 信息** 4、@EnableRabbit: @EnableXxxxx;開啟功能** 5、監聽消息:使用@RabbitListener;必須有@EnableRabbit*    @RabbitListener: 類+方法上(監聽哪些隊列即可)*    @RabbitHandler:配合@RabbitListener標在方法上(重載區分不同的消息)*/@EnableRabbit
@Configuration
public class MyRabbitConfig {/*** 使用JSON序列化機制,進行消息轉換*/@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}

AmqpAdmin 管理組件

    @Testpublic void createExchange() {/*** DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)*/Exchange directExchange = new DirectExchange("hello-java-exchange", true, false);amqpAdmin.declareExchange(directExchange);log.info("Exchange[{}]創建成功", "hello-java-exchange");}@Testpublic void createQueue() {/*** public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)*/Queue queue = new Queue("hello-java-queue", true, false, false);amqpAdmin.declareQueue(queue);log.info("Queue[{}]創建成功", "hello-java-queue");}@Testpublic void createBinding() {/*** String destination【目的地】,* DestinationType destinationType【目的地類型】,* String exchange【交換機】,* String routingKey【路由鍵】,* Map<String, Object> arguments【自定義參數】** 將exchange指定的交換機和destination目的地進行綁定,使用routingKey作為指定的路由鍵*/Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE, "hello-java-exchange", "hello.java", null);amqpAdmin.declareBinding(binding);log.info("Binding[{}]創建成功", "hello-java-binding");}

RabbitTemplate 消息發送處理組件

    @Testpublic void sendMessageTest() {//1、發送消息,如果發送的消息是個對象,我們會使用序列化機制,將對象寫出去。對象必須實現SerializableString msg = "Hello World!";rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", msg);//      2、發送的對象類型的消息,可以自動轉成一個json【自定義MessageConverter即可】
//        OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
//        reasonEntity.setId(1L);
//        reasonEntity.setCreateTime(new Date());
//        reasonEntity.setName("哈哈-"+ 666);
//        rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java",
//                reasonEntity);log.info("消息發送完成{}");}

CorrelationData:消息的唯一ID【可以放在數據庫或者Redis中,方便之后判斷哪些消息沒有可靠抵達】

@RestController
public class RabbitController {@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/sendOneMq")public String sendOneMq(@RequestParam(value = "num", defaultValue = "10") Integer num) {OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();reasonEntity.setId(1L);reasonEntity.setCreateTime(new Date());reasonEntity.setName("哈哈");
//        CorrelationData:消息的唯一ID【可以放在數據庫或者Redis中,方便之后判斷哪些消息沒有可靠抵達】rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", reasonEntity, new CorrelationData(UUID.randomUUID().toString()));return "ok";}

監聽消息

@RabbitListener + @RabbitHandler (重載區分不同的消息)

    @GetMapping("/sendMq")public String sendMq(@RequestParam(value = "num", defaultValue = "10") Integer num) {for (int i = 0; i < num; i++) {if (i % 2 == 0) {OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();reasonEntity.setId(1L);reasonEntity.setCreateTime(new Date());reasonEntity.setName("哈哈-" + i);rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", reasonEntity, new CorrelationData(UUID.randomUUID().toString()));} else {OrderEntity entity = new OrderEntity();entity.setOrderSn(UUID.randomUUID().toString());rabbitTemplate.convertAndSend("hello-java-exchange", "hello22.java", entity, new CorrelationData(UUID.randomUUID().toString()));}}return "ok";}
@RabbitListener(queues = {"hello-java-queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {@RabbitHandlerpublic void receiveMessage(Message message,OrderReturnReasonEntity content,Channel channel) throws InterruptedException {
//        接收到消息...(Body:'{"id":1,"name":"哈哈-666","sort":null,"status":null,"createTime":1720438632794}' MessageProperties [headers={__TypeId__=com.atguigu.gulimall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1, consumerTag=amq.ctag-ltQ3IQ4H1lvLGqYNyEH3nQ, consumerQueue=hello-java-queue])System.out.println("@RabbitListener接收到消息..." + message);
//        System.out.println("接收到消息...content:"+content);
//        byte[] body = message.getBody();
//        //消息頭屬性信息
//        MessageProperties properties = message.getMessageProperties();
//        Thread.sleep(3000);
//        System.out.println("消息處理完成=>"+content.getName());}@RabbitHandlerpublic void receiveMessage2(OrderEntity content) throws InterruptedException {System.out.println("@RabbitHandler接收到消息..." + content);}}

@RabbitListener寫在方法上

@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {@RabbitListener(queues = {"hello-java-queue"})public void receiveMessage(OrderEntity content) throws InterruptedException {System.out.println("接收到消息..." + content);}}

注意

  1. Queue、Exchange、Binding可以 @Bean 注入進去
  2. 監聽消息的方法可以有三種參數(不分數量,順序):Object content, Message message, Channel channel
  3. channel可以用來拒絕或者簽收消息

RabbitMQ消息確認機制-可靠抵達

保證消息不丟失,可靠抵達,可以使用事務消息,這樣性能會下降250倍,為此引入確認機制:

  • publisher confirmCallback :確認模式
  • publisher returnCallback :未投遞到 queue 退回模式
  • consumer 手動 ack 機制

在這里插入圖片描述

發送端—ConfirmCallback

spring.rabbitmq.publisher-confirms=true

在創建 connectionFactory 的時候設置 PublisherConfirms(true) 選項,開啟confirmcallback 。

CorrelationData:用來表示當前消息唯一性。

消息只要被 broker 接收到就會執行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才會調用confirmCallback。

被 broker 接收到只能表示 message 已經到達服務器,并不能保證消息一定會被投遞到目標 queue 里。所以需要用到接下來的 returnCallback 。

發送端—ReturnCallback

spring.rabbitmq.publisher-returns=true

spring.rabbitmq.template.mandatory=true

confrim 模式只能保證消息到達 broker,不能保證消息準確投遞到目標 queue 里。在有些業務場景下,我們需要保證消息一定要投遞到目標 queue 里,此時就需要用到return 退回模式。

這樣如果消息未能投遞到目標 queue 里將調用 returnCallback ,可以記錄下詳細到投遞數據,定期的巡檢或者自動糾錯都需要這些數據。

發送端—代碼配置

#服務端確認
# 開啟消息正確抵達RabbitMQ確認
spring.rabbitmq.publisher-confirms=true
# 開啟RabbitMQ中的消息正確的從交換機投遞到隊列確認
spring.rabbitmq.publisher-returns=true
# 只要消息抵達隊列,以異步方式優先回調ReturnCallback
spring.rabbitmq.template.mandatory=true
@Configuration
public class MyRabbitConfig {/*** 使用JSON序列化機制,進行消息轉換*/@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}private RabbitTemplate rabbitTemplate;@Primary@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);this.rabbitTemplate = rabbitTemplate;rabbitTemplate.setMessageConverter(messageConverter());initRabbitTemplate();return rabbitTemplate;}/*** 定制RabbitTemplate*      做好消息確認機制(publisher,consumer【手動ack】)*      每一個發送的消息都在數據庫做好記錄。定期將失敗的消息再次發送一遍** 發送端確認:* 1、Broker收到消息就回調【P——>B】* 1、spring.rabbitmq.publisher-confirms=true* 2、設置確認回調ConfirmCallback* 2、消息沒有正確抵達隊列進行回調【E——>Q】* 1、spring.rabbitmq.publisher-returns=true*    spring.rabbitmq.template.mandatory=true* 2、設置確認回調ReturnCallback*** 消費端確認(保證每個消息被正確消費,此時broker才可以刪除這個消息)。*          spring.rabbitmq.listener.simple.acknowledge-mode=manual #手動簽收*          默認是自動確認的:只要消息接收到,客戶端會自動確認,服務端就會移除這個消息* 問題:*          我們收到很多消息,如果自動回復給服務器ack,當消息由于各種原因沒有成功處理完成,就會發生消息丟失;* 消費者手動確認模式:*          只要我們沒有明確告訴MQ消息被簽收,消息就一直是unacked狀態。*          即使Consumer宕機,消息也不會丟失,會重新變為Ready狀態,下一次有新的Consumer連接進來就發給他* 如何簽收:*          channel.basicAck(deliveryTag,false);業務成功,簽收*          channel.basicNack(deliveryTag,false,true);業務失敗,拒簽,并讓消息重新入隊;*/
//    @PostConstruct //MyRabbitConfig對象創建完成以后,也就是構造器執行完成后,執行這個方法public void initRabbitTemplate() {rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** P ——> B [Publisher ——> Broker]** 只要消息抵達Broker這里的ack就為true* @param correlationData 當前消息的唯一關聯數據(這個是消息的唯一id,發布者發消息的時候傳遞的)* @param ack  消息是否成功抵達Broker* @param cause 失敗的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {//Broker收到了:修改消息的狀態——>Broker接收到消息
//                System.out.println("confirm...correlationData[" + correlationData + "]==>ack[" + ack + "]==>cause[" + cause + "]");}});rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** E ——> Q [Exchange ——> Queue]** 只要消息沒有正確投遞給指定的隊列,就會觸發這個失敗回調* @param message   投遞失敗的消息的詳細信息* @param replyCode 回復的狀態碼* @param replyText 回復的文本內容* @param exchange  當時這個消息發給了哪個交換機* @param routingKey 當時這個消息指定的路由鍵*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {//報錯了:修改數據庫當前消息的狀態——>隊列接收消息發生錯誤
//                System.out.println("Fail Message[" + message + "]==>replyCode[" + replyCode + "]==>replyText[" + replyText + "]===>exchange[" + exchange + "]===>routingKey[" + routingKey + "]");}});}}

消費端

消費者獲取到消息,成功處理,可以回復Ack給Broker

  • basic.ack用于肯定確認;broker將移除此消息
  • basic.nack用于否定確認;可以指定broker是否丟棄此消息,可以批量
  • basic.reject用于否定確認;可以指定broker是否丟棄此消息,但不能批量

默認自動ack,消息被消費者收到,就會從broker的queue中移除

queue無消費者,消息依然會被存儲,直到消費者消費。消費者收到消息,默認會自動ack。但是如果無法確定此消息是否被處理完成, 或者成功處理。我們可以開啟手動ack模式spring.rabbitmq.listener.simple.acknowledge-mode=manual

  • 消息處理成功,ack(),接受下一個消息,此消息broker就會移除
  • 消息處理失敗,nack()/reject(),重新發送給其他人進行處理,或者容錯處理后ack,或者丟棄
  • 消息一直沒有調用ack/nack方法(比如宕機/程序出異常),broker認為此消息正在被處理,不會投遞給別人,此時客戶端斷開,MQ感知到后,消息不會被broker移除,會重新入隊并投遞給別人
# 客戶端確認:開啟手動ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
    /*** 消費端確認(保證每個消息被正確消費,此時broker才可以刪除這個消息)。*          spring.rabbitmq.listener.simple.acknowledge-mode=manual #手動簽收*          默認是自動確認的:只要消息接收到,客戶端會自動確認,服務端就會移除這個消息* 問題:*          我們收到很多消息,如果自動回復給服務器ack,當消息由于各種原因沒有成功處理完成,就會發生消息丟失;* 消費者手動確認模式:*          只要我們沒有明確告訴MQ消息被簽收,消息就一直是unacked狀態。*          即使Consumer宕機,消息也不會丟失,會重新變為Ready狀態,下一次有新的Consumer連接進來就發給他* 如何簽收:*          channel.basicAck(deliveryTag,false);業務成功,簽收*          channel.basicNack(deliveryTag,false,true);業務失敗,拒簽,并讓消息重新入隊;*/
    @RabbitListener(queues = {"hello-java-queue"})public void receiveMessage0(Message message,Channel channel) {
//         multiple:是否批量
//         requeue=false 丟棄  requeue=true 發回服務器,讓服務器重新入隊該消息。//                          long deliveryTag, boolean multiple
//         channel.basicAck(     deliveryTag,         false); 只簽收當前貨物,不批量簽收;//                          long deliveryTag, boolean multiple, boolean requeue
//         channel.basicNack(    deliveryTag,         false,            true); 拒簽當前貨物【是否將該消息讓MQ重新放入隊列,看自己的業務需求】//                          long deliveryTag, boolean requeue
//         channel.basicReject(  deliveryTag,         true); 拒簽當前貨物【是否將該消息重新放回MQ看自己的業務需求】//DeliveryTag在channel內按順序自增long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("deliveryTag==>" + deliveryTag);//簽收貨物(消息),非批量模式try {if (deliveryTag % 2 == 0) {//收貨channel.basicAck(deliveryTag, false);System.out.println("簽收了貨物"+ message +"...deliveryTag..." + deliveryTag);} else {//退貨 requeue=false 丟棄  requeue=true 發回服務器,服務器重新入隊。channel.basicNack(deliveryTag, false, true);
//                channel.basicReject(deliveryTag, true);//                channel.basicNack(deliveryTag, false, false);System.out.println("不簽收貨物"+ message +"...deliveryTag..." + deliveryTag);}} catch (Exception e) {//網絡中斷,簽收信息未成功發送給Broker}}

消息的TTL(Time To Live)

消息的TTL就是消息的存活時間。

RabbitMQ給消息設置 TTL

  1. 通過隊列設置:隊列中的消息都有相同的過期時間
  2. 給消息本身設置:每條消息的 TTL 可以不同

如果隊列設置了,消息也設置了,則最小的 TTL 生效。所以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設置)。

消息在隊列中生存時間一旦超過 TTL,就會變成死信(Dead Message)

什么是死信

  1. 一個消息被Consumer拒收了,并且reject方法的參數里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。(basic.reject/ basic.nack)requeue=false
  2. 消息的TTL到了,消息過期了。(沒有人消費它)
  3. 隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上

延時隊列

當消息在一個隊列中變成死信之后,它能被發送到一個指定的交換機中,這個交換機就是 DLX (Dead Letter Exchange),可稱為死信交換機。

綁定在 DLX 上的隊列就稱為死信隊列。

我們既可以控制消息在一段時間后變成死信,又可以控制變成死信的消息被路由到某一個指定的交換機,這個交換機又可以綁定隊列去消費死信,結合它們就可以實現一個延時隊列

不推薦使用給消息設置過期時間這種方式實現延時隊列

在這里插入圖片描述

在這里插入圖片描述

在這里插入圖片描述

延時隊列的實現

在這里插入圖片描述

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class MyMQConfig {//@Bean Binding,Queue,Exchange/*** 容器中的 Binding,Queue,Exchange 都會自動創建(RabbitMQ沒有的情況)* RabbitMQ中已有的話 @Bean中聲明屬性發生了變化也不會覆蓋*/@Beanpublic Queue orderDelayQueue() {Map<String,Object> arguments = new HashMap<>();/*** x-dead-letter-exchange: order-event-exchange* x-dead-letter-routing-key: order.release.order* x-message-ttl: 60000*/arguments.put("x-dead-letter-exchange","order-event-exchange");arguments.put("x-dead-letter-routing-key","order.release.order");arguments.put("x-message-ttl",60000);//String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> argumentsreturn new Queue("order.delay.queue", true, false, false,arguments);}@Beanpublic Queue orderReleaseOrderQueue() {return new Queue("order.release.order.queue", true, false, false);}@Beanpublic Exchange orderEventExchange() {//String name, boolean durable, boolean autoDelete, Map<String, Object> argumentsreturn new TopicExchange("order-event-exchange",true,false);}@Beanpublic Binding orderCreateOrderBinding() {//String destination, DestinationType destinationType, String exchange, String routingKey,//			Map<String, Object> argumentsreturn new Binding("order.delay.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);}@Beanpublic Binding orderReleaseOrderBinding() {return new Binding("order.release.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}}
import com.atguigu.gulimall.order.entity.OrderEntity;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.ResponseBody;import java.io.IOException;
import java.util.Date;
import java.util.UUID;@Controller
public class HelloController {@AutowiredRabbitTemplate rabbitTemplate;@RabbitListener(queues = {"order.release.order.queue"})public void testListenOrderRelease(OrderEntity order, Message message, Channel channel) throws IOException {System.out.println("listenOrderRelease order : " + order);System.out.println("listenOrderRelease message : " + message);System.out.println("listenOrderRelease channel : " + channel);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@ResponseBody@GetMapping("/test/createOrder")public String createOrderTest() {//訂單下單成功OrderEntity entity = new OrderEntity();entity.setOrderSn(UUID.randomUUID().toString());entity.setModifyTime(new Date());//給MQ發送消息。rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", entity);return "ok";}}

如何保證消息可靠性

消息丟失

消息丟失出現的原因:

  1. 消息發送出去,由于網絡問題沒有抵達MQ服務器(Broker)

    • 發送者做好容錯方法(try-catch),發送消息可能會網絡失敗,失敗后要有重試機制;
    • 做好記錄,每個消息發送出去后都應該記錄到數據庫
  2. 消息抵達Broker,Broker要將消息寫入磁盤(持久化)才算成功。此時Broker尚未持久化完成就宕機。

    • publisher必須加入確認回調機制,回調后,根據是否成功,修改數據庫消息的發送狀態。
    • 做好定期重發,定期去數據庫掃描未成功的消息進行重發
  3. 自動ACK的狀態下。消費者收到消息,但沒來得及處理完成消息宕機

    • 一定開啟手動ACK,消費成功才移除,失敗或者沒來得及處理就noAck并重新入隊

防止消息丟失:

  1. 做好消息確認機制(publisher,consumer【手動ack】)
  2. publisher將發送的消息都在數據庫做好記錄。定期從數據庫掃描發送失敗的消息,將它們再次發送

比如給數據庫創建如下表,用來記錄消息的發送狀態:

CREATE TABLE `mq_message`
(`message_id`     char(32) NOT NULL,`content`        text,`to_exchange`    varchar(255) DEFAULT NULL,`routing_key`    varchar(255) DEFAULT NULL,`class_type`     varchar(255) DEFAULT NULL,`message_status` int(1)       DEFAULT '0' COMMENT '0-新建  1-已發送 2-錯誤抵達  3-已抵達',`create_time`    datetime     DEFAULT NULL,`update_time`    datetime     DEFAULT NULL,PRIMARY KEY (`message_id`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8mb4

消息重復

  1. 消息消費失敗,由于重試機制,自動又將消息發送出去。【這種情況允許重復】
  2. 成功消費,手動ack時宕機或者網絡原因等等,消息由unack變為ready,Broker又重新發送
    • 將消費者的業務消費接口設計為冪等的即可。比如要解鎖庫存先判斷狀態
    • 使用防重表(redis/mysql),發送消息每一個都有業務的唯 一標識,處理過就不用處理
    • rabbitMQ的每一個消息都有redelivered字段,可以獲取是否是被重新投遞過來的,而不是第一次投遞過來的Boolean redelivered = message.getMessageProperties().getRedelivered();

消息積壓

  1. 消費者宕機積壓
  2. 消費者消費能力不足積壓
  3. 發送者發送流量太大
    • 上線更多的消費者,進行正常消費
    • 上線專門的消費服務,將消息先批量取出來,記錄數據庫,離線慢慢處理

MQ對比

在這里插入圖片描述

參考

雷豐陽: Java項目《谷粒商城》Java架構師 | 微服務 | 大型電商項目.


本文完,感謝您的關注支持!


本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/45500.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/45500.shtml
英文地址,請注明出處:http://en.pswp.cn/web/45500.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Django prefetch_related()方法

prefetch_related的作用 prefetch_related()是 Django ORM 中用于優化查詢性能的另一個重要方法&#xff0c;尤其在處理多對多&#xff08;ManyToMany&#xff09;關系和反向關系時非常有用。它允許你預加載相關對象&#xff0c;從而減少數據庫查詢次數。 1&#xff0c;創建應…

【香橙派】Orange pi AIpro開發板使用之一鍵部署springboot項目

前言 最近有幸收到一份新款 OrangePi AIpro 開發板&#xff0c;之前手里也搗鼓過一些板子&#xff0c;這次嘗試從零開始部署一個簡單的后端服務。OrangePi AIpro 采用昇騰AI技術路線&#xff0c;具體為4核64位處理器AI處理器&#xff0c;可配16GB內存容量&#xff0c;各種復雜應…

數字化賦能,加油小程序讓出行更便捷高效

在快節奏的現代生活中&#xff0c;每一次加油不僅是車輛續航的必要步驟&#xff0c;也成為了人們日常生活中不可或缺的一環。隨著科技的飛速發展&#xff0c;傳統加油模式正逐步向智能化、便捷化轉型&#xff0c;其中&#xff0c;加油小程序作為這股浪潮中的佼佼者&#xff0c;…

el-date-picker手動輸入日期,通過設置開始時間和階段自動填寫結束時間

需求&#xff1a;根據開始時間&#xff0c;通過填寫階段時長&#xff0c;自動填寫結束時間&#xff0c;同時開始時間和節數時間可以手動輸入 代碼如下&#xff1a; <el-form ref"ruleForm2" :rules"rules2" :model"formData" inline label-po…

B樹與B+樹的區別

B樹和B樹都是用于數據庫和文件系統的平衡樹數據結構&#xff0c;但它們有一些顯著的區別&#xff1a; 節點結構&#xff1a; B樹&#xff1a;每個節點存儲數據和指向子節點的指針。葉子節點也包含數據。 B樹&#xff1a;內部節點只存儲索引值&#xff0c;不存儲實際數據。所有…

yolov5 上手

0 介紹 YOLO(You Only Look Once)是一種流行的物體檢測和圖像分割模型&#xff0c;由華盛頓大學的約瑟夫-雷德蒙&#xff08;Joseph Redmon&#xff09;和阿里-法哈迪&#xff08;Ali Farhadi&#xff09;開發。YOLO 于 2015 年推出&#xff0c;因其高速度和高精確度而迅速受到…

人工智能算法工程師(中級)課程13-神經網絡的優化與設計之梯度問題及優化與代碼詳解

大家好&#xff0c;我是微學AI&#xff0c;今天給大家介紹一下人工智能算法工程師(中級)課程13-神經網絡的優化與設計之梯度問題及優化與代碼詳解。 文章目錄 一、引言二、梯度問題1. 梯度爆炸梯度爆炸的概念梯度爆炸的原因梯度爆炸的解決方案 2. 梯度消失梯度消失的概念梯度…

vue2中父組件向子組件傳值不更新視圖問題解決

1. 由于父組件更新了props里面的值, 但是子組件第一次接收后再修改沒有監聽到. 父組件修改值的時候使用this$set解決問題. 在 Vue 2 中&#xff0c;this.$set 通常用于更新數組中的特定元素。如果你想更新整個數組&#xff0c;可以直接賦值一個新的數組&#xff0c;或者你可以…

powerdesigner導出表數據庫設計文檔excel

1、連接數據庫&#xff0c;導出表結構的sql腳本 2、打開powerdesigner&#xff0c;生成項目空間表 sql腳本用第一步的腳本 3、用script腳本生成excel 腳本信息 Option Explicit Dim rowsNum rowsNum 0 -------------------------------------------------------------…

CV12_ONNX轉RKNN模型(諦聽盒子)

暫時簡單整理一下&#xff1a; 1.在邊緣設備上配置相關環境。 2.配置完成后&#xff0c;獲取模型中間的輸入輸出結果&#xff0c;保存為npy格式。 3.將onnx格式的模型&#xff0c;以及中間輸入輸出文件傳送到邊緣設備上。 4.編寫一個python文件用于轉換模型格式&#xff0c…

Git---git本地配置commit_template提交模板,規范開發

如何在Git中配置Commit Template以規范開發 在軟件開發過程中&#xff0c;規范化的提交信息&#xff08;commit messages&#xff09;對于項目的可維護性和協作效率至關重要。Git 提供了配置 commit template 的功能&#xff0c;允許開發者預設一個模板&#xff0c;用于在提交…

[iOS]內存分區

[iOS]內存分區 文章目錄 [iOS]內存分區五大分區棧區堆區全局區常量區代碼區驗證內存使用注意事項總結 函數棧堆棧溢出棧的作用 參考博客 在iOS中&#xff0c;內存主要分為棧區、堆區、全局區、常量區、代碼區五大區域 還記得OC是C的超類 所以C的內存分區也是一樣的 iOS系統中&a…

51單片機STC89C52RC——19.1 SG90舵機(伺服電機)

目的/效果 獨立按鍵K1&#xff0c;K2 實現加舵機減角度增減&#xff0c;LCD1602顯示舵機轉角度數&#xff08;上電默認90度&#xff09; 一&#xff0c;STC單片機模塊 二&#xff0c;SG90舵機 2.1 簡介 舵機只是我們通俗的叫法&#xff0c;它的本質是一個伺服電機&#xf…

react 案例的實現

先看一下如下效果 效果 這是一個 簡單的 效果 左邊是用戶名進行登錄 右邊是一個答題還有遮罩 相信大家還有剛剛創建好的 react 腳手架了&#xff0c;沒有的話可以運行以下命令 creact-react-app 項目名稱 把項目名稱四個字 改成 自己想要的一個名字 最好是英文的在 App.js中去…

python xpath常用代碼功能

1、從文件中讀取html內容&#xff0c;然后xpath加載 with open(FilePath, r,encodingutf8) as file:html file.read() tree etree.HTML(html) 2、基本定位語法 / 從根節點開始選取 /html/div/span // 從任意節點開始選取 //input . 選取當前節點 .…

Web開發:<br>標簽的作用

br作用 介紹基本用法常見用途注意事項使用CSS替代 介紹 在Web開發中&#xff0c;<br> 標簽是一個用于插入換行符的HTML標簽。它是“break”的縮寫&#xff0c;常用于需要在文本中強制換行的地方。<br> 標簽是一個空標簽&#xff0c;這意味著它沒有結束標簽。 基本…

Python小工具—txt轉excel和word

1.txt轉excel import openpyxl# 創建一個新的Excel工作簿 wb = openpyxl.Workbook() sheet = wb.active# 題干和答案的標題 sheet[A1] = 題干 sheet[B1] = 答案# 打開txt文件并讀取內容 with open(xiti.txt, r, encoding=utf-8) as file:lines = file.readlines()# 初始變量 c…

VisualTreeHelper.GetChildrenCount

在WPF&#xff08;Windows Presentation Foundation&#xff09;中&#xff0c;VisualTreeHelper.GetChildrenCount 是一個非常有用的方法&#xff0c;用于獲取指定視覺對象的子元素數量。這對于遍歷復雜的用戶界面樹結構以進行查找、操作或檢查特定元素是非常有幫助的。 Visu…

【java深入學習第7章】用 Spring Boot 和 Java Mail 輕松實現郵件發送功能

引言 在現代的企業應用中&#xff0c;郵件發送是一個非常常見的功能。無論是用戶注冊后的驗證郵件&#xff0c;還是系統通知郵件&#xff0c;郵件服務都扮演著重要的角色。本文將介紹如何在Spring Boot項目中整合Java Mail&#xff0c;實現發送郵件的功能。 一、準備工作 在…

【Ubuntu】安裝使用pyenv - Python版本管理

當我們在Ubuntu上使用Python進行開發的時候&#xff0c;可能會遇到版本不兼容的問題&#xff0c;當然你可以選擇使用apt的方式安裝不同版本的python環境 但是存在一定的問題&#xff1a;安裝不同版本的Python通常不會改變默認的python3命令指向的版本&#xff0c;而且就算你進行…