RabbitMQ 是什么?
RabbitMQ是一個遵循AMQP協議的消息中間件,它從生產者接收消息并傳遞給消費者,在這個過程中,根據路由規則進行消息的路由、緩存和持久化。
AMQP,高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件而設計的。基于此協議的客戶端與消息中間件可以傳遞消息,并不受客戶端/中間件不同產品,不同的開發語言等條件的限制。RabbitMQ就是通過Erlang語言實現的一種消息中間件。
具備下面的核心功能:
- 異步通信:允許應用程序通過消息隊列解耦,生產者發送消息后無需等待消費者處理。
- 消息路由:通過靈活的交換器(Exchange)機制,支持多種消息分發模式。
- 可靠性保障:提供消息持久化、確認機制(ACK)和重試策略,確保消息不丟失。
- 負載均衡:通過輪詢或權重分配方式將消息分發給多個消費者,提升系統吞吐量。
為什么需要消息隊列?
在分布式系統中,直接調用(如 HTTP 請求)可能導致以下問題:
- 耦合性高:服務之間依賴性強,一個服務故障可能引發雪崩效應。
- 性能瓶頸:同步調用會阻塞線程,影響系統響應速度。
- 擴展困難:高并發場景下難以動態調整消費者數量。
消息隊列通過異步通信和緩沖機制解決了這些問題:生產者發送消息到隊列后即可返回,消費者按自身能力處理消息。即使消費者暫時不可用,消息仍能存儲在隊列中,避免數據丟失。
核心概念 :
名稱 | 說明 |
Producer | 生產者,發送消息的一方 |
Consumer | 消費者,接收消息的一方 |
Queue | 隊列,存儲消息的緩沖區 |
Exchange | 交換機,負責轉發消息到隊列 |
Routing Key | 路由鍵,決定消息如何路由 |
Binding | 綁定,連接交換機與隊列的規則 |
Message | 消息,最終傳輸的數據 |
工作模型 :
模型類型(英文) | 中文名稱 | 簡介 |
Simple | 簡單隊列模型 | 一個生產者對應一個隊列和一個消費者,最基礎的模型,適合入門學習或簡單通信。 |
Work Queue | 工作隊列模型 | 一個生產者將任務發送到隊列,由多個消費者競爭消費,常用于任務分發和后臺處理。 |
Publish/Subscribe | 發布/訂閱模型 | 通過 fanout 類型交換機,生產者發送的消息會廣播到所有綁定的隊列,適合通知、廣播類場景。 |
Routing | 路由模型 | 使用 direct 類型交換機,生產者根據路由鍵將消息精確投遞到指定隊列,適合日志分級處理等場景。 |
Topics | 主題(通配路由)模型 | 使用 topic 類型交換機,支持模糊匹配路由鍵(如 |
RPC 模式 | 遠程調用模型 | 實現遠程服務調用,生產者發送請求并等待消費者返回響應,適合系統之間的異步調用場景。 |
Docker安裝RabbitMQ
1、拉取RabbitMQ鏡像
命令:
docker pull rabbitmq
2、啟動
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq
3、進入容器內部
docker exec -it rabbit /bin/bash
4、安裝插件
rabbitmq-plugins enable rabbitmq_management
5、查看插件情況
rabbitmq-plugins list
6、訪問RabbitMQ
http://192.168.142.3:15672/
賬號:guest
密碼:guest
RabbitMQ安裝方式解壓安裝
1、下載RabbitMQ
Installing RabbitMQ | RabbitMQ
2、下載Erlang
RabbitMQ是采用 Erlang語言開發的,所以系統環境必須提供 Erlang環境,需要是安裝 Erlang
Erlang
和RabbitMQ
版本對照:https://www.rabbitmq.com/which-erlang.html
這里安裝最新版本3.8.14的RabbitMQ
,對應的Erlang
版本推薦23.x
下載地址:el/7/erlang-23.2.7-2.el7.x86_64.rpm - rabbitmq/erlang · packagecloud
3、將下載好的文件上傳到服務器
# 創建文件
mkdir -p /opt/rabbitmq
將安裝包上傳到/opt/rabbitmq
4、安裝Erlang
cd /opt/rabbitmq
# 解壓
rpm -Uvh erlang-23.2.7-2.el7.x86_64.rpm# 安裝
yum install -y erlang
如果yum無法使用:
使用國內鏡像源(適用于中國用戶)
- 備份原有 repo 文件:
sudo mkdir /etc/yum.repos.d/backup
sudo mv /etc/yum.repos.d/CentOS-* /etc/yum.repos.d/backup/
- 下載阿里云鏡像源:
sudo curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
- 清理并重建緩存:
sudo yum clean all
sudo yum makecache
安裝完成后輸入如下指令查看版本號輸入兩次ctrl+c退出
5、安裝RabbitMQ
在RabiitMQ
安裝過程中需要依賴socat
插件,首先安裝該插件
yum install -y socat
解壓安裝RabbitMQ
的安裝包
# 解壓
rpm -Uvh rabbitmq-server-3.8.14-1.el7.noarch.rpm
# 安裝
yum install -y rabbitmq-server
6、啟動RabbitMQ服務
# 啟動rabbitmq
systemctl start rabbitmq-server
# 查看rabbitmq狀態
systemctl status rabbitmq-server# 其他命令
# 設置rabbitmq服務開機自啟動
systemctl enable rabbitmq-server
# 關閉rabbitmq服務
systemctl stop rabbitmq-server
# 重啟rabbitmq服務
systemctl restart rabbitmq-server
7、RabbitMQWeb管理界面及授權操作
systemctl stop firewalld
# 打開RabbitMQWeb管理界面插件
rabbitmq-plugins enable rabbitmq_management
打開瀏覽器,訪問服務器公網ip:15672
http://192.168.142.131:15672/
rabbitmq
有一個默認的賬號密碼guest
添加遠程用戶
# 添加用戶
rabbitmqctl add_user 用戶名 密碼
rabbitmqctl add_user admin 123456
# 設置用戶角色,分配操作權限
rabbitmqctl set_user_tags 用戶名 角色
rabbitmqctl set_user_tags admin administrator
# 為用戶添加資源權限(授予訪問虛擬機根節點的所有權限)
rabbitmqctl set_permissions -p / 用戶名 ".*" ".*" ".*"
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"#其他指令
# 修改密碼
rabbitmqctl change_ password 用戶名 新密碼
# 刪除用戶
rabbitmqctl delete_user 用戶名
# 查看用戶清單
rabbitmqctl list_users
角色有四種:
administrator
:可以登錄控制臺、查看所有信息、并對rabbitmq進行管理monToring
:監控者;登錄控制臺,查看所有信息policymaker
:策略制定者;登錄控制臺指定策略managment
:普通管理員;登錄控制
創建用戶admin
,密碼123456
,設置administrator
角色,賦予所有權限
然后訪問 http://192.168.142.131:15672/ 用戶名:admin 密碼:123456
8、延時隊列插件安裝
Community Plugins | RabbitMQ(RabbitMQ是什么版本的,下載的插件就得是什么版本的)
將插件上傳到/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.14/plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
然后重啟rabbitMQ
SpringBoot如何使用RabbitMQ
1、創建SpringBoot項目引入依賴
<!-- Spring Boot Starter for RabbitMQ -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、在 application.yml 中添加 RabbitMQ 配置
spring:# RabbitMQ 相關配置rabbitmq:# RabbitMQ 服務器地址host: 192.168.142.131# RabbitMQ 服務器端口 客戶端應用程序(生產者/消費者)通過這個端口與 RabbitMQ 服務器交互port: 5672# RabbitMQ 用戶名username: admin# RabbitMQ 密碼password: 123456# RabbitMQ 虛擬主機virtual-host: /# 開啟發送方確認機制publisher-confirm-type: correlated# 開啟發送方退回機制publisher-returns: true# RabbitMQ 模板配置template:# 設置為 true 時,RabbitMQ 將確認消息是否成功投遞到隊列mandatory: true# 消息監聽器配置listener:simple:# 消費者最小數量concurrency: 5# 消費者最大數量max-concurrency: 10# 每次從隊列中獲取的消息數量prefetch: 1# 消費者手動 ackacknowledge-mode: manual
3、創建 RabbitMQ 配置類
package com.lw.mqdemo.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** RabbitMQ配置類,用于聲明隊列、交換機、綁定關系以及配置消息轉換器和RabbitTemplate*/
@Slf4j
@Configuration
public class RabbitMQConfig {// 定義隊列、交換機和路由鍵名稱// 定義直連交換機相關的常量// 隊列名稱public static final String DIRECT_QUEUE = "test.direct.queue";// 交換機名稱public static final String DIRECT_EXCHANGE = "test.direct.exchange";// 路由鍵名稱public static final String DIRECT_ROUTING_KEY = "test.direct.routingkey";// 定義主題交換機相關的常量// 隊列1名稱public static final String TOPIC_QUEUE_1 = "test.topic.queue1";// 隊列2名稱public static final String TOPIC_QUEUE_2 = "test.topic.queue2";// 交換機名稱public static final String TOPIC_EXCHANGE = "test.topic.exchange";// 路由鍵名稱public static final String TOPIC_ROUTING_KEY_1 = "test.topic.routingkey1";// 路由鍵名稱public static final String TOPIC_ROUTING_KEY_2 = "test.topic.#";// 定義扇形交換機相關的常量// 隊列1名稱public static final String FANOUT_QUEUE_1 = "test.fanout.queue1";// 隊列2名稱public static final String FANOUT_QUEUE_2 = "test.fanout.queue2";// 交換機名稱public static final String FANOUT_EXCHANGE = "test.fanout.exchange";// 1. 直連型交換機隊列/*** 聲明直連型交換機的隊列** @return 隊列對象*/@Beanpublic Queue directQueue() {log.info("創建隊列: " + DIRECT_QUEUE);return new Queue(DIRECT_QUEUE, true);}// 2. 主題型交換機隊列/*** 聲明主題型交換機的第一個隊列** @return 隊列對象*/@Beanpublic Queue topicQueue1() {log.info("創建隊列: " + TOPIC_QUEUE_1);return new Queue(TOPIC_QUEUE_1, true);}/*** 聲明主題型交換機的第二個隊列** @return 隊列對象*/@Beanpublic Queue topicQueue2() {log.info("創建隊列: " + TOPIC_QUEUE_2);return new Queue(TOPIC_QUEUE_2, true);}// 3. 扇形交換機隊列/*** 聲明扇形交換機的第一個隊列** @return 隊列對象*/@Beanpublic Queue fanoutQueue1() {log.info("創建隊列: " + FANOUT_QUEUE_1);return new Queue(FANOUT_QUEUE_1, true);}/*** 聲明扇形交換機的第二個隊列** @return 隊列對象*/@Beanpublic Queue fanoutQueue2() {log.info("創建隊列: " + FANOUT_QUEUE_2);return new Queue(FANOUT_QUEUE_2, true);}// 1. 直連型交換機/*** 聲明直連型交換機** @return 交換機對象*/@Beanpublic DirectExchange directExchange() {log.info("創建交換機: " + DIRECT_EXCHANGE);return new DirectExchange(DIRECT_EXCHANGE, true, false);}// 2. 主題型交換機/*** 聲明主題型交換機** @return 交換機對象*/@Beanpublic TopicExchange topicExchange() {log.info("創建交換機: " + TOPIC_EXCHANGE);return new TopicExchange(TOPIC_EXCHANGE, true, false);}// 3. 扇形交換機/*** 聲明扇形交換機** @return 交換機對象*/@Beanpublic FanoutExchange fanoutExchange() {log.info("創建交換機: " + FANOUT_EXCHANGE);return new FanoutExchange(FANOUT_EXCHANGE, true, false);}// 綁定直連型交換機和隊列/*** 綁定直連型交換機和隊列** @return 綁定對象*/@Beanpublic Binding bindingDirect() {log.info("綁定隊列: " + DIRECT_QUEUE + " 到交換機: " + DIRECT_EXCHANGE + ",路由鍵: " + DIRECT_ROUTING_KEY);return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTING_KEY);}// 綁定主題型交換機和隊列/*** 綁定主題型交換機的第一個隊列** @return 綁定對象*/@Beanpublic Binding bindingTopic1() {log.info("綁定隊列: " + TOPIC_QUEUE_1 + " 到交換機: " + TOPIC_EXCHANGE + ",路由鍵: " + TOPIC_ROUTING_KEY_1);return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(TOPIC_ROUTING_KEY_1);}/*** 綁定主題型交換機的第二個隊列** @return 綁定對象*/@Beanpublic Binding bindingTopic2() {log.info("綁定隊列: " + TOPIC_QUEUE_2 + " 到交換機: " + TOPIC_EXCHANGE + ",路由鍵: " + TOPIC_ROUTING_KEY_2);return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(TOPIC_ROUTING_KEY_2);}// 綁定扇形交換機和隊列/*** 綁定扇形交換機的第一個隊列** @return 綁定對象*/@Beanpublic Binding bindingFanout1() {log.info("綁定隊列: " + FANOUT_QUEUE_1 + " 到交換機: " + FANOUT_EXCHANGE);return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());}/*** 綁定扇形交換機的第二個隊列** @return 綁定對象*/@Beanpublic Binding bindingFanout2() {log.info("綁定隊列: " + FANOUT_QUEUE_2 + " 到交換機: " + FANOUT_EXCHANGE);return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());}/*** 創建延遲交換機*/@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange("delayed.exchange","x-delayed-message", true, false, args);}/*** 綁定延遲交換機* @return*/@Beanpublic Binding bindingDelayed() {return BindingBuilder.bind(directQueue()).to(delayedExchange()).with(DIRECT_ROUTING_KEY).noargs();}// 使用JSON序列化消息/*** 配置JSON消息轉換器** @return 消息轉換器對象*/@Beanpublic MessageConverter jsonMessageConverter() {log.info("配置JSON消息轉換器");return new Jackson2JsonMessageConverter();}// 配置RabbitTemplate/*** 配置RabbitTemplate,設置消息轉換器以及消息發送確認和返回回調** @param connectionFactory 連接工廠* @return 配置好的RabbitTemplate對象*/@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(jsonMessageConverter());// 消息發送到交換器后確認回調rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {log.info("消息成功發送到Exchange: " + correlationData);} else {log.error("消息發送到Exchange失敗: " + correlationData);}});// 消息從交換器發送到隊列失敗回調rabbitTemplate.setReturnsCallback(returned -> {log.info("消息從Exchange路由到Queue失敗: " + returned.getMessage());log.info("交換機: " + returned.getExchange());log.info("路由鍵: " + returned.getRoutingKey());log.info("返回碼: " + returned.getReplyCode());log.info("返回信息: " + returned.getReplyText());});return rabbitTemplate;}
}
4、創建消息生產者
package com.lw.mqdemo.mq;import com.lw.mqdemo.config.RabbitMQConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;/*** RabbitMQ生產者類,用于發送不同類型的交換機消息*/
@Component
public class RabbitMQProducer {/*** 注入RabbitTemplate模板,用于發送消息*/@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 注入MessageConverter轉換器,用于將對象轉換為消息*/@Autowiredprivate MessageConverter messageConverter;/*** 發送直連型交換機消息** @param message 要發送的消息對象*/public void sendDirectMessage(Object message) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, // 交換機名稱RabbitMQConfig.DIRECT_ROUTING_KEY, // 路由鍵 用于消息路由message, // 要發送的消息對象correlationData // 消息的唯一標識);}/*** 發送主題型交換機消息1** @param message 要發送的消息對象*/public void sendTopicMessage1(Object message) {rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, // 交換機名稱RabbitMQConfig.TOPIC_ROUTING_KEY_1, // 路由鍵 用于消息路由message // 要發送的消息對象);}/*** 發送主題型交換機消息2** @param message 要發送的消息對象*/public void sendTopicMessage2(Object message) {rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, // 交換機名稱"test.topic.routingkey2.test", // 路由鍵 用于消息路由message);}/*** 發送扇形交換機消息** @param message 要發送的消息對象*/public void sendFanoutMessage(Object message) {rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE, // 交換機名稱"", // 扇形交換機不需要路由鍵message);}/*** 發送延遲消息** @param message 要發送的消息對象* @param delayMillis 消息延遲的時間(毫秒)*/public void sendDelayedMessage(Object message, int delayMillis) {MessageProperties props = new MessageProperties();props.setDelay(delayMillis);Message msg = messageConverter.toMessage(message, props);rabbitTemplate.convertAndSend("delayed.exchange", // 關鍵修改點RabbitMQConfig.DIRECT_ROUTING_KEY,msg);}
}
5、創建消息消費者
package com.lw.mqdemo.mq;import com.lw.mqdemo.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;/*** RabbitMQ消費者類* 該類包含了對不同類型隊列(直連型、主題型、扇形)的消息消費方法*/
@Component
@Slf4j
public class RabbitMQConsumer {/*** 直連型隊列消費者* 監聽直連型隊列并處理收到的消息** @param message 消息內容* @param channel 消息通道* @throws IOException 當消息處理失敗時拋出異常*/@RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE)public void processDirectMessage(Message message, Channel channel) throws IOException {try {// 打印收到的消息內容log.info("直連型隊列收到消息: " + new String(message.getBody()));// 手動ACK確認 如果不確認會消息Unacked狀態channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 處理失敗,拒絕消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);e.printStackTrace();}}/*** 主題型隊列1消費者* 監聽主題型隊列1并處理收到的消息** @param message 消息內容* @param channel 消息通道* @param msg 消息對象* @throws IOException 當消息處理失敗時拋出異常*/@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE_1)public void processTopicMessage1(String message, Channel channel, Message msg) throws IOException {try {// 打印收到的消息內容log.info("主題型隊列1收到消息: " + message);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 處理失敗,拒絕消息channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);e.printStackTrace();}}/*** 主題型隊列2消費者* 監聽主題型隊列2并處理收到的消息** @param message 消息內容* @param channel 消息通道* @param msg 消息對象* @throws IOException 當消息處理失敗時拋出異常*/@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE_2)public void processTopicMessage2(String message, Channel channel, Message msg) throws IOException {try {// 打印收到的消息內容log.info("主題型隊列2收到消息: " + message);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 處理失敗,拒絕消息channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);e.printStackTrace();}}/*** 扇形隊列1消費者* 監聽扇形隊列1并處理收到的消息** @param message 消息內容* @param channel 消息通道* @param msg 消息對象* @throws IOException 當消息處理失敗時拋出異1常*/@RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE_1)public void processFanoutMessage1(String message, Channel channel, Message msg) throws IOException {try {// 打印收到的消息內容log.info("扇形隊列1收到消息: " + message);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 處理失敗,拒絕消息channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);e.printStackTrace();}}/*** 扇形隊列2消費者* 監聽扇形隊列2并處理收到的消息** @param message 消息內容* @param channel 消息通道* @param msg 消息對象* @throws IOException 當消息處理失敗時拋出異常*/@RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE_2)public void processFanoutMessage2(String message, Channel channel, Message msg) throws IOException {try {// 打印收到的消息內容log.info("扇形隊列2收到消息: " + message);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 處理失敗,拒絕消息channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, true);e.printStackTrace();}}
}
6、創建測試控制器
package com.lw.mqdemo.controller;import com.lw.mqdemo.mq.RabbitMQProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** RabitMQ控制器* @author lw*/
@Slf4j
@RestController
@RequestMapping("/rabbitmq")
public class RabbitMQController {// 注入RabbitMQ生產者@Autowiredprivate RabbitMQProducer rabbitMQProducer;/*** 發送直連型消息* @return*/@GetMapping("/direct")public String sendDirectMessage() {log.info("發送直連型消息");rabbitMQProducer.sendDirectMessage("這是一條直連型交換機消息");return "直連型消息發送成功";}/*** 發送主題型消息1* @return*/@GetMapping("/topic1")public String sendTopicMessage1() {log.info("發送主題型消息1");rabbitMQProducer.sendTopicMessage1("這是一條主題型交換機消息1");return "主題型消息1發送成功";}/*** 發送主題型消息2* @return*/@GetMapping("/topic2")public String sendTopicMessage2() {log.info("發送主題型消息2");rabbitMQProducer.sendTopicMessage2("這是一條主題型交換機消息2");return "主題型消息2發送成功";}@GetMapping("/fanout")public String sendFanoutMessage() {log.info("發送扇形消息");rabbitMQProducer.sendFanoutMessage("這是一條扇形交換機消息");return "扇形消息發送成功";}/*** 發送延遲消息* @return*/@GetMapping("/delay")public String sendDelayedMessage() {log.info("發送延遲消息");// 延遲5秒rabbitMQProducer.sendDelayedMessage("這是一條延遲消息", 5000);return "延遲消息發送成功,5秒后消費";}
}