文章目錄
- 一、消息丟失問題的解決方案
- (一)發送端丟失
- (二)存儲端丟失
- 1. 同步刷盤
- 2. Broker 集群
- (三)消費端丟失
- 二、消息重復問題的解決方案
- (一)唯一鍵約束
- (二)保存消費記錄
- 三、總結
在消息隊列的使用過程中, 消息丟失和消息重復是兩個常見且令開發人員困擾的問題。
因為從生產者發送消息,到 Broker 保存消息,再到消費者消費消息,每個環節都暗藏著消息丟失的風險;而消息重復的產生,往往源于生產者的重復發送或消費者的重復接收。
那么接下來,我們深入剖析一下這兩個問題及其對應的策略。
一、消息丟失問題的解決方案
(一)發送端丟失
生產者發送消息時,處理不當極易造成消息丟失。目前,主流消息隊列普遍支持同步發送和異步發送兩種模式。
同步發送時,生產者發送消息后會同步等待 Broker 返回的 ACK 確認消息,只有收到 ACK 才認定消息發送成功;若長時間未收到,則判定發送失敗并進行重試。這種方式雖能確保消息不丟失,但會帶來性能瓶頸,因此在實際應用中,異步發送更為常用。
以 Kafka 為例,主流消息隊列(如 Kafka 和 RocketMQ)通常采用回調函數來保障異步發送時消息不丟失,具體代碼如下:
// 配置Kafka生產者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // 確保所有副本都收到消息才確認
props.put("retries", 3); // 重試次數Producer<String, String> producer = new KafkaProducer<>(props);// 創建消息記錄
ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "key", "message");// 異步發送消息并添加回調處理
producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {// 處理發送失敗的情況logger.error("消息發送失敗,topic: {}, partition: {}, 異常信息: {}", metadata.topic(), metadata.partition(), exception.getMessage());// 可在此處添加重試邏輯或告警機制} else {// 處理發送成功的情況logger.info("消息發送成功,topic: {}, partition: {}, offset: {}",metadata.topic(), metadata.partition(), metadata.offset());}}
});// 關閉生產者
producer.close();
(二)存儲端丟失
即便生產者成功發送消息,也無法保證消息絕對不丟失。因為若消息發送到 Broker 后,在消費者拉取之前,Broker 突然宕機且消息尚未落盤,同樣會導致消息丟失。為避免存儲階段的消息丟失,可從以下方面著手:
1. 同步刷盤
異步刷盤存在消息未落盤 Broker 就宕機的風險,而同步刷盤則是在消息成功落盤后,才向 Sender 返回發送成功的確認,從而從消息發送環節保障消息不丟失。在 RocketMQ 中,只需將flushDiskType
參數配置為SYNC_FLUSH
,即可開啟同步刷盤功能。
以下是兩種刷盤機制的對比示意圖:
2. Broker 集群
若 Broker 集群僅有一個節點,即便消息成功落盤,一旦 Broker 發生故障,在恢復前消費者將無法拉取消息;若出現磁盤故障且無法恢復,消息更是會永久丟失。
采用Broker 集群可有效解決該問題。在 Broker 集群環境下,可設置等待 2 個以上節點同步完消息后,再向 Producer 返回成功確認。如此一來,即便某個 Broker 節點掛掉,也能迅速找到替代節點,確保消息的可用性。
以下是 Broker 集群架構圖:
(三)消費端丟失
消費者要確保消息不丟失,需在消費完成后再向 Broker 返回 ACK 確認。主流消息隊列中,若 Broker 未收到 ACK,會重新向消費者發送消息。
有時為了解決消息積壓問題,消費者會在拉取消息后直接返回 ACK,再異步執行消息處理邏輯。此時,為保證消息不丟失,需在返回 ACK 前將消息持久化到本地,例如保存至數據庫,后續可從數據庫讀取消息進行處理。
以下是消費者消息處理流程圖:
二、消息重復問題的解決方案
消息重復產生的原因主要有兩點:
- 一是生產者發送消息后未收到 ACK,進而進行重復發送;
- 二是消費者消費完成后,Broker 未收到 ACK,導致消息被重復推送給消費者。
消息重復會對業務產生嚴重影響,比如電商場景中的重復支付、賬務場景中的重復記賬等。
以下是消息重復產生原因的分析圖:
從當前主流消息隊列來看,尚無一款能夠直接解決消息重復的消費問題,所以通常需要在消費端進行冪等處理。
以下是幾種常見的冪等處理思路:
(一)唯一鍵約束
若消息會存儲到本地數據庫,可將消息 ID 設為唯一鍵;若消息不存入數據庫,也可選取消息 ID 或消息中其他具有唯一性的屬性,作為唯一鍵存儲到業務數據表中,以此避免重復消費。
(二)保存消費記錄
借助 Redis 保存消息 ID 也是一種有效方式。在消費消息前,先判斷 Redis 中是否已存在該消息 ID,示例代碼如下:
@Service
public class MessageConsumerService {private static final Logger logger = LoggerFactory.getLogger(MessageConsumerService.class);@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate BusinessService businessService; // 業務處理服務// 消費消息的方法public void consumeMessage(String messageId, String messageBody) {try {// 1. 檢查消息是否已消費(利用Redis的原子性操作)Boolean isConsumed = redisTemplate.opsForValue().setIfAbsent("message:consumed:" + messageId, // Redis鍵名,格式為 message:consumed:{消息ID}"1", // 值設為1表示已消費30, TimeUnit.DAYS); // 設置過期時間,防止內存泄漏if (isConsumed != null && isConsumed) {// 2. 消息未被消費,執行具體業務邏輯try {businessService.processMessage(messageBody);logger.info("消息處理成功,messageId: {}", messageId);} catch (Exception e) {// 業務處理失敗,刪除Redis標記以便重新消費redisTemplate.delete("message:consumed:" + messageId);logger.error("消息處理失敗,已刪除消費標記,messageId: {}", messageId, e);throw e; // 向上拋出異常,觸發重試機制}} else {// 3. 消息已被消費,直接跳過logger.info("消息已被消費,跳過處理,messageId: {}", messageId);}} catch (Exception e) {// 處理異常情況,可根據業務需求添加告警或補償邏輯logger.error("消息消費過程中發生異常,messageId: {}", messageId, e);// 可添加額外的重試邏輯或告警通知}}
}
需要注意的是,若消費失敗,需及時刪除 Redis 中保存的消息 ID,防止后續消息無法正常消費。
三、總結
最后我們用一張圖總結一下這篇文章:
消息不丟失、不重復是消息隊列的核心需求,但在實際應用中,滿足這一要求并非易事。
對于消息丟失問題,主流消息隊列可通過消息重試和消息持久化等手段有效解決;然而,消息重試機制又不可避免地帶來了消息重復的風險。
目前,主流消息隊列在處理消息重復問題上缺乏現成解決方案,對于不允許重復消費的業務場景,開發人員需在 消費端實現冪等處理邏輯,以保障業務的準確性和穩定性。