【RabbitMQ實戰】Springboot 整合RabbitMQ組件,多種編碼示例,帶你實踐 看完這一篇就夠了

提示:文章寫完后,目錄可以自動生成,如何生成可參考右邊的幫助文檔

文章目錄

  • 前言
  • 一、對RabbitMQ管理界面深入了解
    • 1、在這個界面里面我們可以做些什么?
  • 二、編碼練習
    • (1)使用direct exchange(直連型交換機)
    • (2)使用Topic Exchange 主題交換機。
    • (3)使用Fanout Exchang 扇型交換機。
  • 三、消息確認種類
    • A:消息發送確認
    • B: 消費接收確認
      • 方式一:通過配置類的方式實現
      • 方式二:通過yml配置來完成消費者確認


前言

該篇文章內容較多,包括有RabbitMQ一些理論介紹,provider消息推送實例,consumer消息消費實例,Direct、Topic、Fanout多種交換機的使用,同時簡單介紹對消息回調、手動確認等。


這里面的每一種使用都包含實際編碼示例,供大家理解,共同進步,如有不足。還請指教。

一、對RabbitMQ管理界面深入了解

裝完rabbitMq,啟動MQ后,本地瀏覽器輸入http://ip:15672/ ,看到一個簡單后臺管理界面;
在這里插入圖片描述
對于其中的一些具體指標的解釋:

  • Ready: 待消費的消息總數。
  • Unacked: 待應答的消息總數。
  • Total:總數 Ready+Unacked。
  • Publish: producter pub消息的速率。
  • Publisher confirm: broker確認pub消息的速率。
  • Deliver(manual ack): customer手動確認的速率。
  • Deliver( auto ack): customer自動確認的速率。
  • Consumer ack: customer正在確認的速率。
  • Redelivered: 正在傳遞’redelivered’標志集的消息的速率。
  • Get (manual ack): 響應basic.get而要求確認的消息的傳輸速率。
  • Get (auto ack): 響應于basic.get而發送不需要確認的消息的速率。
  • Return: 將basic.return發送給producter的速率。
  • Disk read: queue從磁盤讀取消息的速率。
  • Disk write: queue從磁盤寫入消息的速率。

Connections:client的tcp連接的總數。
Channels:通道的總數。
Exchange:交換器的總數。
Queues:隊列的總數。
Consumers:消費者的總數。

更詳細的可見:
版權聲明:本文為博主原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。
本文鏈接:https://blog.csdn.net/qq_19343089/article/details/135724659

1、在這個界面里面我們可以做些什么?

可以手動創建虛擬host,創建用戶,分配權限,創建交換機,創建隊列等等,還有查看隊列消息,消費效率,推送效率等等。

以上這些管理界面的操作在這篇暫時不做擴展描述,我想著重介紹后面實例里會使用到的。

首先先介紹一個簡單的一個消息推送到接收的流程,提供一個簡單的圖:
在這里插入圖片描述
黃色的圈圈就是我們的消息推送服務,將消息推送到 中間方框里面也就是 rabbitMq的服務器,然后經過服務器里面的交換機、隊列等各種關系(后面會詳細講)將數據處理入列后,最終右邊的藍色圈圈消費者獲取對應監聽的消息。

常用的交換機有以下三種,因為消費者是從隊列獲取信息的,隊列是綁定交換機的(一般),所以對應的消息推送/接收模式也會有以下幾種:

- Direct Exchange

直連型交換機,根據消息攜帶的路由鍵將消息投遞給對應隊列。

大致流程,有一個隊列綁定到一個直連交換機上,同時賦予一個路由鍵 routing key 。
然后當一個消息攜帶著路由值為X,這個消息通過生產者發送給交換機時,交換機就會根據這個路由值X去尋找綁定值也是X的隊列。

- Fanout Exchange

扇型交換機,這個交換機沒有路由鍵概念,就算你綁了路由鍵也是無視的。 這個交換機在接收到消息后,會直接轉發到綁定到它上面的所有隊列。

- Topic Exchange

主題交換機,這個交換機其實跟直連交換機流程差不多,但是它的特點就是在它的路由鍵和綁定鍵之間是有規則的。
簡單地介紹下規則:

(星號) * 用來表示一個單詞 (必須出現的)
(井號) # 用來表示任意數量(零個或多個)單詞
通配的綁定鍵是跟隊列進行綁定的,舉個小例子
隊列Q1 綁定鍵為 .TT. 隊列Q2綁定鍵為 TT.#
如果一條消息攜帶的路由鍵為 A.TT.B,那么隊列Q1將會收到;
如果一條消息攜帶的路由鍵為TT.AA.BB,那么隊列Q2將會收到;

主題交換機是非常強大的,為啥這么膨脹?
當一個隊列的綁定鍵為 “#”(井號) 的時候,這個隊列將會無視消息的路由鍵,接收所有的消息。
當 * (星號) 和 # (井號) 這兩個特殊字符都未在綁定鍵中出現的時候,此時主題交換機就擁有的直連交換機的行為。
所以主題交換機也就實現了扇形交換機的功能,和直連交換機的功能。

另外還有 Header Exchange 頭交換機 ,Default Exchange 默認交換機,Dead Letter Exchange 死信交換機,這幾個該篇暫不做講述。

好了,一些簡單的介紹到這里為止, 接下來我們來一起編碼。

二、編碼練習

本次實例教程需要創建2個springboot項目,一個 rabbitmq-provider (生產者),一個rabbitmq-consumer(消費者)。【補充說明:我這里模塊名稱創建錯了,其中生產者我創建成了rabbitmq-consumer,消費者我這里叫做 rabbitmq-consumer-true】

首先創建 rabbitmq-provider,

pom.xml里用到的jar依賴:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.atguigu.gulimall</groupId><artifactId>rabbitmq-consumer</artifactId><version>0.0.1-SNAPSHOT</version><name>rabbitmq-consumer</name><description>RabbitMQ生產者模塊</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>1.8</java.version><!--        <spring-cloud.version>2021.0.4</spring-cloud.version>--><spring-cloud.version>2021.0.1</spring-cloud.version></properties><dependencies><dependency><groupId>com.atguigu.gulimall</groupId><artifactId>gulimall-common</artifactId><version>0.0.1-SNAPSHOT</version><exclusions><exclusion><artifactId>servlet-api</artifactId><groupId>javax.servlet</groupId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

然后application.yml:

server:port: 8021#數據源配置
spring:datasource:username: rootpassword: rooturl: jdbc:mysql://192.168.56.10:3306/gulimall_umsdriver-class-name: com.mysql.cj.jdbc.Driver#注冊到注冊中心cloud:nacos:discovery:server-addr: 127.0.0.1:8848application:name: rabbitmq-consumer#配置rabbitMq 服務器rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest#虛擬host 可以不設置,使用server默認hostvirtual-host: /
#    publisher-returns: true  #確認消息已發送到隊列(Queue)  這個在生產者模塊配置 這個后期再配置,這會還用不到
#    publisher-confirm-type: correlated   #確認消息已發送到交換機(Exchange) 這個在生產者模塊配置 這個后期再配置,這會還用不到logging:level:com.atguigu.gulimall: debug   #調整product模塊日志的輸出模式是debug級別,這樣就能在控制臺看到dao包下的輸出日志了。

一定要注意 要注意 要注意!!!!!
里面的virtual-host 是指RabbitMQ控制臺中的下面的位置(我理解是指你的隊列和交換機在哪個分組下面,可以為每一個項目創建單獨的分組,但是在此我沒有單獨創建,直接放到了 / 下面)
在這里插入圖片描述
那么怎么建一個單獨的host呢? 假如我就是想給某個項目接入,使用一個單獨host,順便使用一個單獨的賬號,就好像我文中配置的 root 這樣。

其實也很簡便:

virtual-host的創建:
在這里插入圖片描述

賬號user的創建:
在這里插入圖片描述

然后記得給賬號分配權限,指定使用某個virtual host:
指定給自己剛剛為某個項目單獨創建的virtual host。
在這里插入圖片描述

其實還可以特定指定交換機使用權等等:
在這里插入圖片描述

(1)使用direct exchange(直連型交換機)

創建DirectRabbitConfig.java(對于隊列和交換機持久化以及連接使用設置,在注釋里有說明,后面的不同交換機的配置就不做同樣說明了):

