介紹
具有隊列的特性,再給它附加一個延遲消費隊列消息的功能,也就是說可以指定隊列中的消息在哪個時間點被消費。
使用場景
延遲隊列在項目中的應用還是比較多的,尤其像電商類平臺:
訂單成功后,在30分鐘內沒有支付,自動取消訂單
外賣平臺發送訂餐通知,下單成功后60s給用戶推送短信。
如果訂單一直處于某一個未完結狀態時,及時處理關單,并退還庫存
淘寶新建商戶一個月內還沒上傳商品信息,將凍結商鋪等
該介紹來自其他文章
方案
下面的例子沒有進行封裝,所以代碼僅供參考
Redis過期事件
注意:
不保證在設定的過期時間立即刪除并發送通知,數據量大的時候會延遲比較大
不保證一定送達
發送即忘策略,不包含持久化
但是比如有些場景,對這個時間不是那么看重,并且有其他措施雙層保障,該實現方案是比較簡單。
redis自2.8.0之后版本提供Keyspace Notifications功能,允許客戶訂閱Pub / Sub頻道,以便以某種方式接收影響Redis數據集事件。
配置
需要修改配置啟用過期事件,比如在windows客戶端中,需要修改redis.windows.conf文件,在linux中需要修改redis.conf,修改內容是:

