基于Redis的4種延時隊列實現方式

延時隊列是一種特殊的消息隊列,它允許消息在指定的時間后被消費。在微服務架構、電商系統和任務調度場景中,延時隊列扮演著關鍵角色。例如,訂單超時自動取消、定時提醒、延時支付等都依賴延時隊列實現。

Redis作為高性能的內存數據庫,具備原子操作、數據結構豐富和簡單易用的特性,本文將介紹基于Redis實現分布式延時隊列的四種方式。

1. 基于Sorted Set的延時隊列

原理

利用Redis的Sorted Set(有序集合),將消息ID作為member,執行時間戳作為score進行存儲。通過ZRANGEBYSCORE命令可以獲取到達執行時間的任務。

代碼實現

public class RedisZSetDelayQueue {private final StringRedisTemplate redisTemplate;private final String queueKey = "delay_queue:tasks";public RedisZSetDelayQueue(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;}/*** 添加延時任務* @param taskId 任務ID* @param taskInfo 任務信息(JSON字符串)* @param delayTime 延遲時間(秒)*/public void addTask(String taskId, String taskInfo, long delayTime) {// 計算執行時間long executeTime = System.currentTimeMillis() + delayTime * 1000;// 存儲任務詳情redisTemplate.opsForHash().put("delay_queue:details", taskId, taskInfo);// 添加到延時隊列redisTemplate.opsForZSet().add(queueKey, taskId, executeTime);System.out.println("Task added: " + taskId + ", will execute at: " + executeTime);}/*** 輪詢獲取到期任務*/public List<String> pollTasks() {long now = System.currentTimeMillis();// 獲取當前時間之前的任務Set<String> taskIds = redisTemplate.opsForZSet().rangeByScore(queueKey, 0, now);if (taskIds == null || taskIds.isEmpty()) {return Collections.emptyList();}// 獲取任務詳情List<String> tasks = new ArrayList<>();for (String taskId : taskIds) {String taskInfo = (String) redisTemplate.opsForHash().get("delay_queue:details", taskId);if (taskInfo != null) {tasks.add(taskInfo);// 從集合和詳情中移除任務redisTemplate.opsForZSet().remove(queueKey, taskId);redisTemplate.opsForHash().delete("delay_queue:details", taskId);}}return tasks;}// 定時任務示例public void startTaskProcessor() {ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {try {List<String> tasks = pollTasks();for (String task : tasks) {processTask(task);}} catch (Exception e) {e.printStackTrace();}}, 0, 1, TimeUnit.SECONDS);}private void processTask(String taskInfo) {System.out.println("Processing task: " + taskInfo);// 實際任務處理邏輯}
}

優缺點

優點

  • 實現簡單,易于理解
  • 任務按執行時間自動排序
  • 支持精確的時間控制

缺點