package com.atguigu.gulimall.rabbitmqconsumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Queue;/*** 這里使用的是direct exchange(直連型交換機),  也就是交換機和隊列是一對一關系* 模擬 rabbitmq-provider (生產者),這里模塊名字寫錯了。這個是消息生產者** @author: jd* @create: 2024-06-24*/
@Configuration
public class DirectRabbitConfig {// 聲明需要使用的交換機/路由Key/隊列的名稱public static final String DEFAULT_EXCHANGE = "TestDirectExchange";public static final String DEFAULT_ROUTE = "TestDirectRouting";public static final String DEFAULT_QUEUE = "TestDirectQueue";// 聲明交換機,需要幾個聲明幾個,這里就一個@Beanpublic DirectExchange directExchange(){return new DirectExchange(DEFAULT_EXCHANGE);}//創建隊列//隊列 起名:TestDirectQueue@Beanpublic Queue TestDirectQueue(){// durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效// exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后隊列即被刪除。此參考優先級高于durable// autoDelete:是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。//   return new Queue("TestDirectQueue",true,true,false);//一般設置一下隊列的持久化就好,其余兩個就是默認falsereturn new Queue(DEFAULT_QUEUE,true);}//綁定交換機和隊列,并指定路由鍵//綁定  將隊列和交換機綁定, 并設置用于匹配鍵:TestDirectRoutingBinding bindingDirect(){return BindingBuilder.bind(TestDirectQueue()).to(directExchange()).with(DEFAULT_ROUTE);}/*** 這個是做什么用的 ,為了后面 生產者確認那,找到交換機,找不到隊列用的,* @return*/@BeanDirectExchange lonelyDirectExchange() {return new DirectExchange("lonelyDirectExchange");}}

然后寫個簡單的接口進行消息推送(根據需求也可以改為定時任務等等,具體看需求),SendMessageController.java:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/***  模擬 rabbitmq-provider (生產者) 這里模塊名字寫錯了。這個是消息生產者,一般消息的生產者會直接在業務層調用,*  不會單獨的搞一個消息生產者,這里因為沒有業務調用,去調用這個MQ的生產者,所以這里直接創建一個模塊模擬消息生產者** 發送消息控制器(MQ入消息的入口)* //原文鏈接:https://blog.csdn.net/qq_35387940/article/details/100514134* @author: jd* @create: 2024-06-24*/
@RestController
public class SendMessageController {@AutowiredRabbitTemplate rabbitTemplate;  //使用RabbitTemplate,這提供了接收/發送等等方法/*** 通過postman發送消息給消息隊列-直流交換機* @return*/@GetMapping("/sendDirectMessage")String sendDirectMessage(){String messageId = String.valueOf(UUID.randomUUID());String messageData = "test message, hello!";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String,Object> map=new HashMap<>();map.put("messageId",messageId);map.put("messageData",messageData);
//        map.put("messageData","666666");map.put("createTime",createTime);//將消息攜帶綁定鍵值:TestDirectRouting 發送到交換機TestDirectExchangerabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);//        //生產者發送字符串類型消息,則后面的消息消費者,也需要接受字符串類型的入參進行消費
//        rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", "77777");System.out.println("調用完畢");return "ok";}}

把rabbitmq-provider項目運行,調用下接口:

在這里插入圖片描述

在這里插入圖片描述
因為我們目前還沒弄消費者 rabbitmq-consumer,消息沒有被消費的,我們去rabbitMq管理頁面看看,是否推送成功:(我這里發送了三次,所以有三個消息積壓了)
在這里插入圖片描述

再看看隊列(界面上的各個英文項代表什么意思,可以自己查查哈,對理解還是有幫助的):
在這里插入圖片描述

很好,消息已經推送到rabbitMq服務器上面了。

接下來,創建rabbitmq-consumer項目:

pom.xml里的jar依賴:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.atguigu.gulimall</groupId><artifactId>rabbitmq-consumer-true</artifactId><version>0.0.1-SNAPSHOT</version><name>rabbitmq-consumer-true</name><description>RabbitMQ消費者模塊</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>1.8</java.version><!--        <spring-cloud.version>2021.0.4</spring-cloud.version>--><spring-cloud.version>2021.0.1</spring-cloud.version></properties><dependencies><dependency><groupId>com.atguigu.gulimall</groupId><artifactId>gulimall-common</artifactId><version>0.0.1-SNAPSHOT</version><exclusions><exclusion><artifactId>servlet-api</artifactId><groupId>javax.servlet</groupId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

然后是 application.yml:

server:port: 8022#數據源配置
spring:datasource:url: jdbc:mysql://192.168.56.10:3306/gulimall_umsusername: rootpassword: rootdriver-class-name:  com.mysql.cj.jdbc.Driver#配置nacoscloud:nacos:discovery:server-addr: 127.0.0.1#配置服務名稱application:name: rabbitmq-consumer-true# 配置rabbitMq 服務器#spring.application.name=rabbitmq-consumer-truerabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest#虛擬host 可以不設置,使用server默認hostvirtual-host: /
#    listener:  #這個在測試消費多個消息的時候,不能有下面這些配置,否則只能消費一個消息后就不繼續消費了
#      simple:
#        acknowledge-mode: manual  #指定MQ消費者的確認模式是手動確認模式  這個在消費者者模塊配置
#        prefetch: 1 #一次只能消費一條消息   這個在消費者者模塊配置#配置日志輸出級別
logging:level:com.atguigu.gulimall: debug#配置日志級別

然后一樣,創建DirectRabbitConfig.java(消費者單純的使用,其實可以不用添加這個配置,直接建后面的監聽就好,使用注解來讓監聽器監聽對應的隊列即可。配置上了的話,其實消費者也是生成者的身份,也能推送該消息。):

package com.atguigu.gulimall.consumertrue.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/***   消費者配置類**  原文鏈接:https://blog.csdn.net/qq_35387940/article/details/100514134* 創建DirectRabbitConfig.java  關于隊列的配置只是消息的生產者中配置即可。這個消費者不用配置,配置了的話,就也可以當成生產者了* (消費者單純的使用,其實可以不用添加這個配置,直接建后面的監聽就好,* 使用注解來讓監聽器監聽對應的隊列即可。配置上了的話,其實消費者也是生成者的身份,也能推送該消息。):** @author: jd* @create: 2024-06-25*/
@Configuration
public class DirectRabbitConfig {// 聲明需要使用的交換機/路由Key/隊列的名稱public static final String DEFAULT_EXCHANGE = "TestDirectExchange";public static final String DEFAULT_ROUTE = "TestDirectRouting";public static final String DEFAULT_QUEUE = "TestDirectQueue";//隊列 起名:TestDirectQueue@Beanpublic Queue TestDirectQueue() {return new Queue(DEFAULT_QUEUE,true);}//Direct交換機 起名:TestDirectExchange@BeanDirectExchange TestDirectExchange() {return new DirectExchange(DEFAULT_EXCHANGE);}//綁定  將隊列和交換機綁定, 并設置用于匹配鍵:TestDirectRouting@BeanBinding bindingDirect() {return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with(DEFAULT_ROUTE);}}

然后是創建消息接收監聽類,RabbitMQListener.java:

package com.atguigu.gulimall.consumertrue.listener;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** 消息消費監聽類* @author: jd* @create: 2024-06-25*/
@Component
@Slf4j
@RabbitListener(queues = "TestDirectQueue")//監聽的隊列名稱 TestDirectQueue
public class RabbitMQListener {/*** 當消息發送者發送的是Map的時候,通過這個消息處理器進行處理* @param testMessage*/@RabbitHandler(isDefault = true)public void process(Map testMessage) {System.out.println("RabbitMQListener消費者收到消息  : "+testMessage.toString());}/*** 當消息發送者發送的是String類型的時候,用這個監聽處理器去接受消息并處理* @param testMessage*//* @RabbitHandler(isDefault = true)public void process(String testMessage) {System.out.println("DirectReceiver消費者收到消息  : "+testMessage);//正常開發中,會在消費到消息之后,開始做一些業務處理//模擬業務處理//業務開始String str = testMessage + "--消費成功";System.out.println("業務處理完畢"+str);//業務結束}*/}

然后將rabbitmq-consumer-true項目運行起來,可以看到把之前推送的那條消息消費下來了:
在這里插入圖片描述
然后可以再繼續調用rabbitmq-consumer項目的推送消息接口,可以看到消費者即時消費消息:
在這里插入圖片描述
消費下來了
在這里插入圖片描述
那么直連交換機既然是一對一,那如果咱們配置多臺監聽綁定到同一個直連交互的同一個隊列,會怎么樣?
在這里插入圖片描述
消費的結果如下:
在這里插入圖片描述

可以看到是實現了輪詢的方式對消息進行消費,而且不存在重復消費。

(2)使用Topic Exchange 主題交換機。

在rabbitmq-consume項目里面創建TopicRabbitConfig.java:

package com.atguigu.gulimall.rabbitmqconsumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 使用Topic Exchange 主題交換機。** @author: jd* @create: 2024-06-25*/
@Configuration
public class TopicRabbitConfig {//設置綁定鍵public static final String man = "topic.man";public static final String woman = "topic.woman";public static final String TOPIC_EXCHANGE = "topicExchange";//創建隊列/*** 第一個主題隊列** @return*/@Beanpublic Queue firstQueue() {return new Queue(man);}/*** 第二個主題隊列** @return*/@Beanpublic Queue secondQueue() {return new Queue(woman);}/*** 創建一個主題交換機** @return TopicExchange*/@BeanTopicExchange exchange() {return new TopicExchange(TOPIC_EXCHANGE);}/*** //將firstQueue和topicExchange綁定,而且綁定的鍵值為topic.man* //這樣只要是消息攜帶的路由鍵是topic.man,才會分發到該隊列** @return*/@BeanBinding bindingExchangeMessageForFirstQueue() {return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);}/*** //將secondQueue和topicExchange綁定,而且綁定的鍵值為用上通配路由鍵規則topic.#* // 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發到該隊列** @return*/@BeanBinding bindingExchangeMessageForSecondQueue() {return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");}}

然后添加多2個接口,用于推送消息到主題交換機:


//    然后添加多2個接口,用于推送消息到主題交換機找那個,再主題交換機中通過設置的路由鍵來推送到主題為topic.man的隊列中以供消費
//    https://blog.csdn.net/qq_35387940/article/details/100514134/*** 用于向MQ發送攜帶topic.man路由鍵的消息* @return*/@GetMapping("/sendTopicMessageToMan")public String sendTopicMessageToMan(){String messageId = String.valueOf(UUID.randomUUID());String messageData ="send topic message to man";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String,Object> map=new HashMap<>();map.put(QueueConstant.MESSAGE_ID,messageId);map.put(QueueConstant.MESSAGE_DATA,messageData);map.put(QueueConstant.MESSAGE_TIME,createTime);rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,TopicRabbitConfig.man,map);System.out.println("sendTopicMessageToMan() 執行成功");return "sendTopicMessageToMan is ok";}/*** 用于向MQ發送攜帶topic.woman路由鍵的消息。 這樣會在exchange中去找綁定中這個路由鍵綁定的隊列,并向其中進行轉發* topic.# 這個是通用的綁定規則,只要是攜帶著topic.開頭的就會轉發到綁定的這個隊列中* https://blog.csdn.net/qq_35387940/article/details/100514134* @return*/@GetMapping("/sendTopicMessageToTotal")public String sendTopicMessageToTotal(){String messageId = String.valueOf(UUID.randomUUID());String messageData ="send topic message to woman";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String,Object> map=new HashMap<>();map.put(QueueConstant.MESSAGE_ID,messageId);map.put(QueueConstant.MESSAGE_DATA,messageData);map.put(QueueConstant.MESSAGE_TIME,createTime);
//        rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,TopicRabbitConfig.woman,map);rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,"topic.woman1",map); //測試攜帶路由鍵符合topic.#的是否能轉發到topic.woman的隊列System.out.println("sendTopicMessageToTotal() 執行成功");return "sendTopicMessageToTotal is ok";}

生產者這邊已經完事,先不急著運行,在rabbitmq-consumer-true項目上,創建TopicManListener.java:

package com.atguigu.gulimall.consumertrue.listener;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/** 
主題交換機 監聽topic.man隊列* @author: jd* @create: 2024-06-25*/
@Component
@Slf4j
@RabbitListener(queues = "topic.man")//監聽的隊列名稱 TestDirectQueue
public class TopicManListener {@RabbitHandlerpublic void process(Map testMessage) {System.out.println("TopicManListener主題消費者收到消息  : "+testMessage.toString());}}

再創建一個TopicTotalListener.java:

package com.atguigu.gulimall.consumertrue.listener;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** @author: jd* @create: 2024-06-25*/@Component
@Slf4j
@RabbitListener(queues = "topic.woman")
public class TopicTotalListener {@RabbitHandlerpublic void process(Map testMessage){System.out.println("TopicTotalListener主題消費者收到消息  : "+testMessage.toString());}
}

同樣,加主題交換機的相關配置,TopicRabbitConfig.java(消費者一定要加這個配置嗎? 不需要的其實,理由在前面已經說過了。):

package com.atguigu.gulimall.rabbitmqconsumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 使用Topic Exchange 主題交換機。** @author: jd* @create: 2024-06-25*/
@Configuration
public class TopicRabbitConfig {//設置綁定鍵public static final String man = "topic.man";public static final String woman = "topic.woman";public static final String TOPIC_EXCHANGE = "topicExchange";//創建隊列/*** 第一個主題隊列** @return*/@Beanpublic Queue firstQueue() {return new Queue(man);}/*** 第二個主題隊列** @return*/@Beanpublic Queue secondQueue() {return new Queue(woman);}/*** 創建一個主題交換機** @return TopicExchange*/@BeanTopicExchange exchange() {return new TopicExchange(TOPIC_EXCHANGE);}/*** //將firstQueue和topicExchange綁定,而且綁定的鍵值為topic.man* //這樣只要是消息攜帶的路由鍵是topic.man,才會分發到該隊列** @return*/@BeanBinding bindingExchangeMessageForFirstQueue() {return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);}/*** //將secondQueue和topicExchange綁定,而且綁定的鍵值為用上通配路由鍵規則topic.#* // 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發到該隊列** @return*/@BeanBinding bindingExchangeMessageForSecondQueue() {return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");}}

然后把rabbitmq-consumer,rabbitmq-consumer-true兩個項目都跑起來,先調用/sendTopicMessage1 接口:
在這里插入圖片描述

然后看消費者rabbitmq-consumer的控制臺輸出情況:
TopicManReceiver監聽隊列1,綁定鍵為:topic.man
TopicTotalReceiver監聽隊列2,綁定鍵為:topic.#
而當前推送的消息,攜帶的路由鍵為:topic.man

所以可以看到兩個監聽消費者receiver都成功消費到了消息,因為這兩個recevier監聽的隊列的綁定鍵都能與這條消息攜帶的路由鍵匹配上。
在這里插入圖片描述
接下來調用接口/sendTopicMessage2:
在這里插入圖片描述
然后看消費者rabbitmq-consumer的控制臺輸出情況:
TopicManReceiver監聽隊列1,綁定鍵為:topic.man
TopicTotalReceiver監聽隊列2,綁定鍵為:topic.#
而當前推送的消息,攜帶的路由鍵為:topic.woman

所以可以看到兩個監聽消費者只有TopicTotalReceiver成功消費到了消息。
在這里插入圖片描述

(3)使用Fanout Exchang 扇型交換機。

同樣地,先在rabbitmq-provider項目上創建FanoutRabbitConfig.java:

package com.atguigu.gulimall.rabbitmqconsumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 使用Fanout Exchang 扇型交換機* @author: jd* @create: 2024-06-25*/
@Configuration
public class FanoutRabbitConfig {//隊列名稱public static final String  FANOUT_QUEUE_A ="fanout.a";public static final String  FANOUT_QUEUE_B ="fanout.b";public static final String  FANOUT_QUEUE_C ="fanout.c";public static final String  FANOUT_EXCHANGE = "fanout.exchange";//創建隊列 FANOUT_QUEUE_A@Beanpublic Queue queueA(){return new Queue(FANOUT_QUEUE_A,true);}//創建隊列 FANOUT_QUEUE_B@Beanpublic Queue queueB(){return new Queue(FANOUT_QUEUE_B);}//創建隊列 FANOUT_QUEUE_C@Beanpublic Queue queueC(){return new Queue(FANOUT_QUEUE_C);}//創建交換機@Beanpublic FanoutExchange  fanoutExchange(){return new FanoutExchange(FANOUT_EXCHANGE);}//綁定將多有的隊列都綁定到這個交換機@BeanBinding bindingExchangeA() {return BindingBuilder.bind(queueA()).to(fanoutExchange());}@BeanBinding bindingExchangeB() {return BindingBuilder.bind(queueB()).to(fanoutExchange());}@BeanBinding bindingExchangeC() {return BindingBuilder.bind(queueC()).to(fanoutExchange());}}

然后是寫一個接口用于推送消息,

