谷粒商城篇章9 ---- P248-P261/P292-P294 ---- 消息隊列【分布式高級篇六】

目錄

1 消息隊列(Message Queue)簡介

1.1 概述

1.2 消息服務中兩個重要概念

1.3 消息隊列主要有兩種形式的目的地

1.4 JMS和AMQP對比

1.5 應用場景

1.6? Spring支持

1.7 SpringBoot自動配置

1.7 市面上的MQ產品

2 RabbitMQ

2.1 RabbitMQ簡介

2.1.1 RabbitMQ簡介

2.1.2 核心概念

2.2 docker安裝RabbitMQ

2.3 RabbitMQ管理操作頁面介紹

2.3.1 Overview概述

2.3.2 Connections連接信息

2.3.3 Channels信道信息

2.3.4 Exchanges交換機信息

2.3.5 Queues隊列信息

2.3.6 Admin用戶信息

2.4 RabbitMQ運行機制

2.5 交換機(Exchange)類型

2.5.1 Direct Exchange(直連式)

2.5.2 Fanout Exchange(扇出/廣播式)

2.5.3 Topic Exchange(主題/發布訂閱式)?

2.6 收發消息測試

2.6.1 exchange.direct發送消息

2.6.1.1 exchange.direct綁定隊列

2.6.1.2 exchange.direct發送消息

2.6.2 exchange.fanout發送消息

2.6.2.1?exchange.fanout綁定隊列

2.6.2.2?exchange.fanout發送消息

2.6.3?exchange.topic發送消息

2.6.3.1?exchange.topic綁定隊列

2.6.3.2?exchange.topic發送消息

3 SpringBoot整合RabbitMQ

3.1 引入依賴

3.2 yml配置

3.3 開啟RabbitMQ

3.4 整合測試

3.4.1 AmqpAdmin使用

3.4.1.1 創建交換機

3.4.1.2 創建隊列

3.4.1.3 創建綁定關系

3.4.2?RabbitTemplate使用

3.4.2.1 發送消息

3.4.2.1.1 使用json序列化對象

3.4.3 RabbitListener & RabbitHandler接收消息

3.4.3.1 RabbitListener監聽接收消息

3.4.3.1.1 簡單接收消息

3.4.3.1.2 接收消息內容并反序列化

3.4.3.1.3 完整參數寫法

3.4.3.1.4 驗證多個消費者監聽場景

3.4.3.2 RabbitListener和RabbitHandler監聽接收消息

3.4.3.2.1 @RabbitListener和@RabbitHandler的區別與作用

3.4.3.2.2 測試重載處理不同類型的消息

3.5 消息可靠抵達

3.5.1 發送端確認

3.5.1.1 ConfirmCallback(確認模式)

3.5.1.2 ReturnCallback(回退模式)

3.5.1.3 消息唯一id

3.5.2 消費端確認(Ack確認機制)

3.5.2.1 消費端自動確認

3.5.2.2?消費端手動確認

3.5.2.3 消費端退貨

4 RabbitMQ延時隊列(實現定時任務)

4.1 為什么不使用定時任務?

4.2 使用場景

4.3 消息的存活時間TTL(Time To Live)

4.4 DXL和死信隊列

4.5 延時隊列實現

4.5.1 設置隊列過期時間實現延時隊列

4.5.2 設置消息過期時間實現延時隊列

4.6 延時隊列定時關單模擬

4.6.1 實現方式

4.6.1.1 基礎版

4.6.1.2 升級版

4.6.2 實現

4.6.2.1 創建Queue、Exchange、Binding方式

4.6.2.2 發送消息相關代碼

?4.6.2.3 監聽接收消息相關代碼

4.6.3 測試


1 消息隊列(Message Queue)簡介

1.1 概述

? ? ? ? 大多應用,可以通過消息服務中間件來提升系統異步通信和擴展解耦能力。

1.2 消息服務中兩個重要概念

? ? ? ? 消息代理(message broker)目的地(destination)

1.3 消息隊列主要有兩種形式的目的地

  • 隊列(queue):點對點消息通信(point-to-point);
  • 主題(topic):發布(publish)/訂閱(subscribe)消息通信。

點對點模式和發布訂閱模式區別:

(1)點對點式:

  • 消息發送者發送消息,消息代理將其放入一個隊列中,消息接收者從隊列中獲取消息內容,消息讀取后被移除隊列。
  • 消息只有唯一的發送者和接受者(只能有一個接收者讀取信息) ,但不是說只有一個接收者。

(2)發布訂閱式:

  • 發送者(發布者)發送消息到主題,多個接收者(訂閱者)監聽(訂閱)這個主題,那么就會在消息到達時同時收到消息。

1.4 JMS和AMQP對比

(1)JMS

  • 是Java消息服務的規范
  • 基于jvm消息代理的規范
  • 用@JmsListener監聽消息
  • JMS是java提供的一套消息服務API標準,其目的是為所有的java應用程序提供統一的消息通信的標準,類似java的 jdbc,只要遵循jms標準的應用程序之間都可以進行消息通信。
  • ActiveMQ、HornetMQ是JMS實現的。

(2)AMQP(Advacnce Message Queuing Protocol)

  • 高級消息隊列協議,是一個消息代理的規范,兼容JMS
  • RabbitMQ 是AMQP的實現
  • 用@RabbitListener(AMQP)監聽消息
  • 它和AMQP有什么 不同,jms是java語言專屬的消息服務標準,它是在api層定義標準,并且只能用于java應用;而AMQP是在協議層定義的標準,是跨語言的 。
  • RabbitMQ是AMQP的實現。

1.5 應用場景

????????在實際應用中常用的場景:異步處理、應用解耦、流量削峰和消息通訊四個場景。

1.6? Spring支持

  • spring-jms提供了對JMS的支持
  • spring-rabbit提供了對AMQP的支持
  • 需要ConnectionFactory的實現來連接消息代理
  • 提供JmsTemplate、RabbitTemplate來發送消息
  • JmsListener(JMS).@RabbitListener(AMQP)注解在方法上監聽消息代理發布的消息
  • EnableJms、@EnableRabbit開啟支持

1.7 SpringBoot自動配置

  • JmsAutoConfiguration
  • RabbitAutoConfiguration

1.7 市面上的MQ產品

? ? ? ? ActiveMQ、RabbitMQ、RocketMQ、Kafka。

2 RabbitMQ

2.1 RabbitMQ簡介

RabbitMQ官方文檔:Networking and RabbitMQ — RabbitMQ

2.1.1 RabbitMQ簡介

? ? ? ? RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue Protocol)。