  • 需要輪詢獲取到期任務,消耗CPU資源
  • 大量任務情況下,ZRANGEBYSCORE操作可能影響性能
  • 沒有消費確認機制,需要額外實現

2. 基于List + 定時輪詢的延時隊列

原理

這種方式使用多個List作為存儲容器,按延遲時間的不同將任務分配到不同的隊列中。通過定時輪詢各個隊列,將到期任務移動到一個立即執行隊列。

代碼實現

public class RedisListDelayQueue {private final StringRedisTemplate redisTemplate;private final String readyQueueKey = "delay_queue:ready";  // 待處理隊列private final Map<Integer, String> delayQueueKeys;  // 延遲隊列,按延時時間分級public RedisListDelayQueue(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;// 初始化不同延遲級別的隊列delayQueueKeys = new HashMap<>();delayQueueKeys.put(5, "delay_queue:delay_5s");     // 5秒delayQueueKeys.put(60, "delay_queue:delay_1m");    // 1分鐘delayQueueKeys.put(300, "delay_queue:delay_5m");   // 5分鐘delayQueueKeys.put(1800, "delay_queue:delay_30m"); // 30分鐘}/*** 添加延時任務*/public void addTask(String taskInfo, int delaySeconds) {// 選擇合適的延遲隊列String queueKey = selectDelayQueue(delaySeconds);// 任務元數據,包含任務信息和執行時間long executeTime = System.currentTimeMillis() + delaySeconds * 1000;String taskData = executeTime + ":" + taskInfo;// 添加到延遲隊列redisTemplate.opsForList().rightPush(queueKey, taskData);System.out.println("Task added to " + queueKey + ": " + taskData);}/*** 選擇合適的延遲隊列*/private String selectDelayQueue(int delaySeconds) {// 找到最接近的延遲級別int closestDelay = delayQueueKeys.keySet().stream().filter(delay -> delay >= delaySeconds).min(Integer::compareTo).orElse(Collections.max(delayQueueKeys.keySet()));return delayQueueKeys.get(closestDelay);}/*** 移動到期任務到待處理隊列*/public void moveTasksToReadyQueue() {long now = System.currentTimeMillis();// 遍歷所有延遲隊列for (String queueKey : delayQueueKeys.values()) {boolean hasMoreTasks = true;while (hasMoreTasks) {// 查看隊列頭部任務String taskData = redisTemplate.opsForList().index(queueKey, 0);if (taskData == null) {hasMoreTasks = false;continue;}// 解析任務執行時間long executeTime = Long.parseLong(taskData.split(":", 2)[0]);// 檢查是否到期if (executeTime <= now) {// 通過LPOP原子性地移除隊列頭部任務String task = redisTemplate.opsForList().leftPop(queueKey);// 任務可能被其他進程處理,再次檢查if (task != null) {// 提取任務信息并添加到待處理隊列String taskInfo = task.split(":", 2)[1];redisTemplate.opsForList().rightPush(readyQueueKey, taskInfo);System.out.println("Task moved to ready queue: " + taskInfo);}} else {// 隊列頭部任務未到期,無需檢查后面的任務hasMoreTasks = false;}}}}/*** 獲取待處理任務*/public String getReadyTask() {return redisTemplate.opsForList().leftPop(readyQueueKey);}/*** 啟動任務處理器*/public void startTaskProcessors() {// 定時移動到期任務ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);// 移動任務線程scheduler.scheduleAtFixedRate(this::moveTasksToReadyQueue, 0, 1, TimeUnit.SECONDS);// 處理任務線程scheduler.scheduleAtFixedRate(() -> {String task = getReadyTask();if (task != null) {processTask(task);}}, 0, 100, TimeUnit.MILLISECONDS);}private void processTask(String taskInfo) {System.out.println("Processing task: " + taskInfo);// 實際任務處理邏輯}
}

優缺點

優點

  • 分級隊列設計,降低單隊列壓力
  • 相比Sorted Set占用內存少
  • 支持隊列監控和任務優先級

缺點

  • 延遲時間精度受輪詢頻率影響
  • 實現復雜度高
  • 需要維護多個隊列
  • 時間判斷和隊列操作非原子性,需特別處理并發問題

3. 基于發布/訂閱(Pub/Sub)的延時隊列

原理

結合Redis發布/訂閱功能與本地時間輪算法,實現延遲任務的分發和處理。任務信息存儲在Redis中,而時間輪負責任務的調度和發布。

代碼實現

public class RedisPubSubDelayQueue {private final StringRedisTemplate redisTemplate;private final String TASK_TOPIC = "delay_queue:task_channel";private final String TASK_HASH = "delay_queue:tasks";private final HashedWheelTimer timer;public RedisPubSubDelayQueue(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;// 初始化時間輪,刻度100ms,輪子大小512this.timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512);// 啟動消息訂閱subscribeTaskChannel();}/*** 添加延時任務*/public void addTask(String taskId, String taskInfo, long delaySeconds) {// 存儲任務信息到RedisredisTemplate.opsForHash().put(TASK_HASH, taskId, taskInfo);// 添加到時間輪timer.newTimeout(timeout -> {// 發布任務就緒消息redisTemplate.convertAndSend(TASK_TOPIC, taskId);}, delaySeconds, TimeUnit.SECONDS);System.out.println("Task scheduled: " + taskId + ", delay: " + delaySeconds + "s");}/*** 訂閱任務通道*/private void subscribeTaskChannel() {redisTemplate.getConnectionFactory().getConnection().subscribe((message, pattern) -> {String taskId = new String(message.getBody());// 獲取任務信息String taskInfo = (String) redisTemplate.opsForHash().get(TASK_HASH, taskId);if (taskInfo != null) {// 處理任務processTask(taskId, taskInfo);// 刪除任務redisTemplate.opsForHash().delete(TASK_HASH, taskId);}}, TASK_TOPIC.getBytes());}private void processTask(String taskId, String taskInfo) {System.out.println("Processing task: " + taskId + " - " + taskInfo);// 實際任務處理邏輯}// 模擬HashedWheelTimer類public static class HashedWheelTimer {private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);private final long tickDuration;private final TimeUnit unit;public HashedWheelTimer(long tickDuration, TimeUnit unit, int wheelSize) {this.tickDuration = tickDuration;this.unit = unit;}public void newTimeout(TimerTask task, long delay, TimeUnit timeUnit) {long delayMillis = timeUnit.toMillis(delay);scheduler.schedule(() -> task.run(null), delayMillis, TimeUnit.MILLISECONDS);}public interface TimerTask {void run(Timeout timeout);}public interface Timeout {}}
}

優缺點

優點