 /*** 發送消息給扇形交換機 扇型交換機* @return*/@GetMapping("/sendFanoutMessage")public String sendFanoutMessage(){String messageId = String.valueOf(UUID.randomUUID());String messageData = "message: testFanoutMessage ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put(QueueConstant.MESSAGE_ID,messageId);map.put(QueueConstant.MESSAGE_DATA,messageData);map.put(QueueConstant.MESSAGE_TIME,createTime);rabbitTemplate.convertAndSend(FanoutRabbitConfig.FANOUT_EXCHANGE,null,map);System.out.println("sendFanoutMessage() 執行成功");return "sendFanoutMessage is ok";}

接著在rabbitmq-consumer-true項目里加上消息消費類,
在這里插入圖片描述

FanoutReceiverA.java:
FanoutReceiverB.java:
FanoutReceiverC.java:

package com.atguigu.gulimall.consumertrue.listener;import com.atguigu.gulimall.consumertrue.config.FanoutRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** 扇形交換機-隊列A的監聽器,及監聽到消息后的處理器* @author: jd* @create: 2024-06-25*/
@Component
@Slf4j
@RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_A)
public class FanoutReceiverA {@RabbitHandlerpublic void process(Map message){System.out.println("FanoutReceiverA消費者收到消息  : "+message.toString());}}
package com.atguigu.gulimall.consumertrue.listener;import com.atguigu.gulimall.consumertrue.config.FanoutRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** 扇形交換機-隊列B的監聽器,及監聽到消息后的處理器* @author: jd* @create: 2024-06-25*/
@Component
@Slf4j
@RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_B)
public class FanoutReceiverB {@RabbitHandlerpublic void process(Map message){System.out.println("FanoutReceiverB消費者收到消息  : "+message.toString());}
}
package com.atguigu.gulimall.consumertrue.listener;/*** @author: jd* @create: 2024-06-25*/import com.atguigu.gulimall.consumertrue.config.FanoutRabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;/*** 扇形交換機-隊列B的監聽器,及監聽到消息后的處理器* @author: jd* @create: 2024-06-25*/
@Component
@Slf4j
@RabbitListener(queues = FanoutRabbitConfig.FANOUT_QUEUE_C)
public class FanoutReceiverC {@RabbitHandlerpublic void process(Map message){System.out.println("FanoutReceiverC消費者收到消息  : "+message.toString());}
}

然后加上扇型交換機的配置類,FanoutRabbitConfig.java(消費者真的要加這個配置嗎? 不需要的其實,理由在前面已經說過了)

package com.atguigu.gulimall.consumertrue.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 使用Fanout Exchang 扇型交換機* @author: jd* @create: 2024-06-25*/
@Configuration
public class FanoutRabbitConfig {//隊列名稱public static final String  FANOUT_QUEUE_A ="fanout.a";public static final String  FANOUT_QUEUE_B ="fanout.b";public static final String  FANOUT_QUEUE_C ="fanout.c";public static final String  FANOUT_EXCHANGE = "fanout.exchange";//創建隊列 FANOUT_QUEUE_A@Beanpublic Queue queueA(){return new Queue(FANOUT_QUEUE_A,true);}//創建隊列 FANOUT_QUEUE_B@Beanpublic Queue queueB(){return new Queue(FANOUT_QUEUE_B);}//創建隊列 FANOUT_QUEUE_C@Beanpublic Queue queueC(){return new Queue(FANOUT_QUEUE_C);}//創建交換機@Beanpublic FanoutExchange  fanoutExchange(){return new FanoutExchange(FANOUT_EXCHANGE);}//綁定將多有的隊列都綁定到這個交換機@BeanBinding bindingExchangeA() {return BindingBuilder.bind(queueA()).to(fanoutExchange());}@BeanBinding bindingExchangeB() {return BindingBuilder.bind(queueB()).to(fanoutExchange());}@BeanBinding bindingExchangeC() {return BindingBuilder.bind(queueC()).to(fanoutExchange());}}

最后將rabbitmq-provider和rabbitmq-consumer項目都跑起來,調用下接口/sendFanoutMessage :
在這里插入圖片描述
在這里插入圖片描述
可以看到只要發送到 fanoutExchange 這個扇型交換機的消息, 三個隊列都綁定這個交換機,所以三個消息接收類都監聽到了這條消息。

在這里插入圖片描述

到了這里其實三個常用的交換機的使用我們已經完畢了,那么接下來我們繼續講講消息的回調,其實就是消息確認(生產者推送消息成功,消費者接收消息成功)。

三、消息確認種類

RabbitMQ的消息確認有兩種。

一種是消息發送確認。這種是用來確認生產者將消息發送給交換器,交換器傳遞給隊列的過程中,消息是否成功投遞。發送確認分為兩步,一是確認是否到達交換器,二是確認是否到達隊列。

第二種是消費接收確認。這種是確認消費者是否成功消費了隊列中的消息。

消息確認的作用是什么?

為了防止消息丟失。消息丟失分為發送丟失和消費者處理丟失,相應的也有兩種確認機制。

先來一起學習一下:

A:消息發送確認

在rabbitmq-consumer項目的application.yml文件上,加上消息確認的配置項后:

server:port: 8021#數據源配置
spring:datasource:username: rootpassword: rooturl: jdbc:mysql://192.168.56.10:3306/gulimall_umsdriver-class-name: com.mysql.cj.jdbc.Driver#注冊到注冊中心cloud:nacos:discovery:server-addr: 127.0.0.1:8848application:name: rabbitmq-consumer#配置rabbitMq 服務器rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest#虛擬host 可以不設置,使用server默認hostvirtual-host: /publisher-returns: true  #確認消息已發送到隊列(Queue)  這個在生產者模塊配置 這個后期再配置,這會還用不到publisher-confirm-type: correlated   #確認消息已發送到交換機(Exchange) 這個在生產者模塊配置 這個后期再配置,這會還用不到logging:level:com.atguigu.gulimall: debug   #調整product模塊日志的輸出模式是debug級別,這樣就能在控制臺看到dao包下的輸出日志了。

然后是配置相關的消息確認回調函數,RabbitConfig.java:

package com.atguigu.gulimall.rabbitmqconsumer.config;import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 配置相關的消息確認回調函數,RabbitConfig.java:* https://blog.csdn.net/qq_35387940/article/details/100514134** 先從總體的情況分析,推送消息存在四種情況:** ①消息推送到server,但是在server里找不到交換機* ②消息推送到server,找到交換機了,但是沒找到隊列* ③消息推送到sever,交換機和隊列啥都沒找到* ④消息推送成功* 具體哪些會觸發回調,分別又會觸發哪個函數,看下面的測試** @author: jd* @create: 2024-06-25*/
@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//設置開啟Mandatory,才能觸發回調函數,無論消息推送結果怎么樣都強制調用回調函數rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("ConfirmCallback:     "+"相關數據:"+correlationData);System.out.println("ConfirmCallback:     "+"確認情況:"+ack);System.out.println("ConfirmCallback:     "+"原因:"+cause);}});rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("ReturnCallback:     "+"消息:"+returnedMessage.getMessage());System.out.println("ReturnCallback:     "+"回應碼:"+returnedMessage.getReplyCode());System.out.println("ReturnCallback:     "+"回應信息:"+returnedMessage.getReplyText());System.out.println("ReturnCallback:     "+"交換機:"+returnedMessage.getExchange());System.out.println("ReturnCallback:     "+"路由鍵:"+returnedMessage.getRoutingKey());}});return rabbitTemplate;}
}

到這里,生產者推送消息的消息確認調用回調函數已經完畢。
可以看到上面寫了兩個回調函數,一個叫 ConfirmCallback ,一個叫 RetrunCallback;
那么以上這兩種回調函數都是在什么情況會觸發呢?

先從總體的情況分析,推送消息存在四種情況:

①消息推送到server,但是在server里找不到交換機
②消息推送到server,找到交換機了,但是沒找到隊列
③消息推送到sever,交換機和隊列啥都沒找到
④消息推送成功

那么我先寫幾個接口來分別測試和認證下以上4種情況,消息確認觸發回調函數的情況:

①消息推送到server,但是在server里找不到交換機 (是否到達交換機)
寫個測試接口,把消息推送到名為‘non-existent-exchange’的交換機上(這個交換機是沒有創建沒有配置的):

/*** ①消息推送到server,但是在server里找不到交換機** 寫個測試接口,把消息推送到名為‘non-existent-exchange’的交換機上(這個交換機是沒有創建沒有配置的)* 調用接口,查看rabbitmq-provuder項目的控制臺輸出情況(原因里面有說,沒有找到交換機'non-existent-exchange'):*在控制臺中* 調用后返回:http://localhost:8021/TestMessageAck*ConfirmCallback:     相關數據:null* ConfirmCallback:     確認情況:false* ConfirmCallback:     原因:channel error; protocol method: #method<channel.close>(reply-code=404,* reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)**  結論: ①這種情況觸發的是 ConfirmCallback 回調函數* @return*/@GetMapping("/TestMessageAck")public String TestMessageAck() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "message: non-existent-exchange test message ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", map);return "ok";}

調用接口,查看rabbitmq-provuder項目的控制臺輸出情況(原因里面有說,沒有找到交換機’non-existent-exchange’):
在這里插入圖片描述
結論: ①這種情況觸發的是 ConfirmCallback 回調函數。

②消息推送到server,找到交換機了,但是沒找到隊列 (是否到達隊列)
這種情況就是需要新增一個交換機,但是不給這個交換機綁定隊列,我來簡單地在DirectRabitConfig里面新增一個直連交換機,名叫‘lonelyDirectExchange’,但沒給它做任何綁定配置操作:

