(using php-amqplib)
前提必讀
本教程假設RabbitMQ是安裝在標準端口上運行(5672)。如果您使用不同的主機、端口或憑據,則連接設置需要調整。
如果您在本教程中遇到困難,可以通過郵件列表與我們聯系。
開始
在第二個教程中,我們學習了如何使用工作隊列在多個工人之間分配耗時的任務。
但是如果我們需要在遠程計算機上運行一個函數并等待結果呢?嗯,那是另一回事了。這種模式通常稱為遠程過程調用或RPC。
在本教程中我們將使用RabbitMQ搭建一個RPC系統:一個客戶端和一個可擴展的RPC服務器。由于我們沒有任何值得分配的耗時的任務,所以我們將創建一個返回Fibonacci
數的模擬一個RPC服務。
Client interface
為了說明如何使用RPC服務,我們將創建一個簡單的客戶類。它將公開一個名為調用的方法,該方法發送一個RPC請求并阻塞直到接收到結果為止:
$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "\n";
關于RPC的一些建議
雖然RPC是計算中非常常見的模式,但它經常遭到批評。當程序員不知道函數調用是本地的,或者它是一個緩慢的RPC時,問題就出現了。這樣的混亂導致了不可預知的系統,并給調試增加了不必要的復雜性。而簡化軟件,濫用會導致難以維護的RPC代碼。
考慮到這一點,請考慮以下建議:
確保很明顯哪個函數調用是本地調用,并且它是遠程的。
記錄系統。使組件之間的依賴關系清晰。
處理錯誤案例。RPC服務器長時間處于下行狀態時,客戶端應如何響應?
有疑問時避免RPC。如果可以,則應該使用異步管道,而不是像阻塞這樣的RPC,結果被異步推送到下一個計算階段。
回調隊列(Callback queue)
一般在RabbitMQ做RPC是容易的。客戶端發送一條請求消息和一個響應消息的服務器回復。為了接收響應,我們需要向請求發送一個“回調”隊列地址。我們可以使用默認隊列。讓我們試試看:
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);$msg = new AMQPMessage($payload,array('reply_to' => $queue_name));$channel->basic_publish($msg, '', 'rpc_queue');
# ... then code to read a response message from the callback_queue ...
消息屬性
AMQP協議(0-9-1 protocol)預定義了一套14個屬性,去一個消息。大多數屬性很少使用,除了以下內容:
delivery_mode
: 將消息標記為持久性。 (with a value of 2) or transient (1). 您可能會從第二個教程中記住這個屬性。content_type
:用來描述編碼的MIME類型。例如,對于常用的JSON編碼,將此屬性設置為應用程序/ JSON是一個很好的做法。reply_to
:常用的名字一個回調隊列。correlation_id
:有助于將RPC響應與請求關聯起來。
Correlation Id
在上面介紹的方法中,我們建議為每個RPC請求創建一個回調隊列。這是非常低效的,但幸運的是有一個更好的方法——讓我們為每個客戶機創建一個回調隊列。
這引發了一個新問題,在隊列中收到了響應,不清楚響應的請求屬于哪個。那時候correlation_id屬性用于。我們將把它設置為每個請求的唯一值。稍后,當我們在回調隊列中接收消息時,我們將查看這個屬性,并在此基礎上,我們將能夠將響應與請求匹配。如果我們看到一個未知的correlation_id值,我們可以安全地忽略信息-它不屬于我們的請求。
您可能會問,為什么我們應該忽略回調隊列中的未知消息,而不是失敗出錯呢?這是由于服務器端可能出現競爭情況。雖然不太可能,RPC服務器可能在發送完答案后死亡,但在發出請求的確認消息之前。如果發生這種情況,重新啟動的RPC服務器將再次處理請求。這就是為什么在客戶機上我們必須優雅地處理重復響應,而RPC應該理想地是冪等的。
總結
我們的RPC會像這樣工作:
當客戶端啟動時,它創建一個匿名的獨占回調隊列。
一個RPC請求,客戶端發送消息,兩個屬性:reply_to
,設置回調隊列和correlation_id
,它被設置為每個請求的唯一值。
請求被發送到一個rpc_queue隊列。
RPC worker(又名:服務器)正在等待該隊列上的請求。當一個請求時,它的工作和發送消息的結果返回給客戶端,使用從reply_to隊列。
客戶機等待回調隊列上的數據。當消息出現時,它檢查correlation_id屬性。如果它與請求的值匹配,則返回對應用程序的響應。
匯總
Fibonacci 遞歸源碼:
function fib($n) {if ($n == 0)return 0;if ($n == 1)return 1;return fib($n-1) + fib($n-2);
}
``
我們聲明fibonacci(斐波那契)函數。它只假設有效的正整數輸入。(不要指望這一個能為大數字工作,而且這可能是最慢的遞歸實現)。我們的RPC服務器rpc_server.php代碼看起來像這樣:
<?php
require_once DIR . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('rpc_queue', false, false, false, false);
function fib($n) {
if ($n == 0)return 0;
if ($n == 1)return 1;
return fib($n-1) + fib($n-2);
}
echo " [x] Awaiting RPC requestsn";
$callback = function($req) {
$n = intval($req->body);
echo " [.] fib(", $n, ")\n";$msg = new AMQPMessage((string) fib($n),array('correlation_id' => $req->get('correlation_id')));$req->delivery_info['channel']->basic_publish($msg, '', $req->get('reply_to'));
$req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
服務器代碼相當簡單:像往常一樣,我們從建立連接、通道和聲明隊列開始。我們可能需要運行多個服務器進程。為了分散負載同樣多的服務器需要設置`prefetch_count`, 設置`$channel.basic_qos`美元。我們用`basic_consume`訪問隊列。然后,我們進入while循環,在其中等待請求消息,完成工作并發送響應。我們rpc_client.php RPC客戶端代碼:
<?php
require_once DIR . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
class FibonacciRpcClient {
private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;public function __construct() {$this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');$this->channel = $this->connection->channel();list($this->callback_queue, ,) = $this->channel->queue_declare("", false, false, true, false);$this->channel->basic_consume($this->callback_queue, '', false, false, false, false,array($this, 'on_response'));
}
public function on_response($rep) {if($rep->get('correlation_id') == $this->corr_id) {$this->response = $rep->body;}
}public function call($n) {$this->response = null;$this->corr_id = uniqid();$msg = new AMQPMessage((string) $n,array('correlation_id' => $this->corr_id,'reply_to' => $this->callback_queue));$this->channel->basic_publish($msg, '', 'rpc_queue');while(!$this->response) {$this->channel->wait();}return intval($this->response);
}
};
$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "n";
?>
現在是一個很好的時間來讓我們完整的示例源代碼rpc_client.php和rpc_server.php。我們的RPC服務現在準備好了。我們可以啟動服務器:
php rpc_server.php
# => [x] Awaiting RPC requests
請求斐波那契數運行客戶機:
php rpc_client.php
# => [x] Requesting fib(30)
``
這里介紹的設計并不是RPC服務的唯一實現,但它有一些重要的要點:
如果RPC服務器太慢,您可以通過運行另一個服務器來擴展。試著在一個新的控制臺再運行第一個:rpc_server.php。
在客戶端,RPC只需要發送和接收一條消息。不喜歡queue_declare需要同步調用。因此,RPC客戶機只需要一次RPC請求的一次網絡往返。
我們的代碼仍然非常簡單,并沒有試圖解決更復雜(但重要)的問題,例如:
如果沒有服務器運行,客戶端應該如何反應?
客戶端應該對RPC有某種超時嗎?
如果服務器發生故障并引發異常,是否應該轉發給客戶端?
在處理前防止無效傳入消息(如檢查邊界、類型)。
如果您想進行實驗,您可能會發現management UI對于查看隊列非常有用。