代碼的參數說明在 第一小節的代碼中,如果需要可移步到第一節中查看
工作隊列
工作隊列(又稱:任務隊列——Task Queues)是為了避免等待一些占用大量資源、時間的操作。當我們把任務(Task)當作消息發送到隊列中,一個運行在后臺的工作者(worker)進程就會取出任務然后處理。當你運行多個工作者(workers),任務就會在它們之間共享。
使用工作隊列的一個好處就是它能夠并行的處理隊列。如果堆積了很多任務,我們只需要添加更多的工作者(workers)就可以了,擴展很簡單。
當我們在 n個Terminal 窗口中,運行消費者程序,就可以有多個消費者處理生產者生產的消息了 當隊列中的消息發送給消費者1的時候,就不會再發送給消費者2了。
消息確認
當我們處理消息的時候, 我們想知道,消費者在處理的過程中是否已經處理完成,沒有出現消費者掛機的狀態,這里就需要消息確認了, 不然隊列中的消息是否處理完成,不能明確, 有可能會丟失重要的數據。
消息響應默認是開啟的。之前的例子中我們可以使用no_ack=True標識把它關閉。是時候設置的第四個參數basic_consume為false (true 意味著不響應ack) ,當工作者(worker)完成了任務,就發送一個響應。
公平調度
RabbitMQ只管分發進入隊列的消息,不會關心有多少消費者(consumer)沒有作出響應。它盲目的把第n-th條消息發給第n-th個消費者。 不會等侍是否處理完成
我們可以使用basic.qos方法,并設置prefetch_count=1。這樣是告訴RabbitMQ,再同一時刻,不要發送超過1條消息給一個工作者(worker),直到它已經處理了上一條消息并且作出了響應。這樣,RabbitMQ就會把消息分發給下一個空閑的工作者(worker)。
$channel->basic_qos(null, 1, null);
生產者
使用了 第一小節中的生產者,一次生成了20個消息
消費者
<?php
declare (strict_types = 1);namespace app\command;use Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;//工作隊列帶消息確認的 開幾個終端,就會有幾個消息者來消費。
//生產者可以使用 SimpleMQProduce 中的生產者
class WorkerWithAck extends Command
{protected function configure(){// 指令配置$this->setName('workerwithack')->setDescription('這是一個工作隊列,帶應答的');}protected function execute(Input $input, Output $output){//獲取連接$connection = $this->getConnection();//獲取通道$channel = $connection->channel();$channel->queue_declare("hello",false,false,false,false,false);$callback = function($msg){$msgbody = $msg->body;$msgbydyArr = json_decode($msgbody,true);echo $msgbydyArr["name"]."--".$msgbydyArr["age"]."--".$msgbydyArr["sex"].PHP_EOL;$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); //這里讓就是消息的應答了};//如果我們要讓消費者一條一條的處理消費,也就是說 只有consumer已經處理并確認了上一條message時queue才分派新的message給它//我們可以加上下面的這個代碼, 注意,是可以加,也可以不加。因為我們有了應答機制,消息是不會丟失的//$channel->basic_qos(null,1,null); //這句可加可不加$channel->basic_consume("hello","",false,false,false,false,$callback);while(count($channel->callbacks)){$channel->wait();}}protected function getConnection(){try{return new AMQPStreamConnection("192.168.3.228",5672,"admin","123456");}catch(Exception $e){throw new \Exception("創建隊列連接失敗");}}}
測試結果,兩個 工作隊列分別處理了,同一個生產者的數據,并且沒有重復