通過Redisson構建延時隊列并實現注解式消費

目錄

  • 一、序言
  • 二、延遲隊列實現
    • 1、Redisson延時消息監聽注解和消息體
    • 2、Redisson延時消息發布器
    • 3、Redisson延時消息監聽處理器
  • 三、測試用例
  • 四、結語

一、序言

兩個月前接了一個4萬的私活,做一個線上商城小程序,在交易過程中不可避免的一個問題就是用戶下單后的訂單自動取消。

目前成熟的方案有通過RabbitMQ+死信隊列RabbitMQ+延遲消息插件RocketMQ定時消息推送Redisson延時隊列來實現。

考慮到商城的定位和用戶體量,以及系統維護成本,其實完全沒有必要引入消息中間件,借助Redis其實就可以輕松實現這個需求。

加上Redisson客戶端本身就已經實現了很多分布式集合工具類,借助阻塞隊列和延時隊列就可輕松搞定。

當然,為了使用方便以及團隊協作,順便模仿@RabbitListener封裝了一套基于注解的消息消費,廢話不多說,直接上代碼。


二、延遲隊列實現

1、Redisson延時消息監聽注解和消息體

延遲消息監聽器定義:

/*** Redisson延時隊列監聽器** @author Nick Liu* @date 2024/11/13*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RedissonDelayedQueueListener {/*** 隊列名稱* @return*/String queueName();
}

消息體定義:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RedisDelayedMsgDTO {/*** 消息內容*/private String msg;/*** 隊列名稱*/private String queueName;/*** 延時時間*/private long delayTime;private TimeUnit timeUnit;
}

2、Redisson延時消息發布器

@Slf4j
@Component
@RequiredArgsConstructor
public class RedissonDelayedMsgPublisher {private final RedissonClient redissonClient;/*** 發布延時信息* @param delayedMsgDTO*/public void publishDelayedMsg(RedisDelayedMsgDTO delayedMsgDTO) {log.info("開始發布延遲消息: {}", FastJsonUtils.toJsonString(delayedMsgDTO));RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(delayedMsgDTO.getQueueName());RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);delayedQueue.offer(delayedMsgDTO.getMsg(), delayedMsgDTO.getDelayTime(), delayedMsgDTO.getTimeUnit());}
}

這里我們借助RBlockingQueueRDelayedQueue來實現,只有當延遲消息快到期時,消費者才能從阻塞隊列拉取到消息,否則消費者將一直阻塞。

3、Redisson延時消息監聽處理器

這里我們定義了一個BeanPostProcessor 的實現,目的就是為了掃描Spring容器中所有帶RedissonDelayedQueueListener注解的Bean實例和方法。

