RabbitMQ 入門教程(PHP版) 第三部分:發布/訂閱(Publish/Subscribe)

發布/訂閱

在上篇第二部分教程中,我們搭建了一個工作隊列。每個任務之分發給一個工作者(worker)。在本篇教程中,我們要做的之前完全不一樣——分發一個消息給多個消費者(consumers)。這種模式被稱為“發布/訂閱”。

為了描述這種模式,我們將會構建一個簡單的日志系統。它包括兩個程序——第一個程序負責發送日志消息,第二個程序負責獲取消息并輸出內容。

在我們的這個日志系統中,所有正在運行的接收方程序都會接受消息。我們用其中一個接收者(receiver)把日志寫入硬盤中,另外一個接受者(receiver)把日志輸出到屏幕上。

最終,日志消息被廣播給所有的接受者(receivers)。

交換器(Exchanges)

前面的教程,我們發送消息到隊列并從中取出消息。現在是時候介紹RabbitMq中完整的消息模型了。

讓我們簡單的概括一下之前的教程:

  • 發布者(producer)是發布消息的應用程序。
  • 隊列(queue)用于消息存儲的緩沖。
  • 消費者(consumer)是接收消息的應用程序。

RabbitMQ消息模型的核心理念是:發布者(producer)不會直接發送任何消息給隊列。事實上,發布者(producer)甚至不知道消息是否已經被投遞到隊列。

發布者(producer)只需要把消息發送給一個交換器(exchange)。交換器非常簡單,它一邊從發布者方接收消息,一邊把消息推入隊列。交換器必須知道如何處理它接收到的消息,是應該推送到指定的隊列還是是多個隊列,或者是直接忽略消息。這些規則是通過exchange type來定義的。

有幾個可供選擇的交換器類型:AMQPEXTYPEDIRECT,AMQPEXTYPEFANOUT,AMQPEXTYPEHEADER?orAMQPEXTYPETOPIC。我們在這里主要說明AMQPEXTYPE_FANOUT。先創建一個fanout類型的交換器,命名為logs:

$exchange->setName('logs');
$exchange->setType(AMQP_EX_TYPE_FANOUT);
$exchange->declare();

fanout交換器很簡單,你可能從名字上就能猜測出來,它把消息發送給它所知道的所有隊列。這正是我們的日志系統所需要的。

交換器列表

rabbitmqctl能夠列出服務器上所有的交換器:

$ sudo rabbitmqctl list_exchanges Listing exchanges ... logs fanout amq.direct direct amq.topic topic amq.fanout fanout amq.headers headers ...done.

這個列表中有一些叫做amq.*的交換器。這些都是默認創建的,不過這時候你還不需要使用他們。

匿名的交換器

前面的教程中我們對交換器一無所知,但仍然能夠發送消息到隊列中。因為我們使用了命名為空字符串(“”)默認的交換器。 回想我們之前是如何發布一則消息:

``` $exchange->publish($message, $routeKey);

```

exchange參數就是交換器的名稱。空字符串代表默認或者匿名交換器:消息將會根據指定的routing_key分發到指定的隊列。

在PHP的AMQP中如果exchange設置為匿名的話,是報錯的:PHP Fatal error: Uncaught exception ‘AMQPExchangeException’ with message ‘Invalid exchange name given, must be between 1 and 255 characters long.’

現在,我們就可以發送消息到一個具名交換器了:

$exchange->publish($message, '');

臨時隊列

你還記得之前我們使用的隊列名嗎( hello和task_queue)?給一個隊列命名是很重要的——我們需要把工作者(workers)指向正確的隊列。如果你打算在發布者(producers)和消費者(consumers)之間共享同隊列的話,給隊列命名是十分重要的。

但是這并不適用于我們的日志系統。我們打算接收所有的日志消息,而不僅僅是一小部分。我們關心的是最新的消息而不是舊的。為了解決這個問題,我們需要做兩件事情。

