文章目錄
- 死信隊列
- RabbitMQ 配置類 RabbitMQConfig.java
- 生產者 OrderTimeoutProducer.java
- 消費者 OrderTimeoutConsumer.java
- 應用配置 application.yml
- pom.xml 依賴
- 實體類 Order.java(不變)
- Mapper 接口 OrderMapper.java(不變)
- 服務層 OrderService.java(不變)
- 緩存配置 CacheConfig.java(不變)
- 對賬服務 ReconciliationTask.java(不變)
- 控制器 OrderController.java(不變)
死信隊列
在代碼中,processedMsgCache的類型是Cache<String, Boolean>,鍵是消息ID,值是Boolean。當處理成功時,會執行processedMsgCache.put(msgId, true),將消息ID存入緩存,并設置有效期1小時,最大容量10000條。這樣,在1小時內,如果同樣的消息再次被處理,緩存中已經有記錄,就會直接ACK并跳過處理。
不過,這里需要注意的是,消息ID是否唯一。在RabbitMQ中,消息ID默認可能不是唯一的,生產者發送消息時可以設置messageId。如果生產者沒有明確設置,可能無法保證唯一性,這會導致冪等性檢查失效。因此,需要確保生產者發送消息時設置了唯一的messageId,或者在消費者端使用其他唯一標識,比如消息內容中的orderId結合其他屬性。
另外,緩存的過期時間設置為1小時,這可能與業務場景有關。如果消息的存活時間超過1小時,可能會有重復處理的風險。需要根據實際消息的存活時間來調整緩存的過期時間,確保覆蓋消息可能被重新投遞的時間窗口。
還有一個問題是,緩存是本地緩存,如果消費者有多個實例,每個實例的緩存是獨立的。這可能導致不同的實例處理同一條消息,因為一個實例處理過,但另一個實例的緩存中沒有記錄。這種情況下,本地緩存的冪等性檢查可能不夠,需要考慮分布式緩存,比如Redis,來保證全局唯一性。但根據當前代碼,在單實例或允許短暫重復的場景下使用本地緩存。
總結來說,冪等性檢查的邏輯是通過緩存已處理消息的ID,在消息處理前檢查是否已存在,存在則跳過處理,避免重復執行。這適用于消息隊列保證至少一次投遞,但業務需要確保冪等的場景。
+---------------------+| RabbitMQ Message || (攜帶唯一messageId) |+----------+----------+|v
+----------------+ +-------+-------+ +-----------------+
| 消息到達消費者 | ----> | 檢查緩存是否存在 | ----> | 存在:直接ACK丟棄消息 |
+----------------+ +-------+-------+ +-----------------+|| 不存在v+-------+-------+ +-----------------+| 執行業務邏輯處理 | ----> | 成功:存入緩存并ACK |+---------------+ +-----------------+
緩存過期時間(1小時)> 消息最大存活時間(30分鐘+重試時間)
計算公式:緩存過期時間 = 消息TTL + 最大重試時間 * 重試次數 + 緩沖時間
緩存擊穿 | 空值緩存 | 對不存在的key也進行緩存(需設置較短過期時間) |
---|---|---|
緩存穿透 | 布隆過濾器 | 在緩存前增加過濾層 |
消費者重啟 | 持久化存儲 | 配合數據庫記錄處理狀態 |
網絡分區 | 最終一致性 | 依賴對賬服務修正狀態 |
組件 | 類型 | 作用說明 |
---|---|---|
processedMsgCache | Caffeine緩存 | 存儲已處理消息的唯一標識 |
messageId | 字符串 | 消息唯一標識(需生產者保證唯一性) |
deliveryTag | 長整型 | RabbitMQ消息投遞標識 |
sequenceDiagramparticipant RabbitMQparticipant Consumerparticipant Cacheparticipant DBRabbitMQ->>Consumer: 投遞消息(messageId=123)Consumer->>Cache: 查詢messageId=123alt 存在緩存Cache-->>Consumer: 返回trueConsumer->>RabbitMQ: 發送ACKelse 無緩存Consumer->>DB: 執行取消操作alt 操作成功Consumer->>Cache: 寫入messageId=123Consumer->>RabbitMQ: 發送ACKelse 操作失敗Consumer->>RabbitMQ: 發送NACK(requeue=true)endend
RabbitMQ 配置類 RabbitMQConfig.java
import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 訂單超時相關配置public static final String ORDER_DELAY_EXCHANGE = "order.delay.exchange";public static final String ORDER_DELAY_QUEUE = "order.delay.queue";public static final String ORDER_DELAY_ROUTING_KEY = "order.delay";// 死信隊列配置public static final String ORDER_DEAD_LETTER_EXCHANGE = "order.dead.letter.exchange";public static final String ORDER_DEAD_LETTER_QUEUE = "order.dead.letter.queue";public static final String ORDER_DEAD_LETTER_ROUTING_KEY = "order.dead.letter";// 聲明延遲隊列(設置死信參數)@Beanpublic Queue orderDelayQueue() {return QueueBuilder.durable(ORDER_DELAY_QUEUE).withArgument("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE).withArgument("x-dead-letter-routing-key", ORDER_DEAD_LETTER_ROUTING_KEY).build();}// 聲明延遲交換機@Beanpublic DirectExchange orderDelayExchange() {return new DirectExchange(ORDER_DELAY_EXCHANGE);}// 綁定延遲隊列到交換機@Beanpublic Binding delayBinding() {return BindingBuilder.bind(orderDelayQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);}// 聲明死信隊列@Beanpublic Queue deadLetterQueue() {return new Queue(ORDER_DEAD_LETTER_QUEUE, true);}// 聲明死信交換機@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange(ORDER_DEAD_LETTER_EXCHANGE);}// 綁定死信隊列到交換機@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(ORDER_DEAD_LETTER_ROUTING_KEY);}// JSON 消息轉換器@Beanpublic MessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}
}
生產者 OrderTimeoutProducer.java
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
public class OrderTimeoutProducer {private final RabbitTemplate rabbitTemplate;public OrderTimeoutProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void sendTimeoutMessage(String orderId) {// 設置消息過期時間為30分鐘(單位:毫秒)MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("1800000");return message;}};rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_DELAY_EXCHANGE,RabbitMQConfig.ORDER_DELAY_ROUTING_KEY,orderId,messagePostProcessor);}
}
消費者 OrderTimeoutConsumer.java
import com.github.benmanes.caffeine.cache.Cache;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;@Component
public class OrderTimeoutConsumer {private final OrderService orderService;private final Cache<String, Boolean> processedMsgCache;public OrderTimeoutConsumer(OrderService orderService, Cache<String, Boolean> processedMsgCache) {this.orderService = orderService;this.processedMsgCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.HOURS).maximumSize(10000).build();}@RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)public void processMessage(Message message, Channel channel) throws IOException {String orderId = new String(message.getBody(), StandardCharsets.UTF_8);String messageId = message.getMessageProperties().getMessageId();long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 冪等性檢查if (processedMsgCache.getIfPresent(messageId) != null) {channel.basicAck(deliveryTag, false);return;}boolean success = orderService.safeCancel(orderId);if (success) {processedMsgCache.put(messageId, true);System.out.println("訂單超時取消成功: " + orderId);}channel.basicAck(deliveryTag, false);} catch (Exception e) {// 記錄錯誤日志,重新放回隊列channel.basicNack(deliveryTag, false, true);System.err.println("處理訂單超時取消失敗: " + orderId);e.printStackTrace();}}
}
應用配置 application.yml
spring:rabbitmq:host: ${RABBITMQ_HOST:localhost}port: 5672username: ${RABBITMQ_USER:guest}password: ${RABBITMQ_PASSWORD:guest}virtual-host: /connection-timeout: 5000template:retry:enabled: truemax-attempts: 3initial-interval: 1000mslistener:simple:acknowledge-mode: manual # 手動確認模式prefetch: 10 # 每次預取數量retry:enabled: truemax-attempts: 3initial-interval: 1000ms
pom.xml 依賴
<!-- 移除 RocketMQ 依賴 -->
<!-- 添加 RabbitMQ 依賴 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
實體類 Order.java(不變)
public class Order {// 保持原有實現
}
Mapper 接口 OrderMapper.java(不變)
@Mapper
public interface OrderMapper {// 保持原有SQL操作
}
服務層 OrderService.java(不變)
@Service
public class OrderService {// 保持原有業務邏輯
}
緩存配置 CacheConfig.java(不變)
@Configuration
public class CacheConfig {// 保持原有緩存配置
}
對賬服務 ReconciliationTask.java(不變)
@Component
public class ReconciliationTask {// 保持原有定時任務邏輯
}
控制器 OrderController.java(不變)
@RestController
@RequestMapping("/orders")
public class OrderController {// 保持原有API接口
}
關鍵差異對比
功能點 | RocketMQ 實現 | RabbitMQ 實現 |
---|---|---|
延遲機制 | 內置延遲級別 | TTL+死信隊列 |
消息存儲 | 持久化到CommitLog | 內存+磁盤持久化 |
消費確認 | 自動ACK | 手動ACK+重試機制 |
消息追蹤 | 原生支持消息軌跡 | 需要額外實現 |
集群方案 | 主從復制 | 鏡像隊列 |
部署注意事項
隊列初始化:確保首次啟動時自動創建所需交換機和隊列
消息持久化:所有隊列聲明時設置durable=true
監控配置:需要監控以下指標:
死信隊列消息堆積量
消費者處理耗時
消息TTL過期情況
集群配置:建議至少3節點RabbitMQ集群
故障轉移:配置鏡像隊列實現高可用
性能優化建議
批量處理:
@RabbitListener(queues = QUEUE, containerFactory = "batchFactory")
public void processMessages(List<Message> messages) {// 批量查詢緩存Map<String, Boolean> existMap = cache.getAllPresent(messageIds);// 過濾已處理消息// 批量執行數據庫操作
}
異步更新:
CompletableFuture.runAsync(() -> {processedMsgCache.put(messageId, true);
});
分級緩存:
Cache<String, Boolean> L1 = Caffeine.newBuilder().maximumSize(1000).build();
Cache<String, Boolean> L2 = RedisClient.getCache();
首先看CacheConfig.java,里面有一個orderStatusCache的Bean,使用Caffeine配置,最大大小10000,過期時間10分鐘。然后在OrderService.java中,有一個注入的Cache<String, OrderStatus> orderCache。看起來這里應該用的是這個Bean,因為在OrderService里用了@RequiredArgsConstructor,所以orderCache這個字段會被構造器注入,而對應的Bean應該就是CacheConfig里的orderStatusCache。所以雖然用戶可能沒有顯式地用@Autowired,但是因為用了Lombok的@RequiredArgsConstructor,如果orderCache是final的話,Spring會自動注入同類型的Bean。這時候需要檢查OrderService中的orderCache的類型是否匹配。
看OrderService的定義:private final Cache<String, OrderStatus> orderCache; 而CacheConfig中的Bean也是這個類型,所以應該會被正確注入。所以這個Bean是有被使用的,可能沒注意到OrderService里的注入,導致認為沒有用到。
另外,在OrderTimeoutConsumer.java中,還有一個processedMsgCache,這個是用Caffeine直接new的,并沒有使用Spring管理的Bean。所以這個緩存是消費者自己創建的,和CacheConfig無關。而orderStatusCache是被OrderService使用的,所以這個配置是必須的,否則OrderService中的orderCache會找不到對應的Bean,導致啟動失敗。
所以可能在于誤解了哪里用到了這個Bean。可能在代碼中沒有顯式地看到@Autowired或者@Resource,但實際上因為Lombok的@RequiredArgsConstructor和Spring的自動注入機制,這個Bean是被OrderService使用的。如果沒有CacheConfig中的Bean,Spring啟動時會報錯,提示找不到符合條件的Cache類型的Bean來注入到OrderService中。