持久性
我們在前面說了消息端處理消息時,消息如何不丟失,但是如何保證當 RabbitMQ
服務器停掉之后,生產者發送的消息不丟失呢?
默認情況下,RabbitMQ
退出或者由于某種原因崩潰時,會忽視隊列和消息,除非告知他不要這么做
RabbitMQ
的持久化分為三個部分:
- 交換器的持久化
- 隊列的持久化
- 消息的持久化
交換機持久化
交換機的持久化,是通過在聲明交換機時將 durable
參數置為 true
實現的
- 相當于將交換機的屬性在服務器內部保存,當
MQ
的服務器發生意外或關閉之后,重啟RabbitMQ
時不需要重新去建立交換機,交換機會自動建立,相當于一直存在 - 如果交換器不設置持久化,那么在
RabbitMQ
服務器重啟之后,相關的交換機元數據會丟失,對一個長期使用的交換機來說,建議將其置為持久化的
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build();
隊列持久化
隊列的持久化是通過在聲明隊列時將 durable
參數置為 true
實現的
- 如果隊列不設置持久化,那么在
RabbitMQ
服務器重啟之后,該隊列就會被刪除掉,此時數據也會丟失(隊列沒有了,消息也無處可存了) - 隊列的持久化能保證該隊列本身的元數據不會因異常情況而丟失,但是并不能保證內部存儲的消息不會丟失
- 要確保消息不會丟失,需要將消息設置為持久化
咱們前面用的創建隊列的方式都是持久化的
QueueBuilder.durable(COnstant.ACK_QUEUE).build();
點進去看源碼會發現,該方法默認 durable
是 true
public static QueueBuilder durable() { return durable(namingStrategy.generateName());
} private QueueBuilder setDurable() { this.durable = true; return this;
}
通過下面代碼,可以創建非持久化的隊列
QueueBuilder.noDurable(Constant.ACK_QUEUE).build();
消息持久化
消息實現持久化,需要把消息的投遞模式 (MessageProperties
中的 deliveryMode
)設置為 2,也就是 MessageDeliveryMode.PERSISTENT
public enum MessageDeliveryMode { NON_PERSISTENT, PERSISTENT;
設置了隊列和消息的持久化,當 RabbitMQ
服務器重啟之后,消息依舊存在。
- 如果只設置隊列持久化,重啟之后消息會丟失
- 如果只設置消息持久化,重啟之后隊列消息,繼而消息也丟失
所以單單設置消息持久化而不設置隊列的持久化顯得毫無意義
// 非持久化信息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());// 持久化信息
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAN, msg.getBytes());
MessageProperties.PERSISTENT_TEXT_PLAIN
實際就是封裝了這個屬性
public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain", null, null, 2, 0, null, null, null, null, null, null, null, null, null);
}
如果使用 RabbitTemplate
發送持久化消息,代碼如下:
// 要發送的消息內容
String message = "This is a persistent message";// 創建一個 Message 對象,設置為持久化
Message messagePbject = new Message(message.getBytes(), new MessageProperties());
messageObject.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 使用 RabbitTemplate 發送消息
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", messageObject);
RabbitMQ
默認情況下會將消息視為持久化,除非隊列被聲明為非持久化,或者消息在發送時被標記為非持久化- 我們也可以通過打印
Message
這個對象,來觀察消息是否持久化
(Body:'consumer ack test...' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=ack_exchange, receivedRoutingKey=ack, deliveryTag=2, consumerTag=amq.ctag-mtd-2Mec9zH2fXizRqVAqg, consumerQueue=ack_queue])
- 將所有的消息都設置為持久化,會嚴重影響
RabbitMQ
的性能(隨機)- 寫入磁盤的速度比寫入內存的速度慢得不只一點點,對于可靠性不是那么高的消息可以不采用持久化處理以提高整體的吞吐量
- 在選擇是否要將消息持久化時,需要在可靠性和吞吐量之間做一個權衡
數據丟失
將交換器、隊列、消息都設置了持久化之后就能百分之百保證數據不丟失了嗎?答案是否定的
-
從消費者來說,如果在訂閱消息隊列時將
autoAck
參數設置為true
,那么當消費者接收到相關消息之后,還沒來得及處理就宕機了,這樣也算數據丟失。這種情況很好解決,將autoAck
參數設置為false
,并進行手動確認,詳細可以參考(消息確認章節) -
在持久化的消息正確存入
RabbitMQ
之后,還需要有一段時間(雖然很短,但是不可忽視)才能存入磁盤中。RabbitMQ
并不會為每條信息都進行同步存盤(調用內核的fsync
方法)的處理,可能僅僅保存到操作系統緩存之中而不是物理磁盤之中。如果在這段時間內RabbitMQ
服務器節點發生了宕機、重啟等異常情況,消息保存還沒來得及落盤,那么這些消息將會丟失
這個問題如何解決呢?
-
引入
RabbitMQ
的仲裁隊列(后面會說),如果主節點(master
)在此特殊時間內掛掉,可以自動切換到從節點(slave
),這樣有效地保證了高可用性,除非整個集群都掛掉- 此方法也不能保證
100%
可靠,但是配置了仲裁隊列要比沒有配置的可靠性要高很多,實際生產環境中的關鍵業務隊列一般都會設置仲裁隊列
- 此方法也不能保證
-
還可以在發送端引入事務機制或者發送方確認機制來保證消息已經正確地發送病存儲至
RabbitMQ
中(詳情參考后面發送方確認)
重試機制
在消息傳遞過程中,可能會遇到各種問題,如網絡故障,服務不可用,資源不足等,這些問題可能導致消息處理失敗
為了解決這些問題,RabbitMQ
提供了重試機制,允許消息在處理失敗后重新發送
但如果是程序邏輯引起的錯誤,那么多次重試也是沒有用的,可以設置重試次數
1. 重試配置
spring: rabbitmq: addresses: amqp://guest:guest@127.0.0.1:5672/coding listener: simple: acknowledge-mode: manual # 消息接收確認 retry: enabled: true # 開啟消費者失敗重試 initial-interval: 5000ms # 初始失敗等待時長為 5s max-attempts: 5 # 最大重試次數(包括自身消費的一次)
2. 配置交換機&隊列
public static final String RETRY_EXCHANGE_NAME = "retry_exchange";
public static final String RETRY_QUEUE = "retry_queue";
// 1. 交換機
@Bean("retryExchange")
public FanoutExchange retryExchange() { return ExchangeBuilder.fanoutExchange(Constant.RETRY_EXCHANGE_NAME).durable(true).build();
} // 2. 隊列
@Bean("retryQueue")
public Queue retryQueue() { return QueueBuilder.durable(Constant.RETRY_QUEUE).build();
} // 3. 隊列和交換機綁定 Binding@Bean("retryBinding")
public Binding retryBinding(@Qualifier("retryExchange") FanoutExchange exchange, @Qualifier("retryQueue") Queue queue) { return BindingBuilder.bind(queue).to(exchange);
}
3. 發送消息
@RequestMapping("/retry")
public String retry() { rabbitTemplate.convertAndSend(Constant.RETRY_EXCHANGE_NAME, "", "retry test..."); return "發送成功";
}
4. 消費消息
@Component
public class RetryQueueListener { // 指定監聽隊列的名稱 @RabbitListener(queues = Constant.RETRY_QUEUE) public void Listener(Message message) throws Exception { System.out.printf("接收到消息:%s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), message.getMessageProperties().getDeliveryTag()); // 模擬處理失敗 int num = 3 / 0; System.out.println("處理完成"); }
5. 運行程序
運行程序,調用接口,發送消息: http://127.0.0.1:8080/producer/retry
[外鏈圖片轉存中…(img-7OSM9aK6-1752140853006)]
異常捕獲
如果對異常進行捕獲,那么就不會進行重試
- 代碼修改如下
System.out.printf("接收到消息:%s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), message.getMessageProperties().getDeliveryTag());
// 模擬處理失敗
try { int num = 3 / 0; System.out.println("處理完成");
} catch (Exception e) { System.out.println("處理失敗");
}
重新運行程序,結果如下:[外鏈圖片轉存中…(img-7hevkq3m-1752140853007)]
6. 手動確認
改為手動確認
@RabbitListener(queues = Constant.RETRY_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws Exception{ long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.printf("接收到消息:%s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"), message.getMessageProperties().getDeliveryTag()); // 模擬處理失敗 int num = 3 / 0; System.out.println("處理完成"); // 3. 手動簽收 channel.basicAck(deliveryTag, true); } catch (Exception e) { // 4. 異常了就拒絕簽收 Thread.sleep(1000); // 第三個參數 requeue,是否重新發送。若為 true,則重發;若為 false,則直接丟棄 channel.basicNack(deliveryTag, true, true); }
}
運行結果:[外鏈圖片轉存中…(img-ne4pJWMZ-1752140853008)]
- 可以看到,手動確認模式時,重試次數的限制不會像在自動確認模式下那樣直接生效,因為是否重試以及何時重試更多的取決于應用程序的邏輯和消費者的實現
自動確認模式下,RabbitMQ
會在消息被投遞給消費者后自動確認消息。
- 如果消費者處理消息時拋出異常,
RabbitMQ
根據配置的重試參數自動將消息重新入隊,從而實現重試 - 重試次數和重試間隔等參數可以直接在
RabbitMQ
的配置中設定,并且RabbitMQ
會負責執行這些重試策略
手動確認模式下,消費者需要顯式地對消息進行確認。如果消費者在處理消息時遇到異常,可以選擇不確認消息,使消息重新入隊
- 重試的控制權在于應用程序本身,而不是
RabbitMQ
的內部機制 - 應用程序可以通過自己的邏輯和利用
RabbitMQ
的高級特性來實現有效的重試策略
使用重試機制時需要注意:
- 自動確認模式下:程序邏輯異常,多次重試還是失敗,消息就會被自動確認,那么消息就丟失了
- 手動確認模式下:程序邏輯異常,多次重試消息依然處理失敗,無法被確認,就一直是
unacked
的狀態,導致消息積壓