    @BeanDirectExchange lonelyDirectExchange() {return new DirectExchange("lonelyDirectExchange");}

然后寫個測試接口,把消息推送到名為‘lonelyDirectExchange’的交換機上(這個交換機是沒有任何隊列配置的):

/*** ②消息推送到server,找到交換機了,但是沒找到隊列* 這種情況就是需要新增一個交換機,但是不給這個交換機綁定隊列,* 我來簡單地在DirectRabitConfig里面新增一個直連交換機,名叫‘lonelyDirectExchange’,但沒給它做任何綁定配置操作:** 然后寫個測試接口,把消息推送到名為‘lonelyDirectExchange’的交換機上(這個交換機是沒有任何隊列配置的):**可以看到這種情況,在控制臺中 兩個函數都被調用了;* 這種情況下,消息是推送成功到服務器了的,所以ConfirmCallback對消息確認情況是true;* 而在RetrunCallback回調函數的打印參數里面可以看到,消息是推送到了交換機成功了,但是在路由分發給隊列的時候,找不到隊列,所以報了錯誤 NO_ROUTE 。** 調用后返回:http://localhost:8021/TestMessageAck2* ReturnCallback:     回應碼:312* ReturnCallback:     回應信息:NO_ROUTE* ReturnCallback:     交換機:lonelyDirectExchange* ReturnCallback:     路由鍵:TestDirectRouting* ConfirmCallback:     相關數據:null* ConfirmCallback:     確認情況:true* ConfirmCallback:     原因:null**   結論:②這種情況觸發的是 ConfirmCallback和RetrunCallback兩個回調函數。* @return*/@GetMapping("/TestMessageAck2")public String TestMessageAck2() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "message: lonelyDirectExchange test message ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestDirectRouting", map);  //lonelyDirectExchange這個交換機沒有和任何隊列做綁定,return "ok";}

調用接口,查看rabbitmq-provuder項目的控制臺輸出情況:
在這里插入圖片描述
在這里插入圖片描述

ConfirmCallback:     相關數據:null
ConfirmCallback:     確認情況:true
ConfirmCallback:     原因:null
ReturnCallback:     消息:(Body:'[serialized object]' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
ReturnCallback:     回應碼:312
ReturnCallback:     回應信息:NO_ROUTE
ReturnCallback:     交換機:lonelyDirectExchange
ReturnCallback:     路由鍵:TestDirectRouting

可以看到這種情況,兩個函數都被調用了;
這種情況下,消息是推送成功到服務器了的,所以ConfirmCallback對消息確認情況是true;
而在RetrunCallback回調函數的打印參數里面可以看到,消息是推送到了交換機成功了,但是在路由分發給隊列的時候,找不到隊列,所以報了錯誤 NO_ROUTE 。
結論:②這種情況觸發的是 ConfirmCallback和RetrunCallback兩個回調函數。

③消息推送到sever,交換機和隊列啥都沒找到
這種情況其實一看就覺得跟①很像,沒錯 ,③和①情況回調是一致的,所以不做結果說明了。
結論: ③這種情況觸發的是 ConfirmCallback 回調函數。

④消息推送成功
那么測試下,按照正常調用之前消息推送的接口就行,就調用下 /sendFanoutMessage接口,可以看到控制臺輸出:

ConfirmCallback:     相關數據:null
ConfirmCallback:     確認情況:true
ConfirmCallback:     原因:null

結論: ④這種情況觸發的是 ConfirmCallback 回調函數。

總結:
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){}通過設置這個參數,其中使用內部類進行實現,來記錄消息發送到交換器Exchange后觸發回調。
使用該功能需要開啟確認, publisher-confirm-type: correlated #確認消息已發送到交換機(Exchange) 這個在生產者模塊配置

rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){})通過設置這個參數,如果消息從交換器發送到對應隊列失敗時觸發(比如根據發送消息時指定的routingKey找不到隊列時會觸發)
publisher-returns: true #確認消息已發送到隊列(Queue) 這個在生產者模塊配置

