Redisson - 實現延遲隊列

Redisson 延遲隊列

Redisson 是基于 Redis 的一款功能強大的 Java 客戶端。它提供了諸如分布式鎖、限流器、阻塞隊列、延遲隊列等高可用、高并發組件。

其中,RDelayedQueue 是對 Redis 數據結構的高階封裝,能讓你將消息延遲一定時間后再進入消費隊列。

延遲隊列的組成

Redisson 延遲隊列由兩個 Redis 隊列組成:

RDelayedQueue:延遲元素容器,暫存在 Redis 的 zset 中(按時間排序)
RBlockingQueue(目標隊列):當延遲時間到達后,Redisson 會自動將元素移動到這個隊列,供消費者消費。

Redisson 內部使用 定時輪詢線程 來掃描延遲數據并遷移至目標隊列。

原理

Redisson 的 RDelayedQueue 設計巧妙,它并非一個單一的 Redis 數據結構,而是結合了 Redis 的 ZSET (有序集合) 和 List (列表,具體實現為 RBlockingQueue) 來實現的

> 生產者  -- (元素, 延遲時間) -->  RDelayedQueue (API)
>                                      |
>                                      | (Redisson 內部)
>                                      V
>             +--------------------------------------------------+
>             |   ZSET (有序集合)                                |
>             |   Key: delayed_queue_name_zset                   |
>             |   Member: task_id_or_payload                     |
>             |   Score: execution_timestamp                     |
>             +--------------------------------------------------+
>                        ^
>                        |  (Eviction Scheduler 定期掃描)
>                        |  (到期任務)
>                        V
>             +--------------------------------------------------+
>             |   RBlockingQueue (目標隊列,基于 Redis List)      |
>             |   Key: destination_queue_name                    |
>             |   Value: task_payload                            |
>             +--------------------------------------------------+
>                                      ^
>                                      |
>                                      | (消費者 take()/poll())
>                                      V 消費者  <---------------------------

Redisson 延遲隊列的優勢

分布式特性:基于 Redis,天然支持分布式環境,多個生產者和消費者實例可以共享同一個延遲隊列系統。
高性能與持久化:依賴 Redis 的高性能特性。如果 Redis 配置了持久化 (AOF/RDB),延遲任務的元數據也能得到持久化保障。
易用性:Redisson 提供了簡潔易懂的 API,屏蔽了底層 ZSET 和 List 的復雜操作。
精確度較高:任務的到期判斷依賴 Redis 服務器的時間,通常比較準確。
支持任務取消:可以通過 RDelayedQueue.remove(object) 方法從延遲隊列中移除尚未到期的任務(前提是任務對象能被正確 equals 比較)。

Redisson隊列實踐

添加依賴

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.24.3</version>
</dependency>

Redisson 配置 (application.yml):

spring:

  application:name: redisson-delayed-order-app# Redisson 配置 (如果使用 redisson-spring-boot-starter,它會嘗試自動配置)# 你也可以提供一個 redisson.yaml 文件或在 Java Config 中配置 RedissonClient Bean# 例如,指定單個 Redis 服務器:redis: # Spring Boot 2.x 使用 spring.redis, Spring Boot 3.x 使用 spring.data.redis# Redisson starter 會嘗試使用這些配置,但更推薦使用 Redisson 自身的配置方式# address: redis://127.0.0.1:6379 # Redisson 推薦的格式host: 127.0.0.1port: 6379# database: 0# password:# Redisson 自己的配置方式 (更靈活,可以放在 redisson.yaml 中)
# redisson:
#   singleServerConfig:
#     address: "redis://127.0.0.1:6379"
#     database: 0
#   # codec: org.redisson.codec.JsonJacksonCodec # 推薦使用 Jackson 序列化

定義延遲任務處理器(消費者)