2.1.2 核心概念

  • Message:消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭是由一些可選屬性組成,這些屬性包含routing-key(路由鍵)、priority(相對于其他消息的優先權)、delivery-mode(指出該消息可能需要持久性存儲)等。
  • Publisher:生產者,也是一個向交換機發布消息的客戶端應用程序。
  • Exchange:交換機,用來接收生產者發送的消息并將這些消息路由給服務器中的隊列。Exchange有4種類型:direct(默認)、 fanout、topic、headers,不同類型的Exchange轉發消息的策略有所區別。
  • Queue:消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直 在隊列里面,等待消費者連接到這個隊列將其取走。
  • Binding:綁定,用于消息隊列和交換器之間的關聯。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規則,所以可以將交 換器理解成一個由綁定構成的路由表。Exchange 和Queue的綁定可以是多對多的關系。
  • Connection:網絡連接,比如一個TCP連接。
  • Channel:信道多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內的虛擬連接,AMQP 命令都是通過信道 發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于操作系統來說建立和銷毀 TCP 都 是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。
  • Consumer:信息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
  • Virtual Host:虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加 密環境的獨立服務器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁 有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時 指定,RabbitMQ 默認的 vhost 是 / 。
  • Broker:表示消息隊列服務器實體。

拓展: RabbitMQ為什么使用信道而不直接使用TCP連接通信?

原因:TCP連接的創建和銷毀開銷特別大。創建需要 3 次握手,銷毀需要 4 次揮手。高峰時每秒成千上萬條TCP連接的創建會照成資源巨大浪費。而且操作系統每秒處理TCP連接數也是有限制的,會造成性能瓶頸。而如果一條線程使用一條信道,一條TCP連接可以容納無限的信道,即使每秒成千上萬的請求也不會照成性能瓶頸。 ?

2.2 docker安裝RabbitMQ

# 下載影像
docker pull rabbitmq:3.9.11-management# 安裝
docker run -d --name=rabbitmq --restart=always -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 rabbitmq:3.9.11-management
  • 4369、25672:Erlang發現和集群端口

  • 5671、5672:AMQP端口

  • 15672:web管理后臺端口

  • 61613、61614:STOMP協議端口

  • 1883、8883:MQTT協議接口

rabbitmq的虛擬主機以路徑來區分,例如:/、/a、/b。虛擬主機之間是相互隔離的。

2.3 RabbitMQ管理操作頁面介紹

2.3.1 Overview概述

2.3.2 Connections連接信息

2.3.3 Channels信道信息

2.3.4 Exchanges交換機信息

2.3.5 Queues隊列信息

2.3.6 Admin用戶信息

2.4 RabbitMQ運行機制

AMQP中的消息路由

  • AMQP中消息的路由過程和Java開發者熟悉的JMS存在一些差別,AMQP中增加了ExchangeBinding的角色。生產者把消息發布到Exchange上,消息最終到達隊列并被消費者接收,Binding決定交換機將消息發送到哪個隊列。

2.5 交換機(Exchange)類型

????????Exchange分發消息時根據類型的不同分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由鍵,headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了,所以直接看另外三種類型:

2.5.1 Direct Exchange(直連式)

????????消息中的路由鍵(routing key)如果和Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名完全匹配,如果一個隊列綁定到交換機要求路由鍵為“dog”,則只轉發 routingkey 標記為“dog”的消息,不會轉發“dog.puppy”,也不會轉發“dog.guard”等等。它是完全匹配、單播的模式。

2.5.2 Fanout Exchange(扇出/廣播式)

????????每個發到 fanout 類型交換器的消息都會分到所有綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息。fanout 類型轉發消息是最快的。

2.5.3 Topic Exchange(主題/發布訂閱式)?

????????topic 交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點隔開它同樣也會識別兩個通配符:符號“#”和符號“*”。#匹配0個或多個單詞,*匹配一個單詞。

2.6 收發消息測試

????????按照如下圖,新建交換機、隊列進行測試。

(1)新建交換機,如下:

(2)新建隊列,如下:

2.6.1 exchange.direct發送消息

2.6.1.1 exchange.direct綁定隊列

2.6.1.2 exchange.direct發送消息

只有atguigu隊列拿到消息,直連型路由鍵需要完全匹配。?

?Ask Mode:

1. Nack message requeue true:接收消息但不做確認,消息會重新加入隊列;

2. Automatic ack:獲取消息,應答確認,消息不重新入隊,將會從隊列中刪除;

3. Reject requeue true:拒絕消息,消息會重新加入隊列;

4.?Reject requeue false:拒絕消息,消息會被從隊列中刪除。

2.6.2 exchange.fanout發送消息

2.6.2.1?exchange.fanout綁定隊列

2.6.2.2?exchange.fanout發送消息

?所有與exchange.fanout交換機綁定的隊列都會收到消息,這就是扇出型交換機。

2.6.3?exchange.topic發送消息

2.6.3.1?exchange.topic綁定隊列

2.6.3.2?exchange.topic發送消息

(1)

(2)?

?atguigu、atguigu.emps、atguigu.news這三個隊列收到了消息。“#”匹配0個或多個單詞。

3 SpringBoot整合RabbitMQ

3.1 引入依賴

gulimall-order/pom.xml

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.2 yml配置

gulimall-order/src/main/resources/application.yml

spring:rabbitmq:host: 172.1.11.10port: 5672virtual-host: /

3.3 開啟RabbitMQ

相關注解 @EnableRabbit,監聽消息必須使用,收發消息可以不使用該注解。

gulimall-order/src/main/java/com/wen/gulimall/order/GulimallOrderApplication.java

@EnableRabbit
@SpringBootApplication
public class GulimallOrderApplication {public static void main(String[] args) {SpringApplication.run(GulimallOrderApplication.class, args);}}

3.4 整合測試

3.4.1 AmqpAdmin使用

3.4.1.1 創建交換機

(1)交換機類型

