目錄
- 一、 交換機持久化 (Exchange Persistence)
- 二、 隊列持久化 (Queue Persistence)
- 三、 消息持久化 (Message Persistence)
- 四、 持久化的“黃金三角” 🔱:三者缺一不可!
- 五、 來,完整的代碼示例(整合持久化和確認機制)
🌟我的其他文章也講解的比較有趣😁,如果喜歡博主的講解方式,可以多多支持一下,感謝🤗!
🌟了解 MQ 請看 : 【MQ篇】初識MQ!
其他優質專欄: 【🎇SpringBoot】【🎉多線程】【🎨Redis】【?設計模式專欄(已完結)】…等
如果喜歡作者的講解方式,可以點贊收藏加關注,你的支持就是我的動力
?更多文章請看個人主頁: 碼熔burning
咱們接著聊 RabbitMQ 的“靠譜”修煉之路!上一篇文章講了生產者確認機制,現在聊聊它的“好記性”——持久化!💾??
你想想,咱們電腦里的數據,要么放在內存里(速度快,但關機就沒),要么存在硬盤上(速度慢點,但斷電還在)。RabbitMQ 也是一樣!它處理消息得快,所以很多東西默認是放在內存里的。但如果 RabbitMQ 服務器突然“打個盹”😴 或者“哎呀,摔了一跤”💥(宕機或重啟),內存里的東西就“唰”地一下都沒了!😱 這可不行,重要的消息和設置怎么能說沒就沒呢?!
這時候,“持久化”就登場了!它的作用就是把 RabbitMQ 里那些重要的“記憶”寫到硬盤上,這樣即使服務器重啟了,它也能從硬盤上把這些“記憶”讀取回來,恢復到宕機前的狀態。🐂
RabbitMQ 的持久化,主要涉及三個“記憶點”:
- 交換機持久化 (Exchange Persistence) - “分揀大廳的名字得刻石頭上!” 🏛?
- 隊列持久化 (Queue Persistence) - “每個收件人的信箱要釘牢!” 📬
- 消息持久化 (Message Persistence) - “信箱里的信要用不褪色的墨水寫!” ????
來,咱們一個個看,如何在代碼里設置它們,以及為啥要這么做!
了解RabbitMQ生產者確認機制請看:【MQ篇】RabbitMQ的生產者消息確認實戰!
一、 交換機持久化 (Exchange Persistence)
- 啥是交換機? 它就像郵局的分揀大廳入口,負責接收信件(消息)并根據地址(路由鍵)扔到不同的傳送帶(隊列)上。
- 為啥要持久化? 如果交換機不持久化,RabbitMQ 重啟后,這個交換機就“失憶”了,不存在了!😳 生產者再往這個名字的交換機發消息就會失敗,因為“入口牌子”都沒了。
- 怎么設置? 在聲明(創建)交換機的時候,把
durable
屬性設為true
就行了。 - 代碼怎么寫? 在你的
RabbitConfig
里定義 Exchange 的 Bean 時:
// 交換機持久化設置
@Bean
public TopicExchange myDurableExchange() {// 參數1: 交換機名稱// 參數2: durable 是否持久化,設為 true!?// 參數3: autoDelete 是否自動刪除,設為 false (通常不自動刪除持久化交換機)System.out.println("🛠? 正在創建持久化交換機: my.durable.exchange");return new TopicExchange("my.durable.exchange", true, false);
}
通過設置第二個參數為 true
,你就告訴 RabbitMQ:“喂,這個叫 my.durable.exchange
的交換機很重要,給我記到小本本上!” ??
注意:默認情況下,由SpringAMQP聲明的交換機都是持久化的。
二、 隊列持久化 (Queue Persistence)
- 啥是隊列? 它就像收件人專屬的信箱,消息最終會待在這里,等著被消費者取走。
- 為啥要持久化? 如果隊列不持久化,RabbitMQ 重啟后,這個隊列也“失憶”了,不見了!😵?💫 不僅隊列本身沒了,更要命的是,如果這個隊列里當時還存著沒被消費的消息,它們也會跟著一起消失! 🗑? 這絕對是消息丟失的重大風險點!
- 怎么設置? 在聲明(創建)隊列的時候,把
durable
屬性設為true
。 - 代碼怎么寫? 在你的
RabbitConfig
里定義 Queue 的 Bean 時:
// 隊列持久化設置
@Bean
public Queue myDurableQueue() {// 參數1: 隊列名稱// 參數2: durable 是否持久化,設為 true!?// 參數3: exclusive 是否獨占 (通常不獨占)// 參數4: autoDelete 是否自動刪除System.out.println("🛠? 正在創建持久化隊列: my.durable.queue");return new Queue("my.durable.queue", true, false, false);
}
把第二個參數設為 true
,你就給這個隊列上了把“鎖” 🔒,告訴 RabbitMQ:“這個信箱要釘死在這兒,重啟了也得給我留著!”
注意:默認情況下,由SpringAMQP聲明的隊列都是持久化的。
三、 消息持久化 (Message Persistence)
- 啥是消息? 就是你在隊列里放著的那些“信件”本身的內容。
- 為啥要持久化? 前面說了,隊列持久化能保證隊列這個“信箱”不消失。但是! 如果信箱里的“信件”本身沒有做持久化處理(默認是放在內存里),那么即使信箱(隊列)還在,里面的信件也會在 RabbitMQ 重啟時“蒸發”!🌫? 所以,為了讓消息在持久化隊列中真正“活下來”,消息本身也必須是持久化的!
- 怎么設置? 在發送消息的時候,設置消息的
deliveryMode
屬性為persistent
(模式 2)。 - 代碼怎么寫? 在 Spring AMQP 中,你可以通過
MessagePostProcessor
來修改消息屬性,或者通常情況下,如果你往一個持久化的隊列發送消息,convertAndSend
方法在內部可能會幫你處理這個細節。但最明確的方式是手動設置消息屬性:
// 消息持久化設置 (在發送消息時)
public void sendPersistentMessage(String message) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());System.out.println("📨 正在發送持久化消息: '" + message + "', ID: " + correlationData.getId());rabbitTemplate.convertAndSend("my.durable.exchange", "my.routing.key", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// ? 設置消息的投遞模式為持久化 (DeliveryMode.PERSISTENT) ?message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);System.out.println("?? 消息已標記為持久化!");return message;}}, correlationData); //別忘了帶上 correlationData 給 ConfirmCallback 用!System.out.println("📬 持久化消息已提交到 RabbitTemplate,等待 RabbitMQ ACK...");
}
MessagePostProcessor
允許你在消息發送前“攔截”并修改消息的屬性。這里我們就把它設置為 DeliveryMode.PERSISTENT
。這就像在你寫好的信封上蓋個大紅章 💌:“此件重要!請務必保存!”
注意:默認情況下,SpringAMQP發出的任何消息都是持久化的,不用特意指定。
四、 持久化的“黃金三角” 🔱:三者缺一不可!
理解持久化,最關鍵的是記住這個“黃金三角”:
只有當 交換機是持久化的 (durable exchange) + 隊列是持久化的 (durable queue) + 消息是持久化的 (persistent message),消息才能在 RabbitMQ 重啟后依然保存在隊列中!
- 如果隊列不持久化,交換機和消息再怎么持久化也沒用,隊列沒了,一切皆空。??
- 如果隊列持久化了,但消息不持久化,重啟后隊列還在,但里面的消息都沒了。💧
- 如果消息持久化了,但隊列不持久化,同樣隊列沒了消息也無法保存。
- 交換機持久化主要是為了確保 Exchange 本身重啟后還在,否則生產者都找不到它發消息了。
所以,為了確保消息在 RabbitMQ 宕機重啟時不丟失,這三兄弟(持久化交換機、持久化隊列、持久化消息)得協同工作才行!🤝
五、 來,完整的代碼示例(整合持久化和確認機制)
咱們在之前的代碼基礎上,把持久化設置加進去,并且依然保留發送者確認機制,這倆功能是保護消息的左膀右臂!💪
1. pom.xml
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. application.properties
# 配置Spring應用中的RabbitMQ連接參數
spring:rabbitmq:# RabbitMQ服務器的主機地址host: localhost# RabbitMQ服務器的端口號port: 5672# 訪問RabbitMQ服務器的用戶名username: guest# 訪問RabbitMQ服務器的密碼password: guest# 配置發布確認的類型為correlated,以便在消息發送后收到確認publisher-confirm-type: correlated# 啟動返回機制,當消息無法投遞時返回給發送者publisher-returns: true# 配置RabbitMQ模板的參數template:# 設置所有消息都是必須投遞的mandatory: true# 設置等待回復的超時時間為60000毫秒reply-timeout: 60000# 配置日志級別
logging:level:# 設置org.springframework.amqp包下的日志級別為DEBUG,以便捕獲AMQP相關的調試信息org:springframework:amqp: DEBUG
3. RabbitConfig.java
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.Nullable;@Configuration
public class RabbitConfig {// 定義一個 ?持久化? 主題交換機@Beanpublic TopicExchange myDurableExchange() {// 參數2: durable = true ?System.out.println("🛠? 正在創建持久化交換機: my.durable.exchange");return new TopicExchange("my.durable.exchange", true, false);}// 定義一個 ?持久化? 隊列@Beanpublic Queue myDurableQueue() {// 參數2: durable = true ?System.out.println("🛠? 正在創建持久化隊列: my.durable.queue");return new Queue("my.durable.queue", true, false, false);}// 定義一個綁定,將持久化隊列綁定到持久化交換機@Beanpublic Binding binding(Queue myDurableQueue, TopicExchange myDurableExchange) {// 使用固定的路由鍵 "my.routing.key"return BindingBuilder.bind(myDurableQueue).to(myDurableExchange).with("my.routing.key");}// 配置 RabbitTemplate 并設置 ConfirmCallback@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true);// ? 設置發送者確認回調 (用于確認消息是否到達 Exchange 并被接受路由) ?rabbitTemplate.setConfirmCallback(new ConfirmCallback() {@Overridepublic void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause) {String messageId = correlationData != null ? correlationData.getId() : "N/A";if (ack) {System.out.println("? RabbitMQ 確認收到消息并處理成功!Message ID: " + messageId + " (Ack: " + ack + ")");} else {System.err.println("💔 RabbitMQ 拒絕或未能處理消息!Message ID: " + messageId + ", 原因: " + cause + " (Ack: " + ack + ")");}}});// 這里省略了 ReturnCallback,它用于處理路由不到隊列的消息 (需要 mandatory=true)// rabbitTemplate.setReturnsCallback(...)return rabbitTemplate;}// 簡單的消費者(非持久化核心,用于演示消息能到達)// ? 注意:這個消費者不會做消費者確認,消息消費后可能還在隊列里,直到被 RabbitMQ 刪除 ?// ? 真正的持久化測試需要消費者確認并手動 ack ?@Beanpublic org.springframework.amqp.rabbit.listener.MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory, Queue myDurableQueue) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueues(myDurableQueue);container.setMessageListener(message -> {System.out.println("👂 消費者收到消息: '" + new String(message.getBody()) + "'");// 在實際應用中,這里處理完消息后需要手動發送 ACK 給 RabbitMQ// 如果不發 ACK 且設置了手動確認模式,消息會一直留在隊列直到連接斷開并重發// 這里為了簡單演示持久化隊列里有消息,設置為 NO_ACK 模式});// ? 注意:AcknowledgeMode.NONE (不確認) 模式下,RabbitMQ 會立即刪除消息,不利于演示重啟后消息還在 ?// ? 為了演示消息在隊列中持久化,建議消費者代碼暫時注釋掉或者設置為手動確認且不手動 ack ?// ? 本例代碼中暫時保留消費者,但請注意其行為,真正的持久化測試需要手動停止消費者、重啟RabbitMQ、再啟動消費者查看 ?container.setAcknowledgeMode(AcknowledgeMode.NONE); // 簡單演示,不處理消費者確認return container;}
}
4. MessageSender.java
(發送持久化消息)
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.UUID;@Component
public class MessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;// 使用持久化的交換機和隊列名稱private static final String EXCHANGE_NAME = "my.durable.exchange";private static final String QUEUE_NAME = "my.durable.queue"; // 雖然發送不需要隊列名,這里記一下private static final String ROUTING_KEY = "my.routing.key";private static final String NON_EXISTENT_EXCHANGE = "non.existent.exchange";/*** 發送一條 ?持久化? 消息到 ?持久化? 交換機和隊列* @param message 需要發送的消息內容*/public void sendPersistentMessage(String message) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());System.out.println("📨 正在發送持久化消息: '" + message + "', ID: " + correlationData.getId());rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// ? 核心:設置消息的投遞模式為持久化 ?message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);System.out.println("?? 消息屬性已設置為持久化!");return message;}}, correlationData);System.out.println("📬 持久化消息已提交到 RabbitTemplate,等待 RabbitMQ ACK...");}/*** 演示發送失敗的情況,發送到一個不存在的 Exchange,預期會收到 NACK* 依然使用持久化屬性,但因為 Exchange 不存在,消息無法到達 Exchange* @param message 需要發送的消息內容*/public void sendFailedPersistentMessage(String message) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());System.out.println("😠 嘗試發送持久化消息到不存在的 Exchange: '" + NON_EXISTENT_EXCHANGE + "'");System.out.println("📨 正在發送失敗消息: '" + message + "', ID: " + correlationData.getId());// 發送消息到不存在的交換機,并嘗試標記為持久化(但消息根本到不了Exchange)rabbitTemplate.convertAndSend(NON_EXISTENT_EXCHANGE, ROUTING_KEY, message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 雖然設置了持久化,但這消息因為 Exchange 不存在,根本不會被 RabbitMQ 接收并存盤message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}}, correlationData);System.out.println("📬 失敗消息已提交到 RabbitTemplate,等待 RabbitMQ NACK...");}
}
5. Application.java
(主應用類)
import com.gewb.produce_confire.MessageSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;import java.util.concurrent.TimeUnit;@SpringBootApplication
public class Application implements CommandLineRunner {@Autowiredprivate MessageSender messageSender;public static void main(String[] args) {SpringApplication.run(Application.class, args);// ? 重要:為了讓異步回調和持久化演示效果有時間展現,主線程不要馬上退出 ?// 在非 Web 應用中,需要手動保持應用運行一段時間try {System.out.println("😴 應用正在運行,等待回調和可能的消費者消費。請手動停止 RabbitMQ 后再啟動進行持久化測試。");TimeUnit.SECONDS.sleep(20); // 延長等待時間,以便觀察日志并有時間手動操作 RabbitMQ} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("😴 應用被中斷喚醒.");}System.out.println("👋 應用演示結束.");}@Overridepublic void run(String... args) throws Exception {System.out.println("🚀 應用啟動,開始發送測試持久化消息...");// 1. 發送一條會成功投遞到持久化隊列的持久化消息messageSender.sendPersistentMessage("Important Persistent Message!");// 等待一下,讓第一條消息的確認回調先執行TimeUnit.SECONDS.sleep(2);// 2. 發送一條會失敗的消息(發送到不存在的 Exchange),看 NACK 回調messageSender.sendFailedPersistentMessage("This persistent message should be NACKED!");System.out.println("? 所有測試消息已發送提交。請觀察日志輸出中的 ACK/NACK 回調結果。");System.out.println(">>> 要測試持久化效果,請在看到 ACK 日志后,手動停止 RabbitMQ 服務器,然后重新啟動,檢查隊列中是否有 'Important Persistent Message!' 這條消息。");}
}
運行結果:
注意,在 main
方法里,我加了更長的延時 (20 秒),并提示用戶手動停止和啟動 RabbitMQ 來測試持久化效果。
如何測試持久化效果?
- 確保 RabbitMQ 運行中。
- 運行上面的 Spring Boot 應用。
- 在控制臺看到
? RabbitMQ 確認收到消息并處理成功!Message ID: ...
(ACK) 日志后,手動停止 RabbitMQ 服務器。 - 檢查 RabbitMQ 的數據目錄(如果你知道在哪兒),可以看到有一些文件生成了(這些就是持久化數據)。
- 重新啟動 RabbitMQ 服務器。
- 使用 RabbitMQ Management Plugin (如果安裝了) 或其他客戶端工具,查看
my.durable.queue
這個隊列。你會發現之前發送的Important Persistent Message!
這條消息仍然在隊列里!🥳 而發送到不存在 Exchange 的消息,因為根本沒被 RabbitMQ 接受,當然不會出現在任何隊列里。 - 如果你的 Spring Boot 應用還沒退出(因為設置了 20 秒延時),RabbitMQ 重啟后,它會自動嘗試重連。如果連接成功,并且消費者容器也配置正確且連接成功,它可能會把隊列里的消息消費掉(取決于你的消費者配置)。
通過這個實驗,你就能親眼看到“持久化”的神奇力量:它讓你的重要消息熬過了服務器的重啟!💪
小提示: 在實際生產環境中,為了確保消息不丟失,發送方確認 + 消費者確認 + 消息持久化 往往是同時使用的,它們從不同環節保障消息的安全。