以上是生產者推送消息的消息確認 回調函數的使用介紹(可以在回調函數根據需求做對應的擴展或者業務數據處理)。

B: 消費接收確認

接下來我們繼續, 消費者接收到消息的消息確認機制。

(1)確認模式

AcknowledgeMode.NONE:不確認
AcknowledgeMode.AUTO:自動確認
AcknowledgeMode.MANUAL:手動確認
spring-boot中配置方法:

spring.rabbitmq.listener.simple.acknowledge-mode = manual

(2)手動確認
在這里插入圖片描述
未確認的消息數

上圖為channel中未被消費者確認的消息數。

通過RabbitMQ的host地址加上默認端口號15672訪問管理界面。

(2.1)成功確認

void basicAck(long deliveryTag, boolean multiple) throws IOException;

deliveryTag:該消息的index

multiple:是否批量. true:將一次性ack所有小于deliveryTag的消息。

消費者成功處理后,調用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法對消息進行確認。

(2.2)失敗確認

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

throws IOException;

deliveryTag:該消息的index。

multiple:是否批量. true:將一次性拒絕所有小于deliveryTag的消息。

requeue:被拒絕的是否重新入隊列。

void basicReject(long deliveryTag, boolean requeue) throws IOException;

deliveryTag:該消息的index。

requeue:被拒絕的是否重新入隊列。

channel.basicNack 與 channel.basicReject 的區別在于basicNack可以批量拒絕多條消息,而basicReject一次只能拒絕一條消息。