@Component
public class OrderCloseConsumer implements InitializingBean {private static final String QUEUE_NAME = "order-close-queue";@Autowiredprivate RedissonClient redissonClient;@Autowiredprivate OrderService orderService;@Overridepublic void afterPropertiesSet() {RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(QUEUE_NAME);RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);// 啟動消費線程Executors.newSingleThreadExecutor().submit(() -> {while (true) {try {String orderSn = blockingQueue.take(); // 阻塞等待orderService.closeOrder(orderSn); // 業務處理log.info("訂單超時關閉成功:{}", orderSn);} catch (Exception e) {log.error("延遲關單處理異常", e);}}});}
}

在下單時設置延遲任務

public void createOrder(String orderSn) {// 1. 業務入庫邏輯...// 2. 加入延遲隊列RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue("order-close-queue");RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);delayedQueue.offer(orderSn, 30, TimeUnit.MINUTES); // 30分鐘后執行log.info("訂單加入延遲關閉隊列:{}", orderSn);
}

關鍵注意事項

1、序列化 (Codec):
Redisson 默認使用 MarshallingCodec,它要求任務對象實現 java.io.Serializable。
推薦配置 RedissonClient 使用 org.redisson.codec.JsonJacksonCodec,這樣任務對象無需實現 Serializable,更靈活且跨語言友好。在 RedissonClient Bean 配置中設置:config.setCodec(new JsonJacksonCodec());

2、冪等性: 消費者的處理邏輯(如 processOrderTimeout)必須是冪等的。即使同一個任務被重復消費(如消費者處理中斷后任務重投,或 remove 失敗后任務仍到期),最終結果也應一致。通過檢查訂單當前狀態是保證冪等性的關鍵。

3、任務取消的 equals() 和 hashCode(): RDelayedQueue.remove(object) 依賴于任務對象的 equals() 和 hashCode() 方法來查找并刪除。確保任務載體類 (如 OrderTimeoutTask) 正確實現了這兩個方法,通常基于任務的唯一標識(如訂單號)。

4、消費者線程管理: 使用 InitializingBean 和 DisposableBean 來管理消費者線程的生命周期。消費者邏輯應在單獨的線程中執行,避免阻塞主線程。可以使用 Executors 創建線程池來并發處理任務。

5、錯誤處理與重試: 消費者循環中應有完善的 try-catch 塊,處理單個任務失敗的情況,避免整個消費者線程掛掉。可以考慮引入失敗任務記錄、告警或簡單的延時重試(但要注意避免無限重試)。

6、Redis 持久化: 為保證系統重啟后延遲任務不丟失(ZSET 中的元數據),Redis 服務器應配置 RDB 或 AOF 持久化。

7、消費者并發與伸縮:
可以啟動多個消費者實例(不同JVM進程),它們會從同一個目標 RBlockingQueue 中競爭獲取任務,實現負載均衡。
在單個消費者實例內部,也可以使用線程池來并發處理從隊列中獲取的任務。

8、監控: 關注 Redis 中 ZSET 和 List 的長度、Redisson 調度線程的健康狀況、任務處理的成功率和耗時等指標,以便及時發現和處理問題。

9、Redisson 版本: 確保使用較新的穩定版 Redisson,因為早期版本可能在延遲隊列的某些邊緣場景下存在問題。

總結

Redisson 的 RDelayedQueue 通過巧妙地結合 Redis ZSET 和 List,提供了一個強大、易用且高效的分布式延遲隊列解決方案。它非常適合如訂單超時關閉、延時通知、定時任務等場景。在實踐中,務必注意序列化、冪等性、任務取消的正確實現以及消費者端的健壯性設計,才能充分發揮其優勢,構建穩定可靠的分布式應用。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/83646.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/83646.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/83646.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

上門服務小程序訂單系統框架設計

一、邏輯分析 上門服務小程序訂單系統主要涉及服務展示、用戶下單、訂單處理、服務人員接單與服務完成反饋等核心流程。 服務展示&#xff1a;不同類型的上門服務&#xff08;如家政、維修等&#xff09;需要在小程序中展示詳細信息&#xff0c;包括服務名稱、價格、服務內容介…

