RabbitMQ
客戶端整合Spring Boot
添加相關的依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
編寫配置文件,配置RabbitMQ的服務信息
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /
logging:level:com.atguigu.mq.listener.MyMessageListener: info
監聽消息與發送消息(這里如果想要演示的明白的話需要創建兩個Boot工程分別作為生產端與服務端來進行演示操作)
//=================================監聽消息使用注解的方式來完成=============================
@Component
@Slf4j
public class MyMessageListener {public static final String EXCHANGE_DIRECT = "exchange.direct.order"; ?public static final String ROUTING_KEY = "order"; ?public static final String QUEUE_NAME ?= "queue.order"; ?@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),exchange = @Exchange(value = EXCHANGE_DIRECT),key = {ROUTING_KEY}))public void processMessage(String dateString,Message message,Channel channel) {log.info(dateString);}}
?
//=================================發送消息使用組件的方式來完成=============================
@SpringBootTest ?
public class RabbitMQTest { ?public static final String EXCHANGE_DIRECT = "exchange.direct.order"; ?public static final String ROUTING_KEY = "order";@Autowired ?private RabbitTemplate rabbitTemplate;@Test ?public void testSendMessage() { ?rabbitTemplate.convertAndSend( ?EXCHANGE_DIRECT, ? ROUTING_KEY, ? "Hello atguigu"); ?} ?}
注解
示例1:
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),exchange = @Exchange(value = EXCHANGE_DIRECT),key = {ROUTING_KEY}
))
-
@RabbitListener注解
-
作用:自動綁定指定隊列進行消息監聽,支持直接隊列名、綁定表達式、占位符配置等方式,自動反序列化消息體到方法參數,支持多種參數類型(POJO/Message/byte[]),管理消息確認模式(自動/手動ACK),配置并發消費者線程池,實現消息重試和死信處理
-
binding屬性
-
作用:指定交換機和隊列之間的綁定關系,指定當前方法要監聽的隊列;如果RabbitMQ服務器上沒有這里指定的交換機和隊列,那么框架底層的代碼會創建它們
-
@QueueBinding注解:
-
作用:聲明需要綁定的隊列與交換機的信息
-
value屬性:使用@Queue注解來指定隊列信息
-
exchange屬性:使用@Exchange注解來指定交換機信息
-
key屬性:使用{}來指定路由鍵信息
-
-
-
示例2:
@RabbitListener(queues = {QUEUE_ATGUIGU})
直接使用queues這個屬性值,通過給這個屬性進行賦值來直接指定需要監聽的隊列,優點是編寫模式簡潔精短,缺點是不能像示例1所寫的那樣在未創建交換機與隊列以及設置其綁定關系時可以系統幫忙設置以防發生報錯。需要在RabbitMQ的可視化界面自己手動進行創建與綁定的過程。
消息可靠性投遞
故障及其解決方案
-
故障情況1:消息沒有發送到消息隊列上面
-
解決方案:
-
故障情況2:消息成功發送到了消息隊列,但是消息隊列服務宕機了。原本保存在內存里面的消息也丟失了
-
解決方案:將消息持久化到硬盤上面,哪怕服務器重啟也不會導致消息丟失
-
故障情況3:消息成功存入了消息隊列,但是消費端出現了問題(宕機,拋出異常等)
-
解決方案:
業務代碼實現
生產者故障
-
導入依賴,編寫配置文件
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
注意:publisher-confirm-type和publisher-returns是兩個必須要增加的配置,如果沒有則本節功能不生效
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /publisher-confirm-type: CORRELATED # 交換機的確認publisher-returns: true # 隊列的確認
logging:level:com.atguigu.mq.config.MQProducerAckConfig: info
-
創建配置類
在創建完配置類之后,重寫相應的可提升消息可靠性的方法后,我們還需要對RabbitTemplate的功能進行增強,因為回調函數所在對象必須設置到RabbitTemplate對象中才能生效。原本RabbitTemplate對象并沒有生產者端消息確認的功能,要給它設置對應的組件才可以。
方法名 | 方法功能 | 所屬接口 | 接口所屬類 |
---|---|---|---|
confirm() | 確認消息是否發送到交換機 | ConfirmCallback | RabbitTemplate |
returnedMessage() | 確認消息是否發送到隊列 | ReturnsCallback | RabbitTemplate |
設置組件調用的方法 | 所需對象類型 |
---|---|
setConfirmCallback() | ConfirmCallback接口類型 |
setReturnCallback() | ReturnCallback接口類型 |
//================================配置類的邏輯,確保消息到達交換機、到達隊列=====================================
@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{
?@Autowiredprivate RabbitTemplate rabbitTemplate;
?@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnsCallback(this);//初始化RabbitMQ模板,將重寫的方法重新加入到rabbitTemplate中}
?@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("消息發送到交換機成功!數據:" + correlationData);} else {log.info("消息發送到交換機失敗!數據:" + correlationData + " 原因:" + cause);}}
?@Overridepublic void returnedMessage(ReturnedMessage returned) {log.info("消息主體: " + new String(returned.getMessage().getBody()));log.info("應答碼: " + returned.getReplyCode());log.info("描述:" + returned.getReplyText());log.info("消息使用的交換器 exchange : " + returned.getExchange());log.info("消息使用的路由鍵 routing : " + returned.getRoutingKey());}
}
注解說明:
-
發送消息
@SpringBootTest ?
public class RabbitMQTest { ?public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";@Autowired ?private RabbitTemplate rabbitTemplate;@Test ?public void testSendMessage() { ?rabbitTemplate.convertAndSend( ?EXCHANGE_DIRECT, ? ROUTING_KEY, ? "Hello atguigu"); ?} ?}
通過調整代碼,測試如下三種情況:
-
交換機正確、路由鍵正確
-
交換機正確、路由鍵不正確,無法發送到隊列
-
交換機不正確,無法發送到交換機
RabbitMQ服務故障
我們直接重啟docker容器就會發現,我們明明沒有向隊列中發送消息但是隊列的消息速率圖仍然顯示有消息到達。這是因為剛才我們做實驗時放入隊列中的消息在docker重啟完畢之后就從硬盤里面被移動到了MQ中。實際上RabbitMQ默認是配置了持久化的。表現在交換機是默認持久化的、隊列是默認持久化的、消息是默認持久化的。在一些配置交換機與隊列的注解里面就會看見底層是默認了持久化的。
消費者故障
-
配置文件
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /listener:simple:acknowledge-mode: manual # 把消息確認模式改為手動確認
-
接收消息
@Component
@Slf4j
public class MyMessageListener {
?public static final String EXCHANGE_DIRECT = "exchange.direct.order";public static final String ROUTING_KEY = "order";public static final String QUEUE_NAME ?= "queue.order";
?// 修飾監聽方法@RabbitListener(// 設置綁定關系bindings = @QueueBinding(
?// 配置隊列信息:durable 設置為 true 表示隊列持久化;autoDelete 設置為 false 表示關閉自動刪除value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),
?// 配置交換機信息:durable 設置為 true 表示隊列持久化;autoDelete 設置為 false 表示關閉自動刪除exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),
?// 配置路由鍵信息key = {ROUTING_KEY}))public void processMessage(String dataString, Message message, Channel channel) throws IOException {
?// 1、獲取當前消息的 deliveryTag 值備用long deliveryTag = message.getMessageProperties().getDeliveryTag();
?try {// 2、正常業務操作log.info("消費端接收到消息內容:" + dataString);// System.out.println(10 / 0);
?// 3、給 RabbitMQ 服務器返回 ACK 確認信息channel.basicAck(deliveryTag, false);} catch (Exception e) {
?// 4、獲取信息,看當前消息是否曾經被投遞過Boolean redelivered = message.getMessageProperties().getRedelivered();
?if (!redelivered) {// 5、如果沒有被投遞過,那就重新放回隊列,重新投遞,再試一次channel.basicNack(deliveryTag, false, true);} else {// 6、如果已經被投遞過,且這一次仍然進入了 catch 塊,那么返回拒絕且不再放回隊列channel.basicReject(deliveryTag, false);}
?}}
?
}
ACK與NACK機制:ACK(成功)消費者成功處理消息后,顯式或隱式地向RabbitMQ服務器發送確認信號,告知消息已被正確處理,服務器可以安全移除該消息。在沒有手動的在配置文件中聲明時就是默認返回ACK數據;NACK(失敗)消費者明確告知服務器消息處理失敗,服務器可根據策略重新投遞或丟棄消息(如進入死信隊列)。
-
ACK是消息處理成功的“綠燈”,確保可靠性。
-
NACK是處理失敗的“黃燈”,提供容錯和重試機制。
-
兩者結合使用時,需通過重試策略、死信隊列和冪等性設計,構建高可靠的分布式消息系統。
API說明:
①basicAck()方法:
-
作用:給Broker返回ACK確認信息,表示消息已經在消費端成功消費,這樣Broker就可以把消息刪除了
-
參數列表:
參數名稱 | 含義 |
---|---|
long deliveryTag | Broker給每一條進入隊列的消息都設定一個唯一標識 |
boolean multiple | 取值為true:為小于、等于deliveryTag的消息批量返回ACK信息 取值為false:僅為指定的deliveryTag返回ACK信息 |
②basicNack()方法:
-
作用:給Broker返回NACK信息,表示消息在消費端消費失敗,此時Broker的后續操作取決于參數requeue的值
-
參數列表:
參數名稱 | 含義 |
---|---|
long deliveryTag | Broker給每一條進入隊列的消息都設定一個唯一標識 |
boolean multiple | 取值為true:為小于、等于deliveryTag的消息批量返回ACK信息 取值為false:僅為指定的deliveryTag返回ACK信息 |
boolean requeue | 取值為true:Broker將消息重新放回隊列,接下來會重新投遞給消費端 取值為false:Broker將消息標記為已消費,不會放回隊列 |
③basicReject()方法:
-
作用:根據指定的deliveryTag,對該消息表示拒絕
-
參數列表:
參數名稱 | 含義 |
---|---|
long deliveryTag | Broker給每一條進入隊列的消息都設定一個唯一標識 |
boolean requeue | 取值為true:Broker將消息重新放回隊列,接下來會重新投遞給消費端 取值為false:Broker將消息標記為已消費,不會放回隊列 |
消費端限流
作用
-
防止資源耗盡:避免消費者因處理速度跟不上消息到達速度,導致內存或線程資源耗盡。
-
保護下游服務:防止突發流量壓垮數據庫等依賴服務。
-
平衡負載:確保消費者處理能力與消息生產速率匹配。
實現
①yml文件配置,設置消費端的限流閾值為多少(prefetch參數)
spring:rabbitmq:host: 192.168.200.100port: 5672username: guestpassword: 123456virtual-host: /listener:simple:acknowledge-mode: manualprefetch: 1 # 設置每次最多從消息隊列服務器取回多少消息
-
Prefetch Count(QoS設置):
-
通過設置
prefetch count
參數,限制消費者未確認(unacknowledged)的最大消息數量。 -
例如,若
prefetch=5
,消費者最多同時處理5條消息,處理并確認后才會獲取新消息。 -
手動確認模式(Manual Acknowledgement):必須啟用手動ACK(關閉自動確認),否則prefetch機制失效。
-
②消費與生產端的測試代碼
//=====================生產端===============================
@Test ?
public void testSendMessage() {for (int i = 0; i < 100; i++) {rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"Hello atguigu" + i);}
}
//=========================消費端===========================
// 2、正常業務操作
log.info("消費端接收到消息內容:" + dataString);
?
// System.out.println(10 / 0);
TimeUnit.SECONDS.sleep(1);
?
// 3、給 RabbitMQ 服務器返回 ACK 確認信息
channel.basicAck(deliveryTag, false);
③結果分析
在未配置消費端限流時,RabbitMQ的監視界面顯示消息涌入100條瞬間就被在監視的消費端一掃而空,總消息數為0,但是在ACK的確認機制下Raabbit服務還在慢慢處理消息的確認信息并反饋給監視界面
在配置了消息限流之后,RabbitMQ的監視界面顯示任然是消息涌入100條但是消息并非是一口氣被消費端全部消費完成而是隨著時間的推移慢慢的被消費取出,總消息數隨著時間的變化而在變化,同樣在ACK的機制下Rabbit服務還在慢慢處理消息的確認信息并反饋給監視界面
消息超時
RabbitMQ 的消息超時機制主要通過 TTL(Time-To-Live) 實現,允許為消息或隊列設置存活時間,過期后消息會被自動刪除或轉移到死信隊列。
TTL的兩種設置方式
-
隊列級TTL
-
定義:為整個隊列設置統一的過期時間,所有進入該隊列的消息共享此 TTL(超時時間)。
-
特點:
-
全局生效,優先級低于消息級 TTL。
-
隊列中消息的過期時間從入隊時開始計算。
-
-
? ? ? ? ? ?// 設置隊列參數:消息1分鐘后過期Map<String, Object> args = new HashMap<>();args.put("x-message-ttl", 60000); // 單位:毫秒// 聲明隊列channel.queueDeclare("my_queue", true, false, false, args);
-
消息級 TTL
-
定義:為單條消息設置獨立的過期時間,優先級高于隊列級 TTL。
-
特點:
-
每條消息的 TTL 獨立計算。
-
過期時間從消息入隊時開始計算(若消息被重新投遞到其他隊列,時間重新計算)。
-
-
? ?// 1、創建消息后置處理器對象 ?MessagePostProcessor messagePostProcessor = (Message message) -> { ?// 設定 TTL 時間,以毫秒為單位message.getMessageProperties().setExpiration("5000"); ?return message;};// 2、發送消息 ?rabbitTemplate.convertAndSend( ? ?EXCHANGE_DIRECT, ? ? ROUTING_KEY, ? ? "Hello atguigu", messagePostProcessor); ?
消息超時的行為
-
自動刪除:當消息過期后,默認會直接從隊列中刪除。
-
死信隊列(Dead Letter Exchange): 若隊列配置了死信交換機(DLX),過期消息會轉移到 DLX 關聯的隊列,而非直接刪除。
總結
-
支持隊列級和消息級 TTL,后者優先級更高。
-
過期消息可自動刪除或路由到死信隊列。
-
實際應用中需注意過期檢查機制的性能影響及潛在阻塞問題。 合理使用 TTL 可有效解決業務超時邏輯、資源釋放等常見問題。
死信和死信隊列
死信(Dead Letter)定義:在消息隊列中無法被正常消費的消息稱為死信(Dead Letter);通常產生的原因如下:
-
消息被拒絕(Rejected)且未設置重新入隊(
requeue=false
) -
消息過期(TTL 超時)
-
隊列達到最大長度(超過
x-max-length
限制時,頭部或尾部消息被丟棄)
死信隊列(Dead Letter Queue, DLQ)定義:專門用于接收死信的隊列,需通過死信交換機(DLX) 路由。
核心作用:
-
收集異常消息,防止消息丟失
-
提供消息審計和重試機制
-
解耦主業務邏輯與錯誤處理邏輯
注意:一定要注意正常隊列有諸多限定和設置,這樣才能讓無法處理的消息進入死信交換機
延遲隊列
延遲隊列(Delayed Queue)是一種特殊類型的消息隊列,允許消息在指定的延遲時間后才被消費者獲取和處理。RabbitMQ 本身不直接支持延遲隊列
1. 基于 TTL + 死信隊列的延遲隊列(傳統方案)
實現原理
-
消息設置 TTL:為消息或隊列設置存活時間(TTL)。
-
綁定死信交換機:當消息過期后,通過死信交換機(DLX)路由到目標隊列。
-
消費者監聽目標隊列:實現延遲消費效果。
2. 使用插件 rabbitmq-delayed-message-exchange
(推薦方案)
RabbitMQ 官方提供的插件支持精確延遲消息投遞,消息在交換機層暫存,到期后路由到目標隊列。
下載與啟用插件:
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
mv rabbitmq_delayed_message_exchange-3.13.0.ez /var/lib/docker/volumes/rabbitmq-plugin/_data
# 登錄進入容器內部
docker exec -it rabbitmq /bin/bash
?
# rabbitmq-plugins命令所在目錄已經配置到$PATH環境變量中了,可以直接調用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
?
# 退出Docker容器
exit
?
# 重啟Docker容器
docker restart rabbitmq
//=====================生產端代碼============================
@Test
public void testSendDelayMessage() {rabbitTemplate.convertAndSend(EXCHANGE_DELAY,ROUTING_KEY_DELAY,"測試基于插件的延遲消息 [" + new SimpleDateFormat("hh:mm:ss").format(new Date()) + "]",messageProcessor -> {
?// 設置延遲時間:以毫秒為單位(只有安裝好插件之后才可以使參數生效)messageProcessor.getMessageProperties().setHeader("x-delay", "10000");
?return messageProcessor;});
}
//====================消費端代碼============================
@Component ?
@Slf4j
public class MyDelayMessageListener {public static final String QUEUE_DELAY = "queue.delay.video";@RabbitListener(queues = {QUEUE_DELAY})public void process(String dataString, Message message, Channel channel) throws IOException { ?log.info("[生產者]" + dataString);log.info("[消費者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
?
}
插件特性
-
精確延遲:消息按設定的延遲時間精確投遞。
-
靈活配置:每條消息可獨立設置延遲時間。
-
資源高效:消息存儲在交換機層,不占用隊列資源。
效果展示:
3. 兩種方案對比
特性 | TTL + 死信隊列 | 插件方案 |
---|---|---|
延遲精度 | 低(依賴隊列處理速度) | 高(精確到毫秒) |
靈活性 | 需固定或逐條設置 TTL | 每條消息獨立設置延遲 |
資源占用 | 可能占用隊列存儲 | 交換機層存儲,更高效 |
部署復雜度 | 無需額外插件 | 需安裝插件 |
適用場景 | 短延遲(<1分鐘)、簡單場景 | 長延遲、高精度、復雜需求 |
事務消息
RabbitMQ 的事務消息機制用于確保一組消息操作的原子性——要么全部成功提交到 Broker,要么全部回滾
配置類
//========================事務隊列的相關配置====================
@Configuration
@Data
public class RabbitConfig {
?@Bean//添加事務消息的組件public RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);}
?@Bean//增強RabbitTemplate開啟事務消息模式public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;}
}
//====================消息生產端無事務版本==============================
@SpringBootTest
@Slf4j
public class RabbitMQTest {
?public static final String EXCHANGE_NAME = "exchange.tx.dragon";public static final String ROUTING_KEY = "routing.key.tx.dragon";
?@Resourceprivate RabbitTemplate rabbitTemplate;
?@Testpublic void testSendMessageInTx() {// 1、發送第一條消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~01)");
?// 2、拋出異常log.info("do bad:" + 10 / 0);
?// 3、發送第二條消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~02)");}
?
}
?
//=======================消息生產端有事務版本===============================
@Test
@Transactional
@Rollback(value = false)
public void testSendMessageInTx() {// 1、發送第一條消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~01)");
?// 2、發送第二條消息rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~02)");
}
注意:請注意區分事務消息與消息可靠性投遞這兩者之間的關系。首先事務消息是不確保消息的可靠性投遞的。他只限制相應的Java代碼,在邏輯過程中不拋出異常就將消息發送到Broker上反之就會將消息回滾。
惰性隊列
惰性隊列(Lazy Queue)是 RabbitMQ 中一種特殊的隊列類型,核心目標是通過優先將消息存儲到磁盤而非內存,來優化系統資源使用,尤其適用于高吞吐、大消息量且允許一定延遲的場景。
特性 | 普通隊列 | 惰性隊列 |
---|---|---|
消息存儲優先級 | 內存優先(除非消息標記為持久化) | 磁盤優先(無論消息是否持久化) |
內存占用 | 高(消息積壓時易引發 OOM) | 低(消息直接寫入磁盤) |
消費延遲 | 低(內存中直接讀取) | 較高(需從磁盤加載到內存) |
適用場景 | 實時性要求高、消息量小 | 消息量大、允許延遲、需避免內存耗盡 |
工作機制:
①消息到達隊列時,直接持久化到磁盤(即使未顯式標記為持久化消息),僅在需要傳遞給消費者時加載到內存。
②默認僅緩存少量消息(如消費者預取值 prefetch
范圍內的消息)。
③消費者消費后,消息從磁盤刪除。
優先級隊列
優先級隊列(Priority Queue)是 RabbitMQ 中一種特殊隊列類型,允許消息按照優先級高低被消費,高優先級的消息會被優先處理。適用于需要差異化處理消息的場景(如 VIP 用戶請求優先處理)。
特性 | 說明 |
---|---|
優先級范圍 | 支持 0-255 的優先級等級(實際受 Erlang 虛擬機限制,通常建議 0-10) |
消費順序 | 高優先級消息先被消費,同優先級按 FIFO 順序處理 |
實現原理 | 隊列內部維護多級子隊列,按優先級排序消息 |
資源消耗 | 略高于普通隊列(需維護優先級索引) |
在創建隊列的時候添加參數x-max-priority,用來設置消息參數的最大優先等級。在后面創建的消息在設置等級參數的時候不能超過這個參數值。
//=======================生產端產生消息并為它設置優先等級================================
@SpringBootTest
public class RabbitMQTest {
?public static final String EXCHANGE_PRIORITY = "exchange.test.priority";public static final String ROUTING_KEY_PRIORITY = "routing.key.test.priority";
?@Resourceprivate RabbitTemplate rabbitTemplate;
?@Testpublic void testSendMessage() {rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 1.", message->{message.getMessageProperties().setPriority(1);return message;});}
?
}
?
//==========================消費端接收消息===============================
@Slf4j
@Component
public class MyMessageProcessor {
?public static final String QUEUE_PRIORITY = "queue.test.priority";
?@RabbitListener(queues = {QUEUE_PRIORITY})public void processPriorityMessage(String data, Message message, Channel channel) throws IOException {log.info(data);
?channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
?
}