系統設計
架構圖
+----------------+ +-----------------+ +----------------+ | | | | | | | 生產者 |------>| Redis ZSet |------>| 定時任務消費者 | | (添加延遲任務) | | (延遲隊列存儲) | | (掃描并處理任務)| | | | | | | +----------------+ +-----------------+ +----------------+↑ || ↓| +---------------------++-----------------------------------| 任務處理器 || (執行具體業務邏輯) |+---------------------+
核心流程
生產者將任務添加到Redis ZSet中,score為任務執行時間戳
定時任務定期掃描ZSet,找出score小于當前時間的任務
消費者線程池處理到期的任務
任務處理完成后從ZSet中移除
實現步驟
步驟一:添加依賴(pom.xml)
<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Redis集成 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!-- 定時任務 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-quartz</artifactId></dependency><!-- JSON處理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!-- Lombok簡化代碼 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>
</dependencies>
步驟二:配置Redis(application.yml)
spring:redis:host: localhostport: 6379password: database: 0lettuce:pool:max-active: 20max-idle: 10min-idle: 2max-wait: 10000ms# 自定義延遲隊列配置
delay:queue:key: "delay_queue" # Redis ZSet鍵名batch-size: 10 # 每次處理任務數量interval: 5000 # 定時任務執行間隔(ms)thread-pool-size: 5 # 消費者線程池大小
步驟三:創建任務模型(DelayTask.java)
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class DelayTask {/*** 任務類型枚舉*/public enum TaskType {ORDER_TIMEOUT, // 訂單超時處理EMAIL_REMINDER, // 郵件提醒TASK_EXECUTION // 定時任務執行}private TaskType type; // 任務類型private String taskId; // 任務唯一IDprivate String content; // 任務內容private long createTime; // 任務創建時間private long executeTime; // 任務執行時間// 重寫toString方法用于序列化@Overridepublic String toString() {return "DelayTask{" +"type=" + type +", taskId='" + taskId + '\'' +", content='" + content + '\'' +", createTime=" + createTime +", executeTime=" + executeTime +'}';}
}
步驟四:創建Redis配置類(RedisConfig.java)
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);// 使用Jackson序列化GenericJackson2JsonRedisSerializer jacksonSerializer = new GenericJackson2JsonRedisSerializer();// Key序列化template.setKeySerializer(new StringRedisSerializer());// Value序列化template.setValueSerializer(jacksonSerializer);// Hash Key序列化template.setHashKeySerializer(new StringRedisSerializer());// Hash Value序列化template.setHashValueSerializer(jacksonSerializer);template.afterPropertiesSet();return template;}
}
步驟五:創建線程池配置類(ThreadPoolConfig.java)
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;@Configuration
public class ThreadPoolConfig {@Value("${delay.queue.thread-pool-size:5}")private int threadPoolSize;@Bean("taskExecutor")public Executor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 核心線程數executor.setCorePoolSize(threadPoolSize);// 最大線程數executor.setMaxPoolSize(threadPoolSize * 2);// 隊列大小executor.setQueueCapacity(100);// 線程名前綴executor.setThreadNamePrefix("delay-task-");// 拒絕策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 初始化executor.initialize();return executor;}
}
步驟六:創建延遲隊列服務(DelayQueueService.java)
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Set;
import java.util.concurrent.Executor;@Slf4j
@Service
public class DelayQueueService {@Value("${delay.queue.key}")private String delayQueueKey;@Value("${delay.queue.batch-size}")private int batchSize;@Resourceprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate Executor taskExecutor;/*** 添加延遲任務* @param task 任務對象* @param delaySeconds 延遲秒數*/public void addTask(DelayTask task, long delaySeconds) {long executeTime = System.currentTimeMillis() + (delaySeconds * 1000);task.setExecuteTime(executeTime);// 添加到Redis ZSetredisTemplate.opsForZSet().add(delayQueueKey, task, executeTime);log.info("添加延遲任務成功, 任務ID: {}, 執行時間: {}", task.getTaskId(), executeTime);}/*** 定時掃描任務(每5秒執行一次)*/@Scheduled(fixedRateString = "${delay.queue.interval}")public void scanExpiredTasks() {long now = System.currentTimeMillis();log.debug("開始掃描延遲隊列, 當前時間: {}", now);// 獲取當前時間之前的所有任務Set<ZSetOperations.TypedTuple<Object>> tasks = redisTemplate.opsForZSet().rangeByScoreWithScores(delayQueueKey, 0, now, 0, batchSize);if (tasks == null || tasks.isEmpty()) {log.debug("未找到待處理任務");return;}log.info("發現 {} 個待處理任務", tasks.size());for (ZSetOperations.TypedTuple<Object> tuple : tasks) {Object taskObj = tuple.getValue();if (taskObj instanceof DelayTask) {DelayTask task = (DelayTask) taskObj;// 使用線程池異步處理任務taskExecutor.execute(() -> processTask(task));}}}/*** 處理任務* @param task 延遲任務*/@Asyncpublic void processTask(DelayTask task) {try {log.info("開始處理任務: {}", task.getTaskId());// 根據任務類型執行不同邏輯switch (task.getType()) {case ORDER_TIMEOUT:handleOrderTimeout(task);break;case EMAIL_REMINDER:sendReminderEmail(task);break;case TASK_EXECUTION:executeScheduledTask(task);break;default:log.warn("未知任務類型: {}", task.getType());}// 處理完成后從隊列中移除redisTemplate.opsForZSet().remove(delayQueueKey, task);log.info("任務處理完成并移除: {}", task.getTaskId());} catch (Exception e) {log.error("任務處理失敗: {}", task.getTaskId(), e);handleProcessingError(task);}}// 示例:訂單超時處理private void handleOrderTimeout(DelayTask task) {log.info("處理訂單超時任務: {}", task.getContent());// 實際業務邏輯:取消訂單、釋放庫存等// 模擬處理時間try {Thread.sleep(1000);} catch (InterruptedException ignored) {}}// 示例:發送提醒郵件private void sendReminderEmail(DelayTask task) {log.info("發送提醒郵件: {}", task.getContent());// 實際業務邏輯:調用郵件服務發送郵件}// 示例:執行定時任務private void executeScheduledTask(DelayTask task) {log.info("執行定時任務: {}", task.getContent());// 實際業務邏輯:執行定時任務}// 錯誤處理private void handleProcessingError(DelayTask task) {log.error("任務處理失敗,加入死信隊列: {}", task.getTaskId());// 可以將失敗任務移到死信隊列redisTemplate.opsForList().rightPush("delay:dead-letter", task);}
}
步驟七:創建測試Controller(DelayQueueController.java)
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/delay")
public class DelayQueueController {@Autowiredprivate DelayQueueService delayQueueService;/*** 添加延遲任務* @param type 任務類型 (1-訂單超時, 2-郵件提醒, 3-定時任務)* @param seconds 延遲秒數* @param content 任務內容*/@PostMapping("/add")public String addDelayTask(@RequestParam("type") int type,@RequestParam("seconds") long seconds,@RequestParam("content") String content) {// 創建任務IDString taskId = "TASK-" + System.currentTimeMillis();// 轉換任務類型DelayTask.TaskType taskType;switch (type) {case 1: taskType = DelayTask.TaskType.ORDER_TIMEOUT; break;case 2: taskType = DelayTask.TaskType.EMAIL_REMINDER; break;case 3: taskType = DelayTask.TaskType.TASK_EXECUTION; break;default: throw new IllegalArgumentException("無效的任務類型");}// 創建任務DelayTask task = new DelayTask(taskType, taskId, content, System.currentTimeMillis(), 0);// 添加任務delayQueueService.addTask(task, seconds);return "任務添加成功! ID: " + taskId;}/*** 查看隊列狀態*/@GetMapping("/status")public String queueStatus() {long size = delayQueueService.getQueueSize();return "當前延遲隊列任務數量: " + size;}
}
啟動類(DelayQueueApplication.java)
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication
@EnableScheduling // 啟用定時任務
@EnableAsync // 啟用異步方法
public class DelayQueueApplication {public static void main(String[] args) {SpringApplication.run(DelayQueueApplication.class, args);}
}
方案優勢與注意事項
優勢
高性能:利用Redis內存操作和ZSet有序特性
低延遲:定時任務掃描保證任務及時處理
高可靠:任務處理失敗可進入死信隊列
可擴展:線程池支持并行處理多個任務
靈活配置:支持批量處理大小、掃描間隔等參數配置
注意事項
任務冪等性:確保任務可重復處理而不產生副作用
任務超時處理:長時間任務需考慮超時機制
Redis持久化:根據業務需求配置RDB或AOF
分布式環境:多實例部署時需考慮任務競爭問題
監控告警:添加隊列積壓監控和任務失敗告警
擴展建議
添加管理界面:
查看隊列中的任務
手動重試失敗任務
統計任務處理成功率
分布式鎖優化:
// 在scanExpiredTasks方法中 String lockKey = "delay_queue_lock"; Boolean lockAcquired = redisTemplate.opsForValue().setIfAbsent(lockKey, "locked", 30, TimeUnit.SECONDS);if (lockAcquired != null && lockAcquired) {try {// 執行掃描任務邏輯} finally {redisTemplate.delete(lockKey);} }
任務優先級支持:
// 在添加任務時,可將優先級加入score計算 double score = executeTime + (priority * 0.001);
延遲時間精確控制:
使用Redisson的DelayedQueue組件
或使用Redis的Keyspace通知功能
這個實現方案提供了一個完整、可擴展的延遲隊列系統,適用于訂單超時處理、定時提醒、延遲任務執行等多種業務場景。