提示:文章寫完后,目錄可以自動生成,如何生成可參考右邊的幫助文檔
文章目錄
- 前言
- 一、對RabbitMQ管理界面深入了解
- 1、在這個界面里面我們可以做些什么?
- 二、編碼練習
- (1)使用direct exchange(直連型交換機)
- (2)使用Topic Exchange 主題交換機。
- (3)使用Fanout Exchang 扇型交換機。
- 三、消息確認種類
- A:消息發送確認
- B: 消費接收確認
- 方式一:通過配置類的方式實現
- 方式二:通過yml配置來完成消費者確認
前言
該篇文章內容較多,包括有RabbitMQ一些理論介紹,provider消息推送實例,consumer消息消費實例,Direct、Topic、Fanout多種交換機的使用,同時簡單介紹對消息回調、手動確認等。
這里面的每一種使用都包含實際編碼示例,供大家理解,共同進步,如有不足。還請指教。
一、對RabbitMQ管理界面深入了解
裝完rabbitMq,啟動MQ后,本地瀏覽器輸入http://ip:15672/ ,看到一個簡單后臺管理界面;
對于其中的一些具體指標的解釋:
- Ready: 待消費的消息總數。
- Unacked: 待應答的消息總數。
- Total:總數 Ready+Unacked。
- Publish: producter pub消息的速率。
- Publisher confirm: broker確認pub消息的速率。
- Deliver(manual ack): customer手動確認的速率。
- Deliver( auto ack): customer自動確認的速率。
- Consumer ack: customer正在確認的速率。
- Redelivered: 正在傳遞’redelivered’標志集的消息的速率。
- Get (manual ack): 響應basic.get而要求確認的消息的傳輸速率。
- Get (auto ack): 響應于basic.get而發送不需要確認的消息的速率。
- Return: 將basic.return發送給producter的速率。
- Disk read: queue從磁盤讀取消息的速率。
- Disk write: queue從磁盤寫入消息的速率。
Connections:client的tcp連接的總數。
Channels:通道的總數。
Exchange:交換器的總數。
Queues:隊列的總數。
Consumers:消費者的總數。
更詳細的可見:
版權聲明:本文為博主原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。
本文鏈接:https://blog.csdn.net/qq_19343089/article/details/135724659
1、在這個界面里面我們可以做些什么?
可以手動創建虛擬host,創建用戶,分配權限,創建交換機,創建隊列等等,還有查看隊列消息,消費效率,推送效率等等。
以上這些管理界面的操作在這篇暫時不做擴展描述,我想著重介紹后面實例里會使用到的。
首先先介紹一個簡單的一個消息推送到接收的流程,提供一個簡單的圖:
黃色的圈圈就是我們的消息推送服務,將消息推送到 中間方框里面也就是 rabbitMq的服務器,然后經過服務器里面的交換機、隊列等各種關系(后面會詳細講)將數據處理入列后,最終右邊的藍色圈圈消費者獲取對應監聽的消息。
常用的交換機有以下三種,因為消費者是從隊列獲取信息的,隊列是綁定交換機的(一般),所以對應的消息推送/接收模式也會有以下幾種:
- Direct Exchange
直連型交換機,根據消息攜帶的路由鍵將消息投遞給對應隊列。
大致流程,有一個隊列綁定到一個直連交換機上,同時賦予一個路由鍵 routing key 。
然后當一個消息攜帶著路由值為X,這個消息通過生產者發送給交換機時,交換機就會根據這個路由值X去尋找綁定值也是X的隊列。
- Fanout Exchange
扇型交換機,這個交換機沒有路由鍵概念,就算你綁了路由鍵也是無視的。 這個交換機在接收到消息后,會直接轉發到綁定到它上面的所有隊列。
- Topic Exchange
主題交換機,這個交換機其實跟直連交換機流程差不多,但是它的特點就是在它的路由鍵和綁定鍵之間是有規則的。
簡單地介紹下規則:
(星號) * 用來表示一個單詞 (必須出現的)
(井號) # 用來表示任意數量(零個或多個)單詞
通配的綁定鍵是跟隊列進行綁定的,舉個小例子
隊列Q1 綁定鍵為 .TT. 隊列Q2綁定鍵為 TT.#
如果一條消息攜帶的路由鍵為 A.TT.B,那么隊列Q1將會收到;
如果一條消息攜帶的路由鍵為TT.AA.BB,那么隊列Q2將會收到;
主題交換機是非常強大的,為啥這么膨脹?
當一個隊列的綁定鍵為 “#”(井號) 的時候,這個隊列將會無視消息的路由鍵,接收所有的消息。
當 * (星號) 和 # (井號) 這兩個特殊字符都未在綁定鍵中出現的時候,此時主題交換機就擁有的直連交換機的行為。
所以主題交換機也就實現了扇形交換機的功能,和直連交換機的功能。
另外還有 Header Exchange 頭交換機 ,Default Exchange 默認交換機,Dead Letter Exchange 死信交換機,這幾個該篇暫不做講述。
好了,一些簡單的介紹到這里為止, 接下來我們來一起編碼。
二、編碼練習
本次實例教程需要創建2個springboot項目,一個 rabbitmq-provider (生產者),一個rabbitmq-consumer(消費者)。【補充說明:我這里模塊名稱創建錯了,其中生產者我創建成了rabbitmq-consumer,消費者我這里叫做 rabbitmq-consumer-true】
首先創建 rabbitmq-provider,
pom.xml里用到的jar依賴:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.atguigu.gulimall</groupId><artifactId>rabbitmq-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>rabbitmq-consumer</name><description>RabbitMQ生產者模塊</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>1.8</java.version><!-- <spring-cloud.version>2021.0.4</spring-cloud.version>--><spring-cloud.version>2021.0.1</spring-cloud.version></properties><dependencies><dependency><groupId>com.atguigu.gulimall</groupId><artifactId>gulimall-common</artifactId><version>0.0.1-SNAPSHOT</version><exclusions><exclusion><artifactId>servlet-api</artifactId><groupId>javax.servlet</groupId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
然后application.yml:
server:port: 8021#數據源配置
spring:datasource:username: rootpassword: rooturl: jdbc:mysql://192.168.56.10:3306/gulimall_umsdriver-class-name: com.mysql.cj.jdbc.Driver#注冊到注冊中心cloud:nacos:discovery:server-addr: 127.0.0.1:8848application:name: rabbitmq-consumer#配置rabbitMq 服務器rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest#虛擬host 可以不設置,使用server默認hostvirtual-host: /
# publisher-returns: true #確認消息已發送到隊列(Queue) 這個在生產者模塊配置 這個后期再配置,這會還用不到
# publisher-confirm-type: correlated #確認消息已發送到交換機(Exchange) 這個在生產者模塊配置 這個后期再配置,這會還用不到logging:level:com.atguigu.gulimall: debug #調整product模塊日志的輸出模式是debug級別,這樣就能在控制臺看到dao包下的輸出日志了。
一定要注意 要注意 要注意!!!!!
里面的virtual-host 是指RabbitMQ控制臺中的下面的位置(我理解是指你的隊列和交換機在哪個分組下面,可以為每一個項目創建單獨的分組,但是在此我沒有單獨創建,直接放到了 / 下面)
那么怎么建一個單獨的host呢? 假如我就是想給某個項目接入,使用一個單獨host,順便使用一個單獨的賬號,就好像我文中配置的 root 這樣。
其實也很簡便:
virtual-host的創建:
賬號user的創建:
然后記得給賬號分配權限,指定使用某個virtual host:
指定給自己剛剛為某個項目單獨創建的virtual host。
其實還可以特定指定交換機使用權等等:
(1)使用direct exchange(直連型交換機)
創建DirectRabbitConfig.java(對于隊列和交換機持久化以及連接使用設置,在注釋里有說明,后面的不同交換機的配置就不做同樣說明了):
package com.atguigu.gulimall.rabbitmqconsumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Queue;/*** 這里使用的是direct exchange(直連型交換機), 也就是交換機和隊列是一對一關系* 模擬 rabbitmq-provider (生產者),這里模塊名字寫錯了。這個是消息生產者** @author: jd* @create: 2024-06-24*/
@Configuration
public class DirectRabbitConfig {// 聲明需要使用的交換機/路由Key/隊列的名稱public static final String DEFAULT_EXCHANGE = "TestDirectExchange";public static final String DEFAULT_ROUTE = "TestDirectRouting";public static final String DEFAULT_QUEUE = "TestDirectQueue";// 聲明交換機,需要幾個聲明幾個,這里就一個@Beanpublic DirectExchange directExchange(){return new DirectExchange(DEFAULT_EXCHANGE);}//創建隊列//隊列 起名:TestDirectQueue@Beanpublic Queue TestDirectQueue(){// durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效// exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高于durable// autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。// return new Queue("TestDirectQueue",true,true,false);//一般設置一下隊列的持久化就好,其余兩個就是默認falsereturn new Queue(DEFAULT_QUEUE,true);}//綁定交換機和隊列,并指定路由鍵//綁定 將隊列和交換機綁定, 并設置用于匹配鍵:TestDirectRoutingBinding bindingDirect(){return BindingBuilder.bind(TestDirectQueue()).to(directExchange()).with(DEFAULT_ROUTE);}/*** 這個是做什么用的 ,為了后面 生產者確認那,找到交換機,找不到隊列用的,* @return*/@BeanDirectExchange lonelyDirectExchange() {return new DirectExchange("lonelyDirectExchange");}}
然后寫個簡單的接口進行消息推送(根據需求也可以改為定時任務等等,具體看需求),SendMessageController.java:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** 模擬 rabbitmq-provider (生產者) 這里模塊名字寫錯了。這個是消息生產者,一般消息的生產者會直接在業務層調用,* 不會單獨的搞一個消息生產者,這里因為沒有業務調用,去調用這個MQ的生產者,所以這里直接創建一個模塊模擬消息生產者** 發送消息控制器(MQ入消息的入口)* //原文鏈接:https://blog.csdn.net/qq_35387940/article/details/100514134* @author: jd* @create: 2024-06-24*/
@RestController
public class SendMessageController {@AutowiredRabbitTemplate rabbitTemplate; //使用RabbitTemplate,這提供了接收/發送等等方法/*** 通過postman發送消息給消息隊列-直流交換機* @return*/@GetMapping("/sendDirectMessage")String sendDirectMessage(){String messageId = String.valueOf(UUID.randomUUID());String messageData = "test message, hello!";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String,Object> map=new HashMap<>();map.put("messageId",messageId);map.put("messageData",messageData);
// map.put("messageData","666666");map.put("createTime",createTime);//將消息攜帶綁定鍵值:TestDirectRouting 發送到交換機TestDirectExchangerabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);// //生產者發送字符串類型消息,則后面的消息消費者,也需要接受字符串類型的入參進行消費
// rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", "77777");System.out.println("調用完畢");return "ok";}}
把rabbitmq-provider項目運行,調用下接口:
因為我們目前還沒弄消費者 rabbitmq-consumer,消息沒有被消費的,我們去rabbitMq管理頁面看看,是否推送成功:(我這里發送了三次,所以有三個消息積壓了)
再看看隊列(界面上的各個英文項代表什么意思,可以自己查查哈,對理解還是有幫助的):
很好,消息已經推送到rabbitMq服務器上面了。
接下來,創建rabbitmq-consumer項目:
pom.xml里的jar依賴:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.atguigu.gulimall</groupId><artifactId>rabbitmq-consumer-true</artifactId><version>0.0.1-SNAPSHOT</version><name>rabbitmq-consumer-true</name><description>RabbitMQ消費者模塊</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>1.8</java.version><!-- <spring-cloud.version>2021.0.4</spring-cloud.version>--><spring-cloud.version>2021.0.1</spring-cloud.version></properties><dependencies><dependency><groupId>com.atguigu.gulimall</groupId><artifactId>gulimall-common</artifactId><version>0.0.1-SNAPSHOT</version><exclusions><exclusion><artifactId>servlet-api</artifactId><groupId>javax.servlet</groupId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
然后是 application.yml:
server:port: 8022#數據源配置
spring:datasource:url: jdbc:mysql://192.168.56.10:3306/gulimall_umsusername: rootpassword: rootdriver-class-name: com.mysql.cj.jdbc.Driver#配置nacoscloud:nacos:discovery:server-addr: 127.0.0.1#配置服務名稱application:name: rabbitmq-consumer-true# 配置rabbitMq 服務器#spring.application.name=rabbitmq-consumer-truerabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest#虛擬host 可以不設置,使用server默認hostvirtual-host: /
# listener: #這個在測試消費多個消息的時候,不能有下面這些配置,否則只能消費一個消息后就不繼續消費了
# simple:
# acknowledge-mode: manual #指定MQ消費者的確認模式是手動確認模式 這個在消費者者模塊配置
# prefetch: 1 #一次只能消費一條消息 這個在消費者者模塊配置#配置日志輸出級別
logging:level:com.atguigu.gulimall: debug#配置日志級別
然后一樣,創建DirectRabbitConfig.java(消費者單純的使用,其實可以不用添加這個配置,直接建后面的監聽就好,使用注解來讓監聽器監聽對應的隊列即可。配置上了的話,其實消費者也是生成者的身份,也能推送該消息。):
package com.atguigu.gulimall.consumertrue.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 消費者配置類** 原文鏈接:https://blog.csdn.net/qq_35387940/article/details/100514134* 創建DirectRabbitConfig.java 關于隊列的配置只是消息的生產者中配置即可。這個消費者不用配置,配置了的話,就也可以當成生產者了* (消費者單純的使用,其實可以不用添加這個配置,直接建后面的監聽就好,* 使用注解來讓監聽器監聽對應的隊列即可。配置上了的話,其實消費者也是生成者的身份,也能推送該消息。):** @author: jd* @create: 2024-06-25*/
@Configuration
public class DirectRabbitConfig {// 聲明需要使用的交換機/路由Key/隊列的名稱public static final String DEFAULT_EXCHANGE = "TestDirectExchange";public static final String DEFAULT_ROUTE = "TestDirectRouting";public static final String DEFAULT_QUEUE = "TestDirectQueue";//隊列 起名:TestDirectQueue@Beanpublic Queue TestDirectQueue() {return new Queue(DEFAULT_QUEUE,true);}//Direct交換機 起名:TestDirectExchange@BeanDirectExchange TestDirectExchange() {return new DirectExchange(DEFAULT_EXCHANGE);}//綁定 將隊列和交換機綁定, 并設置用于匹配鍵:TestDirectRouting@BeanBinding bindingDirect() {return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with(DEFAULT_ROUTE);}}
然后是創建消息接收監聽類,RabbitMQListener.java:
package com.atguigu.gulimall.consumertrue.listener;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** 消息消費監聽類* @author: jd* @create: 2024-06-25*/
@Component
@Slf4j
@RabbitListener(queues = "TestDirectQueue")//監聽的隊列名稱 TestDirectQueue
public class RabbitMQListener {/*** 當消息發送者發送的是Map的時候,通過這個消息處理器進行處理* @param testMessage*/@RabbitHandler(isDefault = true)public void process(Map testMessage) {System.out.println("RabbitMQListener消費者收到消息 : "+testMessage.toString());}/*** 當消息發送者發送的是String類型的時候,用這個監聽處理器去接受消息并處理* @param testMessage*//* @RabbitHandler(isDefault = true)public void process(String testMessage) {System.out.println("DirectReceiver消費者收到消息 : "+testMessage);//正常開發中,會在消費到消息之后,開始做一些業務處理//模擬業務處理//業務開始String str = testMessage + "--消費成功";System.out.println("業務處理完畢"+str);//業務結束}*/}
然后將rabbitmq-consumer-true項目運行起來,可以看到把之前推送的那條消息消費下來了:
然后可以再繼續調用rabbitmq-consumer項目的推送消息接口,可以看到消費者即時消費消息:
消費下來了
那么直連交換機既然是一對一,那如果咱們配置多臺監聽綁定到同一個直連交互的同一個隊列,會怎么樣?
消費的結果如下:
可以看到是實現了輪詢的方式對消息進行消費,而且不存在重復消費。
(2)使用Topic Exchange 主題交換機。
在rabbitmq-consume項目里面創建TopicRabbitConfig.java:
package com.atguigu.gulimall.rabbitmqconsumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 使用Topic Exchange 主題交換機。** @author: jd* @create: 2024-06-25*/
@Configuration
public class TopicRabbitConfig {//設置綁定鍵public static final String man = "topic.man";public static final String woman = "topic.woman";public static final String TOPIC_EXCHANGE = "topicExchange";//創建隊列/*** 第一個主題隊列** @return*/@Beanpublic Queue firstQueue() {return new Queue(man);}/*** 第二個主題隊列** @return*/@Beanpublic Queue secondQueue() {return new Queue(woman);}/*** 創建一個主題交換機** @return TopicExchange*/@BeanTopicExchange exchange() {return new TopicExchange(TOPIC_EXCHANGE);}/*** //將firstQueue和topicExchange綁定,而且綁定的鍵值為topic.man* //這樣只要是消息攜帶的路由鍵是topic.man,才會分發到該隊列** @return*/@BeanBinding bindingExchangeMessageForFirstQueue() {return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);}/*** //將secondQueue和topicExchange綁定,而且綁定的鍵值為用上通配路由鍵規則topic.#* // 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發到該隊列** @return*/@BeanBinding bindingExchangeMessageForSecondQueue() {return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");}}
然后添加多2個接口,用于推送消息到主題交換機:
// 然后添加多2個接口,用于推送消息到主題交換機找那個,再主題交換機中通過設置的路由鍵來推送到主題為topic.man的隊列中以供消費
// https://blog.csdn.net/qq_35387940/article/details/100514134/*** 用于向MQ發送攜帶topic.man路由鍵的消息* @return*/@GetMapping("/sendTopicMessageToMan")public String sendTopicMessageToMan(){String messageId = String.valueOf(UUID.randomUUID());String messageData ="send topic message to man";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String,Object> map=new HashMap<>();map.put(QueueConstant.MESSAGE_ID,messageId);map.put(QueueConstant.MESSAGE_DATA,messageData);map.put(QueueConstant.MESSAGE_TIME,createTime);rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,TopicRabbitConfig.man,map);System.out.println("sendTopicMessageToMan() 執行成功");return "sendTopicMessageToMan is ok";}/*** 用于向MQ發送攜帶topic.woman路由鍵的消息。 這樣會在exchange中去找綁定中這個路由鍵綁定的隊列,并向其中進行轉發* topic.# 這個是通用的綁定規則,只要是攜帶著topic.開頭的就會轉發到綁定的這個隊列中* https://blog.csdn.net/qq_35387940/article/details/100514134* @return*/@GetMapping("/sendTopicMessageToTotal")public String sendTopicMessageToTotal(){String messageId = String.valueOf(UUID.randomUUID());String messageData ="send topic message to woman";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String,Object> map=new HashMap<>();map.put(QueueConstant.MESSAGE_ID,messageId);map.put(QueueConstant.MESSAGE_DATA,messageData);map.put(QueueConstant.MESSAGE_TIME,createTime);
// rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,TopicRabbitConfig.woman,map);rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,"topic.woman1",map); //測試攜帶路由鍵符合topic.#的是否能轉發到topic.woman的隊列System.out.println("sendTopicMessageToTotal() 執行成功");return "sendTopicMessageToTotal is ok";}
生產者這邊已經完事,先不急著運行,在rabbitmq-consumer-true項目上,創建TopicManListener.java:
package com.atguigu.gulimall.consumertrue.listener;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/**
主題交換機 監聽topic.man隊列* @author: jd* @create: 2024-06-25*/
@Component
@Slf4j
@RabbitListener(queues = "topic.man")//監聽的隊列名稱 TestDirectQueue
public class TopicManListener {@RabbitHandlerpublic void process(Map testMessage) {System.out.println("TopicManListener主題消費者收到消息 : "+testMessage.toString());}}
再創建一個TopicTotalListener.java:
package com.atguigu.gulimall.consumertrue.listener;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** @author: jd* @create: 2024-06-25*/@Component
@Slf4j
@RabbitListener(queues = "topic.woman")
public class TopicTotalListener {@RabbitHandlerpublic void process(Map testMessage){System.out.println("TopicTotalListener主題消費者收到消息 : "+testMessage.toString());}
}
同樣,加主題交換機的相關配置,TopicRabbitConfig.java(消費者一定要加這個配置嗎? 不需要的其實,理由在前面已經說過了。):
package com.atguigu.gulimall.rabbitmqconsumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 使用Topic Exchange 主題交換機。** @author: jd* @create: 2024-06-25*/
@Configuration
public class TopicRabbitConfig {//設置綁定鍵public static final String man = "topic.man";public static final String woman = "topic.woman";public static final String TOPIC_EXCHANGE = "topicExchange";//創建隊列/*** 第一個主題隊列** @return*/@Beanpublic Queue firstQueue() {return new Queue(man);}/*** 第二個主題隊列** @return*/@Beanpublic Queue secondQueue() {return new Queue(woman);}/*** 創建一個主題交換機** @return TopicExchange*/@BeanTopicExchange exchange() {return new TopicExchange(TOPIC_EXCHANGE);}/*** //將firstQueue和topicExchange綁定,而且綁定的鍵值為topic.man* //這樣只要是消息攜帶的路由鍵是topic.man,才會分發到該隊列** @return*/@BeanBinding bindingExchangeMessageForFirstQueue() {return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);}/*** //將secondQueue和topicExchange綁定,而且綁定的鍵值為用上通配路由鍵規則topic.#* // 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發到該隊列** @return*/@BeanBinding bindingExchangeMessageForSecondQueue() {return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");}}
然后把rabbitmq-consumer,rabbitmq-consumer-true兩個項目都跑起來,先調用/sendTopicMessage1 接口:
然后看消費者rabbitmq-consumer的控制臺輸出情況:
TopicManReceiver監聽隊列1,綁定鍵為:topic.man
TopicTotalReceiver監聽隊列2,綁定鍵為:topic.#
而當前推送的消息,攜帶的路由鍵為:topic.man
所以可以看到兩個監聽消費者receiver都成功消費到了消息,因為這兩個recevier監聽的隊列的綁定鍵都能與這條消息攜帶的路由鍵匹配上。
接下來調用接口/sendTopicMessage2:
然后看消費者rabbitmq-consumer的控制臺輸出情況:
TopicManReceiver監聽隊列1,綁定鍵為:topic.man
TopicTotalReceiver監聽隊列2,綁定鍵為:topic.#
而當前推送的消息,攜帶的路由鍵為:topic.woman
所以可以看到兩個監聽消費者只有TopicTotalReceiver成功消費到了消息。
(3)使用Fanout Exchang 扇型交換機。
同樣地,先在rabbitmq-provider項目上創建FanoutRabbitConfig.java:
package com.atguigu.gulimall.rabbitmqconsumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 使用Fanout Exchang 扇型交換機* @author: jd* @create: 2024-06-25*/
@Configuration
public class FanoutRabbitConfig {//隊列名稱public static final String FANOUT_QUEUE_A ="fanout.a";public static final String FANOUT_QUEUE_B ="fanout.b";public static final String FANOUT_QUEUE_C ="fanout.c";public static final String FANOUT_EXCHANGE = "fanout.exchange";//創建隊列 FANOUT_QUEUE_A@Beanpublic Queue queueA(){return new Queue(FANOUT_QUEUE_A,true);}//創建隊列 FANOUT_QUEUE_B@Beanpublic Queue queueB(){return new Queue(FANOUT_QUEUE_B);}//創建隊列 FANOUT_QUEUE_C@Beanpublic Queue queueC(){return new Queue(FANOUT_QUEUE_C);}//創建交換機@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(FANOUT_EXCHANGE);}//綁定將多有的隊列都綁定到這個交換機@BeanBinding bindingExchangeA() {return BindingBuilder.bind(queueA()).to(fanoutExchange());}@BeanBinding bindingExchangeB() {return BindingBuilder.bind(queueB()).to(fanoutExchange());}@BeanBinding bindingExchangeC() {return BindingBuilder.bind(queueC()).to(fanoutExchange());}}
然后是寫一個接口用于推送消息,
/*** 發送消息給扇形交換機 扇型交換機* @return*/@GetMapping("/sendFanoutMessage")public String sendFanoutMessage(){String messageId = String.valueOf(UUID.randomUUID());String messageData = "message: testFanoutMessage ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put(QueueConstant.MESSAGE_ID,messageId);map.put(QueueConstant.MESSAGE_DATA,messageData);map.put(QueueConstant.MESSAGE_TIME,createTime);rabbitTemplate.convertAndSend(FanoutRabbitConfig.FANOUT_EXCHANGE,null,map);System.out.println("sendFanoutMessage() 執行成功");return "sendFanoutMessage is ok";}
接著在rabbitmq-consumer-true項目里加上消息消費類,
FanoutReceiverA.java:
FanoutReceiverB.java:
FanoutReceiverC.java:
package com.atguigu.gulimall.consumertrue.listener;import com.atguigu.gulimall.consumertrue.config.FanoutRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** 扇形交換機-隊列A的監聽器,及監聽到消息后的處理器* @author: jd* @create: 2024-06-25*/
@Component
@Slf4j
@RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_A)
public class FanoutReceiverA {@RabbitHandlerpublic void process(Map message){System.out.println("FanoutReceiverA消費者收到消息 : "+message.toString());}}
package com.atguigu.gulimall.consumertrue.listener;import com.atguigu.gulimall.consumertrue.config.FanoutRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** 扇形交換機-隊列B的監聽器,及監聽到消息后的處理器* @author: jd* @create: 2024-06-25*/
@Component
@Slf4j
@RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_B)
public class FanoutReceiverB {@RabbitHandlerpublic void process(Map message){System.out.println("FanoutReceiverB消費者收到消息 : "+message.toString());}
}
package com.atguigu.gulimall.consumertrue.listener;/*** @author: jd* @create: 2024-06-25*/import com.atguigu.gulimall.consumertrue.config.FanoutRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** 扇形交換機-隊列B的監聽器,及監聽到消息后的處理器* @author: jd* @create: 2024-06-25*/
@Component
@Slf4j
@RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_C)
public class FanoutReceiverC {@RabbitHandlerpublic void process(Map message){System.out.println("FanoutReceiverC消費者收到消息 : "+message.toString());}
}
然后加上扇型交換機的配置類,FanoutRabbitConfig.java(消費者真的要加這個配置嗎? 不需要的其實,理由在前面已經說過了)
package com.atguigu.gulimall.consumertrue.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 使用Fanout Exchang 扇型交換機* @author: jd* @create: 2024-06-25*/
@Configuration
public class FanoutRabbitConfig {//隊列名稱public static final String FANOUT_QUEUE_A ="fanout.a";public static final String FANOUT_QUEUE_B ="fanout.b";public static final String FANOUT_QUEUE_C ="fanout.c";public static final String FANOUT_EXCHANGE = "fanout.exchange";//創建隊列 FANOUT_QUEUE_A@Beanpublic Queue queueA(){return new Queue(FANOUT_QUEUE_A,true);}//創建隊列 FANOUT_QUEUE_B@Beanpublic Queue queueB(){return new Queue(FANOUT_QUEUE_B);}//創建隊列 FANOUT_QUEUE_C@Beanpublic Queue queueC(){return new Queue(FANOUT_QUEUE_C);}//創建交換機@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(FANOUT_EXCHANGE);}//綁定將多有的隊列都綁定到這個交換機@BeanBinding bindingExchangeA() {return BindingBuilder.bind(queueA()).to(fanoutExchange());}@BeanBinding bindingExchangeB() {return BindingBuilder.bind(queueB()).to(fanoutExchange());}@BeanBinding bindingExchangeC() {return BindingBuilder.bind(queueC()).to(fanoutExchange());}}
最后將rabbitmq-provider和rabbitmq-consumer項目都跑起來,調用下接口/sendFanoutMessage :
可以看到只要發送到 fanoutExchange 這個扇型交換機的消息, 三個隊列都綁定這個交換機,所以三個消息接收類都監聽到了這條消息。
到了這里其實三個常用的交換機的使用我們已經完畢了,那么接下來我們繼續講講消息的回調,其實就是消息確認(生產者推送消息成功,消費者接收消息成功)。
三、消息確認種類
RabbitMQ的消息確認有兩種。
一種是消息發送確認。這種是用來確認生產者將消息發送給交換器,交換器傳遞給隊列的過程中,消息是否成功投遞。發送確認分為兩步,一是確認是否到達交換器,二是確認是否到達隊列。
第二種是消費接收確認。這種是確認消費者是否成功消費了隊列中的消息。
消息確認的作用是什么?
為了防止消息丟失。消息丟失分為發送丟失和消費者處理丟失,相應的也有兩種確認機制。
先來一起學習一下:
A:消息發送確認
在rabbitmq-consumer項目的application.yml文件上,加上消息確認的配置項后:
server:port: 8021#數據源配置
spring:datasource:username: rootpassword: rooturl: jdbc:mysql://192.168.56.10:3306/gulimall_umsdriver-class-name: com.mysql.cj.jdbc.Driver#注冊到注冊中心cloud:nacos:discovery:server-addr: 127.0.0.1:8848application:name: rabbitmq-consumer#配置rabbitMq 服務器rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest#虛擬host 可以不設置,使用server默認hostvirtual-host: /publisher-returns: true #確認消息已發送到隊列(Queue) 這個在生產者模塊配置 這個后期再配置,這會還用不到publisher-confirm-type: correlated #確認消息已發送到交換機(Exchange) 這個在生產者模塊配置 這個后期再配置,這會還用不到logging:level:com.atguigu.gulimall: debug #調整product模塊日志的輸出模式是debug級別,這樣就能在控制臺看到dao包下的輸出日志了。
然后是配置相關的消息確認回調函數,RabbitConfig.java:
package com.atguigu.gulimall.rabbitmqconsumer.config;import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 配置相關的消息確認回調函數,RabbitConfig.java:* https://blog.csdn.net/qq_35387940/article/details/100514134** 先從總體的情況分析,推送消息存在四種情況:** ①消息推送到server,但是在server里找不到交換機* ②消息推送到server,找到交換機了,但是沒找到隊列* ③消息推送到sever,交換機和隊列啥都沒找到* ④消息推送成功* 具體哪些會觸發回調,分別又會觸發哪個函數,看下面的測試** @author: jd* @create: 2024-06-25*/
@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//設置開啟Mandatory,才能觸發回調函數,無論消息推送結果怎么樣都強制調用回調函數rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("ConfirmCallback: "+"相關數據:"+correlationData);System.out.println("ConfirmCallback: "+"確認情況:"+ack);System.out.println("ConfirmCallback: "+"原因:"+cause);}});rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("ReturnCallback: "+"消息:"+returnedMessage.getMessage());System.out.println("ReturnCallback: "+"回應碼:"+returnedMessage.getReplyCode());System.out.println("ReturnCallback: "+"回應信息:"+returnedMessage.getReplyText());System.out.println("ReturnCallback: "+"交換機:"+returnedMessage.getExchange());System.out.println("ReturnCallback: "+"路由鍵:"+returnedMessage.getRoutingKey());}});return rabbitTemplate;}
}
到這里,生產者推送消息的消息確認調用回調函數已經完畢。
可以看到上面寫了兩個回調函數,一個叫 ConfirmCallback ,一個叫 RetrunCallback;
那么以上這兩種回調函數都是在什么情況會觸發呢?
先從總體的情況分析,推送消息存在四種情況:
①消息推送到server,但是在server里找不到交換機
②消息推送到server,找到交換機了,但是沒找到隊列
③消息推送到sever,交換機和隊列啥都沒找到
④消息推送成功
那么我先寫幾個接口來分別測試和認證下以上4種情況,消息確認觸發回調函數的情況:
①消息推送到server,但是在server里找不到交換機 (是否到達交換機)
寫個測試接口,把消息推送到名為‘non-existent-exchange’的交換機上(這個交換機是沒有創建沒有配置的):
/*** ①消息推送到server,但是在server里找不到交換機** 寫個測試接口,把消息推送到名為‘non-existent-exchange’的交換機上(這個交換機是沒有創建沒有配置的)* 調用接口,查看rabbitmq-provuder項目的控制臺輸出情況(原因里面有說,沒有找到交換機'non-existent-exchange'):*在控制臺中* 調用后返回:http://localhost:8021/TestMessageAck*ConfirmCallback: 相關數據:null* ConfirmCallback: 確認情況:false* ConfirmCallback: 原因:channel error; protocol method: #method<channel.close>(reply-code=404,* reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)** 結論: ①這種情況觸發的是 ConfirmCallback 回調函數* @return*/@GetMapping("/TestMessageAck")public String TestMessageAck() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "message: non-existent-exchange test message ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map);return "ok";}
調用接口,查看rabbitmq-provuder項目的控制臺輸出情況(原因里面有說,沒有找到交換機’non-existent-exchange’):
結論: ①這種情況觸發的是 ConfirmCallback 回調函數。
②消息推送到server,找到交換機了,但是沒找到隊列 (是否到達隊列)
這種情況就是需要新增一個交換機,但是不給這個交換機綁定隊列,我來簡單地在DirectRabitConfig里面新增一個直連交換機,名叫‘lonelyDirectExchange’,但沒給它做任何綁定配置操作:
@BeanDirectExchange lonelyDirectExchange() {return new DirectExchange("lonelyDirectExchange");}
然后寫個測試接口,把消息推送到名為‘lonelyDirectExchange’的交換機上(這個交換機是沒有任何隊列配置的):
/*** ②消息推送到server,找到交換機了,但是沒找到隊列* 這種情況就是需要新增一個交換機,但是不給這個交換機綁定隊列,* 我來簡單地在DirectRabitConfig里面新增一個直連交換機,名叫‘lonelyDirectExchange’,但沒給它做任何綁定配置操作:** 然后寫個測試接口,把消息推送到名為‘lonelyDirectExchange’的交換機上(這個交換機是沒有任何隊列配置的):**可以看到這種情況,在控制臺中 兩個函數都被調用了;* 這種情況下,消息是推送成功到服務器了的,所以ConfirmCallback對消息確認情況是true;* 而在RetrunCallback回調函數的打印參數里面可以看到,消息是推送到了交換機成功了,但是在路由分發給隊列的時候,找不到隊列,所以報了錯誤 NO_ROUTE 。** 調用后返回:http://localhost:8021/TestMessageAck2* ReturnCallback: 回應碼:312* ReturnCallback: 回應信息:NO_ROUTE* ReturnCallback: 交換機:lonelyDirectExchange* ReturnCallback: 路由鍵:TestDirectRouting* ConfirmCallback: 相關數據:null* ConfirmCallback: 確認情況:true* ConfirmCallback: 原因:null** 結論:②這種情況觸發的是 ConfirmCallback和RetrunCallback兩個回調函數。* @return*/@GetMapping("/TestMessageAck2")public String TestMessageAck2() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "message: lonelyDirectExchange test message ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestDirectRouting", map); //lonelyDirectExchange這個交換機沒有和任何隊列做綁定,return "ok";}
調用接口,查看rabbitmq-provuder項目的控制臺輸出情況:
ConfirmCallback: 相關數據:null
ConfirmCallback: 確認情況:true
ConfirmCallback: 原因:null
ReturnCallback: 消息:(Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
ReturnCallback: 回應碼:312
ReturnCallback: 回應信息:NO_ROUTE
ReturnCallback: 交換機:lonelyDirectExchange
ReturnCallback: 路由鍵:TestDirectRouting
可以看到這種情況,兩個函數都被調用了;
這種情況下,消息是推送成功到服務器了的,所以ConfirmCallback對消息確認情況是true;
而在RetrunCallback回調函數的打印參數里面可以看到,消息是推送到了交換機成功了,但是在路由分發給隊列的時候,找不到隊列,所以報了錯誤 NO_ROUTE 。
結論:②這種情況觸發的是 ConfirmCallback和RetrunCallback兩個回調函數。
③消息推送到sever,交換機和隊列啥都沒找到
這種情況其實一看就覺得跟①很像,沒錯 ,③和①情況回調是一致的,所以不做結果說明了。
結論: ③這種情況觸發的是 ConfirmCallback 回調函數。
④消息推送成功
那么測試下,按照正常調用之前消息推送的接口就行,就調用下 /sendFanoutMessage接口,可以看到控制臺輸出:
ConfirmCallback: 相關數據:null
ConfirmCallback: 確認情況:true
ConfirmCallback: 原因:null
結論: ④這種情況觸發的是 ConfirmCallback 回調函數。
總結:
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){}通過設置這個參數,其中使用內部類進行實現,來記錄消息發送到交換器Exchange后觸發回調。
(使用該功能需要開啟確認, publisher-confirm-type: correlated #確認消息已發送到交換機(Exchange) 這個在生產者模塊配置
)
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){})通過設置這個參數,如果消息從交換器發送到對應隊列失敗時觸發(比如根據發送消息時指定的routingKey找不到隊列時會觸發)
( publisher-returns: true #確認消息已發送到隊列(Queue) 這個在生產者模塊配置 )
以上是生產者推送消息的消息確認 回調函數的使用介紹(可以在回調函數根據需求做對應的擴展或者業務數據處理)。
B: 消費接收確認
接下來我們繼續, 消費者接收到消息的消息確認機制。
(1)確認模式
AcknowledgeMode.NONE:不確認
AcknowledgeMode.AUTO:自動確認
AcknowledgeMode.MANUAL:手動確認
spring-boot中配置方法:
spring.rabbitmq.listener.simple.acknowledge-mode = manual
(2)手動確認
未確認的消息數
上圖為channel中未被消費者確認的消息數。
通過RabbitMQ的host地址加上默認端口號15672訪問管理界面。
(2.1)成功確認
void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag:該消息的index
multiple:是否批量. true:將一次性ack所有小于deliveryTag的消息。
消費者成功處理后,調用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法對消息進行確認。
(2.2)失敗確認
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;
deliveryTag:該消息的index。
multiple:是否批量. true:將一次性拒絕所有小于deliveryTag的消息。
requeue:被拒絕的是否重新入隊列。
void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:該消息的index。
requeue:被拒絕的是否重新入隊列。
channel.basicNack 與 channel.basicReject 的區別在于basicNack可以批量拒絕多條消息,而basicReject一次只能拒絕一條消息。
①自動確認, 這也是默認的消息確認情況。 AcknowledgeMode.NONE
RabbitMQ成功將消息發出(即將消息成功寫入TCP Socket)中立即認為本次投遞已經被正確處理,不管消費者端是否成功處理本次投遞。
所以這種情況如果消費端消費邏輯拋出異常,也就是消費端沒有處理成功這條消息,那么就相當于丟失了消息。
一般這種情況我們都是使用try catch捕捉異常后,打印日志用于追蹤數據,這樣找出對應數據再做后續處理。② 根據情況確認, 這個不做介紹
③ 手動確認 , 這個比較關鍵,也是我們配置接收消息確認機制時,多數選擇的模式。
消費者收到消息后,手動調用basic.ack/basic.nack/basic.reject后,RabbitMQ收到這些消息后,才認為本次投遞成功。
basic.ack用于肯定確認
basic.nack用于否定確認(注意:這是AMQP 0-9-1的RabbitMQ擴展)
basic.reject用于否定確認,但與basic.nack相比有一個限制:一次只能拒絕單條消息 消費者端以上的3個方法都表示消息已經被正確投遞,但是basic.ack表示消息已經被正確處理。
而basic.nack,basic.reject表示沒有被正確處理:著重講下reject,因為有時候一些場景是需要重新入列的。channel.basicReject(deliveryTag, true); 拒絕消費當前消息,如果第二參數傳入true,就是將數據重新丟回隊列里,那么下次還會消費這消息。設置false,就是告訴服務器,我已經知道這條消息數據了,因為一些原因拒絕它,而且服務器也把這個消息丟掉就行。 下次不想再消費這條消息了。使用拒絕后重新入列這個確認模式要謹慎,因為一般都是出現異常的時候,catch異常再拒絕入列,選擇是否重入列。但是如果使用不當會導致一些每次都被你重入列的消息一直消費-入列-消費-入列這樣循環,會導致消息積壓。順便也簡單講講 nack,這個也是相當于設置不消費某條消息。channel.basicNack(deliveryTag, false, true);
第一個參數依然是當前消息到的數據的唯一id;
第二個參數是指是否針對多條消息;如果是true,也就是說一次性針對當前通道的消息的tagID小于當前這條消息的,都拒絕確認。
第三個參數是指是否重新入列,也就是指不確認的消息是否重新丟回到隊列里面去。同樣使用不確認后重新入列這個確認模式要謹慎,因為這里也可能因為考慮不周出現消息一直被重新丟回去的情況,導致積壓。
看了上面這么多介紹,接下來我們一起配置下,看看一般的消息接收 手動確認是怎么樣的。
方式一:通過配置類的方式實現
此時還不需要加下面的配置,因為這種方式是通過 配置類注解來配置的手動消費者確認,再下面的方式二則是通過yml的配置來設置的消費者手動確認,我們先來看方式一是怎么實現的
??????
在消費者項目里,
新建MessageListenerConfig.java上添加代碼相關的配置代碼:
package com.atguigu.gulimall.consumertrue.config;import com.atguigu.gulimall.consumertrue.listener.MyAckReceiver;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 一般的消息接收 手動確認是怎么樣的,消費者的手動消息確認,配置類* https://blog.csdn.net/qq_35387940/article/details/100514134* @author: jd* @create: 2024-06-25*/
//@Configuration //注釋掉這個注解,這樣第一種MQ消費者的確認模式就失效了,以為你這個里面配置著對某個隊列的監控呢。 第二種MQ的配置方式的話和這個的區別,不用這種配置類,而是在yml中配置東西
public class MessageListenerConfig {@Autowiredprivate CachingConnectionFactory connectionFactory;@Autowiredprivate MyAckReceiver myAckReceiver;//消息接收處理類@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer(){SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);container.setConcurrentConsumers(1);container.setMaxConcurrentConsumers(1);container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默認是自動確認,這里改為手動確認消息//設置一個隊列,在這里設置了隊列,container.setQueueNames("TestDirectQueue");//如果同時設置多個如下: 前提是隊列都是必須已經創建存在的// container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");//另一種設置隊列的方法,如果使用這種情況,那么要設置多個,就使用addQueues//container.setQueues(new Queue("TestDirectQueue",true));//container.addQueues(new Queue("TestDirectQueue2",true));//container.addQueues(new Queue("TestDirectQueue3",true));//這里設置了監聽器,因為上面設置了隊列,所以在監聽器中就不需要用監聽器的注解了 。container.setMessageListener(myAckReceiver);return container;}
}
對應的手動確認消息監聽類,MyAckReceiver.java(手動確認模式需要實現 ChannelAwareMessageListener):
//之前的相關監聽器可以先注釋掉,以免造成多個同類型監聽器都監聽同一個隊列。【比如我之前用的RabbitMQListener 、RabbitMQListener2 為了讓其失效,直接注釋掉其中的//@RabbitListener(queues = “TestDirectQueue”)//監聽的隊列名稱 TestDirectQueue】 這個注解即可,這樣這個監聽器就無法監聽相關隊列了。
MyAckReceiver.java
package com.atguigu.gulimall.consumertrue.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Map;/*** 對應的手動確認消息監聽類,MyAckReceiver.java(手動確認模式需要實現 ChannelAwareMessageListener):* //之前的相關監聽器可以先注釋掉,以免造成多個同類型監聽器都監聽同一個隊列。** 注意:因為這里是在MessageListenerConfig 類中指定了是要監聽哪個隊列,以及消息的確認機制,所以這里不需要使用* @RabbitListener(queues = "TestDirectQueue") 和 @RabbitHandler(isDefault = true)注解了* @author: jd* @create: 2024-06-25*/@Component
public class MyAckReceiver implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {byte[] body = message.getBody();ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(body));Map<String,String> msgMap = (Map<String,String>)objectInputStream.readObject();String messageId = msgMap.get("messageId");String messageData = msgMap.get("messageData");String createTime = msgMap.get("createTime");objectInputStream.close();System.out.println(" MyAckReceiver messageId:"+messageId+" messageData:"+messageData+" createTime:"+createTime);System.out.println("消費的主題隊列來自:"+message.getMessageProperties().getConsumerQueue());
// 消費者成功處理后,調用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法對消息進行確認。channel.basicAck(deliveryTag, true); // deliveryTag:該消息的index multiple:是否批量. true:將一次性ack所有小于deliveryTag的消息。 第二個參數,手動確認可以被批處理, 當該參數為 true 時,則可以一次性確認 delivery_tag 小于等于傳入值的所有消息
// channel.basicReject(deliveryTag, true);//第二個參數,true會重新放回隊列,所以需要自己根據業務邏輯判斷什么時候使用拒絕} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}
}
這時,先調用接口/sendDirectMessage, 給直連交換機TestDirectExchange 的隊列TestDirectQueue 推送一條消息,可以看到監聽器正常消費了下來:
第一次驗證我們發現,消費者沒有消費掉直流交換機中的消息,而且也在直流隊列中積壓了起來,
這是由于我們的配置類忘記加了 @Configuration 注解了,所以此時這個不是配置類,也就是這里對MQ的配置不會生效,所以加上之后 ,我們再去試試:
可看到下圖 消費成功
配置類中 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默認是自動確認,這里改為手動確認消息 是發揮作用的關鍵;
方式二:通過yml配置來完成消費者確認
特別注意:因為這里我們要使用yml配置來實現,所以我們需要關閉配置類的作用,使之失效,我這里直接把@Configuration 給注釋掉 了,這樣配置類不會起作用了!!_
第二種方式正式開始啦 (#.#)
首先我們來在yml中開啟手動確認的配置
server:port: 8022#數據源配置
spring:datasource:url: jdbc:mysql://192.168.56.10:3306/gulimall_umsusername: rootpassword: rootdriver-class-name: com.mysql.cj.jdbc.Driver#配置nacoscloud:nacos:discovery:server-addr: 127.0.0.1#配置服務名稱application:name: rabbitmq-consumer-true# 配置rabbitMq 服務器#spring.application.name=rabbitmq-consumer-truerabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest#虛擬host 可以不設置,使用server默認hostvirtual-host: /listener: #這個在測試消費多個消息的時候,不能有下面這些配置,否則只能消費一個消息后就不繼續消費了simple:acknowledge-mode: manual #指定MQ消費者的確認模式是手動確認模式 這個在消費者者模塊配置prefetch: 1 #一次只能消費一條消息 這個在消費者者模塊配置#配置日志輸出級別
logging:level:com.atguigu.gulimall: debug#配置日志級別
其中的 幾行是開啟的關鍵
listener: #這個在測試消費多個消息的時候,不能有下面這些配置,否則只能消費一個消息后就不繼續消費了
simple:
acknowledge-mode: manual #指定MQ消費者的確認模式是手動確認模式 這個在消費者者模塊配置
prefetch: 1 #一次只能消費一條消息 這個在消費者者模塊配置
此處直接用接口來當生產者了;
然后我們在生產者模塊用于放消息的controller中增加一個放消息的請求方法,用于往隊列里面連續放入5個放消息
SendMessageController.java
/*** 原文鏈接:https://blog.csdn.net/weixin_45724872/article/details/119655638* 將信號放入MQ* @param message* @return*/@PostMapping("/msg/muscle")public String receiveMuscleSign(@RequestBody String message) {//處理業務for (int i = 1; i <= 5; i++) {rabbitTemplate.convertAndSend("muscle_fanout_exchange","",message+i);}return " receiveMuscleSign ok";}
開發消費者
此處用一個類下的兩個方法來模擬2個消費者
package com.atguigu.gulimall.consumertrue.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/****此處用一個類下的兩個方法來模擬2個消費者*原文鏈接:https://blog.csdn.net/weixin_45724872/article/details/119655638原文鏈接:https://blog.csdn.net/weixin_45724872/article/details/119655638* @author: jd* @create: 2024-06-25*/
@Component
public class MyConsumerListener {@RabbitListener(bindings = {@QueueBinding(value = @Queue("consumer_queue_1"),//綁定交換機exchange = @Exchange(value = "muscle_fanout_exchange", type = "fanout"))})public void consumer1(String msg, Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("消費者1 => " + msg);//channel.basicAck(deliveryTag, false); // 因為 yml中 prefetch 設置為 1(或未設置,因為默認可能是 0,表示無限制,但這不是推薦的做法),RabbitMQ 將只發送一個消息給消費者,并等待該消息的確認。在這種情況下,// 如果你注釋掉了 channel.basicAck,消費者將只能消費一個消息,并且不會收到下一個消息,直到你發送確認或關閉連接。 所以對于消息隊列中的五個消息只能銷費一個,除非你手動確認,否則不會再消費其他的消息} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}@RabbitListener(bindings = {@QueueBinding(value = @Queue("consumer_queue_2"),//綁定交換機exchange = @Exchange(value = "muscle_fanout_exchange", type = "fanout"))})public void consumer2(String msg,Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("消費者2 => " + msg);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}}
注意一點,消費者1的手動ACK我們是注釋掉了
而消費者2的手動ACK我們是開著的
原因是為了對照試驗
我們期望的情況是:一共5條消息,消費者1和2都一一處理;
處理完畢后再取下一條,否則不讓取;
那么按我們代碼這樣寫;
消費者1只能取一條 (只是處理一條的原因,)
而消費者2則能取滿5條(因為消費者1的手動ACK被我們注釋了,此處又不是自動ACK)
消費者1只是處理一條的原因:下圖中的perfetchCount有問題,我們實際上配置的是prefetch: 1 ,我們直接按照這個配置來理解就行
消費者一,就是注釋了對消息消費之后的確認回饋給RabbitMQ的設置,所以消費者對五條消息中消費到第一個之后,因為我們在yml中又配置了每次消費一條,而且也是手動確認的,所以MQ消費到這一條之后,就在那等著手動調用ack方法來完成的確認ack的反饋,結果我們這里注釋了,所以就一直等不到第一條消息的回饋,所以就會一直等待,下面的4條消息也就無法繼續消費了,
相反,消費者二就不一樣了,他有消費完每一條消息之后,都調用了手動ack的回饋,所以可以消費5條消息,都消息完。
以下是實驗截圖
MQ 的初始狀態:
首先用postman發送請求
看下圖,生產者發送了5條消息,并得到了成功推送到了交換機和隊列的回饋
接下來我們步入正題:看消費者里面,消費者1只是消費了一條,消費者2消費了全部的5條消息;
結果和我們預想的是一致的;
我們在看看MQ的管理頁面來確認
可以看到,消費者2已經搞完了,而消費者1那邊卡住了(消費者一消費了一條,但是在等待回饋,還剩余4條都沒被消費,在等待消費)
我在實驗的過程中,因為消費者1中的消息堆積了,如果再次發送5條消息到扇形交換機中,那隊列1中會積累到9條待消費的,1條等待反饋的,10條總共的,我們可以實驗一下子:
結果和我們預想的一樣,那我們如何將這些積壓的消息給去掉呢 ?
我自己試出了兩種方式,最初試的直接重啟服務,這樣是無效的,因為進入隊列的不被消費會一直在隊列里面 。
下面是2種處理方法:
第一種是最直接的方法,直接把確認那行的代碼給放開,這樣這個消費者1 就會把隊列1中積壓的那些給消費掉了
第二種 我們將yml中的手動確認配置注釋掉,這樣就默認是自動確認了,這樣我每次從postman中發送5條消息到扇形交換機,分發到兩個隊列之后,兩個消費者都會一直可以消費,因為沒消費一個都會自動確認回饋,不用等待了,這樣也是可以的
我們實驗如下:
實驗1:
我們先把消費者1中注釋的手動回饋給放開
可見console中 ,對于積壓的消息直接給消費掉了。
實驗2:
我們將消費者1中的手動反饋,給繼續注釋掉,發送2次 postman;
造成積壓
我把yml中的手動消費者確認,改成自動的,也就是注釋掉,可以看到,重啟消費者模塊后,積壓的也被消費了
注釋配置:
重啟后,看控制臺: 很明顯啟動后,積壓的消息也被消費了,
在MQ控制臺中也可以看到,積壓消息被消費啦
關于手動確認的一些方法
細心的小伙伴可能發現了我們在消費者的catch處寫了這樣一行代碼
channel.basicReject(deliveryTag, false);
以下是解釋
一般是有3種確認的,其中1種是正確確認,另外2種是錯誤確認;
reject:只能否定一條消息
nack:可以否定一條或者多條消息
而錯誤確認的這兩個,都有一個屬性
boolean requeue
當它是true的時候,表示重新入隊;
當它是false的時候,則表示拋棄掉;
使用拒絕后重新入列這個確認模式要謹慎,因為觸發錯誤確認一般都是出現異常的時候,那么就可能導致死循環,即不斷的入隊-消費-報錯-重新入隊…;這將導致消息積壓,萬一就炸了…
實驗錯誤確認
我們將上述的消費者代碼加一行代碼;
此處只改動了消費者1,消費者2不變
新增一條拋異常的語句
int num = 1/0;
package com.tubai;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;@Component
public class MyConsumer {@RabbitListener(bindings = {@QueueBinding(value = @Queue("consumer_queue_1"),//綁定交換機exchange = @Exchange(value = "muscle_fanout_exchange", type = "fanout"))})public void consumer1(String msg,Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("消費者1 => " + msg);int num = 1/0;channel.basicAck(deliveryTag, false); //第二個參數,手動確認可以被批處理,當該參數為 true 時} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}@RabbitListener(bindings = {@QueueBinding(value = @Queue("consumer_queue_2"),//綁定交換機exchange = @Exchange(value = "muscle_fanout_exchange", type = "fanout"))})public void consumer2(String msg,Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("消費者2 => " + msg);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}
}
運行結果
可以看到我們的消費者1也正常了,因為我們是先打印后確認,因此1~5也會被打印出來;
如果重復入隊…那么我們的程序就會死循環了,瘋狂打印,各位可以自己試試;但是容易把內存占滿O。。
本篇文章書寫不易,自己打了好久,大家認可的話,或者開啟了新認知,請給個點贊。收藏哦 (#.#) 謝謝大家!
參考文章也寫的超級好,大家也可都學習學習,一起進步
Springboot 整合RabbitMq ,用心看完這一篇就夠了
RabbitMQ的消息確認機制
SpringBoot集成RabbitMq 手動ACK
RabbitMQ控制界面詳解