首先,當我們連接上RabbitMQ的時候,我們需要一個全新的、空的隊列。我們可以手動創建一個隨機的隊列名,或者讓服務器為我們選擇一個隨機的隊列名(推薦)。我們只要在調用$queue->declare();方法的時候,不提供queue參數就可以了:

$queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_EXCLUSIVE);
$queue->declare();

這時候我們可以通過$queue->getName();獲得已經生成的隨機隊列名。它可能是這樣子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。

第二步,當與消費者(consumer)斷開連接的時候,這個隊列應當被刪除。我們可以使用exclusive標識。

$queue->setFlags(AMQP_EXCLUSIVE);

綁定(Bindings)

我們已經創建了一個fanout類型的交換器和一個隊列。現在我們需要告訴交換器如何發送消息給我們的隊列。交換器和隊列之間的關系我們稱之為綁定(binding)。

$queue->bind($exchangeName, $queue->getName());

現在,logs交換器將會把消息添加到我們的隊列中。

綁定列表。

你可以使用rabbitmqctl list_bindings隊列出所有存在的綁定。.

整合代碼

發布日志消息的程序看起來和之前的沒有太大區別。最重要的改變就是我們把消息發送給logs交換器而不是匿名交換器。在發送的時候我們需要提供routingkey參數,但是它的值會被fanout交換器忽略。

emit_log.php

<?php$exchangeName = 'logs';
$message = empty($argv[1]) ? 'info:Hello World!' : ' '.$argv[1];$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
$connection->connect() or die("Cannot connect to the broker!\n");$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_FANOUT);
//$exchange->declare();
$exchange->declareExchange();$exchange->publish($message, '');
var_dump("[x] Sent $message");$connection->disconnect();

正如你看到的那樣,在連接成功之后,我們聲明了一個交換器,這一個是很重要的,因為不允許發布消息到不存在的交換器。

如果沒有綁定隊列到交換器,消息將會丟失。但這個沒有所謂,如果沒有消費者監聽,那么消息就會被忽略。

receive_logs.php