/*** Redisson延遲隊列Bean后處理器* @author Nick Liu* @date 2025/1/3*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedissonDelayedQueuePostProcessor implements BeanPostProcessor {private final RedissonClient redissonClient;@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {// 獲取最終的目標運行時對象Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);Method[] methods = clazz.getDeclaredMethods();for (Method m : methods) {if (!m.isAnnotationPresent(RedissonDelayedQueueListener.class)) {continue;}// 如果Bean上的方法有Redisson隊列監聽注解,則啟動一個線程監聽隊列RedissonDelayedQueueListener annotation = m.getAnnotation(RedissonDelayedQueueListener.class);CompletableFuture.runAsync(() -> {log.info("開始監聽Redisson延時隊列[{}]消息", annotation.queueName());while (true) {RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(annotation.queueName());redissonClient.getDelayedQueue(blockingQueue);try {String msg = blockingQueue.take();MDC.put(CommonConst.X_REQUEST_ID, SerialNoUtils.generateSimpleUUID());log.info("監聽到隊列[{}]延時消息: {}", annotation.queueName(), msg);m.invoke(bean, msg);MDC.remove(CommonConst.X_REQUEST_ID);} catch (Exception e) {log.error(e.getMessage(), e);}}});}return bean;}}

這里我們掃描到指定Bean的方法后,會開啟一個異步線程,并輪詢拉取延時消息,如果消息沒過期,異步線程將會一直阻塞等待。


三、測試用例

/*** @author Nick Liu* @date 2025/2/2*/
@Slf4j
@RestController
@RequiredArgsConstructor
public class RedissonDelayedMsgController {private static final String DELAYED_QUEUE = "redisson:delayed:queue";private final RedissonDelayedMsgPublisher redissonDelayedMsgPublisher;@GetMapping("/delayed/msg")public ResponseEntity<RedisDelayedMsgDTO> publishDelayedMsg() {RedisDelayedMsgDTO redisDelayedMsgDTO = new RedisDelayedMsgDTO();redisDelayedMsgDTO.setQueueName(DELAYED_QUEUE);redisDelayedMsgDTO.setMsg("This is a delayed msg");redisDelayedMsgDTO.setDelayTime(10);redisDelayedMsgDTO.setTimeUnit(TimeUnit.SECONDS);redissonDelayedMsgPublisher.publishDelayedMsg(redisDelayedMsgDTO);return ResponseEntity.ok(redisDelayedMsgDTO);}@RedissonDelayedQueueListener(queueName = DELAYED_QUEUE)public void handleDelayedMsg(String msg) {log.info("Received delayed msg: {}", msg);}
}

啟動服務后,Bean后處理器會啟動異步線程監聽延時消息,如下:

2025-02-02 16:46:04.271 INFO  [] [ForkJoinPool.commonPool-worker-2] [c.xlyj.common.message.RedissonDelayedQueuePostProcessor.lambda$postProcessAfterInitialization$0():44] - 開始監聽Redisson延時隊列[redisson:delayed:queue]消息

瀏覽器直接輸入http://localhost:8000/delayed/msg發布延時消息,10s后消費者進行處理,如下:

2025-02-02 16:43:11.107 INFO  [e810d175b0e24e71a4b9e517366b4aa6] [ForkJoinPool.commonPool-worker-2] [c.xlyj.common.message.RedissonDelayedQueuePostProcessor.lambda$postProcessAfterInitialization$0():51] - 監聽到隊列[redisson:delayed:queue]延時消息: This is a delayed msg
2025-02-02 16:43:11.108 INFO  [e810d175b0e24e71a4b9e517366b4aa6] [ForkJoinPool.commonPool-worker-2] [com.xlyj.contoller.RedissonDelayedMsgController.handleDelayedMsg():40] - Received delayed msg: This is a delayed msg

四、結語

雖說通過Redisson實現的延遲隊列也能實現支付訂單的自動取消,但是可用性相比專業的消息中間件還是尚有不足的。

比如消息生產者發送消息沒有確認機制,消息消費也沒有確認機制,這兩個環節都有可能導致消息丟失。

當然我們可以通過其它保障機制去補償,比如再加上定時任務掃表,把掃描時間可以設置長一點,保證最終的一致性。

在大型項目中還是優先推薦專業的消息中間件去實現延時消息消費。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/67792.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/67792.shtml
英文地址,請注明出處:http://en.pswp.cn/web/67792.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

MVC 文件夾:架構之美與實際應用

MVC 文件夾:架構之美與實際應用 引言 MVC(Model-View-Controller)是一種設計模式,它將應用程序分為三個核心組件:模型(Model)、視圖(View)和控制器(Controller)。這種架構模式不僅提高了代碼的可維護性和可擴展性,而且使得開發流程更加清晰。本文將深入探討MVC文…

【PyQt】lambda函數,實現動態傳遞參數

為什么需要 lambda&#xff1f; 在 PyQt5 中&#xff0c;clicked 信號默認會傳遞一個布爾值&#xff08;表示按鈕是否被選中&#xff09;。如果我們希望將按鈕的文本內容傳遞給槽函數&#xff0c;需要通過 lambda 函數顯式傳遞參數。 這樣可以實現將按鈕內容傳遞給槽函數&…

