概述
workerman/rabbitmq 是一個異步RabbitMQ客戶端,使用AMQP協議。
RabbitMQ是一個基于AMQP(高級消息隊列協議)實現的開源消息組件,它主要用于在分布式系統中存儲和轉發消息。RabbitMQ由高性能、高可用以及高擴展性出名的Erlang語言寫成,具有高度的可靠性和可擴展性。它支持多種消息協議,包括AMQP、STOMP、MQTT等,并廣泛應用于消息隊列、消息中間件等領域。
RabbitMQ允許應用程序通過消息傳遞進行通信,這使得不同的應用程序可以在不同的語言和操作系統之間進行通信。
RabbitMQ的消息工作機制涉及消息從發送端到接收端的流轉過程。在這個過程中,消息首先被發送到交換機(Exchange),然后交換機根據路由規則將消息路由到一個或多個隊列(Queue)中。消費者(Consumer)從隊列中獲取消息并進行處理。
生產者和消費者
安裝
composer?require?workerman/rabbitmq
消費者
receive.php
<?phpdeclare(strict_types=1);use?Bunny\Channel;
use?Bunny\Message;
use?Workerman\Worker;
use?Workerman\RabbitMQ\Client;require_once?__DIR__?.?'/vendor/autoload.php';$worker?=?new?Worker();
$worker->eventLoop?=?\Workerman\Events\Revolt::class;$worker->onWorkerStart?=?function()?{//?Create?RabbitMQ?Client$client?=?Client::factory(['host'?=>?'127.0.0.1','port'?=>?5672,'user'?=>?'guest','password'?=>?'guest','vhost'?=>?'/','heartbeat'?=>?60,'heartbeat_callback'?=>?function?()?{echo?"?[-]?coroutine-consumer-heartbeat\n";},'interval'?=>?[100,?300]])->connect();$channel?=?$client->channel();$channel->queueDeclare('hello-coroutine');//?Consumer$channel->consume(function?(Message?$message,?Channel?$channel,?\Bunny\AbstractClient?$client)?{echo?"?[>]?Received?",?$message->content,?"\n";},'hello-coroutine','',false,true);$client->run();echo?'?[*]?Waiting?for?messages.?To?exit?press?CTRL+C',?"\n";//?Producer\Workerman\Timer::add($interval?=?5?,?function?()?use?($channel)?{$channel->publish($message?=?'Hello?World?By?Self?Timer.?'?.?time(),?[],?'',?'hello-coroutine');echo?"?[<]?Sent?$message\n";});echo?"?[!]?Producer?timer?created,?interval:?$interval?s.\n";};
Worker::runAll();
運行命令
php?receive.php?start
基于 Workerman 發布
send.php
<?phpdeclare(strict_types=1);use?Workerman\RabbitMQ\Client;
use?Workerman\Worker;require_once?__DIR__?.?'/vendor/autoload.php';$worker?=?new?Worker();
$worker->eventLoop?=?\Workerman\Events\Revolt::class;$worker->onWorkerStart?=?function()?{$client?=?Client::factory(['host'?=>?'host.docker.internal','port'?=>?5672,'user'?=>?'guest','password'?=>?'guest','vhost'?=>?'/','heartbeat'?=>?60,'heartbeat_callback'?=>?function?()?{echo?"coroutine-producer-heartbeat\n";}])->connect();$channel?=?$client->channel();$channel->queueDeclare('hello-coroutine');//?每5秒發一個消息\Workerman\Timer::add(5,?function?()?use?($channel)?{$channel->publish($message?=?'Hello?World?By?Workerman?Env?Producer.?'?.?time(),?[],?'',?'hello-coroutine');echo?"?[x]?Sent?'$message'\n";});
};
Worker::runAll();
運行命令
php?send.php?start
基于 PHP-FPM 發布
script.php
<?phpdeclare(strict_types=1);use?Workerman\RabbitMQ\Client;require_once?__DIR__?.?'/vendor/autoload.php';$client?=?Client::factory(['host'?=>?'host.docker.internal','port'?=>?5672,'user'?=>?'guest','password'?=>?'guest','vhost'?=>?'/','heartbeat'?=>?60,'heartbeat_callback'?=>?function?()?{echo?"coroutine-producer-heartbeat\n";}
])->connect();
$channel?=?$client->channel();
$channel->queueDeclare('hello-coroutine');
$res?=?$channel->publish($message?=?'Hello?World?By?Normal?Producer.?'?.?time(),?[],?'',?'hello-coroutine');echo?"?[x]?Sent?'$message',?success:?$res\n";
運行命令
php?script.php
異步消費者
receive.php
<?phpuse?Bunny\Channel;
use?Bunny\Message;
use?Workerman\Worker;
use?Workerman\RabbitMQ\Client;require?__DIR__?.?'/vendor/autoload.php';$worker?=?new?Worker();$worker->onWorkerStart?=?function()?{(new?Client())->connect()->then(function?(Client?$client)?{return?$client->channel();})->then(function?(Channel?$channel)?{return?$channel->queueDeclare('hello',?false,?false,?false,?false)->then(function?()?use?($channel)?{return?$channel;});})->then(function?(Channel?$channel)?{echo?'?[*]?Waiting?for?messages.?To?exit?press?CTRL+C',?"\n";$channel->consume(function?(Message?$message,?Channel?$channel,?Client?$client)?{echo?"?[x]?Received?",?$message->content,?"\n";},'hello','',false,true);});
};
Worker::runAll();
運行命令
php?receive.php?start
異步生產者
send.php
<?php
use?Bunny\Channel;
use?Bunny\Message;
use?Workerman\Worker;
use?Workerman\RabbitMQ\Client;require?__DIR__?.?'/vendor/autoload.php';$worker?=?new?Worker();$worker->onWorkerStart?=?function()?{(new?Client())->connect()->then(function?(Client?$client)?{return?$client->channel();})->then(function?(Channel?$channel)?{return?$channel->queueDeclare('hello',?false,?false,?false,?false)->then(function?()?use?($channel)?{return?$channel;});})->then(function?(Channel?$channel)?{echo?"?[x]?Sending?'Hello?World!'\n";return?$channel->publish('Hello?World!',?[],?'',?'hello')->then(function?()?use?($channel)?{return?$channel;});})->then(function?(Channel?$channel)?{echo?"?[x]?Sent?'Hello?World!'\n";$client?=?$channel->getClient();return?$channel->close()->then(function?()?use?($client)?{return?$client;});})->then(function?(Client?$client)?{$client->disconnect();});
};
Worker::runAll();
運行命令
php?send.php?start