<?php$exchangeName = 'logs';$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest'));
$connection->connect() or die("Cannot connect to the broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_FANOUT);
//$exchange->declare();
$exchange->declareExchange();
$queue = new AMQPQueue($channel);
$queue->setFlags(AMQP_EXCLUSIVE);
//$exchange->declare();
$queue->declareQueue();
$queue->bind($exchangeName, '');var_dump('[*] Waiting for messages. To exit press CTRL+C');
while (TRUE) {$queue->consume('callback');
}
$connection->disconnect();function callback($envelope, $queue) {$msg = $envelope->getBody();var_dump(" [x] Received:" . $msg);$queue->nack($envelope->getDeliveryTag());
}

這樣我們就完成了。如果你想把日志保存到文件中,只需要打開控制臺輸入:

 php receive_logs.php > logs_from_rabbit.log

如果你想在屏幕中查看日志,那么打開一個新的終端然后運行:

 php receive_logs.php

當然還要發送日志:

php emit_log.php

?

使用方法,需要打開三個終端,兩個用來運行receive_logs.php腳本【先啟動】,一個用來運行emit_log.php腳本【后啟動】,結果為兩個receive_logs.php腳本會同時受到消息

運行效果:

?

轉載于:https://www.cnblogs.com/-mrl/p/11102740.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/277450.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/277450.shtml
英文地址,請注明出處:http://en.pswp.cn/news/277450.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Proxmox VE 安裝、配置、使用之第二章 Proxmox VE 的安全性

第一章 Proxmox VE 的安全性一、 角色及權限圖2-1-1二、 Root 的密碼安全性把 Root 的實際密碼給出去, 在任何系統都是不符合安全規范的!所以在 Linux 里面, 最好把有需要 root 權限的使用者 放到 sudoers 的群組.# sudo usermod -a -G sudo testuserPVE 的權限設定方式 是由 u…

java allocate_Java中volatile關鍵字的最全總結

一、簡介volatile是Java提供的一種輕量級的同步機制。Java 語言包含兩種內在的同步機制&#xff1a;同步塊(或方法)和 volatile 變量&#xff0c;相比于synchronized(synchronized通常稱為重量級鎖)&#xff0c;volatile更輕量級&#xff0c;因為它不會引起線程上下文的切換和調…

縮放手勢 ScaleGestureDetector 源碼解析,這一篇就夠了

其實在我們日常的編程中&#xff0c;對于縮放手勢的使用并不是很經常&#xff0c;這一手勢主要是用在圖片瀏覽方面&#xff0c;比如下方例子。但是&#xff08;敲重點&#xff09;&#xff0c;作為 Android 入門的基礎來說&#xff0c;學習 ScaleGestureDetector 的使用&#x…

postgres的數據庫備份和恢復

備份和恢復 一條命令就可以解決很簡單: 這是備份的命令&#xff1a; pg_dump -h 127/0.0.1 -U postgres databasename > databasename.bak 指令解釋&#xff1a; pg_dump 是備份數據庫指令&#xff0c;164.82.233.54是數據庫的ip地址&#xff08;必須保證數據庫允許外部訪…

java 類的執行順序_Java中類的執行順序

講解在代碼中&#xff1a;package 類執行順序;/*** java類執行順序** 1、如果父類有靜態成員賦值或者靜態初始化塊&#xff0c;執行靜態成員賦值和靜態初始化塊* 2、如果類有靜態成員賦值或者靜態初始化塊&#xff0c;執行靜態成員賦值和靜態初始化塊* 3、將類的成員賦予初值(原…

ZooKeeper相關資料集錦

1、ZooKeeper相關概念總結 https://github.com/Snailclimb/JavaGuide/blob/master/docs/system-design/framework/ZooKeeper.md 2、ZooKeeper在Windows下的安裝和配置 https://blog.csdn.net/morning99/article/details/40426133 3、Curator框架應用 http://ifeve.com/zookeepe…

JQuery.Ajax()的data參數傳遞方式

最近&#xff0c;新學c# mvc&#xff0c;通過ajax post方式傳遞數據到controller。剛開始傳遞參數&#xff0c;controller中總是為null。現記錄一下&#xff0c;可能不全&#xff0c;純粹記個學習日記。 重點在于參數的方式&#xff0c;代碼為例子 1、這里 dataType: "js…

java如何實現封裝_java如何實現封裝

Java中類的封裝是如何實現的封裝是將對象的信息隱藏在對象內部&#xff0c;禁止外部程序直接訪問對象內部的屬性和方法。 java封裝類通過三個步驟實現&#xff1a; (1)修改屬性的可見性&#xff0c;限制訪問。 (2)設置屬性的讀取方法。 (3)在讀取屬性的方法中&#xff0c;添加對…

用了30天整理的一些GO語言學習資料,2019請你加油

因為極其優秀的并發性能&#xff0c;Google的親兒子Go語言站上了風潮之巔。出現在21世紀的GO語言&#xff0c;雖然不能如愿對C取而代之&#xff0c;但是其近C的執行性能和近解析型語言的開發效率以及近乎于完美的編譯速度&#xff0c;已經風靡全球。特別是在云項目中&#xff0…

Kubernetes網絡設計原則

在配置集群網絡插件或者實踐K8S 應用/服務部署請時刻想到這些原則&#xff1a; 1.每個Pod都擁有一個獨立IP地址&#xff0c;Pod內所有容器共享一個網絡命名空間2.集群內所有Pod都在一個直接連通的扁平網絡中&#xff0c;可通過IP直接訪問 所有容器之間無需NAT就可以直接互相訪問…

php token 驗證,PHP如何實現Token驗證

PHP如何實現Token驗證首先將Token進行解析&#xff1b;然后根據解析出來的信息部分驗證是否過期&#xff0c;如果未過期再將解析出的信息部分進行加密&#xff1b;最后將加密出來的數據和解析出來簽名進行比對&#xff0c;如果相同則驗證成功。示例代碼&#xff1a;<?php f…

關于Linux fontconfig 字體庫的坑

01、安裝字體軟件yum -y install fontconfig然后把字體拷過去就行了 cd /usr/share/fonts fc-list 這是查看02、拷貝字體到指定目錄 cp simsun.ttc /usr/share/fonts/然后把字體拷過去就行了 cd /usr/share/fonts 03、驗證字體安裝情況 fc-list //"宋體"中文字體…

滿江紅.互聯網

小小寰球&#xff0c;有多少信息瞬抵。互聯網&#xff0c;幾多濤生&#xff0c;幾多云逸。螞蟻緣槐近大國&#xff0c;菜鳥搭枝成鳳翼。正臺風綠葉下臨安&#xff0c;何足懼?多少事&#xff0c;從來急&#xff1b;天地轉&#xff0c;光陰隙。一百年太久&#xff0c;只爭朝夕。…

Python startswith()函數 與 endswith函數

函數&#xff1a;startswith() 作用&#xff1a;判斷字符串是否以指定字符或子字符串開頭一、函數說明語法&#xff1a;string.startswith(str, beg0,endlen(string)) 或string[beg:end].startswith(str)參數說明&#xff1a;string&#xff1a; 被檢測的字符串str&#xff1a;…

GitLab 在多分支中的一個push

情景&#xff1a;a.本地庫新建的分支&#xff0c;而Git服務器沒有這個分支服務器分支master本地新建分支&#xff1a;rdar-MS&#xff0c;并git checkout rdar-MS上masterrdar-testrdar-MS更改rdar-MS分支上的文件&#xff0c;git add .git commit -m " "后&#xf…

php post 獲取xml,php 獲取post的xml數據并解析示例

這篇文章主要為大家詳細介紹了php 獲取post的xml數據并解析示例&#xff0c;具有一定的參考價值&#xff0c;可以用來參考一下。對php獲取post過來的xml數據并解析感興趣的小伙伴&#xff0c;下面一起跟隨512筆記的小編兩巴掌來看看吧&#xff01;如何獲取請求的xml數據,對方通…

值得一用的Windows軟件

該清單僅本人使用后所作推薦&#xff0c;可能會比較主觀&#xff0c;所以僅供參考哈。可能某些軟件鏈接會失效&#xff0c;可以自行百度搜索下載即可。 殺軟 火絨安全&#xff1a;國內殺毒軟件的一股清流&#xff0c;界面簡潔&#xff0c;無推廣。現在已經開啟了 5.0 公測&…

《JavaScript模式》讀書筆記一:基本技巧

《JavaScript模式》的讀書筆記&#xff0c;個人向&#xff01;更新進度隨我的閱讀進度 基本技巧 盡量少用全局變量 防止變量污染注意JS變量提升問題盡量使用單一var模式&#xff0c;只使用一個var在函數頂部進行變量聲明function fun () {var a 1,b2,sum ab,函數體//} for循環…

Python字符串處理全攻略(四):常用內置方法輕松掌握

文章目錄 引言Python字符串常用內置方法切片功能介紹語法示例注意事項 str.isalpha()功能介紹語法示例注意事項 str.isdigit()功能介紹語法示例注意事項總結 str.isalnum()功能介紹語法示例注意事項總結 str.isupper()功能介紹語法示例注意事項 islower()功能介紹語法示例注意事…

php空間限制磁盤限額,ORA-01536:超出表空間XXXX的空間限額

問題描述&#xff1a;在FMIS2600用戶下進行某個DDL或DML操作時&#xff0c;提示&#xff1a;ORA-01536&#xff1a;超出表空間FMIS2600 的空間限額 或者 ORA-01950: 對表空間/*******************ORA-01536&#xff1a;超出表空間XXXX的空間限額*******************//*********…