pytorch深度Q網絡

人工智能例子匯總&#xff1a;AI常見的算法和例子-CSDN博客 DQN 引入了深度神經網絡來近似Q函數&#xff0c;解決了傳統Q-learning在處理高維狀態空間時的瓶頸&#xff0c;尤其是在像 Atari 游戲這樣的復雜環境中。DQN的核心思想是使用神經網絡 Q(s,a;θ)Q(s, a; \theta)Q(s,…

Baklib構建高效協同的基于云的內容中臺解決方案

內容概要 隨著云計算技術的飛速發展&#xff0c;內容管理的方式也在不斷演變。企業面臨著如何在數字化轉型過程中高效管理和協同處理內容的新挑戰。為應對這些挑戰&#xff0c;引入基于云的內容中臺解決方案顯得尤為重要。 Baklib作為創新型解決方案提供商&#xff0c;致力于…

DeepSeek-R1 論文. Reinforcement Learning 通過強化學習激勵大型語言模型的推理能力

論文鏈接&#xff1a; [2501.12948] DeepSeek-R1: Incentivizing Reasoning Capability in LLMs via Reinforcement Learning 實在太長&#xff0c;自行扔到 Model 里&#xff0c;去翻譯去提問吧。 工作原理&#xff1a; 主要技術&#xff0c;就是訓練出一些專有用途小模型&…

C++泛型編程指南03-CTAD

文章目錄 C17 自定義類型推斷指引&#xff08;CTAD&#xff09;深度解析一、基礎概念1. 核心作用2. 工作原理 二、標準庫中的 CTAD 應用1. 容器類型推導2. 智能指針推導3. 元組類型推導 三、自定義推導指引語法1. 基本語法結構2. 典型應用場景 四、推導指引設計模式1. 迭代器范…

deepseek+vscode自動化測試腳本生成

近幾日Deepseek大火,我這里也嘗試了一下,確實很強。而目前vscode的AI toolkit插件也已經集成了deepseek R1,這里就介紹下在vscode中利用deepseek幫助我們完成自動化測試腳本的實踐分享 安裝AI ToolKit并啟用Deepseek 微軟官方提供了一個針對AI輔助的插件,也就是 AI Toolk…

電介質超表面中指定渦旋的非線性生成

渦旋光束在眾多領域具有重要應用&#xff0c;但傳統光學器件產生渦旋光束的方式限制了其在集成系統中的應用。超表面的出現為渦旋光束的產生帶來了新的可能性&#xff0c;尤其是在非線性領域&#xff0c;盡管近些年來已經有一些研究&#xff0c;但仍存在諸多問題&#xff0c;如…

基于Springboot+mybatis+mysql+html圖書管理系統2

基于Springbootmybatismysqlhtml圖書管理系統2 一、系統介紹二、功能展示1.用戶登陸2.用戶主頁3.圖書查詢4.還書5.個人信息修改6.圖書管理&#xff08;管理員&#xff09;7.學生管理&#xff08;管理員&#xff09;8.廢除記錄&#xff08;管理員&#xff09; 三、數據庫四、其它…

重構字符串(767)

767. 重構字符串 - 力扣&#xff08;LeetCode&#xff09; 解法&#xff1a; class Solution { public:string reorganizeString(string s){string res;//因為1 < s.length < 500 &#xff0c; uint64_t 類型足夠uint16_t n s.size();if (n 0) {return res;}unordere…

本地部署DeepSeek方法

本地部署完成后的效果如下圖&#xff0c;整體與chatgpt類似&#xff0c;只是模型在本地推理。 我們在本地部署主要使用兩個工具&#xff1a; ollamaopen-webui ollama是在本地管理和運行大模型的工具&#xff0c;可以直接在terminal里和大模型對話。open-webui是提供一個類…