--?取消注釋
notify-keyspace-events?Ex--?注釋
#notify-keyspace-events?""
然后重新啟動服務器,比如windows
.\redis-server.exe .\redis.windows.conf
或者linux中使用docker-compose重新部署redis
redis:container_name:?redisimage:?redishostname:?redisrestart:?alwaysports:?-?"6379:6379"volumes:?-?$PWD/redis/redis.conf:/etc/redis.conf-?/root/common-docker-compose/redis/data:/datacommand:?/bin/bash?-c?"redis-server?/etc/redis.conf"?#啟動執行指定的redis.conf文件
然后使用客戶端訂閱事件
--?windows
.\redis-cli--?linux
docker?exec?-it?容器標識?redis-clipsubscribe?__keyevent@0__:expired
控制臺訂閱
使用StackExchange.Redis組件訂閱過期事件
var?connectionMultiplexer?=?ConnectionMultiplexer.Connect(_redisConnection);
var?db?=?connectionMultiplexer.GetDatabase(0);db.StringSet("orderno:123456",?"訂單創建",?TimeSpan.FromSeconds(10));
Console.WriteLine("開始訂閱");var?subscriber?=?connectionMultiplexer.GetSubscriber();//訂閱庫0的過期通知事件
subscriber.Subscribe("__keyevent@0__:expired",?(channel,?key)?=>
{Console.WriteLine($"key過期?channel:{channel}?key:{key}");
});Console.ReadLine();
輸出結果:
key過期 channel:keyevent@0:expired key:orderno:123456
如果啟動多個客戶端監聽,那么多個客戶端都可以收到過期事件。
WebApi中訂閱
創建RedisListenService繼承自:BackgroundService
public?class?RedisListenService?:?BackgroundService
{private?readonly?ISubscriber?_subscriber;public?RedisListenService(IServiceScopeFactory?serviceScopeFactory){using?var?scope?=?serviceScopeFactory.CreateScope();var?configuration?=?scope.ServiceProvider.GetRequiredService<IConfiguration>();var?connectionMultiplexer?=?ConnectionMultiplexer.Connect(configuration["redis"]);var?db?=?connectionMultiplexer.GetDatabase(0);_subscriber?=?connectionMultiplexer.GetSubscriber();}protected?override?Task?ExecuteAsync(CancellationToken?stoppingToken){//訂閱庫0的過期通知事件_subscriber.Subscribe("__keyevent@0__:expired",?(channel,?key)?=>{Console.WriteLine($"key過期?channel:{channel}?key:{key}");});return?Task.CompletedTask;}
}
注冊該后臺服務
services.AddHostedService<RedisListenService>();
啟用項目,給redis指定庫設置值,等過期后會接收到過期通知事件。
RabbitMq延遲隊列
版本信息 ? Rabbitmq版本:3.10.5 ?Erlang版本:24.3.4.2
要使用rabbitmq做延遲是需要安裝插件(rabbitmq_delayed_message_exchange)的
插件介紹:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
下載地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
將下載好的插件(d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez)映射到容器的plugins目錄下:
docker?run?-d?--name?myrabbit?-p?9005:15672?-p?5672:5672??-e?RABBITMQ_DEFAULT_VHOST=customer?-e?RABBITMQ_DEFAULT_USER=admin?-e?RABBITMQ_DEFAULT_PASS=123456?-v?d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez??rabbitmq:3-management-alpine
進入容器
docker?exec?-it?容器名稱/標識?bash
啟用插件
rabbitmq-plugins?enable?rabbitmq_delayed_message_exchange
查看是否啟用
rabbitmq-plugins?list
[E*]和[e*]表示啟用,然后重啟服務
rabbitmq-server?restart
然后在管理界面添加交換機都可以看到

生產消息
發送的消息類型是:x-delayed-message
[HttpGet("send/delay")]
public?string?SendDelayedMessage()
{var?factory?=?new?ConnectionFactory(){HostName?=?"localhost",//IP地址Port?=?5672,//端口號UserName?=?"admin",//用戶賬號Password?=?"123456",//用戶密碼VirtualHost?=?"customer"};using?var?connection?=?factory.CreateConnection();using?var?channel?=?connection.CreateModel();var?exchangeName?=?"delay-exchange";var?routingkey?=?"delay.delay";var?queueName?=?"delay_queueName";//設置Exchange隊列類型var?argMaps?=?new?Dictionary<string,?object>(){{"x-delayed-type",?"topic"}};//設置當前消息為延時隊列channel.ExchangeDeclare(exchange:?exchangeName,?type:?"x-delayed-message",?true,?false,?argMaps);channel.QueueDeclare(queueName,?true,?false,?false,?argMaps);channel.QueueBind(queueName,?exchangeName,?routingkey);var?time?=?1000?*?5;var?message?=?$"發送時間為?{DateTime.Now:yyyy-MM-dd?HH:mm:ss}?延時時間為:{time}";var?body?=?Encoding.UTF8.GetBytes(message);var?props?=?channel.CreateBasicProperties();//設置消息的過期時間props.Headers?=?new?Dictionary<string,?object>(){{??"x-delay",?time?}};channel.BasicPublish(exchange:?exchangeName,?routingKey:?routingkey,?basicProperties:?props,?body:?body);Console.WriteLine("成功發送消息:"?+?message);return?"success";
}
消費消息
消費消息我是弄了一個后臺任務(RabbitmqDelayedHostService)在處理
public?class?RabbitmqDelayedHostService?:?BackgroundService
{private?readonly?IModel?_channel;private?readonly?IConnection?_connection;public?RabbitmqDelayedHostService(){var?connFactory?=?new?ConnectionFactory//創建連接工廠對象{HostName?=?"localhost",//IP地址Port?=?5672,//端口號UserName?=?"admin",//用戶賬號Password?=?"123456",//用戶密碼VirtualHost?=?"customer"};_connection?=?connFactory.CreateConnection();_channel?=?_connection.CreateModel();//交換機名稱var?exchangeName?=?"exchangeDelayed";var?queueName?=?"delay_queueName";var?routingkey?=?"delay.delay";var?argMaps?=?new?Dictionary<string,?object>(){{"x-delayed-type",?"topic"}};_channel.ExchangeDeclare(exchange:?exchangeName,?type:?"x-delayed-message",?true,?false,?argMaps);_channel.QueueDeclare(queueName,?true,?false,?false,?argMaps);_channel.QueueBind(queue:?queueName,?exchange:?exchangeName,?routingKey:?routingkey);//聲明為手動確認_channel.BasicQos(0,?1,?false);}protected?override?Task?ExecuteAsync(CancellationToken?stoppingToken){var?queueName?=?"delay_queueName";var?consumer?=?new?EventingBasicConsumer(_channel);consumer.Received?+=?(model,?ea)?=>{var?message?=?Encoding.UTF8.GetString(ea.Body.ToArray());var?routingKey?=?ea.RoutingKey;Console.WriteLine($"接受到消息的時間為?{DateTime.Now:yyyy-MM-dd?HH:mm:ss},routingKey:{routingKey}?message:{message}?");//手動確認_channel.BasicAck(ea.DeliveryTag,?true);};_channel.BasicConsume(queue:?queueName,?autoAck:?false,?consumer:?consumer);return?Task.CompletedTask;}public?override?void?Dispose(){_connection.Dispose();_channel.Dispose();base.Dispose();}
}
注冊該后臺任務
services.AddHostedService<RabbitmqDelayedHostService>();
輸出結果
成功發送消息:發送時間為 2022-07-02 18:54:22 延時時間為:5000
成功發送消息:發送時間為 2022-07-02 18:54:22 延時時間為:5000
成功發送消息:發送時間為 2022-07-02 18:54:22 延時時間為:5000
成功發送消息:發送時間為 2022-07-02 18:54:23 延時時間為:5000
成功發送消息:發送時間為 2022-07-02 18:54:23 延時時間為:5000
成功發送消息:發送時間為 2022-07-02 18:54:23 延時時間為:5000
接受到消息的時間為 2022-07-02 18:54:27,routingKey:delay.delay message:發送時間為 2022-07-02 18:54:22 延時時間為:5000
接受到消息的時間為 2022-07-02 18:54:27,routingKey:delay.delay message:發送時間為 2022-07-02 18:54:22 延時時間為:5000
接受到消息的時間為 2022-07-02 18:54:27,routingKey:delay.delay message:發送時間為 2022-07-02 18:54:22 延時時間為:5000
接受到消息的時間為 2022-07-02 18:54:28,routingKey:delay.delay message:發送時間為 2022-07-02 18:54:23 延時時間為:5000
接受到消息的時間為 2022-07-02 18:54:28,routingKey:delay.delay message:發送時間為 2022-07-02 18:54:23 延時時間為:5000
接受到消息的時間為 2022-07-02 18:54:28,routingKey:delay.delay message:發送時間為 2022-07-02 18:54:23 延時時間為:5000
其他方案
Hangfire延遲隊列
BackgroundJob.Schedule(()?=>?Console.WriteLine("Delayed!"),TimeSpan.FromDays(7));
時間輪
Redisson DelayQueue
計時管理器