①自動確認, 這也是默認的消息確認情況。  AcknowledgeMode.NONE
RabbitMQ成功將消息發出(即將消息成功寫入TCP Socket)中立即認為本次投遞已經被正確處理,不管消費者端是否成功處理本次投遞。
所以這種情況如果消費端消費邏輯拋出異常,也就是消費端沒有處理成功這條消息,那么就相當于丟失了消息。
一般這種情況我們都是使用try catch捕捉異常后,打印日志用于追蹤數據,這樣找出對應數據再做后續處理。② 根據情況確認, 這個不做介紹
③ 手動確認 , 這個比較關鍵,也是我們配置接收消息確認機制時,多數選擇的模式。
消費者收到消息后,手動調用basic.ack/basic.nack/basic.reject后,RabbitMQ收到這些消息后,才認為本次投遞成功。
basic.ack用于肯定確認 
basic.nack用于否定確認(注意:這是AMQP 0-9-1RabbitMQ擴展) 
basic.reject用于否定確認,但與basic.nack相比有一個限制:一次只能拒絕單條消息 消費者端以上的3個方法都表示消息已經被正確投遞,但是basic.ack表示消息已經被正確處理。
而basic.nack,basic.reject表示沒有被正確處理:著重講下reject,因為有時候一些場景是需要重新入列的。channel.basicReject(deliveryTag, true);  拒絕消費當前消息,如果第二參數傳入true,就是將數據重新丟回隊列里,那么下次還會消費這消息。設置false,就是告訴服務器,我已經知道這條消息數據了,因為一些原因拒絕它,而且服務器也把這個消息丟掉就行。 下次不想再消費這條消息了。使用拒絕后重新入列這個確認模式要謹慎,因為一般都是出現異常的時候,catch異常再拒絕入列,選擇是否重入列。但是如果使用不當會導致一些每次都被你重入列的消息一直消費-入列-消費-入列這樣循環,會導致消息積壓。順便也簡單講講 nack,這個也是相當于設置不消費某條消息。channel.basicNack(deliveryTag, false, true);
第一個參數依然是當前消息到的數據的唯一id;
第二個參數是指是否針對多條消息;如果是true,也就是說一次性針對當前通道的消息的tagID小于當前這條消息的,都拒絕確認。
第三個參數是指是否重新入列,也就是指不確認的消息是否重新丟回到隊列里面去。同樣使用不確認后重新入列這個確認模式要謹慎,因為這里也可能因為考慮不周出現消息一直被重新丟回去的情況,導致積壓。

看了上面這么多介紹,接下來我們一起配置下,看看一般的消息接收 手動確認是怎么樣的。

方式一:通過配置類的方式實現

此時還不需要加下面的配置,因為這種方式是通過 配置類注解來配置的手動消費者確認,再下面的方式二則是通過yml的配置來設置的消費者手動確認,我們先來看方式一是怎么實現的
在這里插入圖片描述

??????
在消費者項目里,
新建MessageListenerConfig.java上添加代碼相關的配置代碼:

package com.atguigu.gulimall.consumertrue.config;import com.atguigu.gulimall.consumertrue.listener.MyAckReceiver;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 一般的消息接收 手動確認是怎么樣的,消費者的手動消息確認,配置類* https://blog.csdn.net/qq_35387940/article/details/100514134* @author: jd* @create: 2024-06-25*/
//@Configuration     //注釋掉這個注解,這樣第一種MQ消費者的確認模式就失效了,以為你這個里面配置著對某個隊列的監控呢。 第二種MQ的配置方式的話和這個的區別,不用這種配置類,而是在yml中配置東西
public class MessageListenerConfig {@Autowiredprivate CachingConnectionFactory connectionFactory;@Autowiredprivate MyAckReceiver myAckReceiver;//消息接收處理類@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer(){SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);container.setConcurrentConsumers(1);container.setMaxConcurrentConsumers(1);container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默認是自動確認,這里改為手動確認消息//設置一個隊列,在這里設置了隊列,container.setQueueNames("TestDirectQueue");//如果同時設置多個如下: 前提是隊列都是必須已經創建存在的//  container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");//另一種設置隊列的方法,如果使用這種情況,那么要設置多個,就使用addQueues//container.setQueues(new Queue("TestDirectQueue",true));//container.addQueues(new Queue("TestDirectQueue2",true));//container.addQueues(new Queue("TestDirectQueue3",true));//這里設置了監聽器,因為上面設置了隊列,所以在監聽器中就不需要用監聽器的注解了 。container.setMessageListener(myAckReceiver);return container;}
}

對應的手動確認消息監聽類,MyAckReceiver.java(手動確認模式需要實現 ChannelAwareMessageListener):
//之前的相關監聽器可以先注釋掉,以免造成多個同類型監聽器都監聽同一個隊列。【比如我之前用的RabbitMQListener 、RabbitMQListener2 為了讓其失效,直接注釋掉其中的//@RabbitListener(queues = “TestDirectQueue”)//監聽的隊列名稱 TestDirectQueue】 這個注解即可,這樣這個監聽器就無法監聽相關隊列了。

在這里插入圖片描述
MyAckReceiver.java

package com.atguigu.gulimall.consumertrue.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Map;/*** 對應的手動確認消息監聽類,MyAckReceiver.java(手動確認模式需要實現 ChannelAwareMessageListener):* //之前的相關監聽器可以先注釋掉,以免造成多個同類型監聽器都監聽同一個隊列。** 注意:因為這里是在MessageListenerConfig 類中指定了是要監聽哪個隊列,以及消息的確認機制,所以這里不需要使用* @RabbitListener(queues = "TestDirectQueue")  和 @RabbitHandler(isDefault = true)注解了* @author: jd* @create: 2024-06-25*/@Component
public class MyAckReceiver implements ChannelAwareMessageListener {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {byte[] body = message.getBody();ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(body));Map<String,String> msgMap  = (Map<String,String>)objectInputStream.readObject();String messageId = msgMap.get("messageId");String messageData = msgMap.get("messageData");String createTime = msgMap.get("createTime");objectInputStream.close();System.out.println("  MyAckReceiver  messageId:"+messageId+"  messageData:"+messageData+"  createTime:"+createTime);System.out.println("消費的主題隊列來自:"+message.getMessageProperties().getConsumerQueue());
//        消費者成功處理后,調用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法對消息進行確認。channel.basicAck(deliveryTag, true); // deliveryTag:該消息的index   multiple:是否批量. true:將一次性ack所有小于deliveryTag的消息。    第二個參數,手動確認可以被批處理, 當該參數為 true 時,則可以一次性確認 delivery_tag 小于等于傳入值的所有消息
//		channel.basicReject(deliveryTag, true);//第二個參數,true會重新放回隊列,所以需要自己根據業務邏輯判斷什么時候使用拒絕} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}
}

這時,先調用接口/sendDirectMessage, 給直連交換機TestDirectExchange 的隊列TestDirectQueue 推送一條消息,可以看到監聽器正常消費了下來:
在這里插入圖片描述
在這里插入圖片描述
第一次驗證我們發現,消費者沒有消費掉直流交換機中的消息,而且也在直流隊列中積壓了起來,
在這里插入圖片描述
這是由于我們的配置類忘記加了 @Configuration 注解了,所以此時這個不是配置類,也就是這里對MQ的配置不會生效,所以加上之后 ,我們再去試試:
在這里插入圖片描述
可看到下圖 消費成功
在這里插入圖片描述
配置類中 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默認是自動確認,這里改為手動確認消息 是發揮作用的關鍵;

方式二:通過yml配置來完成消費者確認