Android apk裝機編譯類型: verify、speed-profile, speed與啟動耗時

Android apk裝機編譯類型: verify、speed-profile, speed與啟動耗時 Dex2oat (dalvik excutable file to optimized art file) &#xff0c;對 dex 文件進行編譯優化&#xff0c;Android 虛擬機可識別的是dex文件&#xff0c;應用運行過程如果每次都將dex文件加載內存&#xff…

winrm登錄失敗,指定的憑據被服務器拒絕

winrm登錄失敗&#xff0c;指定的憑據被服務器拒絕。 異常提示&#xff1a;the specified credentials were rejected by the server 在windows power shell執行 set-executionpolicy remotesigned winrm quickconfig winrm set winrm/config/service/auth {Basic"true…

Unity3D ET框架游戲腳本系統解析

前言 ET框架在Unity3D中實現的GamePlay腳本系統是一種革命性的、基于ECS&#xff08;實體-組件-系統&#xff09;架構的設計&#xff0c;它徹底改變了傳統的基于MonoBehaviour的游戲邏輯編寫方式。其核心思想是追求高性能、高解耦、易熱更新&#xff0c;特別適合大型復雜的網絡…

android與Qt類比

一、概念對應關系 Android RecyclerView 組件類比描述Qt 模型 - 視圖組件Qt 類比描述RecyclerView畫板&#xff08;容器&#xff09;QAbstractItemView視圖&#xff08;展示數據的容器&#xff0c;如列表、表格&#xff09;RecyclerView.Adapter畫布&#xff08;數據橋梁&…

Jenkins 2.479.1安裝和郵箱配置教程

1.安裝 在JDK安裝并設置環境變量完成后&#xff0c;下載官網對應的war版本&#xff0c;在對應目錄下打開命令行窗口并輸入 java -jar jenkins.war其余參數感興趣可以自行查閱&#xff0c;這里啟動的 jenkins 服務默認占用8080端口&#xff0c;在瀏覽器輸入 localhost:8080進入…

多分辨率 LCD 的 GUI 架構設計與實現

1.1多分辨率顯示系統的挑戰與解決方案 1.1.1 分辨率適配的核心問題 在嵌入式系統中,同時支持不同分辨率的 LCD(如 240160、320480 等)面臨以下挑戰: 布局適配:同一界面元素在不同分辨率下需要調整大小和位置 字體顯示:小分辨率屏幕需要更小的字體,而大分辨率需要更清…

11. MySQL事務管理(上)

1. CURD不加控制&#xff0c;會有什么問題&#xff1f; 火車票售票系統tickets表 id name nums 10 西安<->蘭州 1 客戶端A 客戶端B if (nums > 0) { if (nums > 0) { 賣票 賣票 // update numsnums - 1 update numsnums - 1 } } 當客戶端A檢查還有一張票時&#xf…

Beta分布Dirichlet分布

目錄 Beta分布Dirichlet分布Beta分布&Dirichlet分布從Dirichlet分布生成Beta樣本Beta分布&Dirichlet分布應用 Beta分布 Beta分布是定義在區間 [ 0 , 1 ] [0, 1] [0,1]上的連續概率分布&#xff0c;通常用于模擬概率或比例的隨機變量。Beta分布的概率密度函數&#xff…

嵌入式系統中常用的開源協議

目錄 1、GNU通用公共許可證&#xff08;GPL&#xff09; 2、GNU寬松通用公共許可證&#xff08;LGPL&#xff09; 3、MIT許可證 4、Apache許可證2.0 5、BSD許可證 6、如何選擇合適的協議 在嵌入式系統開發中&#xff0c;開源軟件的使用已成為主流趨勢。從物聯網設備到汽車…

告別延遲,擁抱速度:存儲加速仿真應用的解決方案【1】

