交換機類型
Fanout Exchange
扇型交換機,這個交換機沒有路由鍵概念,就算你綁了路由鍵也是無視的。 這個交換機在接收到消息后,會直接轉發到綁定到它上面的所有隊列
Direct Exchange
直連型交換機,根據消息攜帶的路由鍵將消息投遞給對應隊列。
大致流程,有一個隊列綁定到一個直連交換機上,同時賦予一個路由鍵 routing key 。然后當一個消息攜帶著路由值為X,這個消息通過生產者發送給交換機時,交換機就會根據這個路由值X去尋找綁定值也是X的隊列。
Topic Exchange
主題交換機,這個交換機其實跟直連交換機流程差不多,但是它的特點就是在它的路由鍵和綁定鍵之間是有規則的。
搭建基本環境
1、pom.xml引入的java包
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>${springboot-version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>${springboot-version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version><scope>provided</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><version>${springboot-version}</version><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.57</version></dependency></dependencies>
2、yaml配置文件
# 8004是zookeeper服務器的支付服務提供者端口號
server:port: 8004
spring:application:name: cloud-mqrabbitmq:addresses: 192.168.96.133port: 5672username: guestpassword: guestvirtual-host: /#消費者配置listener:#todo 切記,設置了重拾機制,要拋出異常,不可try catch 后不拋異常,否則重試機制失效simple:#開啟ack 手動確認消息是否被消費成功acknowledge-mode: manualretry:enabled: true# 消費失敗后,繼續消費,然后最多消費5次就不再消費。max-attempts: 5# 消費失敗后 ,重試初始間隔時間 2秒initial-interval: 2000# 重試最大間隔時間5秒max-interval: 5000# 間隔時間乘子,間隔時間*乘子=下一次的間隔時間,最大不能超過設置的最大間隔時間multiplier: 2direct:#開啟ack 手動確認消息是否被消費成功acknowledge-mode: manual#todo 切記,設置了重拾機制,要拋出異常,不可try catch 后不拋異常,否則重試機制失效retry:enabled: true# 消費失敗后,繼續消費,然后最多消費3次就不再消費。max-attempts: 3# 消費失敗后 ,重試初始間隔時間 3秒initial-interval: 3000# 重試最大間隔時間max-interval: 7000# 間隔時間乘子,間隔時間*乘子=下一次的間隔時間,最大不能超過設置的最大間隔時間multiplier: 2# 生產者配置template:retry:# 開啟消息發送失敗重試機制enabled: true# 生產者 true-開啟消息抵達隊列的確認publisher-returns: false#simple 配置用于設置 RabbitMQ 消息生產者的消息確認類型為“簡單確認”。這意味著當消息被發送到 RabbitMQ 之后,只有在消息成功投遞到隊列中后,RabbitMQ 才會向生產者發送一個確認(ack)通知。如果消息未能成功投遞,則不會收到確認。#該配置通常與 publisher-returns: true 一起使用以啟用消息返回機制,但在此配置中 publisher-returns 被設置為 false,表示不啟用消息返回功能publisher-confirm-type: simple
3、主啟動類
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @author 10564*/
@SpringBootApplication
public class ApplicationRabbitmq {public static void main(String[] args) {SpringApplication.run(ApplicationRabbitmq.class, args);}
}
五種消息模式
基本模式(Basic Model)
、工作隊列消息類型
、,(扇形fanout
, 路由direct
, 廣播主題topic
)都屬于發布訂閱模型
1、RabbitMQ簡單模式
也稱為基本模式(Basic Model),是RabbitMQ的最簡單的消息傳遞模式,僅涉及到一個生產者和一個消費者。在這個模式中,當我們啟動一個程序作為生產者并向RabbitMQ發出消息時,我們希望它直接進入隊列中,然后消費者會從隊列中獲取這個消息并進行處理。簡單模式在RabbitMQ中是一個單隊列單生產者單消費者的模式,主要適用于單純的任務處理,消息的生產者和消費者的削峰填谷能力非常高。
1、定義消息隊列Queue名稱
package org.xwb.springcloud.constant;/*** @author 10564*/
public class MqConstant {/*** 簡單消息隊列名稱*/public static final String SIMPLE_MQ_NAME = "simpleQueue";
}
2、配置類Configuration
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xwb.springcloud.constant.MqConstant;/*** 創建RabbitMQ的配置類* @author 10564*/
@Configuration
public class RabbitmqSimpleConfig {/*** 簡單消息隊列*/@Beanpublic Queue simpleQueue() {//名字(name):隊列的名字,用來區分不同的隊列。//是否持久化(durable):如果設置為 true,表示即使服務器重啟了,這個隊列依然存在。//是否獨占(exclusive):如果設置為 true,表示只有創建它的連接才能使用這個隊列。//是否自動刪除(autoDelete):如果設置為 true,表示當不再有消費者使用這個隊列時,服務器會自動刪除它。return new Queue(MqConstant.SIMPLE_MQ_NAME,true,false,false);}
}
3、生產者Producer
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;import javax.annotation.Resource;/*** @author 10564*/
@Component
public class SimpleProducer {private static final Logger log = LoggerFactory.getLogger(SimpleProducer.class);@Resourceprivate RabbitTemplate rabbitTemplate;public void senderSimple(String userModel) {log.info("\n簡單生產者發送消息:{}\n", JSONObject.toJSONString(userModel));rabbitTemplate.convertAndSend(MqConstant.SIMPLE_MQ_NAME,userModel);}
}
4、消費者Consumer
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;/*** @author 10564*/
@Component
public class SimpleConsumer {private static final Logger log = LoggerFactory.getLogger(SimpleConsumer.class);@RabbitListener(queues = MqConstant.SIMPLE_MQ_NAME)public void receiveHelloQueueMessage(String userModel, Channel channel) {log.info("\n簡單消費者接收消息:{}\n", JSONObject.toJSONString(userModel));}
}
5、測試Test
package org.xwb.springcloud.controller;import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xwb.springcloud.messagetype.simple.SimpleProducer;import javax.annotation.Resource;/*** @author 10564*/
@RestController
@RequestMapping("/mq")
public class MqMessageController {@Resourceprivate SimpleProducer simpleProducer;@GetMapping("/simple")public void simpleProducerTest(String message) {simpleProducer.senderSimple(message);}
}
6、測試結果
### simple
GET http://localhost:8004/mq/simple?message=simple111## 結果
2025-06-21 15:19:54.769 INFO 8236 --- [nio-8004-exec-2] o.x.s.messagetype.simple.SimpleProducer :
簡單生產者發送消息:"simple111"2025-06-21 15:19:54.772 INFO 8236 --- [ntContainer#7-1] o.x.s.messagetype.simple.SimpleConsumer :
簡單消費者接收消息:"simple111"
2、RabbitMQ工作模式
當消費者處理消息較為耗時的情況下,可能會導致生產消息的速度遠大于消費速度,從而造成消息堆積、無法及時處理的問題。為了解決這一問題,可以采用工作隊列模型,即讓多個消費者綁定到同一個隊列上,共同消費該隊列中的消息。在這種模型中,隊列中的消息一旦被某個消費者成功消費,就會從隊列中移除,因此任務不會被重復執行,且同一個消息只會被一個消費者消費。
1、定義消息隊列Queue名稱
package org.xwb.springcloud.constant;/*** @author 10564*/
public class MqConstant {/*** 工作隊列消息隊列名稱*/public static final String WORK_QUEUE_NAME = "workQueue";
}
2、配置類Configuration
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xwb.springcloud.constant.MqConstant;/*** 創建RabbitMQ的配置類* @author 10564*/
@Configuration
public class RabbitmqWorkQueueConfig {/*** 工作隊列*/@Beanpublic Queue workQueue() {return new Queue(MqConstant.WORK_QUEUE_NAME);}
}
3、生產者Producer
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;import javax.annotation.Resource;/*** @author 10564*/
@Component
public class WorkQueueProducer {private static final Logger log = LoggerFactory.getLogger(WorkQueueProducer.class);@Resourceprivate RabbitTemplate rabbitTemplate;public void senderWorkQueueMessage(String message) {log.info("\n工作隊列生產者發送消息:{}\n",message);rabbitTemplate.convertAndSend(MqConstant.WORK_QUEUE_NAME,message);}
}
4、消費者Consumer
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;/*** 多個消費者綁定到一個隊列,共同消費隊列中的消息* @author 10564*/
@Component
public class WorkQueueConsumer {private static final Logger log = LoggerFactory.getLogger(WorkQueueConsumer.class);/*** 消費者one監聽隊列,并接收消息* @param message 消息內容*/@RabbitListener(queues = MqConstant.WORK_QUEUE_NAME)public void receiveWorkQueueMessageOne(String message) {log.info("\n工作隊列消費者One接收消息:{}\n",message);}/*** 消費者two監聽隊列,并接收消息* @param message 消息內容*/@RabbitListener(queues = MqConstant.WORK_QUEUE_NAME)public void receiveWorkQueueMessageTwo(String message) {log.info("\n工作隊列消費者Two接收消息:{}\n",message);}
}
5、測試Test
6、測試結果
### workQueue 連續請求三次達到以下結果
GET http://localhost:8004/mq/workQueue?message=workQueue2## 結果
2025-06-21 15:31:07.726 INFO 8236 --- [nio-8004-exec-3] o.x.s.m.workqueue.WorkQueueProducer :
工作隊列生產者發送消息:workQueue workQueue22025-06-21 15:31:07.729 INFO 8236 --- [tContainer#11-1] o.x.s.m.workqueue.WorkQueueConsumer :
工作隊列消費者One接收消息:workQueue workQueue22025-06-21 15:31:22.640 INFO 8236 --- [nio-8004-exec-7] o.x.s.m.workqueue.WorkQueueProducer :
工作隊列生產者發送消息:workQueue workQueue22025-06-21 15:31:22.643 INFO 8236 --- [tContainer#10-1] o.x.s.m.workqueue.WorkQueueConsumer :
工作隊列消費者Two接收消息:workQueue workQueue22025-06-21 15:31:24.122 INFO 8236 --- [nio-8004-exec-8] o.x.s.m.workqueue.WorkQueueProducer :
工作隊列生產者發送消息:workQueue workQueue22025-06-21 15:31:24.124 INFO 8236 --- [tContainer#11-1] o.x.s.m.workqueue.WorkQueueConsumer :
工作隊列消費者One接收消息:workQueue workQueue2
3、RabbitMQ 訂閱發布Pub/Sub(fanout)
發布訂閱模式,也叫做“廣播(Broadcast)模式。生產者將消息發送到Exchange(交換機)上,
沒有RoutingKey以及BindingKey的概念,Bindings只是簡單的將消息與交換機進行了綁定,如果消息進入了交換機中,那么這個消息會被轉發到所有與當前交換機進行綁定的所有隊列中。
發布訂閱模型:允許一個消息向多個消費者投遞,fanout , direct , topics都屬于發布訂閱模型。交換機使用Fanout-廣播類型
1、定義消息隊列Queue名稱
package org.xwb.springcloud.constant;/*** @author 10564*/
public class MqConstant {//訂閱發布消息隊列1public static final String FANOUT_QUEUE_ONE = "fanoutQueueOne";//訂閱發布消息隊列2public static final String FANOUT_QUEUE_TWO = "fanoutQueueTwo";//訂閱發布消息隊列 - 交換機public static final String FANOUT_EXCHANGE_NAME = "fanoutExchange";
}
2、配置類Configuration
package org.xwb.springcloud.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xwb.springcloud.constant.MqConstant;/*** 創建RabbitMQ的配置類** @author 10564*/
@Configuration
public class RabbitmqFanoutConfig {@Beanpublic Queue fanoutQueueOne() {return new Queue(MqConstant.FANOUT_QUEUE_ONE);}@Beanpublic Queue fanoutQueueTwo() {return new Queue(MqConstant.FANOUT_QUEUE_TWO);}@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(MqConstant.FANOUT_EXCHANGE_NAME);}//將訂閱發布隊列one 與該交換機綁定@Beanpublic Binding fanoutQueueOneBinding() {return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());}//將訂閱發布隊列two 與該交換機綁定@Beanpublic Binding fanoutQueueTwobinding() {return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());}
}
3、生產者Producer
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;import javax.annotation.Resource;/*** @author 10564*/
@Component
public class FanoutProducer {private static final Logger log = LoggerFactory.getLogger(FanoutProducer.class);@Resourceprivate RabbitTemplate rabbitTemplate;public void senderFanoutQueue(String message) {log.info("\n訂閱發布生產者發送消息:{}\n",message);//參數1:交換機名稱//參數2:路由key//參數3:消息// fanout_exchange 廣播類型的交換機 不需要指定路由key 所有綁定到該交換機的隊列都會收到消息rabbitTemplate.convertAndSend(MqConstant.FANOUT_EXCHANGE_NAME,"",message);}
}
4、消費者Consumer
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;/*** @author 10564*/
@Component
public class FanoutConsumer {private static final Logger log = LoggerFactory.getLogger(FanoutConsumer.class);//監聽queue1隊列@RabbitListener(queues = MqConstant.FANOUT_QUEUE_ONE)public void receiveFanoutQueueOne(String msg) {log.info("\n訂閱發布消費者One接收消息:{}\n",msg);}//監聽queue2隊列@RabbitListener(queues = MqConstant.FANOUT_QUEUE_TWO)public void receiveFanoutQueueTwo(String msg) {log.info("\n訂閱發布消費者two接收消息:{}\n",msg);}
}
5、測試Test
package org.xwb.springcloud.controller;import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xwb.springcloud.messagetype.fanout.FanoutProducer;import javax.annotation.Resource;/*** @author 10564*/
@RestController
@RequestMapping("/mq")
public class MqMessageController {@Resourceprivate FanoutProducer fanoutProducer;@GetMapping("/fanout")public void fanoutProducerTest(String message) {fanoutProducer.senderFanoutQueue("fanout " + message);}
}
6、測試結果
### fanout
GET http://localhost:8004/mq/fanout?message=fanout## 結果
2025-06-21 15:39:33.247 INFO 8236 --- [nio-8004-exec-4] o.x.s.messagetype.fanout.FanoutProducer :
訂閱發布生產者發送消息:fanout fanout2025-06-21 15:39:33.250 INFO 8236 --- [ntContainer#6-1] o.x.s.messagetype.fanout.FanoutConsumer :
訂閱發布消費者two接收消息:fanout fanout2025-06-21 15:39:33.250 INFO 8236 --- [ntContainer#5-1] o.x.s.messagetype.fanout.FanoutConsumer :
訂閱發布消費者One接收消息:fanout fanout
4、RabbitMQ 路由(direct)
在Fanout模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費,我們就要用的routing路由模式,這種模式是通過一個routingkey來收發消息。交換機的類型使用direct。
(與發布訂閱模式相比,交換機并不是發送給所有綁定的隊列,而是在這些綁定隊列中,符合routingkey的隊列)
1、定義消息隊列Queue名稱
package org.xwb.springcloud.constant;/*** @author 10564*/
public class MqConstant {//路由隊列1public static final String DIRECT_QUEUE_ONE = "directQueueOne";//路由隊列2public static final String DIRECT_QUEUE_TWO = "directQueueTwo";//路由交換機public static final String DIRECT_EXCHANGE_NAME = "directExchangeName";//路由鍵onepublic static final String DIRECT_ROUTING_KEY_ONE = "directRoutingKeyOne";//路由鍵twopublic static final String DIRECT_ROUTING_KEY_TWO = "directRoutingKeyTwo";
}
2、配置類Configuration
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xwb.springcloud.constant.MqConstant;/*** 創建RabbitMQ的配置類** @author 10564*/
@Configuration
public class RabbitmqDirectConfig {@Beanpublic Queue directQueueOne() {return new Queue(MqConstant.DIRECT_QUEUE_ONE);}@Beanpublic Queue directQueueTwo() {return new Queue(MqConstant.DIRECT_QUEUE_TWO);}@Beanpublic DirectExchange directExchange() {return new DirectExchange(MqConstant.DIRECT_EXCHANGE_NAME);}@Beanpublic Binding directExchangeBindOne() {return BindingBuilder.bind(directQueueOne()).to(directExchange()).with(MqConstant.DIRECT_ROUTING_KEY_ONE);}@Beanpublic Binding directExchangeBindTwo() {return BindingBuilder.bind(directQueueTwo()).to(directExchange()).with(MqConstant.DIRECT_ROUTING_KEY_TWO);}
}
3、生產者Producer
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;import javax.annotation.Resource;/*** @author 10564* @description: 直連交換機生產者*/
@Component
public class DirectProducer {private static final Logger log = LoggerFactory.getLogger(DirectProducer.class);@Resourceprivate RabbitTemplate rabbitTemplate;public void senderDirectOneMessage(String message) {log.info("\n路由模式生產者one發送消息:{}\n",message);//參數1:交換機名稱//參數2:路由key//參數3:消息//topic_exchange交換機 需要指定路由key 綁定到該交換機且符合路由key的隊列都會收到消息rabbitTemplate.convertAndSend(MqConstant.DIRECT_EXCHANGE_NAME,MqConstant.DIRECT_ROUTING_KEY_ONE,message+"ONE");}public void senderDirectTwoMessage(String message) {log.info("\n路由模式生產者two發送消息:{}\n",message);//參數1:交換機名稱//參數2:路由key//參數3:消息//topic_exchange交換機 需要指定路由key 綁定到該交換機且符合路由key的隊列都會收到消息rabbitTemplate.convertAndSend(MqConstant.DIRECT_EXCHANGE_NAME,MqConstant.DIRECT_ROUTING_KEY_TWO,message+"TWO");}
}
4、消費者Consumer
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;/*** @author 10564*/
@Component
public class DirectConsumer {private static final Logger log = LoggerFactory.getLogger(DirectConsumer.class);//監聽queue1隊列@RabbitListener(queues = MqConstant.DIRECT_QUEUE_ONE)public void receiveHelloQueueMessage1(String msg) {log.info("\n路由模式消費者one-監聽隊列one-路由鍵one 收到消息:{}\n",msg);}//監聽queue2隊列@RabbitListener(queues = MqConstant.DIRECT_QUEUE_TWO)public void receiveHelloQueueMessage2(String msg) {log.info("\n路由模式消費者two-監聽隊列two-路由鍵two 收到消息:{}\n",msg);}
}
5、測試Test
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xwb.springcloud.messagetype.direct.DirectProducer;import javax.annotation.Resource;/*** @author 10564*/
@RestController
@RequestMapping("/mq")
public class MqMessageController {@Resourceprivate DirectProducer directProducer;@GetMapping("/direct")public void directProducerTest(String message) {directProducer.senderDirectOneMessage("direct one " + message);directProducer.senderDirectTwoMessage("direct two " + message);}
}
6、測試結果
### direct
GET http://localhost:8004/mq/direct?message=direct## 結果
2025-06-21 15:44:34.354 INFO 8236 --- [nio-8004-exec-8] o.x.s.messagetype.direct.DirectProducer :
路由模式生產者one發送消息:direct one direct2025-06-21 15:44:34.354 INFO 8236 --- [nio-8004-exec-8] o.x.s.messagetype.direct.DirectProducer :
路由模式生產者two發送消息:direct two direct2025-06-21 15:44:34.357 INFO 8236 --- [ntContainer#3-1] o.x.s.messagetype.direct.DirectConsumer :
路由模式消費者one-監聽隊列one-路由鍵one 收到消息:direct one directONE2025-06-21 15:44:34.357 INFO 8236 --- [ntContainer#4-1] o.x.s.messagetype.direct.DirectConsumer :
路由模式消費者two-監聽隊列two-路由鍵two 收到消息:direct two directTWO
5、RabbitMQ 主題(topic)
Topic類型的交換機與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列,沒有本質區別,只不過Topic類型交換機可以讓隊列在綁定Routing
key 的時候使用通配符!即不像Direct寫的那么死。
Routing key:可以由一個或多個單詞組成,每個單詞之間用“.”來分隔,例如:topicExchange.job
通配符規則:
#:匹配一個或多個詞
*:匹配不多不少恰好1個詞
Routing key = topicExchange.# 代表后面可以匹配多個,topicExchange.job topicExchange.lucky.day topicExchange.luck.for.you
Routing key = topicExchange.* 代表后面只能跟一個,topicExchange.job topicExchange.luck 但topicExchange.lucky.day(此處后面是兩個單詞所以不匹配)就不可以
1、定義消息隊列Queue名稱
package org.xwb.springcloud.constant;/*** @author 10564*/
public class MqConstant {//隊列01public static final String TOPIC_QUEUE_ONE = "topicQueueOne";//隊列02public static final String TOPIC_QUEUE_TWO = "topicQueueTwo";//交換機public static final String TOPIC_EXCHANGE_NAME = "topicExchange";/*** # 路由鍵匹配一個或多個單詞 routingKey. 開頭的都可以匹配到 eg: routingKey.one routingKey.two等等*/public static final String TOPIC_ROUTING_KEY_MATCHING_ONE_OR_MORE = "topicRoutingKey.#";// * 路由鍵匹配一個單詞 topicRoutingKey. 開頭的,如果多個,只匹配一個public static final String TOPIC_ROUTING_KEY_MATCHING_ONE = "topicRoutingKey.*";
}
2、配置類Configuration
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xwb.springcloud.constant.MqConstant;/*** 創建RabbitMQ的配置類** @author 10564*/
@Configuration
public class RabbitmqTopicConfig {@Beanpublic Queue topicQueueOne() {return new Queue(MqConstant.TOPIC_QUEUE_ONE);}@Beanpublic Queue topicQueueTwo() {return new Queue(MqConstant.TOPIC_QUEUE_TWO);}@Beanpublic TopicExchange topicExchange() {return new TopicExchange(MqConstant.TOPIC_EXCHANGE_NAME);}@Beanpublic Binding topicBindingMatchingOneOrMore() {return BindingBuilder.bind(topicQueueOne()).to(topicExchange()).with(MqConstant.TOPIC_ROUTING_KEY_MATCHING_ONE_OR_MORE);}@Beanpublic Binding topicBindingMatchingOne() {return BindingBuilder.bind(topicQueueTwo()).to(topicExchange()).with(MqConstant.TOPIC_ROUTING_KEY_MATCHING_ONE);}
}
3、生產者Producer
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;import javax.annotation.Resource;/*** @author 10564*/
@Component
public class TopicQueueProducer {private static final Logger log = LoggerFactory.getLogger(TopicQueueProducer.class);@Resourceprivate RabbitTemplate rabbitTemplate;/*** 發送topic主體消息隊列,匹配一個 或者匹配多個* @param message 消息* @param routingKey 路由key*/public void senderTopicQueueRoutingMatching(String message,String routingKey) {log.info("\n主題模式 生產者發送消息 message:{}\n",message);log.info("\n主題模式 Routing:【{},{}】,當前的消息routing:{}\n",MqConstant.TOPIC_ROUTING_KEY_MATCHING_ONE,MqConstant.TOPIC_ROUTING_KEY_MATCHING_ONE_OR_MORE,routingKey);//參數1:交換機名稱//TODO 參數2:路由key * 匹配一個 # 匹配一個或者多個【此處的key是匹配MqConstant.TOPIC_ROUTING_KEY_MATCHING_ONE或者是TOPIC_ROUTING_KEY_MATCHING_ONE_OR_MORE】//參數3:消息// topic_exchange交換機 需要指定路由key 綁定到該交換機且符合路由key的隊列都會收到消息rabbitTemplate.convertAndSend(MqConstant.TOPIC_EXCHANGE_NAME,routingKey,message);}
}
4、消費者Consumer
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;/**** @author 10564*/
@Component
public class TopicQueueConsumer {private static final Logger log = LoggerFactory.getLogger(TopicQueueConsumer.class);/*** 消費者one監聽隊列,并接收消息* @param message 消息內容*/@RabbitListener(queues = MqConstant.TOPIC_QUEUE_ONE)public void receiveTopicQueueMessageOne(String message) {log.info("\n主題模式消費者01監聽隊列01-》路由鍵01 收到消息:{}\n",message);}/*** 消費者two監聽隊列,并接收消息* @param message 消息內容*/@RabbitListener(queues = MqConstant.TOPIC_QUEUE_TWO)public void receiveTopicQueueMessageTwo(String message) {log.info("\n主題模式消費者02監聽隊列02-》路由鍵02 收到消息:{}\n",message);}
}
5、測試Test
package org.xwb.springcloud.controller;import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xwb.springcloud.messagetype.topic.TopicQueueProducer;import javax.annotation.Resource;/*** @author 10564*/
@RestController
@RequestMapping("/mq")
public class MqMessageController {@Resourceprivate TopicQueueProducer topicQueueProducer;@GetMapping("/topic")public void topicQueueProducerTest(String message, String routingKey) {topicQueueProducer.senderTopicQueueRoutingMatching("topic one " + message, routingKey);}
}
6、測試結果
### topic
GET http://localhost:8004/mq/topic?message=topic&routingKey=topicRoutingKey.one## 結果
2025-06-21 15:49:03.545 INFO 8236 --- [nio-8004-exec-2] o.x.s.m.topic.TopicQueueProducer :
主題模式 生產者發送消息 message:topic one topic2025-06-21 15:49:03.545 INFO 8236 --- [nio-8004-exec-2] o.x.s.m.topic.TopicQueueProducer :
主題模式 Routing:【topicRoutingKey.*,topicRoutingKey.#】,當前的消息routing:topicRoutingKey.one2025-06-21 15:49:03.547 INFO 8236 --- [ntContainer#9-1] o.x.s.m.topic.TopicQueueConsumer :
主題模式消費者01監聽隊列01-》路由鍵01 收到消息:topic one topic2025-06-21 15:49:03.547 INFO 8236 --- [ntContainer#8-1] o.x.s.m.topic.TopicQueueConsumer :
主題模式消費者02監聽隊列02-》路由鍵02 收到消息:topic one topic### topic
GET http://localhost:8004/mq/topic?message=topic&routingKey=topicRoutingKey.one.two## 結果
2025-06-21 15:49:28.181 INFO 8236 --- [nio-8004-exec-6] o.x.s.m.topic.TopicQueueProducer :
主題模式 生產者發送消息 message:topic one topic2025-06-21 15:49:28.182 INFO 8236 --- [nio-8004-exec-6] o.x.s.m.topic.TopicQueueProducer :
主題模式 Routing:【topicRoutingKey.*,topicRoutingKey.#】,當前的消息routing:topicRoutingKey.one.two2025-06-21 15:49:28.184 INFO 8236 --- [ntContainer#9-1] o.x.s.m.topic.TopicQueueConsumer :
主題模式消費者01監聽隊列01-》路由鍵01 收到消息:topic one topic
測試工具類 request.http文件
(IDEA搜索HTTP Client插件工具可以直接使用)
### simple
GET http://localhost:8004/mq/simple?message=simple111### workQueue
GET http://localhost:8004/mq/workQueue?message=workQueue2### fanout
GET http://localhost:8004/mq/fanout?message=fanout### direct
GET http://localhost:8004/mq/direct?message=direct### topic
GET http://localhost:8004/mq/topic?message=topic&routingKey=topicRoutingKey.one### topic
GET http://localhost:8004/mq/topic?message=topic&routingKey=topicRoutingKey.one.two