Redis 有序集合和集合一樣也是 string 類型元素的集合,且不允許重復的成員。
不同的是每個元素都會關聯一個 double 類型的分數。redis 正是通過分數來為集合中的成員進行從小到大的排序。
有序集合的成員是唯一的,但分數(score)卻可以重復。
集合是通過哈希表實現的,所以添加,刪除,查找的復雜度都是 O(1)。 集合中最大的成員數為 232 - 1 (4294967295, 每個集合可存儲40多億個成員)。
延遲隊列的設計思想是將隊列的延遲時間作為分數,按照這個進行排序
- 安裝依賴
Newtonsoft.Json 13.0.3
StackExchange.Redis 2.8.0
- 封裝Redis
using StackExchange.Redis;
namespace LedayQueue.RedisHelper
{public class RedisConnection{private readonly ConnectionMultiplexer _connection;public IDatabase _database;public RedisConnection(){_connection = ConnectionMultiplexer.Connect("localhost:6379");_database = _connection.GetDatabase();}public async Task AddToQueueAsync(string task, TimeSpan delay){var executionTime = DateTimeOffset.UtcNow.ToUnixTimeSeconds() + delay.TotalSeconds;await _database.SortedSetAddAsync("delayedQueue", task, executionTime);}}
}
- 封裝background service
using StackExchange.Redis;namespace LedayQueue.RedisHelper
{public class DelayedQueueProcessor : BackgroundService{private readonly RedisConnection _connection;private const string QueueKey = "delayedQueue";public DelayedQueueProcessor(RedisConnection redisConnection){_connection = redisConnection;}protected override async Task ExecuteAsync(CancellationToken stoppingToken){while (!stoppingToken.IsCancellationRequested){var now = DateTimeOffset.UtcNow.ToUnixTimeSeconds();var tasks = await _connection._database.SortedSetRangeByScoreWithScoresAsync(QueueKey, 0, now);foreach (var task in tasks){// 處理任務var taskString = task.Element.ToString();ProcessTask(taskString);// 從隊列中移除任務await _connection._database.SortedSetRemoveAsync(QueueKey, task.Element);}await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken); // 每秒檢查一次}}private void ProcessTask(string content){Console.WriteLine(content);}}
}
- 注冊
builder.Services.AddSingleton<RedisConnection>();
builder.Services.AddHostedService<DelayedQueueProcessor>();
源碼
官網