Spring Boot+Redis Zset:三步構建高可靠延遲隊列系統

系統設計

架構圖

+----------------+       +-----------------+       +----------------+
|                |       |                 |       |                |
|  生產者        |------>| Redis ZSet      |------>| 定時任務消費者  |
|  (添加延遲任務) |       | (延遲隊列存儲)  |       | (掃描并處理任務)|
|                |       |                 |       |                |
+----------------+       +-----------------+       +----------------+↑                                              ||                                              ↓|                                   +---------------------++-----------------------------------|     任務處理器       || (執行具體業務邏輯)   |+---------------------+

核心流程

  1. 生產者將任務添加到Redis ZSet中,score為任務執行時間戳

  2. 定時任務定期掃描ZSet,找出score小于當前時間的任務

  3. 消費者線程池處理到期的任務

  4. 任務處理完成后從ZSet中移除

實現步驟

步驟一:添加依賴(pom.xml)

<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Redis集成 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- 定時任務 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-quartz</artifactId></dependency><!-- JSON處理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- Lombok簡化代碼 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
</dependencies>

步驟二:配置Redis(application.yml)

spring:redis:host: localhostport: 6379password: database: 0lettuce:pool:max-active: 20max-idle: 10min-idle: 2max-wait: 10000ms# 自定義延遲隊列配置
delay:queue:key: "delay_queue"  # Redis ZSet鍵名batch-size: 10      # 每次處理任務數量interval: 5000      # 定時任務執行間隔(ms)thread-pool-size: 5 # 消費者線程池大小

步驟三:創建任務模型(DelayTask.java)

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class DelayTask {/*** 任務類型枚舉*/public enum TaskType {ORDER_TIMEOUT,   // 訂單超時處理EMAIL_REMINDER,  // 郵件提醒TASK_EXECUTION   // 定時任務執行}private TaskType type;    // 任務類型private String taskId;    // 任務唯一IDprivate String content;   // 任務內容private long createTime;  // 任務創建時間private long executeTime; // 任務執行時間// 重寫toString方法用于序列化@Overridepublic String toString() {return "DelayTask{" +"type=" + type +", taskId='" + taskId + '\'' +", content='" + content + '\'' +", createTime=" + createTime +", executeTime=" + executeTime +'}';}
}

步驟四:創建Redis配置類(RedisConfig.java)

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);// 使用Jackson序列化GenericJackson2JsonRedisSerializer jacksonSerializer = new GenericJackson2JsonRedisSerializer();// Key序列化template.setKeySerializer(new StringRedisSerializer());// Value序列化template.setValueSerializer(jacksonSerializer);// Hash Key序列化template.setHashKeySerializer(new StringRedisSerializer());// Hash Value序列化template.setHashValueSerializer(jacksonSerializer);template.afterPropertiesSet();return template;}
}

步驟五:創建線程池配置類(ThreadPoolConfig.java)

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;@Configuration
public class ThreadPoolConfig {@Value("${delay.queue.thread-pool-size:5}")private int threadPoolSize;@Bean("taskExecutor")public Executor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 核心線程數executor.setCorePoolSize(threadPoolSize);// 最大線程數executor.setMaxPoolSize(threadPoolSize * 2);// 隊列大小executor.setQueueCapacity(100);// 線程名前綴executor.setThreadNamePrefix("delay-task-");// 拒絕策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 初始化executor.initialize();return executor;}
}