特別注意:因為這里我們要使用yml配置來實現,所以我們需要關閉配置類的作用,使之失效,我這里直接把@Configuration 給注釋掉 了,這樣配置類不會起作用了!!_
在這里插入圖片描述
第二種方式正式開始啦 (#.#)
首先我們來在yml中開啟手動確認的配置

server:port: 8022#數據源配置
spring:datasource:url: jdbc:mysql://192.168.56.10:3306/gulimall_umsusername: rootpassword: rootdriver-class-name:  com.mysql.cj.jdbc.Driver#配置nacoscloud:nacos:discovery:server-addr: 127.0.0.1#配置服務名稱application:name: rabbitmq-consumer-true# 配置rabbitMq 服務器#spring.application.name=rabbitmq-consumer-truerabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest#虛擬host 可以不設置,使用server默認hostvirtual-host: /listener:  #這個在測試消費多個消息的時候,不能有下面這些配置,否則只能消費一個消息后就不繼續消費了simple:acknowledge-mode: manual  #指定MQ消費者的確認模式是手動確認模式  這個在消費者者模塊配置prefetch: 1 #一次只能消費一條消息   這個在消費者者模塊配置#配置日志輸出級別
logging:level:com.atguigu.gulimall: debug#配置日志級別

其中的 幾行是開啟的關鍵
listener: #這個在測試消費多個消息的時候,不能有下面這些配置,否則只能消費一個消息后就不繼續消費了
simple:
acknowledge-mode: manual #指定MQ消費者的確認模式是手動確認模式 這個在消費者者模塊配置
prefetch: 1 #一次只能消費一條消息 這個在消費者者模塊配置

此處直接用接口來當生產者了;
然后我們在生產者模塊用于放消息的controller中增加一個放消息的請求方法,用于往隊列里面連續放入5個放消息
SendMessageController.java

    /*** 原文鏈接:https://blog.csdn.net/weixin_45724872/article/details/119655638* 將信號放入MQ* @param message* @return*/@PostMapping("/msg/muscle")public String receiveMuscleSign(@RequestBody String message) {//處理業務for (int i = 1; i <= 5; i++) {rabbitTemplate.convertAndSend("muscle_fanout_exchange","",message+i);}return " receiveMuscleSign ok";}

開發消費者
此處用一個類下的兩個方法來模擬2個消費者

package com.atguigu.gulimall.consumertrue.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/****此處用一個類下的兩個方法來模擬2個消費者*原文鏈接:https://blog.csdn.net/weixin_45724872/article/details/119655638原文鏈接:https://blog.csdn.net/weixin_45724872/article/details/119655638* @author: jd* @create: 2024-06-25*/
@Component
public class MyConsumerListener {@RabbitListener(bindings = {@QueueBinding(value = @Queue("consumer_queue_1"),//綁定交換機exchange = @Exchange(value = "muscle_fanout_exchange", type = "fanout"))})public void consumer1(String msg, Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("消費者1 => " + msg);//channel.basicAck(deliveryTag, false); // 因為 yml中 prefetch 設置為 1(或未設置,因為默認可能是 0,表示無限制,但這不是推薦的做法),RabbitMQ 將只發送一個消息給消費者,并等待該消息的確認。在這種情況下,// 如果你注釋掉了 channel.basicAck,消費者將只能消費一個消息,并且不會收到下一個消息,直到你發送確認或關閉連接。 所以對于消息隊列中的五個消息只能銷費一個,除非你手動確認,否則不會再消費其他的消息} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}@RabbitListener(bindings = {@QueueBinding(value = @Queue("consumer_queue_2"),//綁定交換機exchange = @Exchange(value = "muscle_fanout_exchange", type = "fanout"))})public void consumer2(String msg,Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("消費者2 => " + msg);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}}

注意一點,消費者1的手動ACK我們是注釋掉了

而消費者2的手動ACK我們是開著的

原因是為了對照試驗

我們期望的情況是:一共5條消息,消費者1和2都一一處理;

處理完畢后再取下一條,否則不讓取;

那么按我們代碼這樣寫;

消費者1只能取一條 (只是處理一條的原因,)
而消費者2則能取滿5條(因為消費者1的手動ACK被我們注釋了,此處又不是自動ACK)

消費者1只是處理一條的原因:下圖中的perfetchCount有問題,我們實際上配置的是prefetch: 1 ,我們直接按照這個配置來理解就行
在這里插入圖片描述
消費者一,就是注釋了對消息消費之后的確認回饋給RabbitMQ的設置,所以消費者對五條消息中消費到第一個之后,因為我們在yml中又配置了每次消費一條,而且也是手動確認的,所以MQ消費到這一條之后,就在那等著手動調用ack方法來完成的確認ack的反饋,結果我們這里注釋了,所以就一直等不到第一條消息的回饋,所以就會一直等待,下面的4條消息也就無法繼續消費了,

相反,消費者二就不一樣了,他有消費完每一條消息之后,都調用了手動ack的回饋,所以可以消費5條消息,都消息完。

以下是實驗截圖
MQ 的初始狀態:
在這里插入圖片描述

首先用postman發送請求
在這里插入圖片描述

看下圖,生產者發送了5條消息,并得到了成功推送到了交換機和隊列的回饋
在這里插入圖片描述

接下來我們步入正題:看消費者里面,消費者1只是消費了一條,消費者2消費了全部的5條消息;
在這里插入圖片描述
結果和我們預想的是一致的;

我們在看看MQ的管理頁面來確認
在這里插入圖片描述
可以看到,消費者2已經搞完了,而消費者1那邊卡住了(消費者一消費了一條,但是在等待回饋,還剩余4條都沒被消費,在等待消費)

我在實驗的過程中,因為消費者1中的消息堆積了,如果再次發送5條消息到扇形交換機中,那隊列1中會積累到9條待消費的,1條等待反饋的,10條總共的,我們可以實驗一下子:
在這里插入圖片描述
結果和我們預想的一樣,那我們如何將這些積壓的消息給去掉呢 ?
我自己試出了兩種方式,最初試的直接重啟服務,這樣是無效的,因為進入隊列的不被消費會一直在隊列里面 。
下面是2種處理方法:
第一種是最直接的方法,直接把確認那行的代碼給放開,這樣這個消費者1 就會把隊列1中積壓的那些給消費掉了
第二種 我們將yml中的手動確認配置注釋掉,這樣就默認是自動確認了,這樣我每次從postman中發送5條消息到扇形交換機,分發到兩個隊列之后,兩個消費者都會一直可以消費,因為沒消費一個都會自動確認回饋,不用等待了,這樣也是可以的

我們實驗如下:
實驗1:
我們先把消費者1中注釋的手動回饋給放開
在這里插入圖片描述
可見console中 ,對于積壓的消息直接給消費掉了。

實驗2:
我們將消費者1中的手動反饋,給繼續注釋掉,發送2次 postman;

在這里插入圖片描述
造成積壓
在這里插入圖片描述
我把yml中的手動消費者確認,改成自動的,也就是注釋掉,可以看到,重啟消費者模塊后,積壓的也被消費了
注釋配置:
在這里插入圖片描述
重啟后,看控制臺: 很明顯啟動后,積壓的消息也被消費了,
在這里插入圖片描述
在MQ控制臺中也可以看到,積壓消息被消費啦
在這里插入圖片描述

關于手動確認的一些方法
細心的小伙伴可能發現了我們在消費者的catch處寫了這樣一行代碼

channel.basicReject(deliveryTag, false);

以下是解釋

一般是有3種確認的,其中1種是正確確認,另外2種是錯誤確認;

reject:只能否定一條消息
nack:可以否定一條或者多條消息

而錯誤確認的這兩個,都有一個屬性
boolean requeue

當它是true的時候,表示重新入隊;
當它是false的時候,則表示拋棄掉;

使用拒絕后重新入列這個確認模式要謹慎,因為觸發錯誤確認一般都是出現異常的時候,那么就可能導致死循環,即不斷的入隊-消費-報錯-重新入隊…;這將導致消息積壓,萬一就炸了…

實驗錯誤確認
我們將上述的消費者代碼加一行代碼;

此處只改動了消費者1,消費者2不變

新增一條拋異常的語句
int num = 1/0;
package com.tubai;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;@Component
public class MyConsumer {@RabbitListener(bindings = {@QueueBinding(value = @Queue("consumer_queue_1"),//綁定交換機exchange = @Exchange(value = "muscle_fanout_exchange", type = "fanout"))})public void consumer1(String msg,Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("消費者1 => " + msg);int num = 1/0;channel.basicAck(deliveryTag, false); //第二個參數,手動確認可以被批處理,當該參數為 true 時} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}@RabbitListener(bindings = {@QueueBinding(value = @Queue("consumer_queue_2"),//綁定交換機exchange = @Exchange(value = "muscle_fanout_exchange", type = "fanout"))})public void consumer2(String msg,Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("消費者2 => " + msg);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicReject(deliveryTag, false);e.printStackTrace();}}
}

運行結果
在這里插入圖片描述
可以看到我們的消費者1也正常了,因為我們是先打印后確認,因此1~5也會被打印出來;

如果重復入隊…那么我們的程序就會死循環了,瘋狂打印,各位可以自己試試;但是容易把內存占滿O。。

本篇文章書寫不易,自己打了好久,大家認可的話,或者開啟了新認知,請給個點贊。收藏哦 (#.#) 謝謝大家!
參考文章也寫的超級好,大家也可都學習學習,一起進步
Springboot 整合RabbitMq ,用心看完這一篇就夠了
RabbitMQ的消息確認機制
SpringBoot集成RabbitMq 手動ACK
RabbitMQ控制界面詳解

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/39058.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/39058.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/39058.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

2024 年的 13 個 AI 趨勢

2024 年的 13 個 AI 趨勢 人工智能對環境的影響和平人工智能人工智能支持的問題解決和決策針對人工智能公司的訴訟2024 年美國總統大選與人工智能威脅人工智能、網絡犯罪和社會工程威脅人工智能治療孤獨與對人工智能的情感依賴人工智能影響者中國爭奪人工智能霸主地位人工智能…

Java中的機器學習模型集成與訓練

Java中的機器學習模型集成與訓練 大家好&#xff0c;我是免費搭建查券返利機器人省錢賺傭金就用微賺淘客系統3.0的小編&#xff0c;也是冬天不穿秋褲&#xff0c;天冷也要風度的程序猿&#xff01;今天我們將探討在Java中如何進行機器學習模型的集成與訓練。隨著人工智能和機器…

【Lua小知識】Vscode中Emmylua插件大量報錯的解決方法

起因 Vscode寫Lua用的好好的&#xff0c;最近突然出現了大量報錯。 看報錯是有未定義的全局變量&#xff0c;這里查日志才發現是由于0.7.5版本新增診斷啟用配置&#xff0c;所以導致了原先好的代碼&#xff0c;現在出現了大量的報錯。 解決方案一 最直接的方法當然是在配置中直…

用攝像頭實現識別道路中的車道線、行人與車輛檢測(級聯分類器、HOG+SVM、行人檢測)

基于樹莓派的智能小車&#xff0c;用攝像頭實現識別道路中的車道線識別、行人檢測與車輛檢測。 本項目旨在開發一套基于攝像頭的智能道路環境感知系統&#xff0c;該系統能夠實時識別道路中的車道線、行人與車輛&#xff0c;為自動駕駛汽車、智能交通管理以及輔助駕駛系統提供關…

LeetCode熱題100刷題3:3. 無重復字符的最長子串、438. 找到字符串中所有字母異位詞、560. 和為 K 的子數組

3. 無重復字符的最長子串 滑動窗口、雙指針 class Solution { public:int lengthOfLongestSubstring(string s) {//滑動窗口試一下//英文字母、數字、符號、空格,ascii 一共包含128個字符vector<int> pos(128,-1);int ans 0;for(int i0,j0 ; i<s.size();i) {//s[i]…

python 中的生成器

目錄 生成器示例基本生成器示例無限序列生成器使用生成器表達式實用示例&#xff1a;按行讀取大文件生成器的 send、throw 和 close 方法 生成器和迭代器迭代器&#xff08;Iterator&#xff09;定義創建使用示例 生成器&#xff08;Generator&#xff09;定義創建使用示例 主要…

【python學習】自定義函數的一些高級用法-2

8. 生成器函數 生成器函數允許你定義一個可以“記住”其當前執行狀態的函數&#xff0c;并在下次調用時從上次離開的位置繼續執行。生成器函數使用yield關鍵字而不是return。 def simple_generator(): yield 1 yield 2 yield 3 gen simple_generator() print(next(gen)) # …

隱私計算實訓營第二期第十課:基于SPU機器學習建模實踐

隱私計算實訓營第二期-第十課 第十課&#xff1a;基于SPU機器學習建模實踐1 隱私保護機器學習背景1.1 機器學習中隱私保護的需求1.2 PPML提供的技術解決方案 2 SPU架構2.1 SPU前端2.2 SPU編譯器2.3 SPU運行時2.4 SPU目標 3 密態訓練與推理3.1 四個基本問題3.2 解決數據來源問題…

全新升級!中央集中式架構功能測試為新車型保駕護航

“軟件定義汽車”新時代下&#xff0c;整車電氣電氣架構向中央-區域集中式發展已成為行業共識&#xff0c;車型架構的變革帶來更復雜的整車功能定義、更多的新技術的應用&#xff08;如SOA服務化、智能配電等&#xff09;和更短的車型研發周期&#xff0c;對整車和新產品研發的…

OkHttp的源碼解讀1

介紹 OkHttp 是 Square 公司開源的一款高效的 HTTP 客戶端&#xff0c;用于與服務器進行 HTTP 請求和響應。它具有高效的連接池、透明的 GZIP 壓縮和響應緩存等功能&#xff0c;是 Android 開發中廣泛使用的網絡庫。 本文將詳細解讀 OkHttp 的源碼&#xff0c;包括其主要組件…

Qt實現手動切換多種布局

引言 之前寫了一個手動切換多個布局的程序&#xff0c;下面來記錄一下。 程序運行效果如下&#xff1a; 示例 需求 通過點擊程序界面上不同的布局按鈕&#xff0c;使主工作區呈現出不同的頁面布局&#xff0c;多個布局之間可以通過點擊不同布局按鈕切換。支持的最多的窗口…

如何使用 AppML

如何使用 AppML AppML(Application Markup Language)是一種輕量級的標記語言,旨在簡化Web應用的創建和部署過程。它允許開發者通過XML或JSON格式的配置文件來定義應用的結構和行為,從而實現快速開發和靈活擴展。AppML特別適用于構建數據驅動的企業級應用,它可以與各種后端…

pytorch跑手寫體實驗

目錄 1、環境條件 2、代碼實現 3、總結 1、環境條件 pycharm編譯器pytorch依賴matplotlib依賴numpy依賴等等 2、代碼實現 import torch import torch.nn as nn import torch.optim as optim import torchvision import torchvision.transforms as transforms import matpl…

burpsuite 設置監聽窗口 火狐利用插件快速切換代理狀態

一、修改burpsuite監聽端口 1、首先打開burpsuite&#xff0c;點擊Proxy下的Options選項&#xff1a; 2、可以看到默認的監聽端口為8080&#xff0c;首先選中我們想要修改的監聽&#xff0c;點擊Edit進行編輯 3、將端口改為9876&#xff0c;并保存 4、可以看到監聽端口修改成功…

typescript學習回顧(五)

今天來分享一下ts的泛型&#xff0c;最后來做一個練習 泛型 有時候&#xff0c;我們在書寫某些函數的時候&#xff0c;會丟失一些類型信息&#xff0c;比如我下面有一個例子&#xff0c;我想提取一個數組的某個索引之前的所有數據 function getArraySomeData(newArr, n:numb…

JVM原理(十):JVM虛擬機調優分析與實戰

1. 大內存硬件上的程序部署策略 這是筆者很久之前處理過的一個案例&#xff0c;但今天仍然具有代表性。一個15萬PV/日左右的在線文檔類型網站最近更換了硬件系統&#xff0c;服務器的硬件為四路志強處理器、16GB物理內存&#xff0c;操作系統為64位CentOS5.4&#xff0c;Resin…

js數組方法歸納——concat、join、reverse

1、concat( ) 用途&#xff1a;可以連接兩個或多個數組&#xff0c;并將新的數組返回該方法不會對原數組產生影響 var arr ["孫悟空","豬八戒","沙和尚"];var arr2 ["白骨精","玉兔精","蜘蛛精"];var arr3 [&…

Vue Router的深度解析

引言 在現代Web應用開發中&#xff0c;客戶端路由已成為實現流暢用戶體驗的關鍵技術。與傳統的服務器端路由不同&#xff0c;客戶端路由通過JavaScript在瀏覽器中控制頁面內容的更新&#xff0c;避免了頁面的全量刷新。Vue Router作為Vue.js官方的路由解決方案&#xff0c;以其…

阿里云centos 取消硬盤掛載并重建數據盤信息再次掛載

一、取消掛載 umount [掛載點或設備] 如果要取消掛載/dev/sdb1分區&#xff0c;可以使用以下命令&#xff1a; umount /dev/sdb1 如果要取消掛載在/mnt/mydisk的掛載點&#xff0c;可以使用以下命令&#xff1a; umount /mnt/mydisk 如果設備正忙&#xff0c;無法立即取消…

【Spring Boot】簡單了解spring boot支持的三種服務器

Tomcat 概述&#xff1a;Tomcat 是 Apache 軟件基金會&#xff08;Apache Software Foundation&#xff09;的 Jakarta EE 項目中的一個核心項目&#xff0c;由 Apache、Sun 和其他一些公司及個人共同開發而成。它作為 Java Servlet、JSP、JavaServer Pages Expression Languag…