RabbitMQ+PHP 教程六(RPC)

(using php-amqplib)

前提必讀

本教程假設RabbitMQ是安裝在標準端口上運行(5672)。如果您使用不同的主機、端口或憑據,則連接設置需要調整。

如果您在本教程中遇到困難,可以通過郵件列表與我們聯系。

開始

在第二個教程中,我們學習了如何使用工作隊列在多個工人之間分配耗時的任務。

但是如果我們需要在遠程計算機上運行一個函數并等待結果呢?嗯,那是另一回事了。這種模式通常稱為遠程過程調用或RPC。

在本教程中我們將使用RabbitMQ搭建一個RPC系統:一個客戶端和一個可擴展的RPC服務器。由于我們沒有任何值得分配的耗時的任務,所以我們將創建一個返回Fibonacci數的模擬一個RPC服務。

Client interface

為了說明如何使用RPC服務,我們將創建一個簡單的客戶類。它將公開一個名為調用的方法,該方法發送一個RPC請求并阻塞直到接收到結果為止:

$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "\n";

關于RPC的一些建議

雖然RPC是計算中非常常見的模式,但它經常遭到批評。當程序員不知道函數調用是本地的,或者它是一個緩慢的RPC時,問題就出現了。這樣的混亂導致了不可預知的系統,并給調試增加了不必要的復雜性。而簡化軟件,濫用會導致難以維護的RPC代碼。

考慮到這一點,請考慮以下建議:

確保很明顯哪個函數調用是本地調用,并且它是遠程的。
記錄系統。使組件之間的依賴關系清晰。
處理錯誤案例。RPC服務器長時間處于下行狀態時,客戶端應如何響應?
有疑問時避免RPC。如果可以,則應該使用異步管道,而不是像阻塞這樣的RPC,結果被異步推送到下一個計算階段。

回調隊列(Callback queue)

一般在RabbitMQ做RPC是容易的。客戶端發送一條請求消息和一個響應消息的服務器回復。為了接收響應,我們需要向請求發送一個“回調”隊列地址。我們可以使用默認隊列。讓我們試試看:

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);$msg = new AMQPMessage($payload,array('reply_to' => $queue_name));$channel->basic_publish($msg, '', 'rpc_queue');
# ... then code to read a response message from the callback_queue ...
消息屬性
AMQP協議(0-9-1 protocol)預定義了一套14個屬性,去一個消息。大多數屬性很少使用,除了以下內容:

delivery_mode: 將消息標記為持久性。 (with a value of 2) or transient (1). 您可能會從第二個教程中記住這個屬性。
content_type:用來描述編碼的MIME類型。例如,對于常用的JSON編碼,將此屬性設置為應用程序/ JSON是一個很好的做法。
reply_to:常用的名字一個回調隊列。
correlation_id:有助于將RPC響應與請求關聯起來。

Correlation Id

在上面介紹的方法中,我們建議為每個RPC請求創建一個回調隊列。這是非常低效的,但幸運的是有一個更好的方法——讓我們為每個客戶機創建一個回調隊列。

這引發了一個新問題,在隊列中收到了響應,不清楚響應的請求屬于哪個。那時候correlation_id屬性用于。我們將把它設置為每個請求的唯一值。稍后,當我們在回調隊列中接收消息時,我們將查看這個屬性,并在此基礎上,我們將能夠將響應與請求匹配。如果我們看到一個未知的correlation_id值,我們可以安全地忽略信息-它不屬于我們的請求。

您可能會問,為什么我們應該忽略回調隊列中的未知消息,而不是失敗出錯呢?這是由于服務器端可能出現競爭情況。雖然不太可能,RPC服務器可能在發送完答案后死亡,但在發出請求的確認消息之前。如果發生這種情況,重新啟動的RPC服務器將再次處理請求。這就是為什么在客戶機上我們必須優雅地處理重復響應,而RPC應該理想地是冪等的。

總結

clipboard.png

我們的RPC會像這樣工作:

當客戶端啟動時,它創建一個匿名的獨占回調隊列。

一個RPC請求,客戶端發送消息,兩個屬性:reply_to,設置回調隊列和correlation_id,它被設置為每個請求的唯一值。