(2)創建交換機

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@Slf4j
@SpringBootTest
class GulimallOrderApplicationTests {@Resourceprivate AmqpAdmin amqpAdmin;@Testvoid createExchange(){// 聲明交換機/*** DirectExchange(String name,         【交換機名稱】* boolean durable,                    【是否持久化】* boolean autoDelete,                 【是否自動刪除】* Map<String, Object> arguments)      【自定義參數】*/DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);amqpAdmin.declareExchange(directExchange);log.info("Exchange[{}]創建成功","hello-java-exchange");}
}

3.4.1.2 創建隊列
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@Slf4j
@SpringBootTest
class GulimallOrderApplicationTests {@Resourceprivate AmqpAdmin amqpAdmin;@Testvoid createQueue(){// 創建隊列/*** Queue(String name,                           【隊列名稱】* boolean durable,                             【是否持久化】* boolean exclusive,                           【是否排他(只能被一個consumer連接占用)】* boolean autoDelete,                          【是否自動刪除】* @Nullable Map<String, Object> arguments)     【自定義參數】*/Queue queue = new Queue("hello-java-queue",true,false,false);amqpAdmin.declareQueue(queue);log.info("Queue[{}]創建成功","hello-java-queue");}
}

3.4.1.3 創建綁定關系
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@Slf4j
@SpringBootTest
class GulimallOrderApplicationTests {@Resourceprivate AmqpAdmin amqpAdmin;@Testpublic void createBinding(){// 創建綁定/*** Binding(String destination,                  【目的地,隊列name或交換機name】* Binding.DestinationType destinationType,     【目的地類型,queue還是exchange(路由)】* String exchange,                             【交換機】* String routingKey,                           【路由鍵】* @Nullable Map<String, Object> arguments)     【自定義參數】** 將exchange指定交換機和destination目的地進行綁定,使用routingKey作為路由鍵*/Binding binding = new Binding("hello-java-queue",Binding.DestinationType.QUEUE,"hello-java-exchange","hello-java",null);amqpAdmin.declareBinding(binding);log.info("Binding[{}]創建成功","hello-java");}
}

3.4.2?RabbitTemplate使用

3.4.2.1 發送消息

如果發送的消息是個對象,我們會使用序列化機制,將對象寫出。對象必須實現Serializable接口,例如:

@Slf4j
@SpringBootTest
class GulimallOrderApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid sendMessage(){/*** convertAndSend(String exchange, 【交換機】* String routingKey,              【路由鍵】* Object object)                  【發送的信息】*/String msg = "hjhajkxdhjashj哈哈哈哈哈";OrderReturnReasonEntity entity = new OrderReturnReasonEntity();entity.setCreateTime(new Date());entity.setId(2L);entity.setName("多多");//1. 發送消息,如果發送的消息是個對象,我們會使用序列化機制,將對象寫出。對象必須實現SerializablerabbitTemplate.convertAndSend("hello-java-exchange","hello-java",entity);log.info("發送消息【{}】成功",entity);}
}

?隊列收到的消息,如下:

3.4.2.1.1 使用json序列化對象

通過源碼分析,如果容器中沒有MessageConverter默認使用使用SimpleMessageConverter

這里自定義RabbitMQ配置使用Jackson2JsonMessageConverter 消息轉換器

gulimall-order/src/main/java/com/wen/gulimall/order/config/MyRabbitConfig.java

@Configuration
public class MyRabbitConfig {@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}

重新發送消息測試:

@Slf4j
@SpringBootTest
class GulimallOrderApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid sendMessage(){/*** convertAndSend(String exchange, 【交換機】* String routingKey,              【路由鍵】* Object object)                  【發送的信息】*/String msg = "hjhajkxdhjashj哈哈哈哈哈";OrderReturnReasonEntity entity = new OrderReturnReasonEntity();entity.setCreateTime(new Date());entity.setId(2L);entity.setName("多多");//1. 發送消息,如果發送的消息是個對象,我們會使用序列化機制,將對象寫出。對象必須實現SerializablerabbitTemplate.convertAndSend("hello-java-exchange","hello-java",entity);log.info("發送消息【{}】成功",entity);}
}

測試結果如下:

3.4.3 RabbitListener & RabbitHandler接收消息

3.4.3.1 RabbitListener監聽接收消息

監聽消息使用注解@RabbitListener,該注解標注在方法上,使用該注解的前提需要主類上使用注解@EnableRabbit。

3.4.3.1.1 簡單接收消息

使用注解@RabbitListener,并通過屬性queues聲明需要監聽的所有隊列。

gulimall-order/src/main/java/com/wen/gulimall/order/service/impl/OrderServiceImpl.java

@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Object msg){System.out.println("接收到消息...內容:"+msg+"==>類型:"+msg.getClass());
}

監聽結果:

接收到消息...內容:(Body:'[B@17ee5a8a(byte[77])' MessageProperties [headers={__TypeId__=com.wen.gulimall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=hello-java-exchange, receivedRoutingKey=hello-java, deliveryTag=1, consumerTag=amq.ctag-K2kLpChLRK15TemEJ4XRBQ, consumerQueue=hello-java-queue])==>類型:class org.springframework.amqp.core.Message
3.4.3.1.2 接收消息內容并反序列化

由上一步接收到的消息內容可知消息的類型是org.springframework.amqp.core.Message,將需要反序列化的類放在第二個參數的位置可以將消息內容自動封裝成對象。(消息接收對象類型要和消息發送對象類型保持一致

第一個參數:Message -- 原生消息詳細信息,消息頭+消息體。

第二個參數:T<發送消息的類型>

@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage2(Message msg, OrderReturnReasonEntity content){// 消息體byte[] body = msg.getBody();// 消息頭信息MessageProperties messageProperties = msg.getMessageProperties();System.out.println("接收到消息...:"+msg+"==>內容:"+content);
}

監聽結果:

接收到消息...:(Body:'[B@2ec3c5a5(byte[77])' MessageProperties [headers={__TypeId__=com.wen.gulimall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=hello-java-exchange, receivedRoutingKey=hello-java, deliveryTag=1, consumerTag=amq.ctag-vuaHxsngkQHP8vWIUy6oGA, consumerQueue=hello-java-queue])==>內容:OrderReturnReasonEntity(id=2, name=多多, sort=null, status=null, createTime=Thu Jan 25 14:32:30 CST 2024)

3.4.3.1.3 完整參數寫法

參數類型:

1) Message message:原生消息詳細信息,消息頭+消息體。

2)T<發送消息的類型> :OrderReturnReasonEntity content。

3) Channel channel:當前傳輸數據的通道。(一個連接可以容納多個通道)

@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage3(Message msg,OrderReturnReasonEntity content,Channel channel){log.info("信道Channel:{}",channel);// 消息體byte[] body = msg.getBody();// 消息頭信息MessageProperties messageProperties = msg.getMessageProperties();System.out.println("接收到消息...:"+msg+"==>內容:"+content);
}

監聽結果:

3.4.3.1.4 驗證多個消費者監聽場景

Queue可以由多個消費者監聽,只能有一個消費者接收到消息。只要收到消息,隊列刪除該消息。

場景:

1)啟動多個訂單服務,同一個消息,只有一個客戶端接收到;

2)只有一個消息處理完,方法運行結束,才可以接收到下一個消息。

模擬發送10條消息。?

gulimall-order/src/test/java/com/wen/gulimall/order/GulimallOrderApplicationTests.java

