項目代碼:RabbitMQDemo: 學習RabbitMQ的一些整理
基本概念
- RabbitMQ是一種基于AMQP協議的消息隊列實現框架
- RabbitMQ可以用于在系統與系統之間或者微服務節點之間,進行消息緩存,消息廣播,消息分配以及限流消峰處理
- RabbitMQ-Server本身是一個可獨立運行的第三方服務,開發者并不需要單獨維護,相當于一個虛擬主機存儲了RabbitMQ使用過程中所需要的各種變量和對象,同時也提供了UI級別的操作接口
關于消息隊列的特點
形象一點:理解成快遞驛站,快遞員投放,客人取貨,快遞站暫存
- 生產者發起消費者訂閱:生產者主動將消息發送到消息隊列,消費者再從隊列中獲取消息。與HTTP請求消費者向生產者發起請求的數據流向相反。
- 異步而非同步:生產者推送消息到隊列后生產者推邏輯已經結束,不再線程擁塞等待消費者接收消息后傳回處理結果
- 推送而非調用:生產者只負責將信息推出去,消息怎么處理,有怎樣的回應,不是他考慮的事,也不是RabbitMQ中間件需要考慮的事情,類似于廣播機制
RabbitMQ核心概念
- 生產者:產生數據發送消息的程序是生產者。
- 交換機:交換機是 RabbitMQ 非常重要的一個部件,一方面它接收來自生產者的消息,另一方面它將消息推送到隊列中。交換機必須確切知道如何處理它接收到的消息,是將這些消息推送到特定隊列還是推送到多個隊列,亦或者是把消息丟棄,這個是由交換機類型決定的。
- 隊列:隊列是 RabbitMQ 內部使用的一種數據結構,盡管消息流經 RabbitMQ 和應用程序,但它們只能存儲在隊列中。隊列僅受主機的內存和磁盤限制的約束,本質上是一個大的消息緩沖區。許多生產者可以將消息發送到一個隊列,許多消費者可以嘗試從一個隊列接收數據。
- 消費者:消費與接收具有相似的含義。消費者大多時候是一個等待接收消息的程序。請注意生產者,消費者和消息中間件很多時候并不在同一機器上。同一個應用程序既可以是生產者又是可以是消費者。
安裝與控制臺
- 安裝Erlang:http://www.erlang.org/download,目前版本最好要求26 +
- 安裝RabbitMQ服務端:http://www.rabbitmq.com/download.html ,需要梯子
- 啟動可視化界面進行管理:
- 進入安裝目錄,rabbitmq_server-3.4.1\sbin,執行rabbitmq-plugins enable rabbitmq_management
- 重啟WIndows下的RabbitMQ服務
- 登錄網址http://127.0.0.1:15672/,用戶名密碼:guest/guest
開發準備
不一定非要按此結構構建項目這么寫,只是為了方便測試和練習
父項目POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.18</version></parent><packaging>pom</packaging><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency></dependencies><groupId>com</groupId><artifactId>RabbitMQDemo</artifactId><version>0.0.1-SNAPSHOT</version><name>RabbitMQDemo</name><description>RabbitMQDemo</description><properties><java.version>8</java.version><spring-boot.version>2.7.18</spring-boot.version><spring-cloud-alibaba.version>2021.0.6.0</spring-cloud-alibaba.version><spring.cloud.version>2021.0.6</spring.cloud.version><hutol.version>5.5.7</hutol.version><dubbo.version>3.2.14</dubbo.version><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><!-- 統一管理,配置在此模塊下的,子模塊要引入依賴必須聲明groupId和artifactId,不需要聲明版本--><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version><scope>provided</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring.cloud.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>${spring-cloud-alibaba.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.16</version></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
公共模塊
定義一些消費者和生產者公用的常量或者數據類型
消費者和生產者POM文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><artifactId>實際模塊名</artifactId><modelVersion>4.0.0</modelVersion><version>1.0</version><parent><groupId>com</groupId><artifactId>RabbitMQDemo</artifactId><version>0.0.1-SNAPSHOT</version><relativePath>../../RabbitMQDemo</relativePath></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bootstrap</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.junit.jupiter</groupId><artifactId>junit-jupiter-api</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>provided</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId></dependency><!--公共基礎包 --><dependency><groupId>com</groupId><artifactId>common</artifactId><version>0.0.1-SNAPSHOT</version><scope>compile</scope></dependency></dependencies>
</project>
生產者測試接口
@RestController
@RequestMapping("/rabbitMQDemo/具體調用地址")
public class DirectProviderController {@ResourceDirectProviderService directProviderService;@GetMapping("/send")public String send() {return directProviderService.sendMsg(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss"));}
}
其他代碼
依據不同交換機類型案例專項給出
交換機類型
直連模式-單一消費者
即一個生產者–一個隊列–一個消費者
主要步驟
- 生產者創建隊列,交換機對象,實現交換機與隊列的綁定,并將這些信息登記到RabbitMQ服務中
- 生產者指明目標交換機和交換機-隊列綁定關系后,向RabbitMQ服務推送消息
- 消費者訂閱隊列,并從隊列中獲取消息
可以看到,一般情況下消費者不用和交換機打交道,只需要知道自己要訂閱哪個隊列即可
生產者實現
-
配置文件
server:port: 8881 spring:application:name: RabbitMQ-Direct-Providerrabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guest
-
創建隊列等內容
@Configuration //這里只要創建了SpringBean對象即可,具體調用時后臺會自動將這些對象注冊到RabbitMQ-Sevrver中 public class DirectConfiguration {@Beanpublic Queue rabbitmqDemoDirectQueue() {/*** 1、name: 隊列名稱* 2、durable: 是否持久化* 3、exclusive: 是否獨享、排外的。如果設置為true,定義為排他隊列。則只有創建者可以使用此隊列。也就是private私有的。* 4、autoDelete: 是否自動刪除。也就是臨時隊列。當最后一個消費者斷開連接后,會自動刪除。* */return new Queue(DirectRabbitMQTag.Direct_TOPIC, true, false, false);}@Beanpublic DirectExchange rabbitmqDemoDirectExchange() {//Direct交換機return new DirectExchange(DirectRabbitMQTag.Direct_EXCHANGE, true, false);}@Beanpublic Binding bindDirect() {//鏈式寫法,綁定交換機和隊列,并設置匹配鍵return BindingBuilder//綁定隊列.bind(rabbitmqDemoDirectQueue())//到交換機.to(rabbitmqDemoDirectExchange())//并設置匹配鍵.with(DirectRabbitMQTag.Direct_ROUTING_KEY);} }
-
配置消息轉換器(該步驟目的,請參考常見問題:消息序列化一節內容)
@Configuration public class RestTemplateConfiguration {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//設置開啟Mandatory,才能觸發回調函數rabbitTemplate.setMandatory(true);//這里配置在RabbitMQ傳遞消息時,將請求實體對象先轉為JsonObject在進行投遞rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());return rabbitTemplate;} }
-
發送消息邏輯
@Service @Slf4j public class DirectProviderService {@Resourceprivate RabbitTemplate rabbitTemplate;public String sendMsg(String msg) {String result = "OK";try {//采用自定義消息體DirectDto directDto = new DirectDto();directDto.setMsg(msg);directDto.setMsgId(UUID.randomUUID().toString());directDto.setMsgType("test");//利用RabbitTempate對象進行發送,比使用原生RabbitMQ對象更加簡單//注意時異步的,消息發送到交換機后函數就執行完成了,至于消息有沒有被誰解析,無從得知rabbitTemplate.convertAndSend(DirectRabbitMQTag.Direct_EXCHANGE, DirectRabbitMQTag.Direct_ROUTING_KEY, directDto);} catch (Exception e) {log.error(e.getMessage(), e);result = "FAIL";}return result;} }
-
檢查步驟:登錄RabbitMQ-Server查看隊列和交換機是否創建
消費者實現
-
配置文件
server:port: 8882 spring:application:name: RabbitMQ-Direct-Clientrabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guest
-
創建消息轉換器(該步驟目的,請參考常見問題:消息序列化一節內容)
@Configuration public class RabbitMQCustomerConfiguration {@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;} }
-
創建訂閱并解析
@Component @Slf4j public class DirectReceiver {//對于消費者而言并不知道交換機編號或者交換機類型是什么,只關心隊列名稱,以便實現訂閱@RabbitListener(queues = {DirectRabbitMQTag.Direct_TOPIC})public void process(Message message) {//這里采用默認Message對象接收RabbitMQ的消息,//因為消費者和生產者都做了Json正反序列化,因此從message.getBody()獲取的二進制流可以直接轉換為消息體對應的JsonObjectStrlog.info("Received message: {}", new String(message.getBody()));//實際返回結果 Received message: {"msgId":"a6c22848-76db-4f75-b24c-f5c49a335d77","msg":"2025-03-20 10:58:40","msgType":"test"}} }
直連模式-多個消費者
- 即一個生產者–一個隊列–一個消費者
- 對于一個隊列綁定了多個消費者的情況,RabbitMQ會議輪詢的方式,將生產者的消息依次按順尋分別分發到所有的消費者中,其效果非常類似等權值負載均衡
生產者代碼
-
新增一個服務接口,以便測試功能
@Service @Slf4j public class DirectProviderService {@Resourceprivate RabbitTemplate rabbitTemplate;public String sendMsg(String msg) {String result = "OK";try {DirectDto directDto = new DirectDto();directDto.setMsg(msg);directDto.setMsgId(UUID.randomUUID().toString());directDto.setMsgType("test");rabbitTemplate.convertAndSend(DirectRabbitMQTag.Direct_EXCHANGE, DirectRabbitMQTag.Direct_ROUTING_KEY, directDto);} catch (Exception e) {log.error(e.getMessage(), e);result = "FAIL";}return result;}//新增功能,重復投遞十個消息到隊列,Controller層代碼省略,正常調用即可public String sendMsg2() {for (int index = 0; index < 6; index++) {this.sendMsg("index is " + index);}return "OK";} }
消費者代碼
- 復制一份上一節消費者代碼,并讓兩者分別在8882,8883端口同時啟動
- 注意一定要保證都訂閱DirectRabbitMQTag.Direct_TOPIC隊列
執行結果
扇出模式
- 一個生產者–多個隊列-每個隊列一個消費者
- 這種情況下,每個隊列的消費者都會收到相同的消息,類似于廣播機制
- 這種模式,交換機和隊列沒有Key綁定
生產者實現
-
配置文件
server:port: 8883 spring:application:name: RabbitMQ-Fanout-Providerrabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guest
-
創建隊列等內容
package com.provider.configuration;import com.common.config.FanoutRabbitMQTag; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class FanoutConfiguration {@Beanpublic Queue fanoutExchangeQueue01() {//隊列Areturn new Queue(FanoutRabbitMQTag.FANOUT_QUEUE_01, true, false, false);}@Beanpublic Queue fanoutExchangeQueue02() {//隊列Breturn new Queue(FanoutRabbitMQTag.FANOUT_QUEUE_02, true, false, false);}@Beanpublic FanoutExchange rabbitmqDemoFanoutExchange() {//創建FanoutExchange類型交換機return new FanoutExchange(FanoutRabbitMQTag.FANOUT_EXCHANGE, true, false);}@Beanpublic Binding bindFanoutA() {//隊列A綁定到FanoutExchange交換機,這里不需要提供隊列和交換機KEY值return BindingBuilder.bind(this.fanoutExchangeQueue01()).to(this.rabbitmqDemoFanoutExchange());}@Beanpublic Binding bindFanoutB() {//隊列B綁定到FanoutExchange交換機,這里不需要提供隊列和交換機KEY值return BindingBuilder.bind(this.fanoutExchangeQueue02()).to(this.rabbitmqDemoFanoutExchange());}}
-
配置消息轉換器
與直連模式相同,不與累述
-
發送消息邏輯(controller層省略)
@Service @Slf4j public class FanoutProviderService {@Resourceprivate RabbitTemplate rabbitTemplate;public String sendMsg(String msg) {String result = "OK";try {String msgId = UUID.randomUUID() + "\t" + msg;//沒有KEY值,第二個參數傳一個“”即可rabbitTemplate.convertAndSend(FanoutRabbitMQTag.FANOUT_EXCHANGE, "", msgId);} catch (Exception e) {log.error(e.getMessage(), e);result = "FAIL";}return result;} }
消費者實現
-
配置文件
server:port: 8884 spring:application:name: RabbitMQ-Fanout-Clientrabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guest
-
創建消息轉換器
與直連模式相同,不與累述
-
創建訂閱并解析
@Component @Slf4j public class FanoutReceiver {//注意:這里只訂閱了隊列1@RabbitListener(queues = {FanoutRabbitMQTag.FANOUT_QUEUE_01})public void process(String msg) {log.info("FanoutReceiver process message: {}", msg);} }
-
復制上述工程讓其可以同時在8885啟動,并訂閱隊列2
@Component @Slf4j public class FanoutReceiver {//注意:這里只訂閱了隊列2@RabbitListener(queues = {FanoutRabbitMQTag.FANOUT_QUEUE_02})public void process(String msg) {log.info("FanoutReceiver process message: {}", msg);} }
執行結果
-- 8884,8885兩個端口的服務均會收到相同的消息
FanoutReceiver process message: 3fdd6882-97f3-489d-9c45-79bb3c598821 2025-03-24 10:42:47
常見問題:消息序列化
問題描述
- RabbitMQ在推送消息到隊列時,支持傳遞自定義bean對象,但默認會直接轉為二進制流字符數組(btye[])
- 消費者在接收消息時,很有可能無法正確的將byte[]反序列化為bean對象,進而引發
Failed to convert message
異常 - 即便將自定義Bean對象類型抽出到公用模塊,在有些版本中依然不能解決問題
解決策略
- 強制修改RabbitMQ在推送消息過程中的轉換器不轉化為二進制對象,具體步驟包括
- 配置生產者轉化器對象,讓RabbitMQ將Bean轉化為Json在轉化為二進制byte[]傳遞
- 配置消費者解析器對象,讓RabbintMQ將byte[]轉化為JsonStr,再自行按需轉換為Bean類型
實現步驟
生產者配置
@Configuration
public class RestTemplateConfiguration {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){//設置開啟Mandatory,才能觸發回調函數,?論消息推送結果怎么樣都強制調?回調函數RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());return rabbitTemplate;}
}
生產者推送
@Service
@Slf4j
public class DirectProviderService {@Resourceprivate RabbitTemplate rabbitTemplate;public String sendMsg(String msg) {String result = "OK";try {DirectDto directDto = new DirectDto();directDto.setMsg(msg);directDto.setMsgId(UUID.randomUUID().toString());directDto.setMsgType("test");rabbitTemplate.convertAndSend(DirectRabbitMQTag.Direct_EXCHANGE, DirectRabbitMQTag.Direct_ROUTING_KEY, directDto);} catch (Exception e) {log.error(e.getMessage(), e);result = "FAIL";}return result;}}
消費者配置
@Configuration
public class RabbitMQCustomerConfiguration {@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());return factory;}
}
消費者解析
@Component
@Slf4j
public class DirectReceiver {@RabbitListener(queues = {DirectRabbitMQTag.Direct_TOPIC})public void process(Message message) {log.info("Received message: {}", new String(message.getBody()));}
}
//輸出結果
//Received message: {"msgId":"a6c22848-76db-4f75-b24c-f5c49a335d77","msg":"2025-03-20 10:58:40","msgType":"test"}
其他說明
- 對于String,int,double這樣的原子類型為消息體時
- 消費者和生產者不配置Json轉換器,可以正常傳輸與解析
- 消費者和生產者配置了Json轉換器,可以正常傳輸與解析,不會與Bean傳輸沖突
高級應用:手動確認
問題描述
- 默認情況下RabbitMQ采用自動簽收模式,這種情況下【生產者】并不知道以下內容
- 消息是否正確的從生產者發送到了交換機:可能目標交換機被析構了,或者目標交換機明寫錯了
- 消息是否正確的從交換機投遞到了指定隊列:可能時交換機找不到隊列了
- 因此生產者為了知道投遞消息到交換機以及交換機推送到隊列的情況,可以使用手工確認模式
其他說明
- 手工確認模式分為生產者修改和消費者修改,一般而言
- 生產者開啟了手工確認,則消費者也要使用手工確認
- 生產者沒有開啟手工確認,消費者也可以使用手工確認,比如消費重試,死信隊列等場景,但這種模式下,生產這無法知道投遞狀態
- 手工確認模式不會讓生產者處理消費者在從隊列拿到消息后處理失敗的問題,即生產者最遠只關心到消息有沒有到隊列,一旦到了隊列后續再出錯就不是生產者的職責范圍了。
生產者的修改內容
- 開啟手工確認模式,即允許監聽投遞交換機失敗和推送隊列失敗的回調
- 實現ConfirmCallback回調,處理投遞到交換機失敗的異常
- 實現ReturnsCallback回調,處理推送到隊列失敗的異常
修改Yml文件
server:port: 8881
spring:application:name: RabbitMQ-Direct-Providerrabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guest# 開啟手工確認模式publisher-confirm-type: correlatedpublisher-returns: truetemplate:mandatory: true # 消息投遞失敗返回客戶端
實現回調
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());//必須開啟此參數,否則Returns回調不會被觸發rabbitTemplate.setMandatory(true);//設置回調rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {log.info("消息投遞成功! {}" , correlationData);} else {log.error("消息投遞失敗! {} ", cause);}});rabbitTemplate.setReturnsCallback(returnedMessage -> log.info("ReturnedMessage: {}", returnedMessage));return rabbitTemplate;
}
消費者的修改內容
-
在手工模式下消費者需要處理以下兩個內容,特別注意這兩個過程和生產者的confim,returns回調沒有關系
永遠不要企圖讓生產者去捕獲或者處理消費者解析消息時的異常!!!!!!
- 從隊列拿到消息后,若正常處理需要告知RabbitMQ(不是生產者)當前消息需要從隊列中出隊
- 若處理異常要告知RabbitMQ(不是生產者),當前消息時回到隊列等待再次拿取,還是丟棄不要了
-
關于處理異常時采用了回到隊列這種模式的后續處理與注意事項,請參看【消費重試】一節內容
解析工程配置
@Configuration
public class RabbitMQCustomerConfiguration {@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());//對于配置了自定義類型轉換的場景,如果想開啟手工確認,必須在這里配置,yml中配置不生效!!!factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}
}
一定要注意,如果注冊了自定義解析工廠,那么在Yml無論怎么配置listener.simple.acknowledge-mode還是listener.direct.acknowledge-mode都是不起效的!!!!
修改接收邏輯
@Component
@Slf4j
public class DirectReceiver {//對于手工簽收,一定要使用Message對象,因此不建議搭配@RabbitHandler強制解析消息體類型,因此直接對解析方法增加@RabbitListener//Channel參數為RabbitMQ自動注入的,不需要額外關心@RabbitListener(queues = {DirectRabbitMQTag.Direct_TOPIC})public void process(Message message, Channel channel) throws IOException {//獲取消息傳遞的標記long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//處理業務邏輯log.info("開始解析消息...");log.info(new JSONObject(new String(message.getBody())).toString());//模擬隨機出錯的可能int temp = 1 / RandomUtil.randomInt(0,2);// 手動簽收:第一個參數:表示收到的標簽, 第二個參數:表示是否執行批處理確認channel.basicAck(deliveryTag, true);log.info("簽收成功...");} catch (Exception e) {log.error("處理異常{},退回隊列 ", e.getMessage());//如果處理業務邏輯的時候發生了錯誤,則要拒絕簽收//參數1 multiple 是否批量處理//參數2 requeue = true 表示將錯誤的消息,重新放回到隊列中并且一般放置在隊列頭部的位置,等待重提取//參數2 requeue = false 表示將錯誤的消息,不放回隊列,如果有死信隊列則放入到死信隊列,沒有則丟棄channel.basicNack(deliveryTag, true,true);}}
}
測試結果
-- 生產者日志輸出
2025-03-21 10:15:54.952 INFO 21220 --- [nectionFactory1] c.p.c.RestTemplateConfiguration : 消息投遞成功! null
-- 消費者輸出
2025-03-21 10:15:02.996 INFO 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 開始解析消息...
2025-03-21 10:15:02.997 INFO 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : {"msgId":"8427d093-5df0-4258-b5fa-ea0e139b3325","msg":"2025-03-21 10:15:02","msgType":"test"}
2025-03-21 10:15:02.997 ERROR 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 處理異常,退回隊列 / by zero
2025-03-21 10:15:03.011 INFO 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 開始解析消息...
2025-03-21 10:15:03.011 INFO 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : {"msgId":"8427d093-5df0-4258-b5fa-ea0e139b3325","msg":"2025-03-21 10:15:02","msgType":"test"}
2025-03-21 10:15:03.012 ERROR 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 處理異常,退回隊列 / by zero
2025-03-21 10:15:03.013 INFO 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 開始解析消息...
2025-03-21 10:15:03.013 INFO 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : {"msgId":"8427d093-5df0-4258-b5fa-ea0e139b3325","msg":"2025-03-21 10:15:02","msgType":"test"}
2025-03-21 10:15:03.014 INFO 14652 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 簽收成功...
測試結果說明
- 只要生產者投遞消息到交換機就會有Confim回調(不論成功與否)
- 交換機若轉發到隊列無異常,則不會觸發Returns回調
- 若客戶端手動確認消息時選擇了【重回隊列】那么該消息又會立刻被消費者重新接收,直到該消息被正確處理并出隊
- 顯然,步驟3這種邏輯存在缺陷,如果消息體自身就是包含問題數據,則重回隊列-重新接收步驟就會無限重復,關于此問題請參考【消費重試】一節內容
高級應用:消費重試
問題描述
- 如果消息體自身可能包含問題數據,或者解析過程中出現IO等運行時異常,導致消費者解析會出錯時,一般有兩種處理方式
- 接收消息,消息出隊,捕獲異常,直接處理:直接,簡單,但容錯率低
- 拒絕消息,消息回隊,重新抓取,再次處理:容錯率搞,但復雜
- 因為消費者處理過程出現異常存在隨機性,因此方式2更為合理,
- 但如果處理過程必然出錯,例如解析邏輯bug,臟數據等,則重回隊列-重新接收步驟就會無限重復,進而擁塞系統造成運行癱瘓
- 所以,消費重試必須要有次數限制,也必須要有兜底策略
其他說明
- 消費重試是【消費者】對解析過程異常的處理,和【生產者】沒有關系
- 不論采用【手工確認】還是【自動確認】都可以接入消費重試,只是接入和實現方式有所不同
- 自動確認實現便利,但不夠靈活
- 手工確認實現復雜,但足夠靈活
- RabbitMQ集成Springboot中有很多中消費重試的實現框架,比如利用retryTemplate,Channel,可以按照需要自由選擇
- 關于兜底策略RabbitMQ推薦將出錯的消息推送到【死信隊列】進而在RabbitMQ中統一管理,實際開發過程中也可以選擇或其他的實現方式
- 關于死信隊列相關內容,可以參考【死信隊列】一節內容
手工確認模式下的重試實現
- 這只是一種很簡單的實現邏輯
- 關于自動模式下的自動處理,可以參考這篇博文,注意處理異常不捕獲和yml配置即可:RabbitMQ重試-自動簽收模式
- 注意:請參考【高級應用:手動確認-消費者的修改內容-解析工廠配置】一節內容,配置解析工廠,確保手工確認模式起效
@Component
@Slf4j
public class DirectReceiver {private final int maxRetryTimes = 3;//對于手工簽收,一定要使用Message對象,因此不建議搭配@RabbitHandler強制解析消息體類型,因此直接對解析方法增加@RabbitListener//Channel參數為RabbitMQ自動注入的,不需要額外關心@RabbitListener(queues = {DirectRabbitMQTag.Direct_TOPIC})public void process(Message message, Channel channel) throws IOException {//獲取消息傳遞的標記long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//處理業務邏輯log.info("開始解析消息...");log.info(new JSONObject(new String(message.getBody())).toString());//必然的解析出錯int temp = 1 / 0;// 手動簽收:第一個參數:表示收到的標簽, 第二個參數:表示是否執行批處理確認channel.basicAck(deliveryTag, true);log.info("簽收成功...");} catch (Exception e) {log.error("處理異常{},退回隊列 ", e.getMessage());//如果處理業務邏輯的時候發生了錯誤,則要拒絕簽收//參數1 multiple 是否批量處理//參數2 requeue = true 表示將錯誤的消息,重新放回到隊列中并且一般放置在隊列頭部的位置,//參數2 requeue = false 表示將錯誤的消息,不放回隊列,如果有死信隊列則放入到死信隊列,沒有則丟棄//channel.basicNack(deliveryTag, true,true);this.handleRetry(message, channel, e);}}private void handleRetry(Message message, Channel channel, Exception processException) {try {Map<String, Object> headers = message.getMessageProperties().getHeaders();Long retryCount = (Long) headers.getOrDefault("retry-count", 0L);if (retryCount < maxRetryTimes) {//手動進行應答, 接收此消息,先讓消息出隊,false表示只確認當前者一條消息出隊channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//增加重試次數標記headers.put("retry-count", retryCount + 1);//重新發送消息發送到MQ中原有隊列的【隊尾】channel.basicPublish(message.getMessageProperties().getReceivedExchange(),message.getMessageProperties().getReceivedRoutingKey(),new AMQP.BasicProperties().builder().contentType(message.getMessageProperties().getContentType()).headers(headers).build(),message.getBody());} else{//兜底方案//參數2 requeue = false 表示將錯誤的消息,不放回隊列,如果有死信隊列則放入到死信隊列,沒有則丟棄channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false);//這里也可以存入將錯誤的請求體或者具體處理異常信息,存儲日志,redis或者數據庫中,方便人工查閱log.error("重試次數超限,處理異常原因" , processException);}} catch (Exception e) {log.error("重試處理異常{}, ", e.getMessage());}}
}
測試結果
2025-03-21 15:17:36.489 INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 開始解析消息...
2025-03-21 15:17:36.513 INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : {"msgId":"b7c0b46c-3c80-4de9-a06b-d9c2d92142d7","msg":"2025-03-21 15:17:36","msgType":"test"}
2025-03-21 15:17:36.514 ERROR 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 處理異常/ by zero,退回隊列
2025-03-21 15:17:36.519 INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 開始解析消息...
2025-03-21 15:17:36.520 INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : {"msgId":"b7c0b46c-3c80-4de9-a06b-d9c2d92142d7","msg":"2025-03-21 15:17:36","msgType":"test"}
2025-03-21 15:17:36.520 ERROR 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 處理異常/ by zero,退回隊列
2025-03-21 15:17:36.521 INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 開始解析消息...
2025-03-21 15:17:36.521 INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : {"msgId":"b7c0b46c-3c80-4de9-a06b-d9c2d92142d7","msg":"2025-03-21 15:17:36","msgType":"test"}
2025-03-21 15:17:36.521 ERROR 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 處理異常/ by zero,退回隊列
2025-03-21 15:17:36.524 INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 開始解析消息...
2025-03-21 15:17:36.524 INFO 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : {"msgId":"b7c0b46c-3c80-4de9-a06b-d9c2d92142d7","msg":"2025-03-21 15:17:36","msgType":"test"}
2025-03-21 15:17:36.524 ERROR 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 處理異常/ by zero,退回隊列
2025-03-21 15:17:36.528 ERROR 268 --- [ntContainer#0-1] com.customer.receiver.DirectReceiver : 重試次數超限,處理異常原因java.lang.ArithmeticException: / by zeroat com.customer.receiver.DirectReceiver.process(DirectReceiver.java:35) ~[classes/:na]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_321]at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_321]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_321]at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_321]
測試結果說明
- 解析邏輯必然出錯,觸發異常后,消費者重試了三次
- 最終觸發兜底方法,拒絕簽收消息,讓消息丟失
高級應用:死信隊列
問題描述
- 在消費者處理信息時,可能因為一些環境原因導致解析過程發生異常,而消費者自身沒有特殊的異常處理機制(自動重試,自動丟棄,本地存儲等)
- 這時可以將引發異常的消息,從原有隊列中遷出,轉投遞到死信隊列,加以緩存,方便運維人員后期查詢或者分析錯誤消息,
補充說明
- 與普通隊列的相同點:
- 死信隊列也是一種隊列,需要搭配交換機才能完成,接收消息
- 死信隊列也可以又消費者監聽,進而實現消息從死信隊列中出隊
- 與普通隊列的不同點:
- 死信隊列不單獨存在,一般需要和原有的業務隊列進行綁定
- 死信隊列的消息,不直接由生產者顯式的投遞,而是在特定條件下由RabbitMQ-Client自動投遞
- 一般死信隊列功能需要和【手工確認】搭配使用
生產者實現
配置文件
server:port: 8881
spring:application:name: RabbitMQ-Direct-Dead-Providerrabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guest
# 不起效 暫時不用配置
# listener:
# type: simple
# simple:
# default-requeue-rejected: false
# acknowledge-mode: manual
創建隊列等內容
重點!!!!!
@Configuration
public class DirectConfiguration {public static final String DIRECT_EXCHANGE = "direct-exchange";public static final String DIRECT_ROUTING_KEY = "direct-routing-key";public static final String DIRECT_QUEUE = "direct-queue";public static final String DEAD_QUEUE = "dead-queue";public static final String DEAD_ROUTING_KEY = "dead-routing-key";public static final String DEAD_EXCHANGE = "dead-exchange";@Beanpublic DirectExchange rabbitmqDeadExchange() {//定義死信交換機,類型為Directreturn new DirectExchange(DEAD_EXCHANGE, true, false);}@Beanpublic DirectExchange rabbitmqDirectExchange() {//定義業務交換機,類型為Directreturn new DirectExchange(DIRECT_EXCHANGE, true, false);}@Bean Queue rabbitmqDeadQueue() {//定義死信隊列return new Queue(DEAD_QUEUE);}@Beanpublic Queue rabbitmqDirectQueue() {//定義業務隊列,并將業務隊列與死信隊列綁定,以便RabbitMQ自動投遞Map<String, Object> args = new HashMap<>(2);//x-dead-letter-exchange 這里聲明當前隊列綁定的死信交換機args.put("x-dead-letter-exchange", DEAD_EXCHANGE);//x-dead-letter-routing-key 這里聲明當前隊列的死信路由keyargs.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);//x-message-ttl, 生命TLL,超過5s沒有被消費消息,也會自動投遞到死信隊列args.put("x-message-ttl",5000);return QueueBuilder.durable(DIRECT_QUEUE).withArguments(args).build();}@Beanpublic Binding bindDirect() {//鏈式寫法,綁定交換機和隊列,并設置匹配鍵return BindingBuilder//綁定隊列.bind(rabbitmqDirectQueue())//到交換機.to(rabbitmqDirectExchange())//并設置匹配鍵.with(DIRECT_ROUTING_KEY);}@Beanpublic Binding bindDirectDead() {//鏈式寫法,綁定交換機和隊列,并設置匹配鍵return BindingBuilder//綁定隊列.bind(rabbitmqDeadQueue())//到交換機.to(rabbitmqDeadExchange())//并設置匹配鍵.with(DEAD_ROUTING_KEY);}
}
顯然,死信隊列和業務隊列一樣,都有隊列申明,交換機申明,隊列交換機綁定三個步驟,不同的是,業務隊列申明時需要額外與死信隊列進行綁定
配置消息轉換器
不予累述
發送消息邏輯
@Service
@Slf4j
public class DirectProviderService {@Resourceprivate RabbitTemplate rabbitTemplate;public String sendMsg(String msg) {String result = "OK";try {DirectDto directDto = new DirectDto();directDto.setMsg(msg);directDto.setMsgId(UUID.randomUUID().toString());directDto.setMsgType(""+RandomUtil.randomInt(0,3));rabbitTemplate.convertAndSend(DirectConfiguration.DIRECT_EXCHANGE, DirectConfiguration.DIRECT_ROUTING_KEY, directDto);} catch (Exception e) {log.error(e.getMessage(), e);result = "FAIL";}return result;}}
消費者實現
配置文件
server:port: 8882
spring:application:name: RabbitMQ-Direct-Dead-Clientrabbitmq:addresses: 127.0.0.1port: 5672username: guestpassword: guest
消費者配置
注意:請參考【高級應用:手動確認-消費者的修改內容-解析工廠配置】一節內容,配置解析工廠,確保手工確認模式起效
消費者解析-正常接收消息
@Component
@Slf4j
public class DirectReceiver {//手工模式確認@RabbitListener(queues = "direct-queue")public void receiveA(Message message, Channel channel) throws IOException {DirectDto directDto = new JSONObject(new String(message.getBody())).toBean(DirectDto.class);log.info("收到業務消息A, 準備處理:{}", directDto.toString());try{Thread.sleep(Integer.parseInt(directDto.getMsgType()));} catch (Exception e) {log.error(e.getMessage());}//這里模擬一下發生異常時,將消息投遞到死信隊列if (RandomUtil.randomBoolean()){log.error("消息消費發生異常");//參數2 requeue = false 表示將錯誤的消息,不放回隊列,因為有死信隊列,所以放入到死信隊列channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);} else {log.error("消息消費完成");channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
}
消費者解析-消費死信隊列
@Component
@Slf4j
public class DeadReceiver {@RabbitListener(queues = "dead-queue")public void receiveA(Message message, Channel channel) throws IOException {log.info("收到死信消息:{}", new String(message.getBody()));//這里實際把消息從私信隊列中遷出了,所以運行后RabbitMQ-Server在該私信隊列中并沒有看到記錄channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}
運行結果
2025-03-24 14:56:22.939 INFO 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver : 收到業務消息A, 準備處理:DirectDto(msgId=67742847-101d-4074-a664-1ad0aa41ea41, msg=2025-03-24 14:56:22, msgType=0)
2025-03-24 14:56:22.941 ERROR 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver : 消息消費完成
2025-03-24 14:56:22.942 INFO 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver : 收到業務消息A, 準備處理:DirectDto(msgId=4e8236fb-1ae7-4427-b855-474603037c78, msg=2025-03-24 14:56:22, msgType=0)
2025-03-24 14:56:22.942 ERROR 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver : 消息消費發生異常
2025-03-24 14:56:22.943 INFO 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver : 收到業務消息A, 準備處理:DirectDto(msgId=428faf5f-107d-4295-abf7-33c6ac5da0bc, msg=2025-03-24 14:56:22, msgType=0)
2025-03-24 14:56:22.943 ERROR 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver : 消息消費完成
2025-03-24 14:56:22.944 INFO 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver : 收到業務消息A, 準備處理:DirectDto(msgId=d9c4121a-6e74-4944-80b3-3289a9c2d35d, msg=2025-03-24 14:56:22, msgType=0)
2025-03-24 14:56:22.944 ERROR 428 --- [ntContainer#1-1] com.customer.receiver.DirectReceiver : 消息消費完成
代碼說明
- 因為在正常解析步驟中,采用了channel.basicNack方法并指定了requeue=false,RabbitMQ就會自動去找direct-queue隊列是否有綁定死信隊列
- 因為direct-queue綁定了dead-queue,所以出錯的消息會被自動投遞到dead-queue中,并自動標記為Ready狀態,等待消息投遞與解析
- 而對于死信隊列解析,示例中為了方便演示采用了channel.basicAck方法,讓當前消息從死信隊列中出對了,如果不調用此方法,死信隊列就會將此消息記為Unacked狀態一直保留在死信隊列中,具體采取哪種處理方式,應由實際工作需要決定
- 至于死信隊列中Unacked的狀態的消息,要如何在運維環境下處理或者查閱,另起文注