請求被發送到一個rpc_queue隊列。

RPC worker(又名:服務器)正在等待該隊列上的請求。當一個請求時,它的工作和發送消息的結果返回給客戶端,使用從reply_to隊列。

客戶機等待回調隊列上的數據。當消息出現時,它檢查correlation_id屬性。如果它與請求的值匹配,則返回對應用程序的響應。

匯總

Fibonacci 遞歸源碼:

function fib($n) {if ($n == 0)return 0;if ($n == 1)return 1;return fib($n-1) + fib($n-2);
}
``
我們聲明fibonacci(斐波那契)函數。它只假設有效的正整數輸入。(不要指望這一個能為大數字工作,而且這可能是最慢的遞歸實現)。我們的RPC服務器rpc_server.php代碼看起來像這樣:

<?php

require_once DIR . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('rpc_queue', false, false, false, false);

function fib($n) {

if ($n == 0)return 0;
if ($n == 1)return 1;
return fib($n-1) + fib($n-2);

}

echo " [x] Awaiting RPC requestsn";
$callback = function($req) {

$n = intval($req->body);
echo " [.] fib(", $n, ")\n";$msg = new AMQPMessage((string) fib($n),array('correlation_id' => $req->get('correlation_id')));$req->delivery_info['channel']->basic_publish($msg, '', $req->get('reply_to'));
$req->delivery_info['channel']->basic_ack($req->delivery_info['delivery_tag']);

};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {

$channel->wait();

}

$channel->close();
$connection->close();

?>


服務器代碼相當簡單:像往常一樣,我們從建立連接、通道和聲明隊列開始。我們可能需要運行多個服務器進程。為了分散負載同樣多的服務器需要設置`prefetch_count`, 設置`$channel.basic_qos`美元。我們用`basic_consume`訪問隊列。然后,我們進入while循環,在其中等待請求消息,完成工作并發送響應。我們rpc_client.php RPC客戶端代碼:

<?php

require_once DIR . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

class FibonacciRpcClient {

private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;public function __construct() {$this->connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');$this->channel = $this->connection->channel();list($this->callback_queue, ,) = $this->channel->queue_declare("", false, false, true, false);$this->channel->basic_consume($this->callback_queue, '', false, false, false, false,array($this, 'on_response'));
}
public function on_response($rep) {if($rep->get('correlation_id') == $this->corr_id) {$this->response = $rep->body;}
}public function call($n) {$this->response = null;$this->corr_id = uniqid();$msg = new AMQPMessage((string) $n,array('correlation_id' => $this->corr_id,'reply_to' => $this->callback_queue));$this->channel->basic_publish($msg, '', 'rpc_queue');while(!$this->response) {$this->channel->wait();}return intval($this->response);
}

};

$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo " [.] Got ", $response, "n";

?>

現在是一個很好的時間來讓我們完整的示例源代碼rpc_client.php和rpc_server.php。我們的RPC服務現在準備好了。我們可以啟動服務器:

php rpc_server.php
# => [x] Awaiting RPC requests

請求斐波那契數運行客戶機:

php rpc_client.php
# => [x] Requesting fib(30)
``
這里介紹的設計并不是RPC服務的唯一實現,但它有一些重要的要點:

如果RPC服務器太慢,您可以通過運行另一個服務器來擴展。試著在一個新的控制臺再運行第一個:rpc_server.php。

在客戶端,RPC只需要發送和接收一條消息。不喜歡queue_declare需要同步調用。因此,RPC客戶機只需要一次RPC請求的一次網絡往返。

我們的代碼仍然非常簡單,并沒有試圖解決更復雜(但重要)的問題,例如:

如果沒有服務器運行,客戶端應該如何反應?

客戶端應該對RPC有某種超時嗎?

如果服務器發生故障并引發異常,是否應該轉發給客戶端?

在處理前防止無效傳入消息(如檢查邊界、類型)。

如果您想進行實驗,您可能會發現management UI對于查看隊列非常有用。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/284009.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/284009.shtml
英文地址,請注明出處:http://en.pswp.cn/news/284009.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