@Slf4j
@SpringBootTest
class GulimallOrderApplicationTests {@Resourceprivate AmqpAdmin amqpAdmin;@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid sendMessage(){for(int i=0;i<10;i++) {OrderReturnReasonEntity entity = new OrderReturnReasonEntity();entity.setCreateTime(new Date());entity.setId(2L);entity.setName("多多"+i);//1. 發送消息,如果發送的消息是個對象,我們會使用序列化機制,將對象寫出。對象必須實現SerializablerabbitTemplate.convertAndSend("hello-java-exchange", "hello-java", entity);log.info("發送消息【{}】成功", entity);}}
}

復制一個訂單模塊,并啟用。

監聽結果:

每個客戶端都是接收同一個隊列,一條消息只能由一個客戶端接收,沒有重復消息。

在此期間有兩個客戶端有兩個連接,一個客戶端只能有一個連接,一個連接可以有多個信道(channel)。

3.4.3.2 RabbitListener和RabbitHandler監聽接收消息
3.4.3.2.1 @RabbitListener和@RabbitHandler的區別與作用
  • @RabbitListener:標注在類或方法上。【作用:監聽從哪些隊列接收消息】
  • @RabbitHandler:標注在方法上。【作用:重載區分不同類型的消息】
3.4.3.2.2 測試重載處理不同類型的消息

發送消息接口,發送兩種不同類型的消息

gulimall-order/src/main/java/com/wen/gulimall/order/controller/RabbitController.java

@RestController
public class RabbitController {@Resourceprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMq")public String sendMq(Integer num){for(int i=0;i<num;i++){if(i%2==0){OrderReturnReasonEntity entity = new OrderReturnReasonEntity();entity.setCreateTime(new Date());entity.setId(2L);entity.setName("多多"+i);//1. 發送消息,如果發送的消息是個對象,我們會使用序列化機制,將對象寫出。對象必須實現SerializablerabbitTemplate.convertAndSend("hello-java-exchange", "hello-java", entity);}else {OrderEntity orderEntity = new OrderEntity();orderEntity.setOrderSn(UUID.randomUUID().toString());rabbitTemplate.convertAndSend("hello-java-exchange", "hello-java", orderEntity);}}return "ok";}
}

重載方法監聽接收不同類型的消息

gulimall-order/src/main/java/com/wen/gulimall/order/service/impl/OrderServiceImpl.java

@RabbitListener(queues = {"hello-java-queue"})
@Slf4j
@Service("orderService")
public class OrderServiceImpl extends ServiceImpl<OrderDao, OrderEntity> implements OrderService {//@RabbitListener(queues = {"hello-java-queue"})@RabbitHandlerpublic void receiveMessage3(Message msg,OrderReturnReasonEntity content,Channel channel) throws InterruptedException {//log.info("信道Channel:{}",channel);log.info("接收到消息..."+content);// 消息體//byte[] body = msg.getBody();消息頭信息//MessageProperties messageProperties = msg.getMessageProperties();Thread.sleep(3000);//System.out.println("消息處理完成=>"+content.getName());}@RabbitHandlerpublic void receiveMessage3(Message msg,OrderEntity content,Channel channel) {log.info("接收到消息..."+content);}
}

?重啟訂單服務,請求http://localhost:9000/sendMq?num=10發送消息測試,通過@RabbitListener(queues={"hello-java-queue"})聲明隊列,@RabbitHandler重載用于接收同一個隊列不同類型的數據。

注意:接收同一個隊列不同類型的數據并自動封裝成相應的對象,單使用@RabbitListener是無法實現的,需要@RabbitListener和@RabbitHandler搭配使用。如上。

測試結果:?

3.5 消息可靠抵達

官網參考文檔:https://www.rabbitmq.com/=> Docs => Server Documentation => Reliable Delivery

1. 為什么不適用事務消息:保證消息不丟失,可靠抵達,因為使用事務消息會使性能下降250倍,為此引入確認機制。

2. 發送端確認

  • publisher:confirmCallback 確認模式;
  • publisher:returnCallback 未投遞到 queue 退回模式。

3. 消費端確認

  • consumer:ack機制。

3.5.1 發送端確認

https://www.rabbitmq.com/ => Docs => Server Documentation => Reliable Delivery =>
Acknowledgements and Confirms => Publisher confirms
3.5.1.1 ConfirmCallback(確認模式)

1. 開啟 p->e 的發送確認配置,我這里使用的是SpringBoot2.7.8之前的配置已經廢棄,這里使用spring.rabbitmq.publisher-confirm-type=correlated替換spring.rabbitmq.publisher-confirms=true

2. 發送端確認類型spring.rabbitmq.publisher-confirm-type有3種,如下

