目錄
- 一、序言
- 二、延遲隊列實現
- 1、Redisson延時消息監聽注解和消息體
- 2、Redisson延時消息發布器
- 3、Redisson延時消息監聽處理器
- 三、測試用例
- 四、結語
一、序言
兩個月前接了一個4萬的私活,做一個線上商城小程序,在交易過程中不可避免的一個問題就是用戶下單后的訂單自動取消。
目前成熟的方案有通過RabbitMQ+死信隊列
、RabbitMQ+延遲消息插件
、RocketMQ定時消息推送
、Redisson延時隊列
來實現。
考慮到商城的定位和用戶體量,以及系統維護成本,其實完全沒有必要引入消息中間件,借助Redis其實就可以輕松實現這個需求。
加上Redisson客戶端本身就已經實現了很多分布式集合工具類,借助阻塞隊列和延時隊列就可輕松搞定。
當然,為了使用方便以及團隊協作,順便模仿@RabbitListener
封裝了一套基于注解的消息消費,廢話不多說,直接上代碼。
二、延遲隊列實現
1、Redisson延時消息監聽注解和消息體
延遲消息監聽器定義:
/*** Redisson延時隊列監聽器** @author Nick Liu* @date 2024/11/13*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RedissonDelayedQueueListener {/*** 隊列名稱* @return*/String queueName();
}
消息體定義:
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RedisDelayedMsgDTO {/*** 消息內容*/private String msg;/*** 隊列名稱*/private String queueName;/*** 延時時間*/private long delayTime;private TimeUnit timeUnit;
}
2、Redisson延時消息發布器
@Slf4j
@Component
@RequiredArgsConstructor
public class RedissonDelayedMsgPublisher {private final RedissonClient redissonClient;/*** 發布延時信息* @param delayedMsgDTO*/public void publishDelayedMsg(RedisDelayedMsgDTO delayedMsgDTO) {log.info("開始發布延遲消息: {}", FastJsonUtils.toJsonString(delayedMsgDTO));RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(delayedMsgDTO.getQueueName());RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);delayedQueue.offer(delayedMsgDTO.getMsg(), delayedMsgDTO.getDelayTime(), delayedMsgDTO.getTimeUnit());}
}
這里我們借助RBlockingQueue
和RDelayedQueue
來實現,只有當延遲消息快到期時,消費者才能從阻塞隊列拉取到消息,否則消費者將一直阻塞。
3、Redisson延時消息監聽處理器
這里我們定義了一個BeanPostProcessor
的實現,目的就是為了掃描Spring容器中所有帶RedissonDelayedQueueListener
注解的Bean實例和方法。
/*** Redisson延遲隊列Bean后處理器* @author Nick Liu* @date 2025/1/3*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedissonDelayedQueuePostProcessor implements BeanPostProcessor {private final RedissonClient redissonClient;@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {// 獲取最終的目標運行時對象Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);Method[] methods = clazz.getDeclaredMethods();for (Method m : methods) {if (!m.isAnnotationPresent(RedissonDelayedQueueListener.class)) {continue;}// 如果Bean上的方法有Redisson隊列監聽注解,則啟動一個線程監聽隊列RedissonDelayedQueueListener annotation = m.getAnnotation(RedissonDelayedQueueListener.class);CompletableFuture.runAsync(() -> {log.info("開始監聽Redisson延時隊列[{}]消息", annotation.queueName());while (true) {RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(annotation.queueName());redissonClient.getDelayedQueue(blockingQueue);try {String msg = blockingQueue.take();MDC.put(CommonConst.X_REQUEST_ID, SerialNoUtils.generateSimpleUUID());log.info("監聽到隊列[{}]延時消息: {}", annotation.queueName(), msg);m.invoke(bean, msg);MDC.remove(CommonConst.X_REQUEST_ID);} catch (Exception e) {log.error(e.getMessage(), e);}}});}return bean;}}
這里我們掃描到指定Bean的方法后,會開啟一個異步線程,并輪詢拉取延時消息,如果消息沒過期,異步線程將會一直阻塞等待。
三、測試用例
/*** @author Nick Liu* @date 2025/2/2*/
@Slf4j
@RestController
@RequiredArgsConstructor
public class RedissonDelayedMsgController {private static final String DELAYED_QUEUE = "redisson:delayed:queue";private final RedissonDelayedMsgPublisher redissonDelayedMsgPublisher;@GetMapping("/delayed/msg")public ResponseEntity<RedisDelayedMsgDTO> publishDelayedMsg() {RedisDelayedMsgDTO redisDelayedMsgDTO = new RedisDelayedMsgDTO();redisDelayedMsgDTO.setQueueName(DELAYED_QUEUE);redisDelayedMsgDTO.setMsg("This is a delayed msg");redisDelayedMsgDTO.setDelayTime(10);redisDelayedMsgDTO.setTimeUnit(TimeUnit.SECONDS);redissonDelayedMsgPublisher.publishDelayedMsg(redisDelayedMsgDTO);return ResponseEntity.ok(redisDelayedMsgDTO);}@RedissonDelayedQueueListener(queueName = DELAYED_QUEUE)public void handleDelayedMsg(String msg) {log.info("Received delayed msg: {}", msg);}
}
啟動服務后,Bean后處理器會啟動異步線程監聽延時消息,如下:
2025-02-02 16:46:04.271 INFO [] [ForkJoinPool.commonPool-worker-2] [c.xlyj.common.message.RedissonDelayedQueuePostProcessor.lambda$postProcessAfterInitialization$0():44] - 開始監聽Redisson延時隊列[redisson:delayed:queue]消息
瀏覽器直接輸入http://localhost:8000/delayed/msg
發布延時消息,10s后消費者進行處理,如下:
2025-02-02 16:43:11.107 INFO [e810d175b0e24e71a4b9e517366b4aa6] [ForkJoinPool.commonPool-worker-2] [c.xlyj.common.message.RedissonDelayedQueuePostProcessor.lambda$postProcessAfterInitialization$0():51] - 監聽到隊列[redisson:delayed:queue]延時消息: This is a delayed msg
2025-02-02 16:43:11.108 INFO [e810d175b0e24e71a4b9e517366b4aa6] [ForkJoinPool.commonPool-worker-2] [com.xlyj.contoller.RedissonDelayedMsgController.handleDelayedMsg():40] - Received delayed msg: This is a delayed msg
四、結語
雖說通過Redisson實現的延遲隊列也能實現支付訂單的自動取消,但是可用性相比專業的消息中間件還是尚有不足的。
比如消息生產者發送消息沒有確認機制,消息消費也沒有確認機制,這兩個環節都有可能導致消息丟失。
當然我們可以通過其它保障機制去補償,比如再加上定時任務掃表,把掃描時間可以設置長一點,保證最終的一致性。
在大型項目中還是優先推薦專業的消息中間件去實現延時消息消費。