步驟六:創建延遲隊列服務(DelayQueueService.java)

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Set;
import java.util.concurrent.Executor;@Slf4j
@Service
public class DelayQueueService {@Value("${delay.queue.key}")private String delayQueueKey;@Value("${delay.queue.batch-size}")private int batchSize;@Resourceprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate Executor taskExecutor;/*** 添加延遲任務* @param task 任務對象* @param delaySeconds 延遲秒數*/public void addTask(DelayTask task, long delaySeconds) {long executeTime = System.currentTimeMillis() + (delaySeconds * 1000);task.setExecuteTime(executeTime);// 添加到Redis ZSetredisTemplate.opsForZSet().add(delayQueueKey, task, executeTime);log.info("添加延遲任務成功, 任務ID: {}, 執行時間: {}", task.getTaskId(), executeTime);}/*** 定時掃描任務(每5秒執行一次)*/@Scheduled(fixedRateString = "${delay.queue.interval}")public void scanExpiredTasks() {long now = System.currentTimeMillis();log.debug("開始掃描延遲隊列, 當前時間: {}", now);// 獲取當前時間之前的所有任務Set<ZSetOperations.TypedTuple<Object>> tasks = redisTemplate.opsForZSet().rangeByScoreWithScores(delayQueueKey, 0, now, 0, batchSize);if (tasks == null || tasks.isEmpty()) {log.debug("未找到待處理任務");return;}log.info("發現 {} 個待處理任務", tasks.size());for (ZSetOperations.TypedTuple<Object> tuple : tasks) {Object taskObj = tuple.getValue();if (taskObj instanceof DelayTask) {DelayTask task = (DelayTask) taskObj;// 使用線程池異步處理任務taskExecutor.execute(() -> processTask(task));}}}/*** 處理任務* @param task 延遲任務*/@Asyncpublic void processTask(DelayTask task) {try {log.info("開始處理任務: {}", task.getTaskId());// 根據任務類型執行不同邏輯switch (task.getType()) {case ORDER_TIMEOUT:handleOrderTimeout(task);break;case EMAIL_REMINDER:sendReminderEmail(task);break;case TASK_EXECUTION:executeScheduledTask(task);break;default:log.warn("未知任務類型: {}", task.getType());}// 處理完成后從隊列中移除redisTemplate.opsForZSet().remove(delayQueueKey, task);log.info("任務處理完成并移除: {}", task.getTaskId());} catch (Exception e) {log.error("任務處理失敗: {}", task.getTaskId(), e);handleProcessingError(task);}}// 示例:訂單超時處理private void handleOrderTimeout(DelayTask task) {log.info("處理訂單超時任務: {}", task.getContent());// 實際業務邏輯:取消訂單、釋放庫存等// 模擬處理時間try {Thread.sleep(1000);} catch (InterruptedException ignored) {}}// 示例:發送提醒郵件private void sendReminderEmail(DelayTask task) {log.info("發送提醒郵件: {}", task.getContent());// 實際業務邏輯:調用郵件服務發送郵件}// 示例:執行定時任務private void executeScheduledTask(DelayTask task) {log.info("執行定時任務: {}", task.getContent());// 實際業務邏輯:執行定時任務}// 錯誤處理private void handleProcessingError(DelayTask task) {log.error("任務處理失敗,加入死信隊列: {}", task.getTaskId());// 可以將失敗任務移到死信隊列redisTemplate.opsForList().rightPush("delay:dead-letter", task);}
}

步驟七:創建測試Controller(DelayQueueController.java)

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/delay")
public class DelayQueueController {@Autowiredprivate DelayQueueService delayQueueService;/*** 添加延遲任務* @param type 任務類型 (1-訂單超時, 2-郵件提醒, 3-定時任務)* @param seconds 延遲秒數* @param content 任務內容*/@PostMapping("/add")public String addDelayTask(@RequestParam("type") int type,@RequestParam("seconds") long seconds,@RequestParam("content") String content) {// 創建任務IDString taskId = "TASK-" + System.currentTimeMillis();// 轉換任務類型DelayTask.TaskType taskType;switch (type) {case 1: taskType = DelayTask.TaskType.ORDER_TIMEOUT; break;case 2: taskType = DelayTask.TaskType.EMAIL_REMINDER; break;case 3: taskType = DelayTask.TaskType.TASK_EXECUTION; break;default: throw new IllegalArgumentException("無效的任務類型");}// 創建任務DelayTask task = new DelayTask(taskType, taskId, content, System.currentTimeMillis(), 0);// 添加任務delayQueueService.addTask(task, seconds);return "任務添加成功! ID: " + taskId;}/*** 查看隊列狀態*/@GetMapping("/status")public String queueStatus() {long size = delayQueueService.getQueueSize();return "當前延遲隊列任務數量: " + size;}
}

啟動類(DelayQueueApplication.java)

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication
@EnableScheduling // 啟用定時任務
@EnableAsync     // 啟用異步方法
public class DelayQueueApplication {public static void main(String[] args) {SpringApplication.run(DelayQueueApplication.class, args);}
}

方案優勢與注意事項

優勢

  1. 高性能:利用Redis內存操作和ZSet有序特性

  2. 低延遲:定時任務掃描保證任務及時處理

  3. 高可靠:任務處理失敗可進入死信隊列

  4. 可擴展:線程池支持并行處理多個任務

  5. 靈活配置:支持批量處理大小、掃描間隔等參數配置

注意事項

  1. 任務冪等性:確保任務可重復處理而不產生副作用

  2. 任務超時處理:長時間任務需考慮超時機制

  3. Redis持久化:根據業務需求配置RDB或AOF

  4. 分布式環境:多實例部署時需考慮任務競爭問題

  5. 監控告警:添加隊列積壓監控和任務失敗告警

擴展建議

  1. 添加管理界面

    • 查看隊列中的任務

    • 手動重試失敗任務

    • 統計任務處理成功率

  2. 分布式鎖優化

