Rabbitmq的五種消息類型介紹,以及集成springboot的使用

交換機類型

Fanout Exchange

扇型交換機,這個交換機沒有路由鍵概念,就算你綁了路由鍵也是無視的。 這個交換機在接收到消息后,會直接轉發到綁定到它上面的所有隊列

Direct Exchange

直連型交換機,根據消息攜帶的路由鍵將消息投遞給對應隊列。
大致流程,有一個隊列綁定到一個直連交換機上,同時賦予一個路由鍵 routing key 。然后當一個消息攜帶著路由值為X,這個消息通過生產者發送給交換機時,交換機就會根據這個路由值X去尋找綁定值也是X的隊列。

Topic Exchange

主題交換機,這個交換機其實跟直連交換機流程差不多,但是它的特點就是在它的路由鍵和綁定鍵之間是有規則的。

搭建基本環境

1、pom.xml引入的java包

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>${springboot-version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>${springboot-version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version><scope>provided</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><version>${springboot-version}</version><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.57</version></dependency></dependencies>

2、yaml配置文件

# 8004是zookeeper服務器的支付服務提供者端口號
server:port: 8004
spring:application:name: cloud-mqrabbitmq:addresses: 192.168.96.133port: 5672username: guestpassword: guestvirtual-host: /#消費者配置listener:#todo 切記,設置了重拾機制,要拋出異常,不可try catch 后不拋異常,否則重試機制失效simple:#開啟ack 手動確認消息是否被消費成功acknowledge-mode: manualretry:enabled: true# 消費失敗后,繼續消費,然后最多消費5次就不再消費。max-attempts: 5# 消費失敗后 ,重試初始間隔時間 2秒initial-interval: 2000# 重試最大間隔時間5秒max-interval: 5000# 間隔時間乘子,間隔時間*乘子=下一次的間隔時間,最大不能超過設置的最大間隔時間multiplier: 2direct:#開啟ack 手動確認消息是否被消費成功acknowledge-mode: manual#todo 切記,設置了重拾機制,要拋出異常,不可try catch 后不拋異常,否則重試機制失效retry:enabled: true# 消費失敗后,繼續消費,然后最多消費3次就不再消費。max-attempts: 3# 消費失敗后 ,重試初始間隔時間 3秒initial-interval: 3000# 重試最大間隔時間max-interval: 7000# 間隔時間乘子,間隔時間*乘子=下一次的間隔時間,最大不能超過設置的最大間隔時間multiplier: 2# 生產者配置template:retry:# 開啟消息發送失敗重試機制enabled: true# 生產者 true-開啟消息抵達隊列的確認publisher-returns: false#simple 配置用于設置 RabbitMQ 消息生產者的消息確認類型為“簡單確認”。這意味著當消息被發送到 RabbitMQ 之后,只有在消息成功投遞到隊列中后,RabbitMQ 才會向生產者發送一個確認(ack)通知。如果消息未能成功投遞,則不會收到確認。#該配置通常與 publisher-returns: true 一起使用以啟用消息返回機制,但在此配置中 publisher-returns 被設置為 false,表示不啟用消息返回功能publisher-confirm-type: simple

3、主啟動類


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @author 10564*/
@SpringBootApplication
public class ApplicationRabbitmq {public static void main(String[] args) {SpringApplication.run(ApplicationRabbitmq.class, args);}
}

五種消息模式

基本模式(Basic Model)工作隊列消息類型、,(扇形fanout , 路由direct , 廣播主題topic)都屬于發布訂閱模型

1、RabbitMQ簡單模式

也稱為基本模式(Basic Model),是RabbitMQ的最簡單的消息傳遞模式,僅涉及到一個生產者和一個消費者。在這個模式中,當我們啟動一個程序作為生產者并向RabbitMQ發出消息時,我們希望它直接進入隊列中,然后消費者會從隊列中獲取這個消息并進行處理。簡單模式在RabbitMQ中是一個單隊列單生產者單消費者的模式,主要適用于單純的任務處理,消息的生產者和消費者的削峰填谷能力非常高。

1、定義消息隊列Queue名稱


package org.xwb.springcloud.constant;/*** @author 10564*/
public class MqConstant {/*** 簡單消息隊列名稱*/public static final String SIMPLE_MQ_NAME = "simpleQueue";
}

2、配置類Configuration


import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xwb.springcloud.constant.MqConstant;/*** 創建RabbitMQ的配置類* @author 10564*/
@Configuration
public class RabbitmqSimpleConfig {/*** 簡單消息隊列*/@Beanpublic Queue simpleQueue() {//名字(name):隊列的名字,用來區分不同的隊列。//是否持久化(durable):如果設置為 true,表示即使服務器重啟了,這個隊列依然存在。//是否獨占(exclusive):如果設置為 true,表示只有創建它的連接才能使用這個隊列。//是否自動刪除(autoDelete):如果設置為 true,表示當不再有消費者使用這個隊列時,服務器會自動刪除它。return new Queue(MqConstant.SIMPLE_MQ_NAME,true,false,false);}
}

3、生產者Producer

import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;import javax.annotation.Resource;/*** @author 10564*/
@Component
public class SimpleProducer {private static final Logger log = LoggerFactory.getLogger(SimpleProducer.class);@Resourceprivate RabbitTemplate rabbitTemplate;public void senderSimple(String userModel) {log.info("\n簡單生產者發送消息:{}\n", JSONObject.toJSONString(userModel));rabbitTemplate.convertAndSend(MqConstant.SIMPLE_MQ_NAME,userModel);}
}

4、消費者Consumer

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;/*** @author 10564*/
@Component
public class SimpleConsumer {private static final Logger log = LoggerFactory.getLogger(SimpleConsumer.class);@RabbitListener(queues = MqConstant.SIMPLE_MQ_NAME)public void receiveHelloQueueMessage(String userModel, Channel channel) {log.info("\n簡單消費者接收消息:{}\n", JSONObject.toJSONString(userModel));}
}

5、測試Test

package org.xwb.springcloud.controller;import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xwb.springcloud.messagetype.simple.SimpleProducer;import javax.annotation.Resource;/*** @author 10564*/
@RestController
@RequestMapping("/mq")
public class MqMessageController {@Resourceprivate SimpleProducer simpleProducer;@GetMapping("/simple")public void simpleProducerTest(String message) {simpleProducer.senderSimple(message);}
}

6、測試結果

### simple
GET http://localhost:8004/mq/simple?message=simple111## 結果
2025-06-21 15:19:54.769  INFO 8236 --- [nio-8004-exec-2] o.x.s.messagetype.simple.SimpleProducer  : 
簡單生產者發送消息:"simple111"2025-06-21 15:19:54.772  INFO 8236 --- [ntContainer#7-1] o.x.s.messagetype.simple.SimpleConsumer  : 
簡單消費者接收消息:"simple111"

2、RabbitMQ工作模式

當消費者處理消息較為耗時的情況下,可能會導致生產消息的速度遠大于消費速度,從而造成消息堆積、無法及時處理的問題。為了解決這一問題,可以采用工作隊列模型,即讓多個消費者綁定到同一個隊列上,共同消費該隊列中的消息。在這種模型中,隊列中的消息一旦被某個消費者成功消費,就會從隊列中移除,因此任務不會被重復執行,且同一個消息只會被一個消費者消費

1、定義消息隊列Queue名稱

package org.xwb.springcloud.constant;/*** @author 10564*/
public class MqConstant {/*** 工作隊列消息隊列名稱*/public static final String WORK_QUEUE_NAME = "workQueue";
}

2、配置類Configuration


import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xwb.springcloud.constant.MqConstant;/*** 創建RabbitMQ的配置類* @author 10564*/
@Configuration
public class RabbitmqWorkQueueConfig {/*** 工作隊列*/@Beanpublic Queue workQueue() {return new Queue(MqConstant.WORK_QUEUE_NAME);}
}

3、生產者Producer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;import javax.annotation.Resource;/*** @author 10564*/
@Component
public class WorkQueueProducer {private static final Logger log = LoggerFactory.getLogger(WorkQueueProducer.class);@Resourceprivate RabbitTemplate rabbitTemplate;public void senderWorkQueueMessage(String message) {log.info("\n工作隊列生產者發送消息:{}\n",message);rabbitTemplate.convertAndSend(MqConstant.WORK_QUEUE_NAME,message);}
}

4、消費者Consumer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;/*** 多個消費者綁定到一個隊列,共同消費隊列中的消息* @author 10564*/
@Component
public class WorkQueueConsumer {private static final Logger log = LoggerFactory.getLogger(WorkQueueConsumer.class);/*** 消費者one監聽隊列,并接收消息* @param message 消息內容*/@RabbitListener(queues = MqConstant.WORK_QUEUE_NAME)public void receiveWorkQueueMessageOne(String message) {log.info("\n工作隊列消費者One接收消息:{}\n",message);}/*** 消費者two監聽隊列,并接收消息* @param message 消息內容*/@RabbitListener(queues = MqConstant.WORK_QUEUE_NAME)public void receiveWorkQueueMessageTwo(String message) {log.info("\n工作隊列消費者Two接收消息:{}\n",message);}
}

5、測試Test

6、測試結果

### workQueue 連續請求三次達到以下結果
GET http://localhost:8004/mq/workQueue?message=workQueue2## 結果
2025-06-21 15:31:07.726  INFO 8236 --- [nio-8004-exec-3] o.x.s.m.workqueue.WorkQueueProducer      : 
工作隊列生產者發送消息:workQueue workQueue22025-06-21 15:31:07.729  INFO 8236 --- [tContainer#11-1] o.x.s.m.workqueue.WorkQueueConsumer      : 
工作隊列消費者One接收消息:workQueue workQueue22025-06-21 15:31:22.640  INFO 8236 --- [nio-8004-exec-7] o.x.s.m.workqueue.WorkQueueProducer      : 
工作隊列生產者發送消息:workQueue workQueue22025-06-21 15:31:22.643  INFO 8236 --- [tContainer#10-1] o.x.s.m.workqueue.WorkQueueConsumer      : 
工作隊列消費者Two接收消息:workQueue workQueue22025-06-21 15:31:24.122  INFO 8236 --- [nio-8004-exec-8] o.x.s.m.workqueue.WorkQueueProducer      : 
工作隊列生產者發送消息:workQueue workQueue22025-06-21 15:31:24.124  INFO 8236 --- [tContainer#11-1] o.x.s.m.workqueue.WorkQueueConsumer      : 
工作隊列消費者One接收消息:workQueue workQueue2

3、RabbitMQ 訂閱發布Pub/Sub(fanout)

發布訂閱模式,也叫做“廣播(Broadcast)模式。生產者將消息發送到Exchange(交換機)上,
沒有RoutingKey以及BindingKey的概念
,Bindings只是簡單的將消息與交換機進行了綁定,如果消息進入了交換機中,那么這個消息會被轉發到所有與當前交換機進行綁定的所有隊列中。

發布訂閱模型:允許一個消息向多個消費者投遞,fanout , direct , topics都屬于發布訂閱模型。交換機使用Fanout-廣播類型

1、定義消息隊列Queue名稱

package org.xwb.springcloud.constant;/*** @author 10564*/
public class MqConstant {//訂閱發布消息隊列1public static final String FANOUT_QUEUE_ONE = "fanoutQueueOne";//訂閱發布消息隊列2public static final String FANOUT_QUEUE_TWO = "fanoutQueueTwo";//訂閱發布消息隊列 - 交換機public static final String FANOUT_EXCHANGE_NAME = "fanoutExchange";
}

2、配置類Configuration


package org.xwb.springcloud.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xwb.springcloud.constant.MqConstant;/*** 創建RabbitMQ的配置類** @author 10564*/
@Configuration
public class RabbitmqFanoutConfig {@Beanpublic Queue fanoutQueueOne() {return new Queue(MqConstant.FANOUT_QUEUE_ONE);}@Beanpublic Queue fanoutQueueTwo() {return new Queue(MqConstant.FANOUT_QUEUE_TWO);}@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(MqConstant.FANOUT_EXCHANGE_NAME);}//將訂閱發布隊列one 與該交換機綁定@Beanpublic Binding fanoutQueueOneBinding() {return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());}//將訂閱發布隊列two 與該交換機綁定@Beanpublic Binding fanoutQueueTwobinding() {return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());}
}

3、生產者Producer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;import javax.annotation.Resource;/*** @author 10564*/
@Component
public class FanoutProducer {private static final Logger log = LoggerFactory.getLogger(FanoutProducer.class);@Resourceprivate RabbitTemplate rabbitTemplate;public void senderFanoutQueue(String message) {log.info("\n訂閱發布生產者發送消息:{}\n",message);//參數1:交換機名稱//參數2:路由key//參數3:消息// fanout_exchange 廣播類型的交換機  不需要指定路由key  所有綁定到該交換機的隊列都會收到消息rabbitTemplate.convertAndSend(MqConstant.FANOUT_EXCHANGE_NAME,"",message);}
}

4、消費者Consumer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;/*** @author 10564*/
@Component
public class FanoutConsumer {private static final Logger log = LoggerFactory.getLogger(FanoutConsumer.class);//監聽queue1隊列@RabbitListener(queues = MqConstant.FANOUT_QUEUE_ONE)public void receiveFanoutQueueOne(String msg) {log.info("\n訂閱發布消費者One接收消息:{}\n",msg);}//監聽queue2隊列@RabbitListener(queues = MqConstant.FANOUT_QUEUE_TWO)public void receiveFanoutQueueTwo(String msg) {log.info("\n訂閱發布消費者two接收消息:{}\n",msg);}
}

5、測試Test

package org.xwb.springcloud.controller;import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xwb.springcloud.messagetype.fanout.FanoutProducer;import javax.annotation.Resource;/*** @author 10564*/
@RestController
@RequestMapping("/mq")
public class MqMessageController {@Resourceprivate FanoutProducer fanoutProducer;@GetMapping("/fanout")public void fanoutProducerTest(String message) {fanoutProducer.senderFanoutQueue("fanout " + message);}
}

6、測試結果

### fanout
GET http://localhost:8004/mq/fanout?message=fanout## 結果
2025-06-21 15:39:33.247  INFO 8236 --- [nio-8004-exec-4] o.x.s.messagetype.fanout.FanoutProducer  : 
訂閱發布生產者發送消息:fanout fanout2025-06-21 15:39:33.250  INFO 8236 --- [ntContainer#6-1] o.x.s.messagetype.fanout.FanoutConsumer  : 
訂閱發布消費者two接收消息:fanout fanout2025-06-21 15:39:33.250  INFO 8236 --- [ntContainer#5-1] o.x.s.messagetype.fanout.FanoutConsumer  : 
訂閱發布消費者One接收消息:fanout fanout

4、RabbitMQ 路由(direct)

在Fanout模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費,我們就要用的routing路由模式,這種模式是通過一個routingkey來收發消息。交換機的類型使用direct。

(與發布訂閱模式相比,交換機并不是發送給所有綁定的隊列,而是在這些綁定隊列中,符合routingkey的隊列)

1、定義消息隊列Queue名稱

package org.xwb.springcloud.constant;/*** @author 10564*/
public class MqConstant {//路由隊列1public static final String DIRECT_QUEUE_ONE = "directQueueOne";//路由隊列2public static final String DIRECT_QUEUE_TWO = "directQueueTwo";//路由交換機public static final String DIRECT_EXCHANGE_NAME = "directExchangeName";//路由鍵onepublic static final String DIRECT_ROUTING_KEY_ONE = "directRoutingKeyOne";//路由鍵twopublic static final String DIRECT_ROUTING_KEY_TWO = "directRoutingKeyTwo";
}

2、配置類Configuration


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xwb.springcloud.constant.MqConstant;/*** 創建RabbitMQ的配置類** @author 10564*/
@Configuration
public class RabbitmqDirectConfig {@Beanpublic Queue directQueueOne() {return new Queue(MqConstant.DIRECT_QUEUE_ONE);}@Beanpublic Queue directQueueTwo() {return new Queue(MqConstant.DIRECT_QUEUE_TWO);}@Beanpublic DirectExchange directExchange() {return new DirectExchange(MqConstant.DIRECT_EXCHANGE_NAME);}@Beanpublic Binding directExchangeBindOne() {return BindingBuilder.bind(directQueueOne()).to(directExchange()).with(MqConstant.DIRECT_ROUTING_KEY_ONE);}@Beanpublic Binding directExchangeBindTwo() {return BindingBuilder.bind(directQueueTwo()).to(directExchange()).with(MqConstant.DIRECT_ROUTING_KEY_TWO);}
}

3、生產者Producer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;import javax.annotation.Resource;/*** @author 10564* @description: 直連交換機生產者*/
@Component
public class DirectProducer {private static final Logger log = LoggerFactory.getLogger(DirectProducer.class);@Resourceprivate RabbitTemplate rabbitTemplate;public void senderDirectOneMessage(String message) {log.info("\n路由模式生產者one發送消息:{}\n",message);//參數1:交換機名稱//參數2:路由key//參數3:消息//topic_exchange交換機  需要指定路由key  綁定到該交換機且符合路由key的隊列都會收到消息rabbitTemplate.convertAndSend(MqConstant.DIRECT_EXCHANGE_NAME,MqConstant.DIRECT_ROUTING_KEY_ONE,message+"ONE");}public void senderDirectTwoMessage(String message) {log.info("\n路由模式生產者two發送消息:{}\n",message);//參數1:交換機名稱//參數2:路由key//參數3:消息//topic_exchange交換機  需要指定路由key  綁定到該交換機且符合路由key的隊列都會收到消息rabbitTemplate.convertAndSend(MqConstant.DIRECT_EXCHANGE_NAME,MqConstant.DIRECT_ROUTING_KEY_TWO,message+"TWO");}
}

4、消費者Consumer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;/*** @author 10564*/
@Component
public class DirectConsumer {private static final Logger log = LoggerFactory.getLogger(DirectConsumer.class);//監聽queue1隊列@RabbitListener(queues = MqConstant.DIRECT_QUEUE_ONE)public void receiveHelloQueueMessage1(String msg) {log.info("\n路由模式消費者one-監聽隊列one-路由鍵one 收到消息:{}\n",msg);}//監聽queue2隊列@RabbitListener(queues = MqConstant.DIRECT_QUEUE_TWO)public void receiveHelloQueueMessage2(String msg) {log.info("\n路由模式消費者two-監聽隊列two-路由鍵two 收到消息:{}\n",msg);}
}

5、測試Test


import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xwb.springcloud.messagetype.direct.DirectProducer;import javax.annotation.Resource;/*** @author 10564*/
@RestController
@RequestMapping("/mq")
public class MqMessageController {@Resourceprivate DirectProducer directProducer;@GetMapping("/direct")public void directProducerTest(String message) {directProducer.senderDirectOneMessage("direct one " + message);directProducer.senderDirectTwoMessage("direct two " + message);}
}

6、測試結果

### direct
GET http://localhost:8004/mq/direct?message=direct## 結果
2025-06-21 15:44:34.354  INFO 8236 --- [nio-8004-exec-8] o.x.s.messagetype.direct.DirectProducer  : 
路由模式生產者one發送消息:direct one direct2025-06-21 15:44:34.354  INFO 8236 --- [nio-8004-exec-8] o.x.s.messagetype.direct.DirectProducer  : 
路由模式生產者two發送消息:direct two direct2025-06-21 15:44:34.357  INFO 8236 --- [ntContainer#3-1] o.x.s.messagetype.direct.DirectConsumer  : 
路由模式消費者one-監聽隊列one-路由鍵one 收到消息:direct one directONE2025-06-21 15:44:34.357  INFO 8236 --- [ntContainer#4-1] o.x.s.messagetype.direct.DirectConsumer  : 
路由模式消費者two-監聽隊列two-路由鍵two 收到消息:direct two directTWO

5、RabbitMQ 主題(topic)

Topic類型的交換機與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列,沒有本質區別,只不過Topic類型交換機可以讓隊列在綁定Routing
key 的時候使用通配符!即不像Direct寫的那么死。

Routing key:可以由一個或多個單詞組成,每個單詞之間用“.”來分隔,例如:topicExchange.job

通配符規則:

#:匹配一個或多個詞

*:匹配不多不少恰好1個詞

Routing key = topicExchange.# 代表后面可以匹配多個,topicExchange.job topicExchange.lucky.day topicExchange.luck.for.you

Routing key = topicExchange.* 代表后面只能跟一個,topicExchange.job topicExchange.luck 但topicExchange.lucky.day(此處后面是兩個單詞所以不匹配)就不可以

1、定義消息隊列Queue名稱

package org.xwb.springcloud.constant;/*** @author 10564*/
public class MqConstant {//隊列01public static final String TOPIC_QUEUE_ONE = "topicQueueOne";//隊列02public static final String TOPIC_QUEUE_TWO = "topicQueueTwo";//交換機public static final String TOPIC_EXCHANGE_NAME = "topicExchange";/*** # 路由鍵匹配一個或多個單詞  routingKey. 開頭的都可以匹配到 eg: routingKey.one routingKey.two等等*/public static final String TOPIC_ROUTING_KEY_MATCHING_ONE_OR_MORE = "topicRoutingKey.#";// * 路由鍵匹配一個單詞   topicRoutingKey. 開頭的,如果多個,只匹配一個public static final String TOPIC_ROUTING_KEY_MATCHING_ONE = "topicRoutingKey.*";
}

2、配置類Configuration


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xwb.springcloud.constant.MqConstant;/*** 創建RabbitMQ的配置類** @author 10564*/
@Configuration
public class RabbitmqTopicConfig {@Beanpublic Queue topicQueueOne() {return new Queue(MqConstant.TOPIC_QUEUE_ONE);}@Beanpublic Queue topicQueueTwo() {return new Queue(MqConstant.TOPIC_QUEUE_TWO);}@Beanpublic TopicExchange topicExchange() {return new TopicExchange(MqConstant.TOPIC_EXCHANGE_NAME);}@Beanpublic Binding topicBindingMatchingOneOrMore() {return BindingBuilder.bind(topicQueueOne()).to(topicExchange()).with(MqConstant.TOPIC_ROUTING_KEY_MATCHING_ONE_OR_MORE);}@Beanpublic Binding topicBindingMatchingOne() {return BindingBuilder.bind(topicQueueTwo()).to(topicExchange()).with(MqConstant.TOPIC_ROUTING_KEY_MATCHING_ONE);}
}

3、生產者Producer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;import javax.annotation.Resource;/*** @author 10564*/
@Component
public class TopicQueueProducer {private static final Logger log = LoggerFactory.getLogger(TopicQueueProducer.class);@Resourceprivate RabbitTemplate rabbitTemplate;/*** 發送topic主體消息隊列,匹配一個 或者匹配多個* @param message 消息* @param routingKey 路由key*/public void senderTopicQueueRoutingMatching(String message,String routingKey) {log.info("\n主題模式 生產者發送消息 message:{}\n",message);log.info("\n主題模式 Routing:【{},{}】,當前的消息routing:{}\n",MqConstant.TOPIC_ROUTING_KEY_MATCHING_ONE,MqConstant.TOPIC_ROUTING_KEY_MATCHING_ONE_OR_MORE,routingKey);//參數1:交換機名稱//TODO 參數2:路由key * 匹配一個 # 匹配一個或者多個【此處的key是匹配MqConstant.TOPIC_ROUTING_KEY_MATCHING_ONE或者是TOPIC_ROUTING_KEY_MATCHING_ONE_OR_MORE】//參數3:消息// topic_exchange交換機  需要指定路由key  綁定到該交換機且符合路由key的隊列都會收到消息rabbitTemplate.convertAndSend(MqConstant.TOPIC_EXCHANGE_NAME,routingKey,message);}
}

4、消費者Consumer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;/**** @author 10564*/
@Component
public class TopicQueueConsumer {private static final Logger log = LoggerFactory.getLogger(TopicQueueConsumer.class);/*** 消費者one監聽隊列,并接收消息* @param message 消息內容*/@RabbitListener(queues = MqConstant.TOPIC_QUEUE_ONE)public void receiveTopicQueueMessageOne(String message) {log.info("\n主題模式消費者01監聽隊列01-》路由鍵01 收到消息:{}\n",message);}/*** 消費者two監聽隊列,并接收消息* @param message 消息內容*/@RabbitListener(queues = MqConstant.TOPIC_QUEUE_TWO)public void receiveTopicQueueMessageTwo(String message) {log.info("\n主題模式消費者02監聽隊列02-》路由鍵02 收到消息:{}\n",message);}
}

5、測試Test

package org.xwb.springcloud.controller;import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xwb.springcloud.messagetype.topic.TopicQueueProducer;import javax.annotation.Resource;/*** @author 10564*/
@RestController
@RequestMapping("/mq")
public class MqMessageController {@Resourceprivate TopicQueueProducer topicQueueProducer;@GetMapping("/topic")public void topicQueueProducerTest(String message, String routingKey) {topicQueueProducer.senderTopicQueueRoutingMatching("topic one " + message, routingKey);}
}

6、測試結果


### topic
GET http://localhost:8004/mq/topic?message=topic&routingKey=topicRoutingKey.one## 結果
2025-06-21 15:49:03.545  INFO 8236 --- [nio-8004-exec-2] o.x.s.m.topic.TopicQueueProducer         : 
主題模式 生產者發送消息 message:topic one topic2025-06-21 15:49:03.545  INFO 8236 --- [nio-8004-exec-2] o.x.s.m.topic.TopicQueueProducer         : 
主題模式 Routing:【topicRoutingKey.*,topicRoutingKey.#】,當前的消息routing:topicRoutingKey.one2025-06-21 15:49:03.547  INFO 8236 --- [ntContainer#9-1] o.x.s.m.topic.TopicQueueConsumer         : 
主題模式消費者01監聽隊列01-》路由鍵01 收到消息:topic one topic2025-06-21 15:49:03.547  INFO 8236 --- [ntContainer#8-1] o.x.s.m.topic.TopicQueueConsumer         : 
主題模式消費者02監聽隊列02-》路由鍵02 收到消息:topic one topic### topic
GET http://localhost:8004/mq/topic?message=topic&routingKey=topicRoutingKey.one.two## 結果
2025-06-21 15:49:28.181  INFO 8236 --- [nio-8004-exec-6] o.x.s.m.topic.TopicQueueProducer         : 
主題模式 生產者發送消息 message:topic one topic2025-06-21 15:49:28.182  INFO 8236 --- [nio-8004-exec-6] o.x.s.m.topic.TopicQueueProducer         : 
主題模式 Routing:【topicRoutingKey.*,topicRoutingKey.#】,當前的消息routing:topicRoutingKey.one.two2025-06-21 15:49:28.184  INFO 8236 --- [ntContainer#9-1] o.x.s.m.topic.TopicQueueConsumer         : 
主題模式消費者01監聽隊列01-》路由鍵01 收到消息:topic one topic

測試工具類 request.http文件

(IDEA搜索HTTP Client插件工具可以直接使用)


### simple
GET http://localhost:8004/mq/simple?message=simple111### workQueue
GET http://localhost:8004/mq/workQueue?message=workQueue2### fanout
GET http://localhost:8004/mq/fanout?message=fanout### direct
GET http://localhost:8004/mq/direct?message=direct### topic
GET http://localhost:8004/mq/topic?message=topic&routingKey=topicRoutingKey.one### topic
GET http://localhost:8004/mq/topic?message=topic&routingKey=topicRoutingKey.one.two

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/86453.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/86453.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/86453.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

日語學習-日語知識點小記-進階-JLPT-真題訓練-N2階段(4):2022年12月2023年12月

日語學習-日語知識點小記-進階-JLPT-真題訓練-N2階段&#xff08;4&#xff09;&#xff1a;2022年12月&2023年12月 1、前言&#xff08;1&#xff09;情況說明&#xff08;2&#xff09;工程師的信仰&#xff08;3&#xff09;真題訓練 2、2個卷的單詞部分1、 真題-2023年…

從代碼學習深度強化學習 - Actor-Critic 算法 PyTorch版

文章目錄 前言算法原理1. 從策略梯度到Actor-Critic2. Actor 和 Critic 的角色3. Critic 的學習方式:時序差分 (TD)4. Actor 的學習方式:策略梯度5. 算法流程代碼實現1. 環境與工具函數2. 構建Actor-Critic智能體3. 組織訓練流程4. 主程序:啟動訓練5. 實驗結果總結前言 在深…

Python 數據分析與可視化 Day 8 - Pandas 高級操作技巧

? 今日目標 掌握 Pandas 的索引體系&#xff08;Index / MultiIndex&#xff09;使用 set_index() 和 reset_index() 管理數據索引理解 pivot_table 與 melt、stack/unstack 重塑數據形態初步理解“寬表”與“長表”在數據分析與可視化中的應用場景 &#x1f4da; 一、深入理…

Spring Boot整合百度AI人臉比對實戰

目錄 一、簡述 二、依賴 三、代碼步驟 3.1 實體注入 3.2 服務實現 3.3 其它實現 四、小結 歡迎來到 盹貓(>^ω^<)的博客 本篇文章主要介紹了 [Spring Boot整合百度AI人臉比對實戰] ?博主廣交技術好友&#xff0c;喜歡文章的可以關注一下? 一、簡述 人臉識別在日…

使用 pip 安裝 numpy 包卡在 Preparing metadata 階段問題解決

TOC 1 問題描述 使用 pip 安裝numpy卡在下面最后一行的階段&#xff1a; Collecting numpy1.26.4 (from -r requirements.txt (line 2))Using cached https://mirrors.aliyun.com/pypi/packages/65/6e/09db70a523a96d25e115e71cc56a6f9031e7b8cd166c1ac8438307c14058/numpy-…

新手向:Anaconda3的安裝與使用方法

我們在剛開始接觸Python時使用的是Python的直接編譯器,如果我們需要進行其他的項目編寫往往需要使用另一個版本的Python ,這樣反復的下載很是麻煩并且還會造成系統變量的紊亂.這次我們引入Anaconda3,可創建虛擬的Python環境,滿足不同項目的需要,當不用的時候可以直接放心刪除不…

C#中的設計時構造函數

以下是關于設計時構造函數的詳細整理&#xff0c;包括定義、適用場景、相關概念和實際應用&#xff1a; 一、設計時構造函數的定義 設計時構造函數&#xff08;Design-time Constructor&#xff09;是專門為開發工具&#xff08;如Visual Studio、Blazor Designer等&#xff0…

Spring Boot 2.x 項目搭建 (一)

以下是基于Spring Boot 2.x&#xff08;兼容JDK 1.8&#xff09;的項目搭建指南及Markdown文檔生成方案&#xff0c;整合了多個搜索結果中的最佳實踐&#xff1a; 一、項目初始化 1. 使用Spring Initializr創建項目 步驟&#xff1a; 訪問 start.spring.io 或通過IDE&#x…

Kotlin作用域函數:掌握apply/let/run/with/also精髓

一、作用域函數詳解 1. apply&#xff1a;對調用對象進行配置或操作&#xff0c;并返回該對象本身。 接收者引用&#xff1a;this&#xff08;可省略&#xff0c;直接調用接收者成員&#xff09;返回值&#xff1a;接收者對象本身&#xff08;T&#xff09;核心用途&#xff…

Spring Boot監視器:應用監控終極指南

Spring Boot 監視器詳解 Spring Boot 監視器(Monitor)是用于監控和管理 Spring Boot 應用程序運行狀態的核心組件,主要通過 Spring Boot Actuator 和 Spring Boot Admin 兩大工具實現。 一、核心監視器組件 1. Spring Boot Actuator 功能定位:提供應用程序內部運行狀態的原…

SpringBoot 中 @Transactional 的使用

SpringBoot 中 Transactional 的使用 一、Transactional 的基本使用二、Transactional 的核心屬性三、使用避坑&#xff08;失效場景&#xff09;3.1 自調用問題3.2 異常處理不當3.3 類未被 Spring 管理3.4 異步方法內使用失效 四、工作實踐4.1 事務提交之后執行一些操作4.2 事…

6.26_JAVA_微服務_Elasticsearch

1、ES文檔中keyword意思是&#xff1a;字符串&#xff0c;但不需要分詞 2、ES細節CreateIndexRequest request new CreateIndexRequest("items");會讓你導包&#xff0c;會有兩個選擇&#xff1a; import org.elasticsearch.action.admin.indices.create.CreateInd…

Java 大視界 -- 基于 Java 的大數據可視化在智慧城市能源消耗動態監測與優化決策中的應用(324)

Java 大視界 -- 基于 Java 的大數據可視化在智慧城市能源消耗動態監測與優化決策中的應用&#xff08;324&#xff09; 引言&#xff1a;正文&#xff1a;一、Java 驅動的能源數據采集與預處理基建1.1 多源異構數據合規接入層&#xff08;ISO 50001IEC 61850 雙標準適配&#x…

C++ 快速回顧(二)

C 快速回顧&#xff08;二&#xff09; 前言一、友元類二、友元函數三、深淺拷貝淺拷貝深拷貝 前言 用于快速回顧之前遺漏或者補充C知識 一、友元類 友元的優點是可以快速的輕松的訪問的原本由于私有保護的字段和函數&#xff0c;同時這也是它的缺點這樣破壞了原本封裝性。 …

ldl-DeserializationViewer一款強大的序列化數據可視化工具

ldl-DeserializationViewer 一款強大的序列化數據可視化工具&#xff0c;能夠將Java序列化的緩存數據轉換為可讀的JSON格式&#xff0c;無需原始DTO類定義。 A powerful visualization tool for serialized data that converts Java serialized cache data to readable JSON f…

NetworkSecurity SIG成立,助力國產操作系統安全生態發展

近期&#xff0c;ZeroOnes實驗室團隊成員在OpenAtom openKylin&#xff08;簡稱“openKylin”&#xff09;社區發起成立NetworkSecurity SIG&#xff0c;負責基于openKylin系統開展網絡安全工具的研發與適配&#xff0c;助力國產操作系統安全生態發展。 ZeroOnes實驗室專注于網…

回歸任務與分類任務的區別

回歸任務&#xff08;Regression&#xff09;與分類任務&#xff08;Classification&#xff09;是機器學習的兩大核心任務類型&#xff0c;其根本區別在于輸出變量的性質和任務目標。以下是系統性對比&#xff1a; 1. 本質區別&#xff1a;輸出變量類型 任務類型輸出&#xf…

Webshell工具的流量特征分析(菜刀,蟻劍,冰蝎,哥斯拉)

Webshell工具的流量特征分析&#xff08;菜刀&#xff0c;蟻劍&#xff0c;冰蝎&#xff0c;哥斯拉&#xff09; 0x00 前言 使用各種的shell工具獲取到目標權限&#xff0c;即可進行數據操作&#xff0c;今天來簡要分析一下目前常使用的各類shell管理工具的流量特診&#xff…

【linux】全志Tina配置swupdate工具進行分區打包

一、文件路徑 1、描述文件&#xff1a; .\build\swupdate\sw-description-ab 2、鏡像打包文件&#xff1a; .\build\swupdate\sw-subimgs-ab.cfg 二、文件作用 1、sw-description-ab 用于描述版本信息和ab區中要打包的分區信息以及掛載點。 2、sw-subimgs-ab.cfg 用于…

MicroPython網絡編程:AP模式與STA模式詳解

文章目錄 1. MicroPython網絡模塊概述2. 熱點AP模式詳解2.1 什么是AP模式?2.2 AP模式特點2.3 AP模式設置代碼2.4 AP模式適用場景3. 客戶端STA模式詳解3.1 什么是STA模式?3.2 STA模式特點3.3 STA模式設置代碼3.4 STA模式適用場景4. AP與STA模式對比分析5. 實際應用場景與選擇建…