TKMybatis 介紹和使用

目錄 一、什么是 TKMybatis 二、TKMybatis 使用 2.1 Springboot 項目中加入依賴 2.2 使用講解 2.2.1 實體類中使用 2.2.2 dao中使用 2.2.3 Service 層中使用 2.3 實際案例 2.3.1 dao 層使用 2.3.2 service 層使用 一、什么是 TKMybatis TKMybatis 是基于 Mybatis 框…

angularjs的ng-repeat回調

首先html代碼是這樣的&#xff1a; <label>Name des Leiters:</label><select name"leaderID" id"selectLeaderID"><option ng-repeat"manager in managers" value"leader_id{{manager.id}}&leader_name{{manager…

sed和vim練習

1、刪除/etc/grub2.conf文件中所有以空白開頭的行行首的空白字符sed s^[[:space:]]\ /etc/grub2.conf2、刪除/etc/fstab文件中所有以#開頭&#xff0c;后面至少跟一個空白字符的行的行首的#和空白字符sed -n s^#[[:space:]]\p /etc/fstab3、在/root/install.log每一行行首增加#…

WinForm(三)揭開可視化控件的面紗

WinForm所見即所得的UI設計框架&#xff0c;開發效率確實有所提升&#xff0c;同時降低了編程門檻&#xff0c;讓WinForm更普及。拖拖拽拽就能設計出一個界面&#xff0c;那么我們拖拽的這些東西是什么&#xff1f;它們是什么原理&#xff1f;。WinForm我覺得很好的一點是&…

淺談 maxMemory , totalMemory , freeMemory 和 OOM 與 native Heap

作者&#xff1a;林冠宏 / 指尖下的幽靈 掘金&#xff1a;https://juejin.im/user/587f0dfe128fe100570ce2d8 博客&#xff1a;http://www.cnblogs.com/linguanh/ GitHub &#xff1a; https://github.com/af913337456/ 騰訊云專欄&#xff1a; https://cloud.tencent.com/deve…

RestTemplate 詳解

在項目中&#xff0c;當我們需要遠程調用一個 HTTP 接口時&#xff0c;我們經常會用到 RestTemplate 這個類。這個類是 Spring 框架提供的一個工具類。Spring 官網對它的介紹如下&#xff1a; RestTemplate: The original Spring REST client with a synchronous, template met…

初識Spark2.0之Spark SQL

內存計算平臺Spark在今年6月份的時候正式發布了spark2.0&#xff0c;相比上一版本的spark1.6版本&#xff0c;在內存優化&#xff0c;數據組織&#xff0c;流計算等方面都做出了較大的改變&#xff0c;同時更加注重基于DataFrame數據組織的MLlib&#xff0c;更加注重機器學習整…

webpack開發Vue配置

一直以來使用webpack都是用的別人的配置&#xff0c;這幾天自己學習了一下。 項目地址&#xff1a;https://github.com/donghaohao... 新建整個工程 npm init安裝依賴&#xff0c;這里我們開發vue項目&#xff0c;npm install vue --save&#xff0c;然后是開發時的依賴npm ins…

ABP詳細教程——模塊類

概述模塊化是ABP vNext的最大亮點&#xff0c;也是ABP vNext框架的核心&#xff0c;而模塊類是ABP vNext框架模塊化的核心要素。這一章節&#xff0c;我就從模塊類的用法、運行機制、源代碼等層面&#xff0c;帶大家詳細了解ABP vNext的模塊類。用法在ABP的約定中&#xff0c;每…

[轉]Eureka工作原理

目錄 Eureka 工作原理 Eureka 核心概念 自我保護機制 Eureka 集群原理 Eurka 工作流程 總結 Eureka 工作原理 上節內容為大家介紹了&#xff0c;注冊中心 Eureka 產品的使用&#xff0c;以及如何利用 Eureka 搭建單臺和集群的注冊中心。這節課我們來繼續學習 Eureka&…

centos7下別名(alias)的特殊用法