需求分析 現代仿真&#xff08;如CFD流體動力學、FEA結構分析、電磁仿真、氣候模擬、自動駕駛場景仿真、芯片設計等&#xff09;會產生PB級甚至EB級的數據。海量數據的生成、處理和存儲&#xff0c;主要體現在以下幾個關鍵方面&#xff1a; 數據量爆炸式增長&#xff1a;高分…

vue封裝gsap自定義動畫指令

1、指令文件封裝 import { gsap } from gsap;// 動畫類型配置 const ANIMATION_TYPES {// 縮放scale: {from: { scale: 0.5, opacity: 0 },to: { scale: 1, opacity: 1 },hide: { scale: 0.5, opacity: 0 },},// 透明度fade: {from: { opacity: 0 },to: { opacity: 1, ease: …

HTTP 如何升級成 HTTPS

有一個自己的項目需要上線&#xff0c;域名解析完成后&#xff0c;發現只能使用 http 協議&#xff0c;這在瀏覽器上會限制&#xff0c;提示用戶不安全&#xff0c;所以需要把 HTTP 升級成 HTTPS 協議&#xff0c;但又不想花錢。 前提條件&#xff1a; 已經配置好 Nginx 服務器…

測試面試題總結一

目錄 列表、元組、字典的區別 nvicat連接出現問題如何排查 mysql性能調優 python連接mysql數據庫方法 參數化 pytest.mark.parametrize 裝飾器 list1 [1,7,4,5,5,6] for i in range(len(list1): assert list1[i] < list1[i1] 這段程序有問題嘛&#xff1f; pytest.i…

[藍橋杯]密文搜索

密文搜索 題目描述 福爾摩斯從 X 星收到一份資料&#xff0c;全部是小寫字母組成。 他的助手提供了另一份資料&#xff1a;許多長度為 8 的密碼列表。 福爾摩斯發現&#xff0c;這些密碼是被打亂后隱藏在先前那份資料中的。 請你編寫一個程序&#xff0c;從第一份資料中搜…

打卡第36天:模型可視化以及推理

知識點回顧&#xff1a; 1.三種不同的模型可視化方法&#xff1a;推薦torchinfo打印summary權重分布可視化 2.進度條功能&#xff1a;手動和自動寫法&#xff0c;讓打印結果更加美觀 3.推理的寫法&#xff1a;評估模式 作業&#xff1a;調整模型定義時的超參數&#xff0c;對…

8天Python從入門到精通【itheima】-68(元組)

目錄 65節——元組的定義和操作 1.學習目標 2.為什么要學習元組 3.元組的定義 4.定義元組的注意事項 5.元組的嵌套 6.元組的相關操作 【1】index方法 【2】count方法 【3】len方法 7.元組的遍歷 【1】while循環進行元組的遍歷 【2】for循環進行元組的變量 Python …

鏈表題解——環形鏈表【LeetCode】

141. 環形鏈表 方法一 核心思想&#xff1a; 使用一個集合 seen 來記錄已經訪問過的節點。遍歷鏈表&#xff0c;如果當前節點已經存在于集合中&#xff0c;說明鏈表存在環&#xff1b;否則&#xff0c;將當前節點添加到集合中&#xff0c;繼續遍歷。如果遍歷結束&#xff08;h…

【免費數據】1980-2022年中國2384個站點的水質數據

水&#xff0c;是生命之源&#xff0c;關乎著地球上每一個生物的生存與發展。健康的水生生態系統維持著整個水生態的平衡與活力&#xff1b;更是確保人類能持續獲得清潔水源的重要保障。水質數據在水質研究、海洋生物量測算以及生物多樣性評估等諸多關鍵領域都扮演著舉足輕重的…

分享推薦高精度磁阻式磁編碼器芯片

磁編碼器其通過感應旋轉磁場來實現角度、轉速的測量&#xff0c;因此&#xff0c;相較于傳統的光編碼器&#xff0c;磁編碼器對粉塵、污垢和油脂等污染物有很強的耐受性&#xff0c;即使在較為惡劣的環境中仍能夠保持高分辨率與檢測精度&#xff0c;安裝和維護簡捷方便&#xf…