  • 即時觸發,無需輪詢
  • 高效的時間輪算法
  • 可以跨應用訂閱任務
  • 分離任務調度和執行,降低耦合

缺點

  • 依賴本地時間輪,非純Redis實現
  • Pub/Sub模式無消息持久化,可能丟失消息
  • 服務重啟時需要重建時間輪
  • 訂閱者需要保持連接

4. 基于Redis Stream的延時隊列

原理

Redis 5.0引入的Stream是一個強大的數據結構,專為消息隊列設計。結合Stream的消費組和確認機制,可以構建可靠的延時隊列。

代碼實現

public class RedisStreamDelayQueue {private final StringRedisTemplate redisTemplate;private final String delayQueueKey = "delay_queue:stream";private final String consumerGroup = "delay_queue_consumers";private final String consumerId = UUID.randomUUID().toString();public RedisStreamDelayQueue(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;// 創建消費者組try {redisTemplate.execute((RedisCallback<String>) connection -> {connection.streamCommands().xGroupCreate(delayQueueKey.getBytes(), consumerGroup, ReadOffset.from("0"), true);return "OK";});} catch (Exception e) {// 消費者組可能已存在System.out.println("Consumer group may already exist: " + e.getMessage());}}/*** 添加延時任務*/public void addTask(String taskInfo, long delaySeconds) {long executeTime = System.currentTimeMillis() + delaySeconds * 1000;Map<String, Object> task = new HashMap<>();task.put("executeTime", String.valueOf(executeTime));task.put("taskInfo", taskInfo);redisTemplate.opsForStream().add(delayQueueKey, task);System.out.println("Task added: " + taskInfo + ", execute at: " + executeTime);}/*** 獲取待執行的任務*/public List<String> pollTasks() {long now = System.currentTimeMillis();List<String> readyTasks = new ArrayList<>();// 讀取尚未處理的消息List<MapRecord<String, Object, Object>> records = redisTemplate.execute((RedisCallback<List<MapRecord<String, Object, Object>>>) connection -> {return connection.streamCommands().xReadGroup(consumerGroup.getBytes(),consumerId.getBytes(),StreamReadOptions.empty().count(10),StreamOffset.create(delayQueueKey.getBytes(), ReadOffset.from(">")));});if (records != null) {for (MapRecord<String, Object, Object> record : records) {String messageId = record.getId().getValue();Map<Object, Object> value = record.getValue();long executeTime = Long.parseLong((String) value.get("executeTime"));String taskInfo = (String) value.get("taskInfo");// 檢查任務是否到期if (executeTime <= now) {readyTasks.add(taskInfo);// 確認消息已處理redisTemplate.execute((RedisCallback<String>) connection -> {connection.streamCommands().xAck(delayQueueKey.getBytes(),consumerGroup.getBytes(),messageId.getBytes());return "OK";});// 可選:從流中刪除消息redisTemplate.opsForStream().delete(delayQueueKey, messageId);} else {// 任務未到期,放回隊列redisTemplate.execute((RedisCallback<String>) connection -> {connection.streamCommands().xAck(delayQueueKey.getBytes(),consumerGroup.getBytes(),messageId.getBytes());return "OK";});// 重新添加任務(可選:使用延遲重新入隊策略)Map<String, Object> newTask = new HashMap<>();newTask.put("executeTime", String.valueOf(executeTime));newTask.put("taskInfo", taskInfo);redisTemplate.opsForStream().add(delayQueueKey, newTask);}}}return readyTasks;}/*** 啟動任務處理器*/public void startTaskProcessor() {ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {try {List<String> tasks = pollTasks();for (String task : tasks) {processTask(task);}} catch (Exception e) {e.printStackTrace();}}, 0, 1, TimeUnit.SECONDS);}private void processTask(String taskInfo) {System.out.println("Processing task: " + taskInfo);// 實際任務處理邏輯}
}

優缺點

優點

  • 支持消費者組和消息確認,提供可靠的消息處理
  • 內置消息持久化機制
  • 支持多消費者并行處理
  • 消息ID包含時間戳,便于排序

缺點