  • none值:是禁用發布確認模式,是默認值。
  • correlated值:是發布消息成功到交換器后會觸發回調方法。
  • simple值:經測試有兩種效果,其一效果和CORRELATED值一樣會觸發回調方法,其二在發布消息成功后使用rabbitTemplate調用waitForConfirms或waitForConfirmsOrDie方法等待broker節點返回發送結果,根據返回結果來判定下一步的邏輯,要注意的點是waitForConfirmsOrDie方法如果返回false則會關閉channel,則接下來無法發送消息到broker。

開啟配置

gulimall-order/src/main/resources/application.yml

spring:rabbitmq:host: 172.xx.xx.xxport: 5672virtual-host: /publisher-confirm-type: correlated

自定義RabbitTemplate,設置確認回調

gulimall-order/src/main/java/com/wen/gulimall/order/config/MyRabbitConfig.java

@Configuration
public class MyRabbitConfig {@Resourceprivate RabbitTemplate rabbitTemplate;@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}/*** 定制RabbitTemplate* 1.服務收到消息就回調*      1)spring.rabbitmq.publisher-confirms=true(這里使用spring.rabbitmq.publisher-confirm-type=correlated)*      2)設置確認回調*  注意:springboot rabbitmq 中 spring.rabbitmq.publisher-confirms 已經失效。需要使用 spring.rabbitmq.publisher-confirm-type 替代*  publisher-confirm-type有3種:*  1)NONE值是禁用發布確認模式,是默認值*  2)CORRELATED值是發布消息成功到交換器后會觸發回調方法*  3)SIMPLE值經測試有兩種效果,其一效果和CORRELATED值一樣會觸發回調方法,其二在發布消息成功后使用rabbitTemplate調用waitForConfirms或waitForConfirmsOrDie方法等待broker節點返回發送結果,根據返回結果來判定下一步的邏輯,要注意的點是waitForConfirmsOrDie方法如果返回false則會關閉channel,則接下來無法發送消息到broker;*/@PostConstruct //MyRabbitConfig對象創建完成之后,執行這個方法public void initRabbitTemplate(){// 設置確認回調rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/***  只要消息抵達Broker就ack=true* @param correlationData 當前消息的唯一關聯數據(這個是消息的唯一id)* @param b (ack) 消息是否成功收到* @param s (cause) 失敗的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("confirm...correlationData["+correlationData+"]==>ack["+b+"]==>cause["+s+"]");}});}
}

測試

請求http://localhost:9000/sendMq?num=10?,看測試結果,只要消息抵達Broker就ack=true,如下:

3.5.1.2 ReturnCallback(回退模式)

開啟?e->q?的確認配置,消息不能由交換機投遞到目標隊列將調用returnCallback。

gulimall-order/src/main/resources/application.yml

server:port: 9000
spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://172.xx.xx.xx:9906/gulimall_omsusername: rootpassword: rootrabbitmq:host: 172.xx.xx.xxport: 5672virtual-host: /# 開啟發送端確認publisher-confirm-type: correlated# 開啟發送端消息抵達隊列的確認,默認是falsepublisher-returns: true# 只要消息抵達隊列,以異步發送優先回調我們的returnConfirmtemplate:mandatory: trueredis:host: 172.xx.xx.xxcloud:nacos:discovery:server-addr: 172.xx.xx.xx:8848main:allow-circular-references: true
mybatis-plus:mapper-locations: classpath:/mapper/**/*.xmlglobal-config: # 全局配置db-config:id-type: auto # 主鍵自增

?設置消息抵達隊列的確認回調

@Configuration
public class MyRabbitConfig {@Resourceprivate RabbitTemplate rabbitTemplate;@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}/*** 定制RabbitTemplate* 1.服務收到消息就回調*      1)spring.rabbitmq.publisher-confirms=true(這里使用spring.rabbitmq.publisher-confirm-type=correlated)*      2)設置確認回調 ConfirmCallback* 2. 消息抵達隊列就回調*      1)spring.rabbitmq.publisher-returns=true*          spring.rabbitmq.template.mandatory=true*      2)設置消息抵達隊列的確認回調 ReturnCallback*  注意:springboot rabbitmq 中 spring.rabbitmq.publisher-confirms 已經失效。需要使用 spring.rabbitmq.publisher-confirm-type 替代*  publisher-confirm-type有3種:*  1)NONE值是禁用發布確認模式,是默認值*  2)CORRELATED值是發布消息成功到交換器后會觸發回調方法*  3)SIMPLE值經測試有兩種效果,其一效果和CORRELATED值一樣會觸發回調方法,其二在發布消息成功后使用rabbitTemplate調用waitForConfirms或waitForConfirmsOrDie方法等待broker節點返回發送結果,根據返回結果來判定下一步的邏輯,要注意的點是waitForConfirmsOrDie方法如果返回false則會關閉channel,則接下來無法發送消息到broker;*/@PostConstruct //MyRabbitConfig對象創建完成之后,執行這個方法public void initRabbitTemplate(){// 設置確認回調rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/***  只要消息抵達Broker就ack=true* @param correlationData 當前消息的唯一關聯數據(這個是消息的唯一id)* @param b (ack) 消息是否成功收到* @param s (cause) 失敗的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {System.out.println("confirm...correlationData["+correlationData+"]==>ack["+b+"]==>cause["+s+"]");}});// 設置消息抵達隊列的確認回調rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/*** 只要消息沒有投遞給指定的隊列,就觸發這個失敗回調* @param message the returned message. 投遞失敗的消息詳細信息* @param replyCode the reply code.     回復的狀態碼* @param replyText the reply text.     回復的文本內容* @param exchange the exchange.        當時這個消息接收的交換機* @param routingKey the routing key.   當時這個消息用的哪個路由鍵*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");}});}
}

測試:

該模式需要消息投遞到指定隊列失敗才會觸發,這里將路由鍵故意改錯便于測試,測試結束后將錯誤改回來。

錯誤消息:

Fail Message[(Body:'[B@f04a8cf(byte[855])' MessageProperties [headers={__TypeId__=com.wen.gulimall.order.entity.OrderEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])]==>replyCode[312]==>replyText[NO_ROUTE]==>exchange[hello-java-exchange]==>routingKey[hello-java333]

3.5.1.3 消息唯一id

在發送消息時參數CorrelationData就是消息的唯一id。

?這個id在發送端確認時是可以拿到的,可以用于排查哪些消息為成功抵達(與消費端接收到存放在數據庫中的消息唯一id進行對比)。

3.5.2 消費端確認(Ack確認機制)

3.5.2.1 消費端自動確認

1)消費端確認,默認是自動確認的,只要消息收到,客戶端會自動確認,服務端就會移除這個消息。

2)消費端自動確認存在問題:

????????在接收消息這里打上斷點,在處理完第一個消息后,關掉服務器,模擬突發狀況,服務器宕機,如下:

關掉服務器后,剩余未處理的4個消息也消失不見,未經過消費端處理,相當于消息丟失。?

3.5.2.2?消費端手動確認

????????為了解決消息自動確認,遇到突發狀況造成消息丟失問題,可以使用手動確認。 只有手動確認的消息才能被隊列移除。

?開啟消費端手動確認配置

gulimall-order/src/main/resources/application.yml

spring:rabbitmq:host: 172.1.11.10port: 5672virtual-host: /# 開啟發送端確認publisher-confirm-type: correlated# 開啟發送端消息抵達隊列的確認,默認是falsepublisher-returns: true# 只要消息抵達隊列,以異步發送優先回調我們的returnConfirmtemplate:mandatory: true# 開啟消費端手動確認listener:simple:acknowledge-mode: manual

1. 消息抵達客戶端,不手動ack測試?

發送五個消息進行測試,客戶端就算拿到消息不做消息抵達確認,隊列不能移除未確認的消息,只是消息的狀態有ready變為unacked,關閉客戶端服務器后消息的狀態又由unacked變為ready,下次重啟客戶端服務器又可以接收消息。

關閉客戶端服務器,觀察消息的狀態:

(由unacked變為ready)

2. 消息抵達客戶端,模擬部分消息手動ack測試

客戶端確認,debug模式下無法模擬真實情況下宕機,關閉了也會繼續執行,這里斷點放開,根據投遞標簽deliveryTag(channel內按順序自增)取余模擬客戶端突然宕機未接收到消息,如下:?

