發布/訂閱
在上篇第二部分教程中,我們搭建了一個工作隊列。每個任務之分發給一個工作者(worker)。在本篇教程中,我們要做的之前完全不一樣——分發一個消息給多個消費者(consumers)。這種模式被稱為“發布/訂閱”。
為了描述這種模式,我們將會構建一個簡單的日志系統。它包括兩個程序——第一個程序負責發送日志消息,第二個程序負責獲取消息并輸出內容。
在我們的這個日志系統中,所有正在運行的接收方程序都會接受消息。我們用其中一個接收者(receiver)把日志寫入硬盤中,另外一個接受者(receiver)把日志輸出到屏幕上。
最終,日志消息被廣播給所有的接受者(receivers)。
交換器(Exchanges)
前面的教程,我們發送消息到隊列并從中取出消息。現在是時候介紹RabbitMq中完整的消息模型了。
讓我們簡單的概括一下之前的教程:
- 發布者(producer)是發布消息的應用程序。
- 隊列(queue)用于消息存儲的緩沖。
- 消費者(consumer)是接收消息的應用程序。
RabbitMQ消息模型的核心理念是:發布者(producer)不會直接發送任何消息給隊列。事實上,發布者(producer)甚至不知道消息是否已經被投遞到隊列。
發布者(producer)只需要把消息發送給一個交換器(exchange)。交換器非常簡單,它一邊從發布者方接收消息,一邊把消息推入隊列。交換器必須知道如何處理它接收到的消息,是應該推送到指定的隊列還是是多個隊列,或者是直接忽略消息。這些規則是通過exchange type來定義的。
有幾個可供選擇的交換器類型:AMQPEXTYPEDIRECT,AMQPEXTYPEFANOUT,AMQPEXTYPEHEADER?orAMQPEXTYPETOPIC。我們在這里主要說明AMQPEXTYPE_FANOUT。先創建一個fanout類型的交換器,命名為logs:
$exchange->setName('logs'); $exchange->setType(AMQP_EX_TYPE_FANOUT); $exchange->declare();
fanout交換器很簡單,你可能從名字上就能猜測出來,它把消息發送給它所知道的所有隊列。這正是我們的日志系統所需要的。
交換器列表
rabbitmqctl能夠列出服務器上所有的交換器:
$ sudo rabbitmqctl list_exchanges Listing exchanges ... logs fanout amq.direct direct amq.topic topic amq.fanout fanout amq.headers headers ...done.
這個列表中有一些叫做amq.*的交換器。這些都是默認創建的,不過這時候你還不需要使用他們。
匿名的交換器
前面的教程中我們對交換器一無所知,但仍然能夠發送消息到隊列中。因為我們使用了命名為空字符串(“”)默認的交換器。 回想我們之前是如何發布一則消息:
``` $exchange->publish($message, $routeKey);
```
exchange參數就是交換器的名稱。空字符串代表默認或者匿名交換器:消息將會根據指定的routing_key分發到指定的隊列。
在PHP的AMQP中如果exchange設置為匿名的話,是報錯的:PHP Fatal error: Uncaught exception ‘AMQPExchangeException’ with message ‘Invalid exchange name given, must be between 1 and 255 characters long.’
現在,我們就可以發送消息到一個具名交換器了:
$exchange->publish($message, '');
臨時隊列
你還記得之前我們使用的隊列名嗎( hello和task_queue)?給一個隊列命名是很重要的——我們需要把工作者(workers)指向正確的隊列。如果你打算在發布者(producers)和消費者(consumers)之間共享同隊列的話,給隊列命名是十分重要的。
但是這并不適用于我們的日志系統。我們打算接收所有的日志消息,而不僅僅是一小部分。我們關心的是最新的消息而不是舊的。為了解決這個問題,我們需要做兩件事情。
首先,當我們連接上RabbitMQ的時候,我們需要一個全新的、空的隊列。我們可以手動創建一個隨機的隊列名,或者讓服務器為我們選擇一個隨機的隊列名(推薦)。我們只要在調用$queue->declare();方法的時候,不提供queue參數就可以了:
$queue = new AMQPQueue($channel); $queue->setFlags(AMQP_EXCLUSIVE); $queue->declare();
這時候我們可以通過$queue->getName();獲得已經生成的隨機隊列名。它可能是這樣子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。
第二步,當與消費者(consumer)斷開連接的時候,這個隊列應當被刪除。我們可以使用exclusive標識。
$queue->setFlags(AMQP_EXCLUSIVE);
綁定(Bindings)
我們已經創建了一個fanout類型的交換器和一個隊列。現在我們需要告訴交換器如何發送消息給我們的隊列。交換器和隊列之間的關系我們稱之為綁定(binding)。
$queue->bind($exchangeName, $queue->getName());
現在,logs交換器將會把消息添加到我們的隊列中。
綁定列表。
你可以使用rabbitmqctl list_bindings隊列出所有存在的綁定。.
整合代碼
發布日志消息的程序看起來和之前的沒有太大區別。最重要的改變就是我們把消息發送給logs交換器而不是匿名交換器。在發送的時候我們需要提供routingkey參數,但是它的值會被fanout交換器忽略。
emit_log.php
<?php$exchangeName = 'logs'; $message = empty($argv[1]) ? 'info:Hello World!' : ' '.$argv[1];$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n");$channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_FANOUT); //$exchange->declare(); $exchange->declareExchange();$exchange->publish($message, ''); var_dump("[x] Sent $message");$connection->disconnect();
正如你看到的那樣,在連接成功之后,我們聲明了一個交換器,這一個是很重要的,因為不允許發布消息到不存在的交換器。
如果沒有綁定隊列到交換器,消息將會丟失。但這個沒有所謂,如果沒有消費者監聽,那么消息就會被忽略。
receive_logs.php
<?php$exchangeName = 'logs';$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_FANOUT); //$exchange->declare(); $exchange->declareExchange(); $queue = new AMQPQueue($channel); $queue->setFlags(AMQP_EXCLUSIVE); //$exchange->declare(); $queue->declareQueue(); $queue->bind($exchangeName, '');var_dump('[*] Waiting for messages. To exit press CTRL+C'); while (TRUE) {$queue->consume('callback'); } $connection->disconnect();function callback($envelope, $queue) {$msg = $envelope->getBody();var_dump(" [x] Received:" . $msg);$queue->nack($envelope->getDeliveryTag()); }
這樣我們就完成了。如果你想把日志保存到文件中,只需要打開控制臺輸入:
php receive_logs.php > logs_from_rabbit.log
如果你想在屏幕中查看日志,那么打開一個新的終端然后運行:
php receive_logs.php
當然還要發送日志:
php emit_log.php
?
使用方法,需要打開三個終端,兩個用來運行receive_logs.php腳本【先啟動】,一個用來運行emit_log.php腳本【后啟動】,結果為兩個receive_logs.php腳本會同時受到消息
運行效果:
?