版權聲明&#xff1a;轉載請注明出處:http://blog.csdn.net/dajitui2024 https://blog.csdn.net/dajitui2024/article/details/79438200 參考&#xff1a;https://www.cyberciti.biz/faq/bash-bypass-alias-command-on-linux-macos-unix/ 正常情況下&#xff0c;定義過的別名&a…

解決WDCP3環境gbk網站編碼程序亂碼問題

因為默認WDCP V3版本環境編碼格式是UTF-8版本&#xff0c;如果我們程序采用的是GBK編碼肯定都會有亂碼問題。 我們到WDCP后臺&#xff0c;"網站管理"-"PHP設置"&#xff0c;看到上圖所示&#xff0c;準備直接在線編輯PHP.INI文件。 這里我們找到"defa…

重談聯想5G編碼投票事件

此前&#xff0c;司馬南談了聯想好幾個問題&#xff0c;其中最尖銳的要屬國有資產流失&#xff0c;這是聯想管理層無法回避的死穴。不過&#xff0c;司馬南批判聯想5G投票背刺H公司&#xff0c;這基本就是造謠了。當年&#xff0c;媒體把編碼投票炒作的很厲害&#xff0c;抨擊聯…

JStorm2.1.1集群的安裝和使用

為什么80%的碼農都做不了架構師&#xff1f;>>> JStorm2.1.1集群的安裝和使用 Storm是一個免費開源、分布式、高容錯的實時計算系統&#xff0c;而JStorm是阿里巴巴開源的基于Storm采用Java重寫的一套分布式實時流計算框架&#xff0c;在性能和支持的集群規模上做了…

Hystrix 原理

Hystrix是什么&#xff1f; Hystrix是Netflix開源庫&#xff0c;這是一個針對分布式系統的延遲和容錯庫。 Hystrix 供分布式系統使用&#xff0c;提供延遲和容錯功能&#xff0c;隔離遠程系統、訪問和第三方程序庫的訪問點&#xff0c;防止級聯失敗&#xff0c;保證復雜的分布…

「深度」無人機實名制政策特稿|市場看好、資本關注,“反黑飛”正在崛起

從政策和需求來看&#xff0c;“反黑飛”越來越重要&#xff0c;市場也正在不斷崛起。 對于大多數人來說&#xff0c;今天是最適合明目張膽“裝嫩”的六一兒童節。不過&#xff0c;在無人機廠商和無人機玩家的眼里&#xff0c;今天是無人機實名制政策正式實施的日子。 近年來&…

在navicat中新建數據庫

前言&#xff1a; 在本地新建一個名為editor的數據庫&#xff1b; 過程&#xff1a; 1.&#xff1b; 2.選擇&#xff1a;utf8mb4 -- UTF-8 Unicode字符集&#xff0c;原因在于&#xff1a;utf8mb4兼容utf8&#xff0c;且比utf8能表示更多的字符。&#xff0c;而且它支持表情符號…

MASA Stack 第三期社區例會

MASA Blazor 0.5.0發版內容功能Autocomplete&#xff1a;支持通過設置AutoSelectFirst參數開啟自動選擇第一項的功能&#xff0c;支持CacheItems參數&#xff0c;增強使用上下鍵的用戶體驗。BottomNavigation&#xff1a;&#xff1a;一個替代側邊欄的新組件。它主要用于移動應…

MySQL添加用戶、刪除用戶與授權

MySql中添加用戶,新建數據庫,用戶授權,刪除用戶,修改密碼(注意每行后邊都跟個;表示一個命令語句結束): 1.新建用戶 1.1 登錄MYSQL&#xff1a; >mysql -u root -p >密碼 1.2 創建用戶&#xff1a; mysql> insert into mysql.user(Host,User,Password) values("lo…

[轉]高并發架構設計之--「服務降級」、「服務限流」與「服務熔斷」

目錄 服務降級 1 、簡介 2 、使用場景 3 、核心設計 3.1 分布式開關 3.2 自動降級分類 3.3 配置中心 3.4 處理策略 3.5 降級分類 3.6 服務降級要考慮的問題 4 、高級特性 4.1 分級降級 4.2 降級權值 5 、總結與展望 服務限流 一、為什么要做服務限流設計&…