消息重復消費的原因
- 生產者重試:網絡波動導致生產者未收到 Broker 確認,重復發送消息。
- 消費者失敗:消費者處理消息后未發送 ACK,消息重新入隊。
- 集群故障轉移:主節點宕機,未確認消息被重新投遞。
解決方案
1. 消費者冪等性設計
原理:確保同一消息多次處理的結果與一次處理相同。
實現方式
-
數據庫唯一約束
利用業務字段(如訂單號)的唯一性約束,避免重復插入數據。CREATE TABLE orders (id VARCHAR(64) PRIMARY KEY, -- 唯一訂單號amount DECIMAL(10,2) );
// Java 示例(使用 MyBatis) public void processOrder(Order order) {try {orderMapper.insert(order); // 唯一約束沖突時會拋出異常// 業務邏輯...} catch (DuplicateKeyException e) {// 已處理過該訂單,直接跳過log.warn("訂單已存在: {}", order.getId());} }
-
Redis 原子操作
使用 Redis 記錄已處理消息的 ID,通過SETNX
命令實現原子性檢查。// Java 示例(使用 Spring Data Redis) public boolean isMessageProcessed(String messageId) {Boolean result = redisTemplate.opsForValue().setIfAbsent("msg:" + messageId, "1", Duration.ofMinutes(30));return Boolean.TRUE.equals(result); }public void consumeMessage(Message message) {String messageId = message.getMessageId();if (!isMessageProcessed(messageId)) {// 已處理過,直接返回return;}// 業務邏輯... }
2. 消息全局唯一 ID
原理:為每條消息分配唯一 ID,消費者記錄已處理 ID。
實現步驟
-
生產者端:發送消息時附加唯一 ID。
// Java 示例(使用 RabbitTemplate) public void sendOrder(Order order) {String messageId = UUID.randomUUID().toString();Message message = MessageBuilder.withBody(order.toJson().getBytes()).setHeader("messageId", messageId).build();rabbitTemplate.send("order.exchange", "order.key", message); }
-
消費者端:處理前檢查 ID 是否已存在。
// Java 示例(使用 @RabbitListener) @RabbitListener(queues = "order.queue") public void handleOrder(Message message, Channel channel) throws IOException {String messageId = message.getMessageProperties().getHeader("messageId");if (redisTemplate.hasKey("processed:" + messageId)) {channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);return;}// 業務邏輯...redisTemplate.opsForValue().set("processed:" + messageId, "1", Duration.ofDays(1));channel.basicAck(deliveryTag, false); }
3. 手動確認模式(Manual ACK)
原理:消費者處理完消息后手動發送 ACK,避免消息因異常重新入隊。
配置與代碼
-
配置手動 ACK(Spring Boot):
spring:rabbitmq:listener:simple:acknowledge-mode: manual
-
消費者邏輯:
@RabbitListener(queues = "order.queue") public void handleOrder(Order order, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {try {// 業務邏輯...channel.basicAck(deliveryTag, false); // 確認消息} catch (Exception e) {channel.basicNack(deliveryTag, false, true); // 重入隊列} }
4. 消息去重表
原理:在數據庫中維護一張去重表,記錄已處理的消息 ID。
表結構
CREATE TABLE message_dedup (message_id VARCHAR(128) PRIMARY KEY,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
消費者邏輯
public void consumeMessage(Message message) {String messageId = extractMessageId(message);try {jdbcTemplate.update("INSERT INTO message_dedup (message_id) VALUES (?)", messageId);// 業務邏輯...} catch (DuplicateKeyException e) {// 消息已處理,直接ACKchannel.basicAck(deliveryTag, false);}
}
5. 消息過期與死信隊列
原理:設置消息 TTL,超時未處理則轉入死信隊列,避免無限重試。
配置隊列 TTL 和死信交換
// Java 配置示例
@Bean
public Queue orderQueue() {Map<String, Object> args = new HashMap<>();args.put("x-message-ttl", 60000); // 消息60秒過期args.put("x-dead-letter-exchange", "dlx.exchange");args.put("x-dead-letter-routing-key", "dlx.key");return new Queue("order.queue", true, false, false, args);
}@Bean
public DirectExchange dlxExchange() {return new DirectExchange("dlx.exchange");
}@Bean
public Queue dlxQueue() {return new Queue("dlx.queue");
}@Bean
public Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.key");
}
方案對比與選型
方案 | 優點 | 缺點 | 適用場景 |
---|---|---|---|
數據庫唯一約束 | 無需額外組件 | 高并發下數據庫壓力大 | 低頻業務(如訂單創建) |
Redis 原子操作 | 高性能 | 需維護 Redis 高可用 | 高頻業務(如支付回調) |
手動ACK | 避免消息丟失 | 需處理ACK異常 | 所有需要可靠消費的場景 |
消息去重表 | 數據持久化 | 增加數據庫寫入壓力 | 數據一致性要求高的場景 |
死信隊列 | 避免消息堆積 | 需額外處理死信消息 | 需要異常消息兜底的場景 |
總結
- 冪等性設計是核心:無論消息重復多少次,業務結果保持一致。
- 組合使用多種方案:例如“手動ACK + Redis去重”兼顧可靠性與性能。
- 監控與告警:通過 RabbitMQ 管理界面監控消息積壓情況,設置閾值告警。