@RabbitListener(queues = {"hello-java-queue"})
@Slf4j
@Service("orderService")
public class OrderServiceImpl extends ServiceImpl<OrderDao, OrderEntity> implements OrderService {...//@RabbitListener(queues = {"hello-java-queue"})@RabbitHandlerpublic void receiveMessage3(Message msg,OrderReturnReasonEntity content,Channel channel) throws InterruptedException {//log.info("信道Channel:{}",channel);System.out.println("接收到消息..."+content);// 消息體byte[] body = msg.getBody();// 消息頭信息MessageProperties messageProperties = msg.getMessageProperties();Thread.sleep(3000);System.out.println("消息處理完成=>"+content.getName());// channel內按順序自增的long deliveryTag = msg.getMessageProperties().getDeliveryTag();System.out.println("deliveryTag=>"+deliveryTag);// basicAck(long deliveryTag, // channel內按順序自增// boolean multiple)          // 是否批量確認// debug下無法模擬真實情況下的宕機,關閉了也會繼續執行,// 這里根據投遞標簽deliveryTag模擬客戶端突然宕機未接收到消息try {if(deliveryTag%2==0) {channel.basicAck(deliveryTag,false);// 手動ack確認接收到消息System.out.println("簽收了貨物..."+deliveryTag);}} catch (IOException e) {throw new RuntimeException(e);}}}

?此時只確認簽收了貨物-2和貨物-4,還有3個未確認,如下:

關閉客戶端服務器,消息狀態由unacked變成ready,如下:?

重啟客戶端服務器,剩余的消息可以繼續簽收,因為deliveryTag是根據信道channel里的消息順序自增,客戶端重啟后消息又重新排序。之前發送的5個消息(1,2,3,4,5)只確認了偶數2和4,客戶端重啟后channel內的消息順序變為(1,2,3),消費端可以再次確認接收一個消息。

3.5.2.3 消費端退貨
@RabbitListener(queues = {"hello-java-queue"})
@Slf4j
@Service("orderService")
public class OrderServiceImpl extends ServiceImpl<OrderDao, OrderEntity> implements OrderService {...//@RabbitListener(queues = {"hello-java-queue"})@RabbitHandlerpublic void receiveMessage3(Message msg,OrderReturnReasonEntity content,Channel channel) throws InterruptedException {//log.info("信道Channel:{}",channel);System.out.println("接收到消息..."+content);// 消息體byte[] body = msg.getBody();// 消息頭信息MessageProperties messageProperties = msg.getMessageProperties();Thread.sleep(3000);System.out.println("消息處理完成=>"+content.getName());// channel內按順序自增的long deliveryTag = msg.getMessageProperties().getDeliveryTag();System.out.println("deliveryTag=>"+deliveryTag);// basicAck(long deliveryTag, // channel內按順序自增// boolean multiple)          // 是否批量確認// debug下無法模擬真實情況下的宕機,關閉了也會繼續執行,// 這里根據投遞標簽deliveryTag模擬客戶端突然宕機未接收到消息try {// 簽收貨物非批量模式if(deliveryTag%2==0) {// 收貨channel.basicAck(deliveryTag,false);// 手動ack確認接收到消息System.out.println("簽收了貨物..."+deliveryTag);}else {// 退貨// requeue=false 丟棄 ;requeue=true 重回隊列// basicNack(long deliveryTag, boolean multiple, boolean requeue)channel.basicNack(deliveryTag,false,false);System.out.println("沒有簽收貨物..."+deliveryTag);}} catch (IOException e) {throw new RuntimeException(e);}}
}

channel.basicNack(deliveryTag,false,false);中requeue=false丟棄掉未被簽收的貨物,清空隊列,如下:

?當requeue=true時,deliveryTag為奇數消息被拒收重新入隊deliveryTag變成偶數,被簽收。

4 RabbitMQ延時隊列(實現定時任務)

4.1 為什么不使用定時任務?

定時任務的時效性問題

場景:訂單30min中未支付,關閉訂單

出現問題:定時任務在0min中掃描的時候沒有訂單未支付,1min中后下訂單,定時任務第二次執行的時候(30min)這時下訂單29min中未支付訂單不會被關閉,到定時任務第三次執行的時候(60min)才能關閉未支付的訂單,訂單從創建到關閉花費了30+29=59min,不滿足訂單30min中未支付關閉訂單。

4.2 使用場景

4.3 消息的存活時間TTL(Time To Live)

  • 消息的TTL就是消息的存活時間
  • RabbitMQ可以對隊列消息分別設置TTL。
    • 對隊列設置就是隊列沒有消費者連著的保留時間,也可以對每一個單獨的消息做單獨的設置。超過了這個時間,我們認為這個消息就死了,稱之為死信
    • 如果隊列設置了,消息也設置了,那么會取小的。所以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設置)。這里單講單個消息的TTL,因為它才是實現延時任務的關鍵。可以通過設置消息的expiration字段或者x-message-ttl屬性來設置時間,兩者是一樣的效果。

4.4 DXL和死信隊列

  • DXL(Dead Letter Exchanges)即死信交換機,它其實就是一個正常的交換機,能夠與任何隊列綁定。
  • 死信隊列是指隊列(正常)上的消息(過期)變成死信后,能夠發送到另外一個交換機(DLX),然后被路由到一個隊列上,這個隊列就是死信隊列。
  • 成為死信一般有以下幾種情況:

? ? ? ? 1)消息被拒收(basic.reject/basic.nack)requeue=false;

? ? ? ? 2)消息的TTL到了,消息過期;

? ? ? ? 3)隊列長度限制被超越(隊列滿了)。

4.5 延時隊列實現

1. 延時隊列實現有兩種方式設置隊列過期時間設置消息過期時間實現延時隊列。

2. 建議使用設置隊列過期時間實現延時隊列,因為設置消息過期時間,MQ是惰性檢查。比如:第一個進來的消息過期時間是5min,第二個進來的消息過期時間是2min,MQ會先檢查第一個消息,等第一個消息進入死信隊列,再去檢查第二個消息,這時第二個消息已經死3min了。

4.5.1 設置隊列過期時間實現延時隊列

4.5.2 設置消息過期時間實現延時隊列

4.6 延時隊列定時關單模擬

以下單為例,設置隊列過期時間實現延時隊列

4.6.1 實現方式

4.6.1.1 基礎版

交換機與隊列一一對應,一臺路由器路由一個隊列。

給隊列user.order.delay.queue(死信隊列)設置了三個參數:

  • x-dead-letter-exchange:user.order.exchange (死信交換機)
  • x-dead-letter-rounting-key:order(死信路由鍵)
  • x-message-ttl:60000(隊列里面所有消息存活時間60000ms)
4.6.1.2 升級版

一個微服務模塊配置一個交換機,一個交換機綁定多個隊列。

4.6.2 實現

4.6.2.1 創建Queue、Exchange、Binding方式

容器中的 Binding、Queue、Exchange都會自動創建(RabbitMQ中沒有的情況)

  • 第一次發送消息【使用隊列】的時候創建交換機、隊列、綁定關系
  • RabbitMQ中沒有交換機、隊列才會創建;RabbitMQ中只要有,屬性發生變化也不會覆蓋。

