延時隊列是一種特殊的消息隊列,它允許消息在指定的時間后被消費。在微服務架構、電商系統和任務調度場景中,延時隊列扮演著關鍵角色。例如,訂單超時自動取消、定時提醒、延時支付等都依賴延時隊列實現。
Redis作為高性能的內存數據庫,具備原子操作、數據結構豐富和簡單易用的特性,本文將介紹基于Redis實現分布式延時隊列的四種方式。
1. 基于Sorted Set的延時隊列
原理
利用Redis的Sorted Set(有序集合),將消息ID作為member,執行時間戳作為score進行存儲。通過ZRANGEBYSCORE
命令可以獲取到達執行時間的任務。
代碼實現
public class RedisZSetDelayQueue {private final StringRedisTemplate redisTemplate;private final String queueKey = "delay_queue:tasks";public RedisZSetDelayQueue(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;}/*** 添加延時任務* @param taskId 任務ID* @param taskInfo 任務信息(JSON字符串)* @param delayTime 延遲時間(秒)*/public void addTask(String taskId, String taskInfo, long delayTime) {// 計算執行時間long executeTime = System.currentTimeMillis() + delayTime * 1000;// 存儲任務詳情redisTemplate.opsForHash().put("delay_queue:details", taskId, taskInfo);// 添加到延時隊列redisTemplate.opsForZSet().add(queueKey, taskId, executeTime);System.out.println("Task added: " + taskId + ", will execute at: " + executeTime);}/*** 輪詢獲取到期任務*/public List<String> pollTasks() {long now = System.currentTimeMillis();// 獲取當前時間之前的任務Set<String> taskIds = redisTemplate.opsForZSet().rangeByScore(queueKey, 0, now);if (taskIds == null || taskIds.isEmpty()) {return Collections.emptyList();}// 獲取任務詳情List<String> tasks = new ArrayList<>();for (String taskId : taskIds) {String taskInfo = (String) redisTemplate.opsForHash().get("delay_queue:details", taskId);if (taskInfo != null) {tasks.add(taskInfo);// 從集合和詳情中移除任務redisTemplate.opsForZSet().remove(queueKey, taskId);redisTemplate.opsForHash().delete("delay_queue:details", taskId);}}return tasks;}// 定時任務示例public void startTaskProcessor() {ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {try {List<String> tasks = pollTasks();for (String task : tasks) {processTask(task);}} catch (Exception e) {e.printStackTrace();}}, 0, 1, TimeUnit.SECONDS);}private void processTask(String taskInfo) {System.out.println("Processing task: " + taskInfo);// 實際任務處理邏輯}
}
優缺點
優點
- 實現簡單,易于理解
- 任務按執行時間自動排序
- 支持精確的時間控制
缺點
- 需要輪詢獲取到期任務,消耗CPU資源
- 大量任務情況下,
ZRANGEBYSCORE
操作可能影響性能 - 沒有消費確認機制,需要額外實現
2. 基于List + 定時輪詢的延時隊列
原理
這種方式使用多個List作為存儲容器,按延遲時間的不同將任務分配到不同的隊列中。通過定時輪詢各個隊列,將到期任務移動到一個立即執行隊列。
代碼實現
public class RedisListDelayQueue {private final StringRedisTemplate redisTemplate;private final String readyQueueKey = "delay_queue:ready"; // 待處理隊列private final Map<Integer, String> delayQueueKeys; // 延遲隊列,按延時時間分級public RedisListDelayQueue(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;// 初始化不同延遲級別的隊列delayQueueKeys = new HashMap<>();delayQueueKeys.put(5, "delay_queue:delay_5s"); // 5秒delayQueueKeys.put(60, "delay_queue:delay_1m"); // 1分鐘delayQueueKeys.put(300, "delay_queue:delay_5m"); // 5分鐘delayQueueKeys.put(1800, "delay_queue:delay_30m"); // 30分鐘}/*** 添加延時任務*/public void addTask(String taskInfo, int delaySeconds) {// 選擇合適的延遲隊列String queueKey = selectDelayQueue(delaySeconds);// 任務元數據,包含任務信息和執行時間long executeTime = System.currentTimeMillis() + delaySeconds * 1000;String taskData = executeTime + ":" + taskInfo;// 添加到延遲隊列redisTemplate.opsForList().rightPush(queueKey, taskData);System.out.println("Task added to " + queueKey + ": " + taskData);}/*** 選擇合適的延遲隊列*/private String selectDelayQueue(int delaySeconds) {// 找到最接近的延遲級別int closestDelay = delayQueueKeys.keySet().stream().filter(delay -> delay >= delaySeconds).min(Integer::compareTo).orElse(Collections.max(delayQueueKeys.keySet()));return delayQueueKeys.get(closestDelay);}/*** 移動到期任務到待處理隊列*/public void moveTasksToReadyQueue() {long now = System.currentTimeMillis();// 遍歷所有延遲隊列for (String queueKey : delayQueueKeys.values()) {boolean hasMoreTasks = true;while (hasMoreTasks) {// 查看隊列頭部任務String taskData = redisTemplate.opsForList().index(queueKey, 0);if (taskData == null) {hasMoreTasks = false;continue;}// 解析任務執行時間long executeTime = Long.parseLong(taskData.split(":", 2)[0]);// 檢查是否到期if (executeTime <= now) {// 通過LPOP原子性地移除隊列頭部任務String task = redisTemplate.opsForList().leftPop(queueKey);// 任務可能被其他進程處理,再次檢查if (task != null) {// 提取任務信息并添加到待處理隊列String taskInfo = task.split(":", 2)[1];redisTemplate.opsForList().rightPush(readyQueueKey, taskInfo);System.out.println("Task moved to ready queue: " + taskInfo);}} else {// 隊列頭部任務未到期,無需檢查后面的任務hasMoreTasks = false;}}}}/*** 獲取待處理任務*/public String getReadyTask() {return redisTemplate.opsForList().leftPop(readyQueueKey);}/*** 啟動任務處理器*/public void startTaskProcessors() {// 定時移動到期任務ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);// 移動任務線程scheduler.scheduleAtFixedRate(this::moveTasksToReadyQueue, 0, 1, TimeUnit.SECONDS);// 處理任務線程scheduler.scheduleAtFixedRate(() -> {String task = getReadyTask();if (task != null) {processTask(task);}}, 0, 100, TimeUnit.MILLISECONDS);}private void processTask(String taskInfo) {System.out.println("Processing task: " + taskInfo);// 實際任務處理邏輯}
}
優缺點
優點
- 分級隊列設計,降低單隊列壓力
- 相比Sorted Set占用內存少
- 支持隊列監控和任務優先級
缺點
- 延遲時間精度受輪詢頻率影響
- 實現復雜度高
- 需要維護多個隊列
- 時間判斷和隊列操作非原子性,需特別處理并發問題
3. 基于發布/訂閱(Pub/Sub)的延時隊列
原理
結合Redis發布/訂閱功能與本地時間輪算法,實現延遲任務的分發和處理。任務信息存儲在Redis中,而時間輪負責任務的調度和發布。
代碼實現
public class RedisPubSubDelayQueue {private final StringRedisTemplate redisTemplate;private final String TASK_TOPIC = "delay_queue:task_channel";private final String TASK_HASH = "delay_queue:tasks";private final HashedWheelTimer timer;public RedisPubSubDelayQueue(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;// 初始化時間輪,刻度100ms,輪子大小512this.timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512);// 啟動消息訂閱subscribeTaskChannel();}/*** 添加延時任務*/public void addTask(String taskId, String taskInfo, long delaySeconds) {// 存儲任務信息到RedisredisTemplate.opsForHash().put(TASK_HASH, taskId, taskInfo);// 添加到時間輪timer.newTimeout(timeout -> {// 發布任務就緒消息redisTemplate.convertAndSend(TASK_TOPIC, taskId);}, delaySeconds, TimeUnit.SECONDS);System.out.println("Task scheduled: " + taskId + ", delay: " + delaySeconds + "s");}/*** 訂閱任務通道*/private void subscribeTaskChannel() {redisTemplate.getConnectionFactory().getConnection().subscribe((message, pattern) -> {String taskId = new String(message.getBody());// 獲取任務信息String taskInfo = (String) redisTemplate.opsForHash().get(TASK_HASH, taskId);if (taskInfo != null) {// 處理任務processTask(taskId, taskInfo);// 刪除任務redisTemplate.opsForHash().delete(TASK_HASH, taskId);}}, TASK_TOPIC.getBytes());}private void processTask(String taskId, String taskInfo) {System.out.println("Processing task: " + taskId + " - " + taskInfo);// 實際任務處理邏輯}// 模擬HashedWheelTimer類public static class HashedWheelTimer {private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);private final long tickDuration;private final TimeUnit unit;public HashedWheelTimer(long tickDuration, TimeUnit unit, int wheelSize) {this.tickDuration = tickDuration;this.unit = unit;}public void newTimeout(TimerTask task, long delay, TimeUnit timeUnit) {long delayMillis = timeUnit.toMillis(delay);scheduler.schedule(() -> task.run(null), delayMillis, TimeUnit.MILLISECONDS);}public interface TimerTask {void run(Timeout timeout);}public interface Timeout {}}
}
優缺點
優點:
- 即時觸發,無需輪詢
- 高效的時間輪算法
- 可以跨應用訂閱任務
- 分離任務調度和執行,降低耦合
缺點:
- 依賴本地時間輪,非純Redis實現
- Pub/Sub模式無消息持久化,可能丟失消息
- 服務重啟時需要重建時間輪
- 訂閱者需要保持連接
4. 基于Redis Stream的延時隊列
原理
Redis 5.0引入的Stream是一個強大的數據結構,專為消息隊列設計。結合Stream的消費組和確認機制,可以構建可靠的延時隊列。
代碼實現
public class RedisStreamDelayQueue {private final StringRedisTemplate redisTemplate;private final String delayQueueKey = "delay_queue:stream";private final String consumerGroup = "delay_queue_consumers";private final String consumerId = UUID.randomUUID().toString();public RedisStreamDelayQueue(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;// 創建消費者組try {redisTemplate.execute((RedisCallback<String>) connection -> {connection.streamCommands().xGroupCreate(delayQueueKey.getBytes(), consumerGroup, ReadOffset.from("0"), true);return "OK";});} catch (Exception e) {// 消費者組可能已存在System.out.println("Consumer group may already exist: " + e.getMessage());}}/*** 添加延時任務*/public void addTask(String taskInfo, long delaySeconds) {long executeTime = System.currentTimeMillis() + delaySeconds * 1000;Map<String, Object> task = new HashMap<>();task.put("executeTime", String.valueOf(executeTime));task.put("taskInfo", taskInfo);redisTemplate.opsForStream().add(delayQueueKey, task);System.out.println("Task added: " + taskInfo + ", execute at: " + executeTime);}/*** 獲取待執行的任務*/public List<String> pollTasks() {long now = System.currentTimeMillis();List<String> readyTasks = new ArrayList<>();// 讀取尚未處理的消息List<MapRecord<String, Object, Object>> records = redisTemplate.execute((RedisCallback<List<MapRecord<String, Object, Object>>>) connection -> {return connection.streamCommands().xReadGroup(consumerGroup.getBytes(),consumerId.getBytes(),StreamReadOptions.empty().count(10),StreamOffset.create(delayQueueKey.getBytes(), ReadOffset.from(">")));});if (records != null) {for (MapRecord<String, Object, Object> record : records) {String messageId = record.getId().getValue();Map<Object, Object> value = record.getValue();long executeTime = Long.parseLong((String) value.get("executeTime"));String taskInfo = (String) value.get("taskInfo");// 檢查任務是否到期if (executeTime <= now) {readyTasks.add(taskInfo);// 確認消息已處理redisTemplate.execute((RedisCallback<String>) connection -> {connection.streamCommands().xAck(delayQueueKey.getBytes(),consumerGroup.getBytes(),messageId.getBytes());return "OK";});// 可選:從流中刪除消息redisTemplate.opsForStream().delete(delayQueueKey, messageId);} else {// 任務未到期,放回隊列redisTemplate.execute((RedisCallback<String>) connection -> {connection.streamCommands().xAck(delayQueueKey.getBytes(),consumerGroup.getBytes(),messageId.getBytes());return "OK";});// 重新添加任務(可選:使用延遲重新入隊策略)Map<String, Object> newTask = new HashMap<>();newTask.put("executeTime", String.valueOf(executeTime));newTask.put("taskInfo", taskInfo);redisTemplate.opsForStream().add(delayQueueKey, newTask);}}}return readyTasks;}/*** 啟動任務處理器*/public void startTaskProcessor() {ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(() -> {try {List<String> tasks = pollTasks();for (String task : tasks) {processTask(task);}} catch (Exception e) {e.printStackTrace();}}, 0, 1, TimeUnit.SECONDS);}private void processTask(String taskInfo) {System.out.println("Processing task: " + taskInfo);// 實際任務處理邏輯}
}
優缺點
優點:
- 支持消費者組和消息確認,提供可靠的消息處理
- 內置消息持久化機制
- 支持多消費者并行處理
- 消息ID包含時間戳,便于排序
缺點:
- 要求Redis 5.0+版本
- 實現相對復雜
- 仍需輪詢獲取到期任務
- 對未到期任務的處理相對繁瑣
性能對比與選型建議
實現方式 | 性能 | 可靠性 | 實現復雜度 | 內存占用 | 適用場景 |
---|---|---|---|---|---|
Sorted Set | ★★★★☆ | ★★★☆☆ | 低 | 中 | 任務量適中,需要精確調度 |
List + 輪詢 | ★★★★★ | ★★★☆☆ | 中 | 低 | 高并發,延時精度要求不高 |
Pub/Sub + 時間輪 | ★★★★★ | ★★☆☆☆ | 高 | 低 | 實時性要求高,可容忍服務重啟丟失 |
Stream | ★★★☆☆ | ★★★★★ | 高 | 中 | 可靠性要求高,需要消息確認 |
總結
在實際應用中,可根據系統規模、性能需求、可靠性要求和實現復雜度等因素進行選擇,也可以組合多種方式打造更符合業務需求的延時隊列解決方案。無論選擇哪種實現,都應關注可靠性、性能和監控等方面,確保延時隊列在生產環境中穩定運行。