  • 要求Redis 5.0+版本
  • 實現相對復雜
  • 仍需輪詢獲取到期任務
  • 對未到期任務的處理相對繁瑣

性能對比與選型建議

實現方式性能可靠性實現復雜度內存占用適用場景
Sorted Set★★★★☆★★★☆☆任務量適中,需要精確調度
List + 輪詢★★★★★★★★☆☆高并發,延時精度要求不高
Pub/Sub + 時間輪★★★★★★★☆☆☆實時性要求高,可容忍服務重啟丟失
Stream★★★☆☆★★★★★可靠性要求高,需要消息確認

總結

在實際應用中,可根據系統規模、性能需求、可靠性要求和實現復雜度等因素進行選擇,也可以組合多種方式打造更符合業務需求的延時隊列解決方案。無論選擇哪種實現,都應關注可靠性、性能和監控等方面,確保延時隊列在生產環境中穩定運行。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/901944.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/901944.shtml
英文地址,請注明出處:http://en.pswp.cn/news/901944.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

GN ninja 工程化構建例程

文章目錄 1. 前言?2. 工程實例??2.1 工程目錄結構2.2 工程頂層.gn文件2.3 工具鏈配置.gn文件2.4 編譯配置.gn文件2.5 編譯目標配置.gn文件2.6 工程接口文件2.7 動態庫編譯.gn文件2.8 動態庫源文件2.9 靜態庫編譯.gn文件2.10 靜態庫源文件2.11 主程序編譯.gn文件2.12 主程序源…

基于亞博K210開發板——內存卡讀寫文件

開發板 亞博K210開發板 實驗目的 本實驗主要學習 K210 通過 SPI 讀寫內存卡文件的功能 實驗準備 實驗元件 開發板自帶的 TF 卡、LCD 顯示屏 &#xff08;提前準備好 FAT32 格式的TF 卡。TF 插入 TF 卡槽的時候注意方向&#xff0c;TF 卡的金手指那一面需要面向開發板&am…

51單片機實驗五:A/D和D/A轉換

一、實驗環境與實驗器材 環境&#xff1a;Keli&#xff0c;STC-ISP燒寫軟件,Proteus. 器材&#xff1a;TX-1C單片機&#xff08;STC89C52RC&#xff09;、電腦。 二、 實驗內容及實驗步驟 1.A/D轉換 概念&#xff1a;模數轉換是將連續的模擬信號轉換為離散的數字信…

C++ 常用的智能指針

C 智能指針 一、智能指針類型概覽 C 標準庫提供以下智能指針&#xff08;需包含頭文件 <memory>&#xff09;&#xff1a; unique_ptr&#xff1a;獨占所有權&#xff0c;不可復制&#xff0c; 可移動shared_ptr&#xff1a;共享所有權&#xff0c;用于引用計數weak_pt…

6.8.最小生成樹

一.復習&#xff1a; 1.生成樹&#xff1a; 對于一個連通的無向圖&#xff0c;假設圖中有n個頂點&#xff0c;如果能找到一個符合以下要求的子圖&#xff1a; 子圖中包含圖中所有的頂點&#xff0c;同時各個頂點保持連通&#xff0c; 而且子圖的邊的數量只有n-1條&#xff0…

Spring Boot 集成金蝶 API 演示

? Spring Boot 集成金蝶 API 演示&#xff1a;登錄 / 注銷 Cookie 保存 本文將通過 Spring Boot 完整實現一套金蝶接口集成模型&#xff0c;包括&#xff1a; ? 普通登錄? AppSecret 登錄? 注銷? Cookie 保存與復用 &#x1f4c5; 項目結構 src/ ├── controller/ │…

React 受控表單綁定基礎

React 中最常見的幾個需求是&#xff1a; 渲染一組列表綁定點擊事件表單數據與組件狀態之間的綁定 受控表單綁定是理解表單交互的關鍵之一。 &#x1f4cd;什么是受控組件&#xff1f; 在 React 中&#xff0c;所謂“受控組件”&#xff0c;指的是表單元素&#xff08;如 &l…

基于FPGA的AES加解密系統verilog實現,包含testbench和開發板硬件測試

目錄 1.課題概述 2.系統測試效果 3.核心程序與模型 4.系統原理簡介 4.1 字節替換&#xff08;SubBytes&#xff09; 4.2 行移位&#xff08;ShiftRows&#xff09; 4.3 列混合&#xff08;MixColumns&#xff09; 4.4 輪密鑰加&#xff08;AddRoundKey&#xff09; 4.…

6.5 GitHub監控系統實戰:雙通道采集+動態調度打造高效運維體系

GitHub Sentinel Agent 定期更新功能設計與實現 關鍵詞:GitHub API 集成、定時任務調度、Python 爬蟲開發、SMTP 郵件通知、系統穩定性保障 1. GitHub 項目數據獲取功能 1.1 雙通道數據采集架構設計 #mermaid-svg-ZHJIMXcMAyDHVhmV {font-family:"trebuchet ms",v…

Explorer++:輕量級高效文件管理器!!

項目簡介 Explorer 是一款專為Windows操作系統設計的輕量級且高效的文件管理器。作為Windows資源管理器的強大替代方案&#xff0c;它提供了豐富的特性和優化的用戶體驗&#xff0c;使得文件管理和組織變得更加便捷高效。無論是專業用戶還是普通用戶&#xff0c;都能從中受益&a…

7、生命周期:魔法的呼吸節奏——React 19 新版鉤子

一、魔法呼吸的本質 "每個組件都是活體魔法生物&#xff0c;呼吸節奏貫穿其生命始終&#xff0c;"鄧布利多的冥想盆中浮現三維相位圖&#xff0c;"React 19的呼吸式鉤子&#xff0c;讓組件能量流轉如尼可勒梅的煉金術&#xff01;" ——以霍格沃茨魔法生理…

理解計算篇--正則表達式轉NFA--理論部分

空正則表達式轉NFA單字符正則表達式轉NFA拼接正則表達式轉NFA選擇正則表達式轉NFA重復正則表達式轉NFA 正則表達式轉NFA–實戰部分 空正則表達式轉NFA 轉換步驟&#xff1a; 構建1個只有1個狀態的NFA起始狀態也是接受狀態沒有規則&#xff0c;即規則集為空 單字符正則表達式…

穩態模型下的異步電機調速【運動控制系統】

異步電動機&#xff1a; n1是同步轉速&#xff08;電機和磁芯同步時候的轉速&#xff09; n&#xff1a;電機的實際轉速 異步電動機恒壓頻比的概念&#xff0c;為什么基頻以下可以采取恒壓頻率&#xff0c;基頻以上不可以采用恒壓頻比&#xff1a; 異步電動機的恒壓頻比&…

【KWDB 創作者計劃】_算法篇---Stockwell變換

文章目錄 前言一、Stockwell變換原理詳解1.1 連續S變換定義1.2 離散S變換1.3簡介 二、S變換的核心特點2.1頻率自適應的時頻分辨率2.1.1高頻區域2.1.2低頻區域 2.2無交叉項干擾2.3完全可逆2.4相位保持2.5與傅里葉譜的直接關系 三、應用領域3.1地震信號分析3.2生物醫學信號處理3.…

云計算(Cloud Computing)概述——從AWS開始

李升偉 編譯 無需正式介紹亞馬遜網絡服務&#xff08;Amazon Web Services&#xff0c;簡稱AWS&#xff09;。作為行業領先的云服務提供商&#xff0c;AWS為全球開發者提供了超過170項隨時可用的服務。 例如&#xff0c;Adobe能夠獨立于IT團隊開發和更新軟件。通過AWS的服務&…

Python爬蟲第17節-動態渲染頁面抓取之Selenium使用下篇

目錄 引言 一、獲取節點信息 1.1 獲取屬性 1.2 獲取文本值 1.3 獲取ID、位置、標簽名、大小 二、切換Frame 三、延時等待 3.1 隱式等待 3.2 顯式等待 四、前進后退 五、Cookies 六、選項卡管理 七、異常處理 引言 這一節我們繼續講解Selenium的使用下篇&#xff0…

容器docker入門學習

這里寫目錄標題 容器容器的軟件廠商 dockerdocker引擎 虛擬化虛擬化技術 docker安裝詳解1、安裝檢查2、安裝yum相關的工具3、安裝docker-ce軟件4、查看docker版本5、啟動docker服務6、設置docker開機啟動7、查看有哪些docker容器運行進程8、查看容器里有哪些鏡像9、下載nginx軟…

文獻總結:NIPS2023——車路協同自動駕駛感知中的時間對齊(FFNet)

FFNet 一、文獻基本信息二、背景介紹三、相關研究1. 以自車為中心的3D目標檢測2. 車路協同3D目標檢測3. 特征流 四、FFNet網絡架構1. 車路協同3D目標檢測任務定義2. 特征流網絡2.1 特征流生成2.2 壓縮、傳輸與解壓縮2.3 車輛傳感器數據與基礎設施特征流融合 3. 特征流網絡訓練流…

git 出現 port 443 Connection timed out

梯子正常延遲不算嚴重&#xff0c;但在使用git push時反復出現 fatal: unable to access https://github.com/irvingwu5/xxxx.git/ Error in the HTTP2 framing layer Failed to connect to github.com port 443 after 136353 ms: Connection timed out 將git的網絡配置與梯子…