gulimall-order/src/main/java/com/wen/gulimall/order/config/MyMQConfig.java

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @author W* @createDate 2024/02/21 14:39* @description: 創建 Exchange 、 Queue、 Binding*/
@Configuration
public class MyMQConfig {//@RabbitListener(queues = "order.release.order.queue")//public void listen(Message message, Channel channel, OrderEntity orderEntity) throws IOException {//    System.out.println("收到過期訂單消息,準備關閉訂單:------->"+orderEntity);//    // 確認收到消息//    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//}/*** 容器中的 Binding、Queue、Exchange都會自動創建(RabbitMQ沒有的情況)*  延時隊列*  RabbitMQ中只要有。屬性發生變化也不會覆蓋。* @return*/@Beanpublic Queue orderDelayQueue(){Map<String, Object> arguments = new HashMap<>();//x-dead-letter-exchange: order-event-exchange//x-dead-letter-routing-key: order.release.order//x-message-ttl: 60000arguments.put("x-dead-letter-exchange","order-event-exchange");arguments.put("x-dead-letter-routing-key","order.release.order");arguments.put("x-message-ttl",60000);// String name,       【隊列名稱】// boolean durable,   【是否持久化】// boolean exclusive, 【是否排它】// boolean autoDelete,【是否自動刪除】// @Nullable Map<String, Object> arguments 【自定義參數,死信路由、死信路由鍵、消息存活時間】Queue queue = new Queue("order.delay.queue",true,false,false,arguments);return queue;}/*** 死信隊列* @return*/@Beanpublic Queue orderReleaseOrderQueue(){Queue queue = new Queue("order.release.order.queue",true,false,false);return queue;}/*** 普通路由/死信路由* @return*/@Beanpublic Exchange orderEventExchange(){// String name, boolean durable, boolean autoDelete, Map<String, Object> argumentsTopicExchange topicExchange = new TopicExchange("order-event-exchange",true,false);return topicExchange;}/*** 交換機與延時隊列的綁定* @return*/@Beanpublic Binding orderCreateOrder(){// String destination, 【目的地】// DestinationType destinationType, 【目的地類型,queue、exchange】// String exchange,// String routingKey,// @Nullable Map<String, Object> argumentsreturn new Binding("order.delay.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.create.order",null);}/*** 死信交換機與死信隊列的綁定* @return*/@Beanpublic Binding orderReleaseOrder(){return new Binding("order.release.order.queue",Binding.DestinationType.QUEUE,"order-event-exchange","order.release.order",null);}
}

啟動訂單服務后,給延時隊列發送消息后(相關代碼如下),創建隊列和交換機以及綁定關系,如下:

4.6.2.2 發送消息相關代碼

gulimall-order/src/main/java/com/wen/gulimall/order/web/HelloController.java

@Controller
public class HelloController {@Resourceprivate RabbitTemplate rabbitTemplate;@ResponseBody@GetMapping("/test/createOrder")public String createOrderTest(){OrderEntity orderEntity = new OrderEntity();orderEntity.setOrderSn(UUID.randomUUID().toString());orderEntity.setCreateTime(new Date());rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity);return "ok";}
}
?4.6.2.3 監聽接收消息相關代碼

gulimall-order/src/main/java/com/wen/gulimall/order/config/MyMQConfig.java

@Configuration
public class MyMQConfig {@RabbitListener(queues = "order.release.order.queue")public void listen(Message message, Channel channel, OrderEntity orderEntity) throws IOException {System.out.println("當前時間"+new Date() +"收到過期訂單消息,準備關閉訂單:------->"+orderEntity);// 確認收到消息channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}...// 創建交換機 隊列 綁定
}

4.6.3 測試

  • 啟動訂單服務,請求?http://order.gulimall.com/test/createOrder?給延時隊列(order.delay.queue) 發送消息。

???????????????????????????????????

  • 一分鐘后,死信隊列(order.release.order.queue) 接收到延時隊列(order.delay.queue)中過期的消息(即死信),消費者監聽隊列(order.release.order.queue)消費信息,控制臺打印如下:?

當前時間Thu Feb 22 15:16:20 CST 2024收到過期訂單消息,準備關閉訂單:------->OrderEntity(id=null, memberId=null, orderSn=0d784fd2-6c82-48cc-aa2c-e573bf8de1d1, couponId=null, createTime=Thu Feb 22 15:15:20 CST 2024, memberUsername=null, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=null, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null)