    // 在scanExpiredTasks方法中
    String lockKey = "delay_queue_lock";
    Boolean lockAcquired = redisTemplate.opsForValue().setIfAbsent(lockKey, "locked", 30, TimeUnit.SECONDS);if (lockAcquired != null && lockAcquired) {try {// 執行掃描任務邏輯} finally {redisTemplate.delete(lockKey);}
    }

  3. 任務優先級支持

    // 在添加任務時,可將優先級加入score計算
    double score = executeTime + (priority * 0.001);
  4. 延遲時間精確控制

    • 使用Redisson的DelayedQueue組件

    • 或使用Redis的Keyspace通知功能

這個實現方案提供了一個完整、可擴展的延遲隊列系統,適用于訂單超時處理、定時提醒、延遲任務執行等多種業務場景。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/90062.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/90062.shtml
英文地址,請注明出處:http://en.pswp.cn/web/90062.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

MCP vs 傳統集成方案:REST API、GraphQL、gRPC的終極對比

MCP vs 傳統集成方案&#xff1a;REST API、GraphQL、gRPC的終極對比 &#x1f31f; Hello&#xff0c;我是摘星&#xff01; &#x1f308; 在彩虹般絢爛的技術棧中&#xff0c;我是那個永不停歇的色彩收集者。 &#x1f98b; 每一個優化都是我培育的花朵&#xff0c;每一個特…

SQL語句中鎖的使用與優化

一、鎖機制簡介1.定義在數據庫中&#xff0c;除了傳統的計算資源&#xff08;如CPU、RAM、I/O等&#xff09;的爭用以外&#xff0c;數據也是一種供需要用戶共享的資源。如何保證數據并發訪問的一致性、有效性是所有數據庫必須解決的一個問題&#xff0c;鎖沖突也是影響數據庫并…

Linux筆記1——簡介安裝

操作系統給用戶一個操作界面&#xff0c;用戶通過操作界面使用系統資源Linux內核管理控制硬件&#xff0c;和硬件打交道SCSI&#xff08;盤&#xff09;sd**;第一個*表示磁盤順序&#xff0c;第二個*表示分區。例如&#xff1a;sda\sdb\sdc,sda1,sda2NVMe&#xff08;盤&#x…

GoLand 部署第一個項目

前言&#xff1a;Go環境部署分為兩種模式&#xff0c;一種是基于GOPATH部署&#xff08;老版本&#xff09;&#xff0c;另一種是基于Module部署&#xff08;新版本v1.11開始&#xff09;。GOPATH&#xff1a;需要配置GOPATH路徑&#xff0c;將GOPATH目錄視為工作目錄&#xff…

Mosaic數據增強介紹

1. 核心概念與目標Mosaic 是一種在計算機視覺&#xff08;尤其是目標檢測任務&#xff09;中非常流行且強大的數據增強技術。它最早由 Ultralytics 的 Alexey Bochkovskiy 在 YOLOv4 中提出并推廣&#xff0c;后來被廣泛應用于 YOLOv5, YOLOv7, YOLOv8 等模型以及其他目標檢測框…

LINUX 722 邏輯卷快照

邏輯卷快照 lvcreate -L 128M -s -n lv1-snap /dev/vg1/lv1 lvs lvscan mount -o ro /dev/vg1/lv1 /mmt/lv1-snap dmsetup ls --tree 測試 lvs /dev/vg1/lv1-snap dd if/dev/zero of/uc1/test bs1M count40 lvs /dev/vg1/lv1-snap 問題 [rootweb ~]# cd /mnt [rootweb mnt]# m…

Springboot+vue個人健康管理系統的設計與實現

文章目錄前言詳細視頻演示具體實現截圖后端框架SpringBoot前端框架Vue持久層框架MyBaits成功系統案例&#xff1a;代碼參考數據庫源碼獲取前言 博主介紹:CSDN特邀作者、985高校計算機專業畢業、現任某互聯網大廠高級全棧開發工程師、Gitee/掘金/華為云/阿里云/GitHub等平臺持續…

數據結構 --棧和隊鏈

一.棧的概念一種特殊的線性表&#xff0c;只能從固定的一端插入和刪除元素。棧中元素遵循先進后出的原則。二.模擬實現public class MyStack {public int size;public int[] array;public MyStack(){array new int[10];}private void grow(){array Arrays.copyOf(array,array…

文檔處理控件TX Text Control系列教程:使用 C# .NET 將二維碼添加到 PDF 文檔

PDF 文檔通常是合同、發票、證書和報告的最終格式。盡管它們在設計上是靜態的&#xff0c;但用戶現在希望能夠與它們交互、驗證信息并直接從這些文件訪問數字服務。這時&#xff0c;二維碼就變得至關重要。 PDF 文檔中的二維碼將印刷或數字內容與動態在線體驗連接起來。用戶只需…

Google Chrome 谷歌瀏覽器全部版本集合

Google Chrome 谷歌瀏覽器全部版本集合 Collection of all software versions of Google Chrome. 項目介紹 本項目為Google Chrome谷歌瀏覽器的全部版本集合&#xff0c;方便大家下載舊版本使用。 因為Gitee項目限制倉庫1G大小&#xff0c;所以許多谷歌瀏覽器版本無法上傳。…

論文略讀:Towards Safer Large Language Models through Machine Unlearning

ACL 2024大型語言模型&#xff08;LLMs&#xff09;的迅猛發展展現了其在多個領域的巨大潛力&#xff0c;這主要得益于其廣泛的預訓練知識和出色的泛化能力。然而&#xff0c;當面對問題性提示&#xff08;problematic prompts&#xff09;時&#xff0c;LLMs 仍然容易生成有害…

深度學習 ---參數初始化以及損失函數

深度學習 —參數初始化以及損失函數 文章目錄深度學習 ---參數初始化以及損失函數一&#xff0c;參數初始化1.1 固定值初始化1.1.1 全0初始化1.1.2 全1初始化1.3 任意常數初始化1.2 隨機初始化一&#xff0c;參數初始化 神經網絡的參數初始化是訓練深度學習模型的關鍵步驟之一…

JS--M端事件

移動端&#xff08;Mobile 端&#xff0c;簡稱 M 端&#xff09;開發中&#xff0c;由于設備特性&#xff08;觸摸屏、手勢操作等&#xff09;&#xff0c;需要處理一些與桌面端不同的事件。這些事件主要針對觸摸交互、手勢識別等場景 一、觸摸事件&#xff08;Touch Events&am…

Linux網絡編程-tcp

tcp、udp對比&#xff1a;UDP1. 特點無連接&#xff1a;無需建立連接即可發送數據。不可靠&#xff1a;不保證數據順序或完整性。低延遲&#xff1a;適合實時性要求高的場景。2. 應用場景視頻/音頻流傳輸&#xff08;如直播&#xff09;。DNS 查詢、在線游戲。TCP1. 特點面向連…

記一次flink資源使用優化

一.現狀分析 現有任務的資源配置如下&#xff0c;根據ui監控中Garbage Collection可以發現&#xff0c;此任務頻繁的發生GC&#xff0c;且老年代GC時間較久二.整體memory使用分析如下Framework Heap&#xff08;框架堆內存&#xff09;用于Flink框架自身的堆內存&#xff08;如…

Vue底層換成啥了?如何更新DOM的?

摘要&#xff1a;之前的vue是使用虛擬 DOM的&#xff0c;但是Vue 3.6 帶來了一個意義重大的更新&#xff1a; Vapor Mode 渲染模式。Vue 渲染策略的演進&#xff1a; Vue 1.x&#xff1a; 基于模板渲染策略&#xff0c;直接將模板轉換為DOM元素&#xff0c;并為每個DOM元素創建…

0722 數據結構順序表

Part 1.順序表的代碼一.順序表的內存申請head.h: typedef int datatype;typedef struct sqlist {//數據元素datatype data[MAXSIZE];//順序表長度int len;}*sqlist; //*sqlist的作用: //sqlist:struct Sqlist * sqlist create();head.c: sqlist create() {sqlist list (sqlist)…

為何在 Vue 的 v-model 指令中不能使用可選鏈(Optional Chaining)?

Vue 的 v-model 是實現組件與數據雙向綁定的核心指令之一&#xff0c;它本質上是一個語法糖&#xff0c;用于簡化對表單元素和組件 props 的同步更新。然而&#xff0c;在 Vue 3&#xff08;以及 Vue 2 的某些模式下&#xff09;&#xff0c;開發者嘗試在 v-model 中使用 JavaS…

基于單片機智能藥盒/智能藥箱/定時吃藥系統

傳送門 &#x1f449;&#x1f449;&#x1f449;&#x1f449;其他作品題目速選一覽表 &#x1f449;&#x1f449;&#x1f449;&#x1f449;其他作品題目功能速覽 概述 本設計實現了一種基于單片機的智能藥盒&#xff0c;系統以微控制器&#xff08;如STM32&#xff…

(25)python+playwright自動化處理單選和多選按鈕-中

1.簡介上一篇中講解和介紹的單選框有點多&#xff0c;而且由于時間的關系&#xff0c;決定今天講解和分享復選框的相關知識。2.什么是單選框、復選框&#xff1f;單選按鈕一般叫raido button&#xff0c;就像我們在電子版的單選答題過程一樣&#xff0c;單選只能點擊一次&#…