游戲引擎 Unity - Unity 啟動(下載 Unity Editor、生成 Unity Personal Edition 許可證)

Unity Unity 首次發布于 2005 年&#xff0c;屬于 Unity Technologies Unity 使用的開發技術有&#xff1a;C# Unity 的適用平臺&#xff1a;PC、主機、移動設備、VR / AR、Web 等 Unity 的適用領域&#xff1a;開發中等畫質中小型項目 Unity 適合初學者或需要快速上手的開…

【開源免費】基于Vue和SpringBoot的公寓報修管理系統(附論文)

本文項目編號 T 186 &#xff0c;文末自助獲取源碼 \color{red}{T186&#xff0c;文末自助獲取源碼} T186&#xff0c;文末自助獲取源碼 目錄 一、系統介紹二、數據庫設計三、配套教程3.1 啟動教程3.2 講解視頻3.3 二次開發教程 四、功能截圖五、文案資料5.1 選題背景5.2 國內…

Haskell語言的多線程編程

Haskell語言的多線程編程 Haskell是一種基于函數式編程范式的編程語言&#xff0c;以其強大的類型系統和懶惰求值著稱。近年來&#xff0c;隨著多核處理器的發展&#xff0c;多線程編程變得日益重要。雖然Haskell最初并不是為了多線程而設計&#xff0c;但它的設計理念和工具集…

《蒼穹外賣》項目學習記錄-Day11訂單統計

根據起始時間和結束時間&#xff0c;先把begin放入集合中用while循環當begin不等于end的時候&#xff0c;讓begin加一天&#xff0c;這樣就把這個區間內的時間放到List集合。 查詢每天的訂單總數也就是查詢的時間段是大于當天的開始時間&#xff08;0點0分0秒&#xff09;小于…

【python】python油田數據分析與可視化(源碼+數據集)【獨一無二】

&#x1f449;博__主&#x1f448;&#xff1a;米碼收割機 &#x1f449;技__能&#x1f448;&#xff1a;C/Python語言 &#x1f449;專__注&#x1f448;&#xff1a;專注主流機器人、人工智能等相關領域的開發、測試技術。 【python】python油田數據分析與可視化&#xff08…

FBX SDK的使用:基礎知識

Windows環境配置 FBX SDK安裝后&#xff0c;目錄下有三個文件夾&#xff1a; include 頭文件lib 編譯的二進制庫&#xff0c;根據你項目的配置去包含相應的庫samples 官方使用案列 動態鏈接 libfbxsdk.dll, libfbxsdk.lib是動態庫&#xff0c;需要在配置屬性->C/C->預…

【單層神經網絡】基于MXNet庫簡化實現線性回歸

寫在前面 同最開始的兩篇文章 完整程序及注釋 導入使用的庫# 基本 from mxnet import autograd, nd, gluon # 模型、網絡 from mxnet.gluon import nn from mxnet import init # 學習 from mxnet.gluon import loss as gloss # 數據集 from mxnet.gluon…

【爬蟲】JS逆向解決某藥的商品價格加密

??????????歡迎來到我的博客?????????? ??作者:秋無之地 ??簡介:CSDN爬蟲、后端、大數據領域創作者。目前從事python爬蟲、后端和大數據等相關工作,主要擅長領域有:爬蟲、后端、大數據開發、數據分析等。 ??歡迎小伙伴們點贊????、收藏??、…

OpenAI開源戰略反思:中國力量推動AI產業變革

在周五的Reddit問答會上&#xff0c;OpenAI首席執行官Sam Altman罕見承認公司正面臨來自中國科技企業的強勁挑戰。這位向來強硬的硅谷領軍者坦言&#xff0c;以深度求索&#xff08;DeepSeek&#xff09;為代表的中國AI公司正在改寫行業游戲規則。 這場歷時三小時的對話揭示了…