php8.0 使用 rabbitmq 要使用 3.6版本以上的, 并且還要開啟 php.ini中的 socket 擴展
php think make:command SimpleMQProduce //創建一個生產者命令行
php think make:command SimpleMQConsumer //創建一個消費者命令行
代碼中的消息持久化的說明
RabbitMQ 消息持久化
持久化是為提高rabbitmq消息的可靠性,防止在異常情況(重啟,關閉,宕機)下數據的丟失。設置完隊列和消息的持久化,并不能完全保證消息不會丟失。盡管它告訴 RabbitMQ 將消息保存到磁盤,但當 RabbitMQ 接受消息但尚未保存消息時,仍有一個較短的時間窗口。另外, RabbitMQ 不會對每條消息都執行 fsync(2) – 它可能只是保存到緩存中,而不是真正寫入磁盤。持久性保證并不強,但對于我們的簡單任務隊列來說已經足夠了。如果您需要更強的保證,則可以使用發布者確認(RabbitMQ 消息確認機制)
生產者代碼
<?php
declare (strict_types = 1);namespace app\command;use ba\Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;//本類是模式中的 生產者 produce
class SimpleMQProduce extends Command
{protected function configure(){// 指令配置$this->setName('SimpleMQ')->setDescription('這是一個隊列的 Hello模式 (最簡單的應用模式),使用了默認的交換機,只需要建一個隊列就可以了');}protected function execute(Input $input, Output $output){//獲取連接$connection = $this->connectRabbitMQ();//獲取連接的通道$channel = $connection->channel();//直接創建一個隊列/*** 關于 queue_declare參數的說明* params queue 隊列的名稱* params passive 是否消極的聲明隊列,如果存在,就把隊列的信息返回, 如果沒有就拋出錯誤,(是的, 你沒看錯,這個參數很雞肋,所以一般為 false)* params durable 是否持久化,意思是說就算隊列服務掛了, 也不會丟失隊列* params exclusive 是否排外,如果設置為true ,表示只有本次連接中的channel 可以訪問,其它channel 是不可以訪問的* params auto_delete 設置是否自動刪除。為true 則設置隊列為自動刪除。自動刪除的前提是, 至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會自動刪除* params nowait 相當于做一個異步版的聲明, 如果設置成true, 就是說方法調用完就結束,也不用等待創建隊列是否成功,一般也設為false*/$channel->queue_declare("hello",false,false,false,false,false);for ($i = 0; $i < 20; $i++) {$message = ["name"=>"huang".$i,"age"=>$i,"sex"=>"man".$i];$msg = new AMQPMessage(json_encode($message),["delivery_mode"=>AMQPMessage::DELIVERY_MODE_PERSISTENT //使消息持久化]);$channel->basic_publish($msg,"",'hello'); //簡單模式下,routing_key 和 隊列名稱是一樣的sleep(1);}//生產者調用完成后要關閉資源$channel->close();}//這個方法是建立一個 rabbitMQ 的資源連接protected function connectRabbitMQ(){try{$connection = new AMQPStreamConnection("192.168.3.228",'5672',"admin","123456");return $connection;}catch(Exception $e){throw new Exception("隊列連接失敗");}}}
消費者代碼
<?php
declare (strict_types = 1);namespace app\command;use ba\Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;class SimpleMQConsumer extends Command
{protected function configure(){// 指令配置$this->setName('simplemqconsumer')->setDescription('這是rabbitMQ的簡單模式的消費者代碼');}protected function execute(Input $input, Output $output){//創建一個資源連接$connection = $this->connectRabbitMQ();$channel = $connection->channel();//定義一個處理函數$callback = function($msg){$body = $msg->body;$bodyArr = json_decode($body,true);echo $bodyArr["name"] ."--".$bodyArr["age"]. "--".$bodyArr["sex"];};//聲明隊列,這個函數的 passive ,如果有不返回這個隊列的信息,如果沒有就拋出異常$channel->queue_declare("hello",false,false,false,false,false);//從隊列中消費數據/*** 參數說明* params queue 隊列名稱* params consumer_tag 消費者標簽* params no_local AMQP標準 RabbitMQ沒有實現, 默認為false* params no_ack 收到消息后是否不需要回復 如果為true表示不回復, 一般是要回復的 設置為 false* params exclusive 排他消費,即這個隊列只能由一個消費者來消費* params nowait 相當于一個異步,在執行完成之后返回結果,不堵塞* params callback 處理消息的回調方法*/$channel->basic_consume("hello",'',false,true,false,false,$callback);//等待隊列執行完成while(count($channel->callbacks)){$channel->wait();}}//這個方法是建立一個 rabbitMQ 的資源連接protected function connectRabbitMQ(){try{$connection = new AMQPStreamConnection("192.168.3.228",'5672',"admin","123456");return $connection;}catch(Exception $e){throw new Exception("隊列連接失敗");}}
}