  • 一分鐘后,死信隊列(order.release.order.queue) 接收到延時隊列(order.delay.queue)中過期的消息(即死信),如果沒有消費者監聽接收消息,消息在死信隊列(order.release.order.queue)中,如下:

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/696943.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/696943.shtml
英文地址,請注明出處:http://en.pswp.cn/news/696943.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

什么是Elasticsearch SQL

什么是Elasticsearch SQL 一. 介紹二. SQL 入門 前言 這是我在這個網站整理的筆記,有錯誤的地方請指出&#xff0c;關注我&#xff0c;接下來還會持續更新。 作者&#xff1a;神的孩子都在歌唱 一. 介紹 Elasticsearch SQL 是一個 X-Pack 組件&#xff0c;允許針對 Elasticsea…

通俗易懂理解G-GhostNet輕量級神經網絡模型

一、參考資料 原始論文&#xff1a;[1] IJCV22 | 已開源 | 華為GhostNet再升級&#xff0c;全系列硬件上最優極簡AI網絡 二、G-GhostNet相關介紹 G-GhostNet 又稱為 GhostNetV1 的升級版&#xff0c;是針對GPU優化的輕量級神經網絡。 1. 摘要 GhostNetV1 作為近年來最流行…

Leetcode 611.有效三角形的個數

題目 給定一個包含非負整數的數組 nums &#xff0c;返回其中可以組成三角形三條邊的三元組個數。 示例 1: 輸入: nums [2,2,3,4] 輸出: 3 解釋:有效的組合是: 2,3,4 (使用第一個 2) 2,3,4 (使用第二個 2) 2,2,3示例 2: 輸入: nums [4,2,3,4] 輸出: 4提示: 1 < nums…

Android的LiveData

LiveData 是一種可觀察的數據存儲器類。與常規的可觀察類不同&#xff0c;LiveData 具有生命周期感知能力&#xff0c;意指它遵循其他應用組件&#xff08;如 activity、fragment 或 service&#xff09;的生命周期。這種感知能力可確保 LiveData 僅更新處于活躍生命周期狀態的…

ChatGPT在醫學領域的應用與前景

標題&#xff1a; ChatGPT在醫學領域的應用與前景 正文&#xff1a; 隨著人工智能技術的不斷進步&#xff0c;ChatGPT等語言模型在醫學領域的應用逐漸深入&#xff0c;展現出其巨大的潛力和廣闊的發展前景。作為一個高級的自然語言處理工具&#xff0c;ChatGPT能夠理解和生成…

WPF 開發調試比較:Visual Studio 原生和Snoop調試控制臺

文章目錄 前言運行環境簡單的WPF代碼實現一個簡單的ListBoxVisual Studio自帶代碼調試熱重置功能測試實時可視化樹查找窗口元素顯示屬性 Snoop調試使用Snoop簡單使用調試控制臺元素追蹤結構樹Visual/可視化結構樹Logical/本地代碼可視化樹AutoMation/自動識別結構樹 WPF元素控制…

基于springboot+vue的房屋租賃管理系統(前后端分離)

博主主頁&#xff1a;貓頭鷹源碼 博主簡介&#xff1a;Java領域優質創作者、CSDN博客專家、阿里云專家博主、公司架構師、全網粉絲5萬、專注Java技術領域和畢業設計項目實戰&#xff0c;歡迎高校老師\講師\同行交流合作 ?主要內容&#xff1a;畢業設計(Javaweb項目|小程序|Pyt…

【OpenAI官方課程】第四課:ChatGPT文本推斷Summarizing

歡迎來到ChatGPT 開發人員提示工程課程&#xff08;ChatGPT Prompt Engineering for Developers&#xff09;&#xff01;本課程將教您如何通過OpenAI API有效地利用大型語言模型&#xff08;LLM&#xff09;來創建強大的應用程序。 本課程由OpenAI 的Isa Fulford和 DeepLearn…

手拉手Vite+Vue3+TinyVue+Echarts+TailwindCSS

技術棧springboot3hutool-alloshi-coreVue3viteTinyVueEchartsTailwindCSS軟件版本IDEAIntelliJ IDEA 2022.2.1JDK17Spring Boot3.1hutool-all5.8.18oshi-core6.4.1Vue35.0.10vite5.0.10axios1.6.7echarts5.4.3 ECharts是一個使用 JavaScript 實現的開源可視化庫&#xff0c;可…

快速搭建ARM64實驗平臺(QEMU虛擬機+Debian)

文章目錄 前言一、實驗平臺介紹二、安裝步驟2.1 安裝工具2.2 下載倉庫2.3 編譯內核并制作根文件系統2.4 運行剛才編譯好的ARM64版本的Debian系統2.5 在線安裝軟件包2.6 在QEMU虛擬機和主機之間共享文件 三、單步調試ARM64 Linux內核參考資料 前言 最近翻閱笨叔的《奔跑吧Linux…

go-zero微服務入門教程

go-zero微服務入門教程 本教程主要模擬實現用戶注冊和用戶信息查詢兩個接口。 準備工作 安裝基礎環境 安裝etcd&#xff0c; mysql&#xff0c;redis&#xff0c;建議采用docker安裝。 MySQL安裝好之后&#xff0c;新建數據庫dsms_admin&#xff0c;并新建表sys_user&#…

【Git】 刪除遠程分支

Git 刪除遠程分支有以下幾種方法 服務端UI工具 Git 的服務端圖形化工具主要是 web 端。常用的有 GitHub、Gitea、Gutlab 等。 這些工具都提供了分支管理&#xff0c;可以直接在各服務端找到相關功能&#xff0c;謹慎刪除。 客戶端UI工具 Git 擁有諸多客戶端 UI 工具&#x…

詳細分析Python中的unittest測試框架

目錄 1. 基本知識2. API2.1 斷言2.2 setUp() 和 tearDown() 3. Demo 1. 基本知識 unittest 是 Python 標準庫中的一個單元測試框架&#xff0c;用于編寫和執行測試用例以驗證代碼的正確性 提供了一種結構化的方法來編寫測試&#xff0c;使得測試代碼更加模塊化和易于維護 以…

【ACW 服務端】頁面操作Java增刪改查代碼生成

版本: 1.2.2-JDK17-SNAPSHOT 項目地址&#xff1a;wu-smart-acw 演示地址&#xff1a;演示地址 admin/admin Java增刪改查代碼生成 找到對應菜單 選擇你需要的數據實例 選擇數據庫 選擇數據庫表 選擇客戶端&#xff08;如果是本地ACW服務代碼啟動默認注冊上的客戶端ID是…

騰訊云主機Ubuntu22.04安裝Odoo17

一、安裝PostgreSQL16 參見之前的文章 Ubuntu22.04安裝PostgreSQL-CSDN博客 二、安裝Odoo17 本方案使用的nightly版的odoo&#xff0c;安裝的都是最新版odoo wget -O - https://nightly.odoo.com/odoo.key | apt-key add - echo "deb http://nightly.odoo.com/17.0/n…

Maven【1】(命令行操作)

文章目錄 一丶創建maven工程二、理解pom.xml三、maven的構建命令1.編譯操作2.清理操作3.測試操作4.打包操作5.安裝操作 一丶創建maven工程 首先創建這樣一個目錄&#xff0c;然后從命令行里進入這個目錄&#xff1a; 然后接下來就在這個命令行里進行操作了。 這個命令是&…

Python學習筆記——PySide6設計GUI應用之UI與邏輯分離

1、打開PySide6的UI設計工具pyside6-designer&#xff0c;設計一個主窗口&#xff0c;保存文件名為testwindow.ui 2、使用PySide6的RCC工具把testwindow.ui文件轉換為testwindow_rc.py文件&#xff0c;此文件中有一個類Ui_MainWindow&#xff08;包含各種控件對象&#xff09;…

設計模式淺析(八) ·外觀模式

設計模式淺析(八) 外觀模式 日常叨逼叨 java設計模式淺析&#xff0c;如果覺得對你有幫助&#xff0c;記得一鍵三連&#xff0c;謝謝各位觀眾老爺&#x1f601;&#x1f601; 外觀模式 概念 外觀模式&#xff08;Facade Pattern&#xff09;是一種設計模式&#xff0c;它為…

深度學習發展里程碑事件2006-2024

2006-2024年&#xff0c;深度學習發展經歷眾多的里程碑事件&#xff0c;一次次地刺激著人們的神經&#xff0c;帶來巨大的興奮。電影還在繼續&#xff0c;好戲在后面&#xff0c;期待…… 2006年 深度信念網絡&#xff08;DBNs&#xff09;&#xff1a;Geoffrey Hinton與他的學…

備戰藍橋杯 Day10(背包dp)

01背包問題 1267&#xff1a;【例9.11】01背包問題 【題目描述】 一個旅行者有一個最多能裝 M&#xfffd; 公斤的背包&#xff0c;現在有 n&#xfffd; 件物品&#xff0c;它們的重量分別是W1&#xff0c;W2&#xff0c;...,Wn&#xfffd;1&#xff0c;&#xfffd;2&#…