延時隊列與優先級隊列
在消息隊列的最后一篇文章中,我們再來學習兩個非常常見的隊列功能。一個是延時隊列,一個是優先級隊列。它們的應用場景非常多,也非常有意思,不同的消息隊列工具都提供了不同的實現,同樣的,Redis 在 Laravel 框架中還是通過邏輯代碼來實現類似功能的,非常值得大家來好好研究一下。
延時隊列
延時隊列,從名字就可以看出,隊列里面的消息會延時一會,也就是等一會才會被消費。這個功能非常常用,比如說最經典的就是電商中下訂單后不支付。通常,我們會設定一個時間,比如 20 分鐘內如果不支付,訂單就自動取消。這個功能就可以通過延時隊列來實現,下訂單后,馬上向延時隊列發送一條消息,并且設置延遲時間為 20 分鐘。然后等 20 分鐘之后,消費者開始消費這條消息,可以簡單的判斷一下比如訂單還是未支付狀態,就把訂單狀態改為關閉的。
其它還有很多例子,比如像是定時采集爬蟲之類的,也是這種延時隊列的常見場景。總之,這種隊列也是一種非常常見的隊列功能。我們先來看一下,在 Laravel 框架中,使用 Redis 隊列驅動是如何實現這個延時隊列功能的。
Laravel框架中使用 Redis 實現
在 Laravel 中,只需要在任務分發,也就是入隊的時候,使用一個 delay() 方法就可以了。
//?app/Console/Commands/P6.php
//?………………
public?function?handle()
{Queue6::dispatch('任務發送時間:'?.?date('Y-m-d?H:i:s'))->delay(now()->addSeconds(random_int(0,10)));return?0;
}
//?………………
這個 delay() 方法接收一個 now() 助手函數返回的 Carbon 類型的時間對象,這個對象是 Laravel 框架中的一個組件。使用代碼中的方法,就可以添加按秒延時的隊列,具體的延時時間是 0 到 10 的隨機數。now() 助手函數還有其它很多方法可以添加分鐘、小時、毫秒等等,是非常好用的一套日期時間對象工具。
任務對象里面沒什么特別的,就是打印了一下接收到的消息和處理的時間。
//?app/Jobs/Queue6.php
//?………………
public?function?handle()
{//echo?'接收到了消息:'?.$this->msg,?'?處理時間:',date('Y-m-d?H:i:s'),PHP_EOL;
}
//?………………
好了,現在我們就多運行幾次任務分派,向隊列中多添加幾條消息數據吧。
>?php?artisan?p:q6
>?php?artisan?p:q6
>?php?artisan?p:q6
然后觀察隊列消費輸出的結果。
>?php?artisan?queue:work
[2023-01-03?14:16:53][b5ee7d7c-9d79-4b26-b87f-1ef8e265000a]?Processing:?App\Jobs\Queue6
接收到了消息:任務發送時間:2023-01-03?14:16:53?處理時間:2023-01-03?14:16:53
[2023-01-03?14:16:53][b5ee7d7c-9d79-4b26-b87f-1ef8e265000a]?Processed:??App\Jobs\Queue6
[2023-01-03?14:16:59][d133609f-341e-4442-821d-256aaa8ed9a9]?Processing:?App\Jobs\Queue6
接收到了消息:任務發送時間:2023-01-03?14:16:54?處理時間:2023-01-03?14:16:59
[2023-01-03?14:16:59][d133609f-341e-4442-821d-256aaa8ed9a9]?Processed:??App\Jobs\Queue6
[2023-01-03?14:16:59][775af430-cc13-45cd-8c9b-7eb705110b48]?Processing:?App\Jobs\Queue6
接收到了消息:任務發送時間:2023-01-03?14:16:53?處理時間:2023-01-03?14:16:59
[2023-01-03?14:16:59][775af430-cc13-45cd-8c9b-7eb705110b48]?Processed:??App\Jobs\Queue6
注意看中間那一條,它的任務發送時間是 54 秒,但它是在中間被消費的,最后一條數據的任務發送時間是比它早的。這也就是說,中間這條數據的延時時間更長一些。大家也可以將具體的延時秒數添加到消息體中,然后在消費的時候打印出來,這樣看得就很清楚了。不過如果直接觀察消費者,也能看出消息都是在不同的時間段內消費的,是有延時的效果的。
這個功能是怎么實現的呢?還記得我們之前在 Redis 系列中講過的 Sorted Set 這個數據類型吧?當時我們就說過,TP 以及 Laravel 中的延時隊列都是通過有序集合來實現的。
有序集合除了數據本身外,還有一個 score 分數字段可以用于排序。聰明的你一定想到了,直接將時間戳當做 score 就可以實現按指定時間排序的功能了。同時,我們也可以先查詢小于當前時間戳分數的數據,然后只取出這一部分的數據。現在你可以再添加幾條數據,但不要開消費者。然后到 Redis 中,就會看到 laravel_database_queues:default:delayed ?這樣一個集合。
127.0.0.1:6379>?ZRANGE?laravel_database_queues:default:delayed?0?-1?withscores
1)?"{\"uuid\":\"bfbfec4b-ffb4-4259-b299-4fe18866a741\",\"displayName\":\"App\\\\Jobs\\\\Queue6\",\"job\":\"Illuminate\\\\Queue\\\\CallQueuedHandler@call\",\"maxTries\":null,\"maxExceptions\":null,\"failOnTimeout\":false,\"backoff\":null,\"timeout\":null,\"retryUntil\":null,\"data\":{\"commandName\":\"App\\\\Jobs\\\\Queue6\",\"command\":\"O:15:\\\"App\\\\Jobs\\\\Queue6\\\":11:{s:3:\\\"msg\\\";s:40:\\\"\\u4efb\\u52a1\\u53d1\\u9001\\u65f6\\u95f4\\uff1a2023-01-03?14:21:04\\\";s:3:\\\"job\\\";N;s:10:\\\"connection\\\";N;s:5:\\\"queue\\\";N;s:15:\\\"chainConnection\\\";N;s:10:\\\"chainQueue\\\";N;s:19:\\\"chainCatchCallbacks\\\";N;s:5:\\\"delay\\\";O:25:\\\"Illuminate\\\\Support\\\\Carbon\\\":3:{s:4:\\\"date\\\";s:26:\\\"2023-01-03?14:31:04.417193\\\";s:13:\\\"timezone_type\\\";i:3;s:8:\\\"timezone\\\";s:3:\\\"UTC\\\";}s:11:\\\"afterCommit\\\";N;s:10:\\\"middleware\\\";a:0:{}s:7:\\\"chained\\\";a:0:{}}\"},\"id\":\"bfbfec4b-ffb4-4259-b299-4fe18866a741\",\"attempts\":0,\"type\":\"job\",\"tags\":[],\"pushedAt\":\"1672755664.4408\"}"
2)?"1672756264"
怎么樣,是不是分數就是時間戳。這下整體延時隊列的實現就不用我多說了吧。我們使用 ZREMRANGEBYRANK 或者 ZPOPMIN 命令都可以拿到最新的數據,但是,Laravel 里面的更復雜一些。它是先把延時隊列的遷移到 laravel_database_queues:default 隊列,然后再進行普通隊列的 POP 處理。在 /vendor/laravel/framework/src/Illuminate/Queue/RedisQueue.php 中,pop() 方法第一行就是調用一下 migrate() 方法。這個方法內部會繼續調用 migrateExpiredJobs() 方法,傳遞的參數為 $queue.':delayed'
和 $queue
參數名稱為 $from
和 $to
。已經很明顯了吧,最后調用 /vendor/laravel/framework/src/Illuminate/Queue/LuaScripts.php 中的 migrateExpiredJobs() 方法,這個方法里面是一個 Lua 腳本,腳本中就是使用 zremrangebyrank 命令根據 score 順序獲取數據,接著再 rpush 到 default 隊列中。
public?static?function?migrateExpiredJobs()
{return?<<<'LUA'
--?Get?all?of?the?jobs?with?an?expired?"score"...
local?val?=?redis.call('zrangebyscore',?KEYS[1],?'-inf',?ARGV[1])--?If?we?have?values?in?the?array,?we?will?remove?them?from?the?first?queue
--?and?add?them?onto?the?destination?queue?in?chunks?of?100,?which?moves
--?all?of?the?appropriate?jobs?onto?the?destination?queue?very?safely.
if(next(val)?~=?nil)?thenredis.call('zremrangebyrank',?KEYS[1],?0,?#val?-?1)for?i?=?1,?#val,?100?doredis.call('rpush',?KEYS[2],?unpack(val,?i,?math.min(i+99,?#val)))--?Push?a?notification?for?every?job?that?was?migrated...for?j?=?i,?math.min(i+99,?#val)?doredis.call('rpush',?KEYS[3],?1)endend
endreturn?val
LUA;
}
剩下的就和普通隊列的處理流程一樣了。具體的處理過程大家可以參考我之前的 Laravel 系列中關于隊列那一篇文章的講解。當然,更好的還是大家自己去源碼里看一下,其實 default 默認隊列就是普通的 list 類型了,直接 rpush 配合 lpop 的經典隊列操作。
RabbitMQ延時隊列
這回輪到 RabbitMQ 沒有了,是的,RabbitMQ 里面沒有延時隊列的實現。額,不是說 RabbitMQ 是一個標準隊列嘛?注意,延時隊列只是一個隊列功能,和我們之前學過的持久化、確認、異常處理等功能相比,它確實是一個可有可無的功能。因此,并不是每個隊列系統都一定要實現這個功能的。而且,我們可以利用各種邏輯業務的方式來實現,比如說在 RabbitMQ 中,最方便的實現延時隊列的方式就是利用上節課我們學習過的死信隊列。
還記得死信隊列有一個條件就是超過消息的有效時間吧。利用這個有效時間,我們可以完全不寫普通消費者,讓消息全部等到有效時間后過期,然后讓死信消費者成為延時隊列消費者。
我們之前演示的是在隊列定義時設置隊列的消息過期時間,如果只使用這種形式,那么整個隊列中所有的消息過期時間都是一樣的,這個明顯不符合我們的需求。所幸,消息對象,也就是 AMQPMessage 對象的 expiration
屬性,也可以設置一個過期時間。它和隊列定義中的 x-message-ttl
一起存在的話,誰小就按誰先過期。
//?5.rq.p.php
//?………………
$channel->queue_declare('hello5',?false,?true,?false,?false,?false,?new?AMQPTable(['x-message-ttl'=>10000,?//?隊列里所有的數據?10?秒過期'x-dead-letter-exchange'=>'dead_letter',?//?死信到某個交換機'x-dead-letter-routing-key'=>'',?//?死信路由
]));//?創建消息
$msg?=?new?AMQPMessage('Hello?World!'?.?time(),['expiration'=>?3000,??//?消息?3?秒過期]);
$channel->basic_publish($msg,?'',?'hello5');?//?將消息放入隊列中
//?………………
調用生產者發送消息。
>?php?5.rq.p.php
生產者向消息隊列中發送信息:Hello?World!
只開死信消費者就可以了,不需要去消費 hello5 隊列。
>?php?5.rq.c.deadletter.php
等待死信隊列消息,或者使用?Ctrl+C?退出程序。
死信隊列接收到數據:?Hello?World!1672800558?時間:1672800561
注意看,我們生產時間和死信消費的時間正好差 3 秒。
除了最簡單的使用死信隊列之外,RabbitMQ 還有專門的延時隊列插件,這個大家可以自己去看一下哦。
優先級隊列
延時隊列還是挺有意思的吧?下面我們再來看一個優先級隊列。之前在學數據結構的時候,我們沒講過,如果是更詳細的一些數據結構教材中,直接就會有優先級隊列的實現,一般是通過大頂堆或者小頂堆的方式來實現。另外,PHP 的 SPL 擴展中也有通過大頂堆實現的優先級隊列對象 SplPriorityQueue ,有興趣的小伙伴可以自行了解一下哦。
Laravel 中的優先隊列
我們還是先來看 Laravel 實現的優先級隊列,它其實并不是一個完全的優先級隊列實現,因為它針對的其實是不同的隊列,而不是同一個隊列中給不同的消息賦予不同的優先級。
//?app/Jobs/Queue6.php
//?………………
public?function?handle()
{//for?($i?=?6;?$i?>?0;?$i--)?{$queue?=?'default';if?($i%3?==?1)?{$queue?=?'A';}?else?if?($i%3?==?2)?{$queue?=?'B';}sleep(random_int(0,?2));Queue6::dispatch('測試優先級,當前優先隊列為:'?.?$queue?.?',入隊時間:'?.?date("Y-m-d?H:i:s"))->onQueue($queue);}
}
//?………………
在這個隊列生產者中,我們使用 onQueue() 方法,其實是將消息數據放到了不同的隊列中,分別是 A、B 和默認的 default 三個隊列。每條消息在分派時都有 0 至 2 秒隨機的時間間隔。查看 Redis ,確實是不同名稱的隊列。
127.0.0.1:6379>?keys?laravel_database*
1)?"laravel_database_queues:A"
2)?"laravel_database_queues:B"
3)?"laravel_database_queues:A:notify"
4)?"laravel_database_queues:default"
5)?"laravel_database_queues:default:notify"
6)?"laravel_database_queues:B:notify"
額外補充一點, 隊列名前綴 laravel_database_
,這個東西是在 config/database.php 的 redis 配置中的。
'redis'?=>?[//?………………'options'?=>?[//?………………'prefix'?=>?env('REDIS_PREFIX',?Str::slug(env('APP_NAME',?'laravel'),?'_').'_database_'),],//?………………
],
然后,通過消費者的 --queue
參數,來指定隊列處理的優先級,注意它的順序,先寫的隊列名會優先處理。
>?php?artisan?queue:work?--queue=B,A,default
[2023-01-04?01:38:38][325c0e6d-3bdc-4ec0-84f9-57092a66753c]?Processing:?App\Jobs\Queue6
接收到了消息:測試優先級,當前優先隊列為:B,入隊時間:2023-01-04?01:38:35?處理時間:2023-01-04?01:38:38
[2023-01-04?01:38:38][325c0e6d-3bdc-4ec0-84f9-57092a66753c]?Processed:??App\Jobs\Queue6
[2023-01-04?01:38:38][c1f1d065-dfeb-413c-8fd9-1bf9bb2c8bdc]?Processing:?App\Jobs\Queue6
接收到了消息:測試優先級,當前優先隊列為:B,入隊時間:2023-01-04?01:38:36?處理時間:2023-01-04?01:38:38
[2023-01-04?01:38:38][c1f1d065-dfeb-413c-8fd9-1bf9bb2c8bdc]?Processed:??App\Jobs\Queue6
[2023-01-04?01:38:38][c902a1ec-242c-4a0c-b4d8-c509d7d9a9f2]?Processing:?App\Jobs\Queue6
接收到了消息:測試優先級,當前優先隊列為:A,入隊時間:2023-01-04?01:38:35?處理時間:2023-01-04?01:38:38
[2023-01-04?01:38:38][c902a1ec-242c-4a0c-b4d8-c509d7d9a9f2]?Processed:??App\Jobs\Queue6
[2023-01-04?01:38:38][0d96ec9e-5fd6-4c34-93b6-c29ff1749a02]?Processing:?App\Jobs\Queue6
接收到了消息:測試優先級,當前優先隊列為:A,入隊時間:2023-01-04?01:38:36?處理時間:2023-01-04?01:38:38
[2023-01-04?01:38:38][0d96ec9e-5fd6-4c34-93b6-c29ff1749a02]?Processed:??App\Jobs\Queue6
[2023-01-04?01:38:38][90ef0eb0-6bf7-4e2a-8cf7-0124b4855b81]?Processing:?App\Jobs\Queue6
接收到了消息:測試優先級,當前優先隊列為:default,入隊時間:2023-01-04?01:38:35?處理時間:2023-01-04?01:38:38
[2023-01-04?01:38:38][90ef0eb0-6bf7-4e2a-8cf7-0124b4855b81]?Processed:??App\Jobs\Queue6
[2023-01-04?01:38:38][ef754a45-eaef-470c-a09e-5e7cdfacd778]?Processing:?App\Jobs\Queue6
接收到了消息:測試優先級,當前優先隊列為:default,入隊時間:2023-01-04?01:38:35?處理時間:2023-01-04?01:38:38
[2023-01-04?01:38:38][ef754a45-eaef-470c-a09e-5e7cdfacd778]?Processed:??App\Jobs\Queue6
看出效果了吧,B 隊列里的數據最先處理,即使第二條數據的入隊時間是靠后的,它也會優先被處理。然后再處理 A 隊列中的數據,最后才會處理默認的 default 隊列中的數據。
其實從這里也能看出來,Laravel 是使用了一個取巧的辦法,畢竟 Redis 原生并不支持優先級隊列。所以它是通過消費者指定隊列名稱的方式,并按名稱順序來實現的優先級隊列。
RabbitMQ消息優先級
好了,我們再來看 RabbitMQ 的優先級隊列。它就是真正傳統意義上的單個隊列中,不同消息有不同優先級的實現了。
//?6.rq.p.php
use?PhpAmqpLib\Connection\AMQPStreamConnection;
use?PhpAmqpLib\Message\AMQPMessage;
use?PhpAmqpLib\Wire\AMQPTable;require_once?__DIR__?.?'/vendor/autoload.php';//?建立連接
$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');
$channel?=?$connection->channel();?//?獲取頻道$channel->queue_declare('hello6',?false,?true,?false,?false,?false,?new?AMQPTable(['x-max-priority'=>10,?//?設置最大優先級
]));//?創建消息
for?($i?=?6;?$i?>?0;?$i--)?{$priority?=?random_int(0,?2);$body?=?'優先消息測試,當前優先級為:'?.?$priority;$msg?=?new?AMQPMessage($body,['priority'?=>?$priority]);$channel->basic_publish($msg,?'',?'hello6');?//?將消息放入隊列中echo?"生產者向消息隊列中發送信息:"?.?$body,?PHP_EOL;
}
在上面的代碼中,我們需要先設置一個隊列的優先級容量 x-max-priority
,也就是在這個隊列中,最大的優先級就到 10 。這個值可以設置到更大,但是官方推薦就到 10 就可以了。
然后在消息對象 AMQPMessage 的屬性中,使用 priority 來設置每條消息具體的優先級。
>?php?6.rq.p.php
生產者向消息隊列中發送信息:優先消息測試,當前優先級為:0
生產者向消息隊列中發送信息:優先消息測試,當前優先級為:2
生產者向消息隊列中發送信息:優先消息測試,當前優先級為:0
生產者向消息隊列中發送信息:優先消息測試,當前優先級為:1
生產者向消息隊列中發送信息:優先消息測試,當前優先級為:1
生產者向消息隊列中發送信息:優先消息測試,當前優先級為:2
運行后,我們可以看到,入隊時每條消息的優先級都是隨機的,這里是沒有順序的。但是在消費時,就能明顯地看到消息是按優先級從高到低被消費的。
>?php?6.rq.c.php
等待消息,或者使用?Ctrl+C?退出程序。
接收到數據:?優先消息測試,當前優先級為:2
接收到數據:?優先消息測試,當前優先級為:2
接收到數據:?優先消息測試,當前優先級為:1
接收到數據:?優先消息測試,當前優先級為:1
接收到數據:?優先消息測試,當前優先級為:0
接收到數據:?優先消息測試,當前優先級為:0
總結
今天學習的兩種隊列功能是比較常見的兩種隊列功能,同時,我們也看到了在 Redis 中其實都是沒有這兩個功能的實現的,但是,Laravel 框架通過業務代碼以及各種邏輯技巧的方式實現了它們。另外,RabbitMQ 也是沒有默認完全的延時隊列的功能,也需要通過取巧的方式來實現。而對于我們來說,了解它們的實現以及背后的原理,學習大佬們的處理邏輯,也能獲得非常多的收獲與成長。
完結撒花。
是的,消息隊列系列結束了。就是這么簡單,6 篇小文章。內容不多,但是我們已經了解了什么消息隊列,什么發布訂閱模式,消息隊列的可用性是如何保障的,以及非常好玩的兩個擴展隊列功能。意猶未盡嗎?其實呀,消息隊列核心的內容真的就是這些,并沒有太多很高深的內容。就像我們最早說的,不管什么消息隊列中間件工具,本質上都是我們最基礎的那個隊列數據結構的實現。只不過變成了一個獨立的組件,再加上各種功能和優化罷了。
如果你對消息隊列還十分有興趣,那么你現在應該更加深入地學習一下 RabbitMQ ,就是我前面沒有細說的交換機路由相關的功能,它能夠衍生出更多的隊列應用功能。想要更加深入的話,那么你可以再挑戰一下 Kafka ,現階段排名第一的消息隊列系統,天生為大數據服務的,去挑戰更恐怖的超強性能以及天生分布式的隊列系統。這方面的系統學習我就不多說了,沒別的,刷文檔,刷 B 站視頻就好啦,畢竟 RabbitMQ 和 Kafka 都是熱門應用,資料多的是!
至于之前提到過的其它消息隊列,如果你在工作用到了,再詳細深入的學習吧,我更推薦的還是 Redis(Laravel框架實現)、RabbitMQ、Kafka 這三個。RabbitMQ 和 Kafka 任選其一深入研究,Laravel 的 Redis 隊列實現對于我們 PHPer 來說會更親切,可以深入源碼學習哦。
測試代碼:
https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/6.rq.c.php
https://github.com/zhangyue0503/dev-blog/blob/master/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97/source/6.rq.p.php