RabbitMQ詳解(三)

一、分發到多Consumer(fanout)
二、Routing路由(Direct)
三、主題路由(Topic)

一、分發到多Consumer(fanout)
將同一個Message deliver到多個Consumer中。這個模式也被稱為"publish/subscribe"
創建一個日志系統,包含兩部分:第一部分發出log(Producer),第二部分接收到并打印(Consumer)。兩個Consumer,第一個將log寫到物理磁盤上;第二個將log輸出的屏幕。

1.發送消息流程:
?? ?1.Producer發送的Message實際上是發到了Exchange中。
?? ?2.Exchanges從Producer接收message投遞到queue中
?? ?3.Prducer發送的消息只是到達了Exchange中,Exchange具有不同的類型實現不同的分發方式

Exchnges的類型:direct、topic和fanout
fanout就是廣播模式,會將所有的Message都放到它所知道的queue中
channel.exchange_declare(exchange='logs', ?
?? ?type='fanout')?? //創建一個名字為logs,類型為fanout的Exchange:

1
2
3
4
5
6
7
8
9
10
11
[root@node112?~]#?rabbitmqctl?list_exchanges?//查看所有的Exchanges
Listing?exchanges?...
logs??fanout
amq.direct????direct
amq.fanout????fanout
amq.headers????headers
amq.match????headers
amq.rabbitmq.log????topic
amq.rabbitmq.trace????topic
amq.topic????topic
...done.

注意:amq.* exchanges 和the default (unnamed)exchange是RabbitMQ默認創建的。?

通過exchange,而不是routing_key來publish Message:
channel.basic_publish(exchange='logs', ?
?? ?routing_key='', ?
?? ?body=message) ?

2.臨時隊列
截至現在,我們用的queue都是有名字的:第一個是hello,第二個是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成為可能。
但是對于我們將要構建的日志系統,并不需要有名字的queue。我們希望得到所有的log,而不是它們中間的一部分。而且我們只對當前的log感興趣。為了實現這個目標,我們需要兩件事情:
?? ?1)每當Consumer連接時,我們需要一個新的,空的queue。因為我們不對老的log感興趣。幸運的是,如果在聲明queue時不指定名字,那么RabbitMQ會隨機為我們選擇這個名字。方法:
?? ?result = channel.queue_declare()?
?? ?通過result.method.queue 可以取得queue的名字。基本上都是這個樣子:amq.gen-JzTY20BRgKO-HjmUJj0wLg。
?? ?2)當Consumer關閉連接時,這個queue要被deleted。可以加個exclusive的參數。方法:
?? ?result = channel.queue_declare(exclusive=True)?? //每次獲取的都是新的,單獨使用的
?? ?
3.Bindings綁定
?? ?創建好fanout類型的Exchange和沒有名字的queue后(實際上是RabbitMQ幫我們取的名字)Exchange通過bindings把它的Message發送到目標queue
?? ?channel.queue_bind(exchange='logs', ?
?? ??? ?queue=result.method.queue) ??? ?
?? ?使用命令rabbitmqctl list_bindings 查看bindings
?? ?
4.最終代碼
拓撲圖:
1.png

Producer,在這里就是產生log的program,基本上和前幾個都差不多。最主要的區別就是publish通過了exchange而不是routing_key。
emit_log.py script:
===========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/usr/bin/env?python
import?pika
import?sys
connection?=?pika.BlockingConnection(pika.ConnectionParameters(
????host='localhost'))
channel?=?connection.channel()
channel.exchange_declare(exchange='logs',
????type='fanout')
message?=?'?'.join(sys.argv[1:])?or?"info:?Hello?World!"
channel.basic_publish(exchange='logs',
????routing_key='',
????body=message)
print?"?[x]?Sent?%r"?%?(message,)
connection.close()

還有一點要注意的是我們聲明了exchange。publish到一個不存在的exchange是被禁止的。如果沒有queue bindings exchange的話,log是被丟棄的。
Consumer:receive_logs.py:
===========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env?python
import?pika
connection?=?pika.BlockingConnection(pika.ConnectionParameters(
????host='localhost'))
channel?=?connection.channel()
channel.exchange_declare(exchange='logs',
????type='fanout')
result?=?channel.queue_declare(exclusive=True)
queue_name?=?result.method.queue
channel.queue_bind(exchange='logs',
????queue=queue_name)
print?'?[*]?Waiting?for?logs.?To?exit?press?CTRL+C'
def?callback(ch,?method,?properties,?body):
????print?"?[x]?%r"?%?(body,)
channel.basic_consume(callback,
????queue=queue_name,
????no_ack=True)
channel.start_consuming()

試運行:
?? ?Consumer1:$ python receive_logs.py > logs_from_rabbit.log? //追加到文件
?? ?Consumer2:python receive_logs.py //輸出到屏幕
?? ?Producer:python emit_log.py
也可通過修改callback自己寫文件
輸出結果如圖:
3.png

二、Routing路由(Direct)
對于上一個日志系統改進。能夠使用不同的severity來監聽不同等級的log。比如我們希望只有error的log才保存到磁盤上。
1.Bindings綁定
之前的綁定
channel.queue_bind(exchange=exchange_name, ?
?? ?queue=queue_name) ?
綁定其實就是關聯了exchange和queue。或者這么說:queue對exchagne的內容感興趣,exchange要把它的Message deliver到queue中。
實際上,綁定可以帶routing_key 這個參數。其實這個參數的名稱和basic_publish 的參數名是相同了。為了避免混淆,我們把它成為binding key。
?? ?使用一個key來創建binding :
channel.queue_bind(exchange=exchange_name, ?
?? ?queue=queue_name, ?
?? ?routing_key='black')?
對于fanout的exchange來說,這個參數是被忽略的。

2.Direct Exchange
通過Bindings key完全匹配
圖Direct路由模型
Direct.png

exchange X和兩個queue綁定在一起。Q1的binding key是orange。Q2的binding key是black和green。
當P publish key是orange時,exchange會把它放到Q1。如果是black或者green那么就會到Q2。其余的Message都會被丟棄。

3.多重綁定(Multiple Bindings)
多個queue綁定同一個key是可以的。對于下圖的例子,Q1和Q2都綁定了black。也就是說,對于routing key是black的Message,會被deliver到Q1和Q2。其余的Message都會被丟棄。
圖muliti-bindings
multi.png

4.生產者和消費者
生產者:
===========================================================================

1
2
3
4
5
6
7
8
channel.exchange_declare(exchange='direct_logs',??
????type='direct')??
//創建一個direct的exchange。使用log的severity作為routing?key,這樣Consumer可以針對不同severity的log進行不同的處理。
publish:
channel.basic_publish(exchange='direct_logs',??
????routing_key=severity,?
????body=message)??
//涉及三種severity:'info',?'warning',?'error'.

消費者:
===========================================================================

1
2
3
4
5
6
7
result?=?channel.queue_declare(exclusive=True)??
queue_name?=?result.method.queue??
for?severity?in?severities:??
????channel.queue_bind(exchange='direct_logs',??
????????queue=queue_name,??
????????routing_key=severity)?
//queue需要綁定severity

5.最終版本
圖:direct_2
direct_2.png

emit_log_direct.py?
===========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env?python
import?pika
import?sys
connection?=?pika.BlockingConnection(pika.ConnectionParameters(
????host='localhost'))
channel?=?connection.channel()
channel.exchange_declare(exchange='direct_logs',
????type='direct')
severity?=?sys.argv[1]?if?len(sys.argv)?>?1?else?'info'
message?=?'?'.join(sys.argv[2:])?or?'Hello?World!'
channel.basic_publish(exchange='direct_logs',
????routing_key=severity,
????body=message)
print?"?[x]?Sent?%r:%r"?%?(severity,?message)
connection.close()

receive_logs_direct.py:?
===========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env?python??
import?pika??
import?sys??
connection?=?pika.BlockingConnection(pika.ConnectionParameters(??
????host='localhost'))??
channel?=?connection.channel()??
channel.exchange_declare(exchange='direct_logs',??
????type='direct')??
result?=?channel.queue_declare(exclusive=True)??
queue_name?=?result.method.queue??
severities?=?sys.argv[1:]??
if?not?severities:??
????print?>>?sys.stderr,?"Usage:?%s?[info]?[warning]?[error]"?%?\??
????????(sys.argv[0],)??
????sys.exit(1)??
for?severity?in?severities:??????
????channel.queue_bind(exchange='direct_logs',??
????????queue=queue_name,??
????????routing_key=severity)??
print?'?[*]?Waiting?for?logs.?To?exit?press?CTRL+C'??
def?callback(ch,?method,?properties,?body):??
????print?"?[x]?%r:%r"?%?(method.routing_key,?body,)??
channel.basic_consume(callback,??
????queue=queue_name,??
????no_ack=True)??
channel.start_consuming()

===========================================================================
試運行:
$ python receive_logs_direct.py warning error > logs_from_rabbit.log?
?? ?//把warning和error的log記錄到一個文件中
$ python receive_logs_direct.py info warning error ?
?? ?//打印所有log到屏幕?? ?

三、主題路由(Topic)
1.Topic exchange
Message的routing_key使用限制,不能使任意的。格式是以點號“."分割的字符表。
比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。你可以放任意的key在routing_key中,當然最長不能超過255 bytes。
?? ?對于routing_key,有兩個特殊字符(在正則表達式里叫元字符):
?? ?* (星號) 代表任意 一個單詞
?? ?# (hash) 0個或者多個單詞
示例:
Producer發送消息時需要設置routing_key,routing_key包含三個單詞和兩個點號。
?? ?第一個key是描述了celerity(靈巧,敏捷),第二個是colour(色彩),第三個是species(物種):"<celerity>.<colour>.<species>"。
在這里我們創建了兩個綁定: Q1 的binding key 是"*.orange.*"; Q2 是? "*.*.rabbit" 和 "lazy.#":
?? ?Q1 感興趣所有orange顏色的動物
?? ?Q2 感興趣所有的rabbits和所有的lazy的
比如routing_key是 "quick.orange.rabbit"將會發送到Q1和Q2中。消息"lazy.orange.elephant" 也會發送到Q1和Q2。但是"quick.orange.fox" 會發送到Q1;"lazy.brown.fox"會發送到Q2。"lazy.pink.rabbit" 也會發送到Q2,但是盡管兩個routing_key都匹配,它也只是發送一次。"quick.brown.fox" 會被丟棄。
如果發送的單詞不是3個呢? 答案要看情況,因為#是可以匹配0個或任意個單詞。比如"orange" or "quick.orange.male.rabbit",它們會被丟棄。如果是lazy那么就會進入Q2。類似的還有 "lazy.orange.male.rabbit",盡管它包含四個單詞。

Topic exchange和其他exchange
?? ?由于有"*" (star) and "#" (hash), Topic exchange 非常強大并且可以轉化為其他的exchange:
?? ?如果binding_key 是 "#" - 它會接收所有的Message,不管routing_key是什么,就像是fanout exchange。
?? ?如果 "*" (star) and "#" (hash) 沒有被使用,那么topic exchange就變成了direct exchange。

2.代碼實現
The code for emit_log_topic.py:
========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env?python
import?pika
import?sys
connection?=?pika.BlockingConnection(pika.ConnectionParameters(
????host='localhost'))
channel?=?connection.channel()
channel.exchange_declare(exchange='topic_logs',
????type='topic')
routing_key?=?sys.argv[1]?if?len(sys.argv)?>?1?else?'anonymous.info'
message?=?'?'.join(sys.argv[2:])?or?'Hello?World!'
channel.basic_publish(exchange='topic_logs',
????routing_key=routing_key,
????body=message)
print?"?[x]?Sent?%r:%r"?%?(routing_key,?message)
connection.close()

========================================================================

The code for receive_logs_topic.py: ?? ?
========================================================================?? ?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env?python
import?pika
import?sys
connection?=?pika.BlockingConnection(pika.ConnectionParameters(
????host='localhost'))
channel?=?connection.channel()
channel.exchange_declare(exchange='topic_logs',
????type='topic')
?????
result?=?channel.queue_declare(exclusive=True)
queue_name?=?result.method.queue
binding_keys?=?sys.argv[1:]
if?not?binding_keys:
????print?>>?sys.stderr,?"Usage:?%s?[binding_key]..."?%?(sys.argv[0],)
????sys.exit(1)
for?binding_key?in?binding_keys:
????channel.queue_bind(exchange='topic_logs',
????????queue=queue_name,
????????routing_key=binding_key)
print?'?[*]?Waiting?for?logs.?To?exit?press?CTRL+C'
def?callback(ch,?method,?properties,?body):
????print?"?[x]?%r:%r"?%?(method.routing_key,?body,)
channel.basic_consume(callback,
????queue=queue_name,
????no_ack=True)
channel.start_consuming()

?? ?
3.運行和結果
??? python receive_logs_topic.py "#"? //接收所有的log
??? python receive_logs_topic.py "kern.*"? //接收所有kern facility的log
??? python receive_logs_topic.py "*.critical"? //僅僅接收critical的log:?
??? python receive_logs_topic.py "kern.*" "*.critical"? //可以創建多個綁定:?
??? python emit_log_topic.py "kern.critical" "A critical kernel error"? //Producer產生一個log:"kern.critical" type:?
?? ?
參考:?? ?
http://www.rabbitmq.com/tutorials/tutorial-three-python.html










本文轉自MT_IT51CTO博客,原文鏈接:http://blog.51cto.com/hmtk520/2051247,如需轉載請自行聯系原作者

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/286089.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/286089.shtml
英文地址,請注明出處:http://en.pswp.cn/news/286089.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

重磅 | Linux內核5.19初步支持LoongArch架構

經過龍芯中科與內核社區一年多的緊密合作&#xff0c;北京時間2022年6月4日清晨&#xff0c;Linux內核社區正式合并LoongArch架構支持代碼。隨著Linux-5.19的rc1版本的正式發布&#xff0c;LoongArch體系結構主體部分的源碼已合并到內核主線之中&#xff0c;其余相關代碼正在進…

C語言試題五十之請編寫一個函數void function(char *ss),其功能時:將字符串ss中所有下標為奇數位置上的字母轉換為大寫(若位置上不是字母,則不轉換)。

??個人主頁:個人主頁 ??系列專欄:C語言試題200例目錄 ??推薦一款刷算法、筆試、面經、拿大公司offer神器 ?? 點擊跳轉進入網站 ?作者簡介:大家好,我是碼莎拉蒂,CSDN博客專家(全站排名Top 50),阿里云博客專家、51CTO博客專家、華為云享專家 1、題目 請編寫一個…

【MATLAB統計分析與應用100例】案例011:matlab讀取Excel數據,調用regress函數作一元線性回歸分析

數據擬合效果預覽: 文章目錄 1. 讀取數據,繪制散點圖2. 計算相關系數3. 繪制回歸直線4. 剔除異常數據,重新調用regress函數作一元線性回歸1. 讀取數據,繪制散點圖 ClimateData = xlsread(examp08_01.xls); % 從Excel文件讀取數據 x &

“*** IS NOT TRANSLATED IN …….. 解決辦法

首先引起提示的原因是因為Lint 代碼檢查工具發現你的項目中&#xff08;或者引用的三方庫&#xff09;有部分string.xml文件內容做了國際化操作&#xff0c;但卻不完整&#xff0c;有些文本內容并沒有相應的國際化翻譯&#xff0c;在android開發中常見于項目引用的Libraries第三…

[轉] ArcEngine 產生專題圖

小生原文 ArcEngine 產生專題圖 ArcEngine提供多個著色對象用于產生專題圖&#xff0c;可以使用標準著色方案&#xff0c;也可以自定義著色方案&#xff0c;ArcEngine提供8中標準著色方案。 一、SimpleRenderer專題圖 是使用單一符號進行著色分類&#xff0c;不涉及對要素的數據…

iVX無代碼挑戰五秒游戲制作

一、五秒挑戰游戲簡介及思考 制作iVX 低代碼項目需要進入在線IDE&#xff1a;https://editor.ivx.cn/ 五秒挑戰游戲指的是點擊一個按鈕開始計時&#xff0c;隨后需要用戶再次點擊計時按鈕&#xff0c;將會停止計時&#xff0c;當計時的時間等于五秒時將挑戰成功&#xff0c;否…

C語言試題五十一之已知學生的記錄是由學號和學習成績構成,n名學生的數據已存入s結構體數組中。請編寫函數fun,該函數的功能是:找出成績最高的學生記錄,通過形參返回主函數(規定只有一個最高分)。

??個人主頁:個人主頁 ??系列專欄:C語言試題200例目錄 ??推薦一款刷算法、筆試、面經、拿大公司offer神器 ?? 點擊跳轉進入網站 ?作者簡介:大家好,我是碼莎拉蒂,CSDN博客專家(全站排名Top 50),阿里云博客專家、51CTO博客專家、華為云享專家 1、題目 請編寫一個…

CSS 巧用 :before和:after

前幾天的晚上較全面的去看了下css的一些文檔和資料&#xff0c;大部分的樣式運用都沒什么大問題了&#xff0c;只是有些許較陌生&#xff0c;但是也知道他們的存在和實現的是什么樣式。今天主要想在這篇學習筆記中寫的也不多&#xff0c;主要是針對:before和:after寫一些內容&a…

MAUI 入門教程系列(4.通用主機)

前言對于ASP.NET Core 開發人員而言, 這并不陌生, 當ASP.NET Core應用程序啟動時, 會創建默認的應用程序主機, 我們可以為應用程序配置所有的依賴關系、系統設置, 最終啟動。如下所示:using IHost host Host.CreateDefaultBuilder(args).ConfigureServices((_, services) >…

【MATLAB統計分析與應用100例】案例012:matlab讀取Excel數據,調用robustfit函數作穩健回歸

穩健回歸效果預覽: 文章目錄 1. 讀取數據2. 調用robustfit函數作穩健回歸3 .繪制殘差和權重的散點圖4. 繪制regress函數和robustfit函數對應的回歸直線5. 擬合效果1. 讀取數據 ClimateData = xlsread(examp08_01.xls); % 從Excel文件讀取數據 x

Android單擊、長按獲取當前觸點坐標下(TextView)文字字符

package com.*.*.*.utils;import android.graphics.Rect; import android.text.Layout; import android.widget.TextView;public class TextViewUtils {/**獲取TextView某一個字符的坐標位置return 返回的是相對坐標parms tvparms index 字符索引*/public static Rect getTextV…

后臺頁制作01《ivx低代碼簽到系統制作》

制作iVX 低代碼項目需要進入在線IDE&#xff1a;https://editor.ivx.cn/ 一、簽到系統思考 簽到系統一般是指公布一個簽到鏈接或者二維碼&#xff0c;隨后用戶掃碼后即可完成簽到。 那如何制作呢&#xff1f;首先我們可以先不考慮簽到頁面的制作&#xff0c;既然簽到暫時沒有…

個人作業-Week2

第一部分 調研&#xff0c; 評測 運行平臺 win 8 軟件版本&#xff1a;微軟必應詞典桌面版 3.5.2 BUG標題&#xff1a;必應背單詞無法發音 BUG詳細描述&#xff1a;如圖&#xff0c;左邊為必應詞典該單詞的搜索&#xff0c;可以發音&#xff0c;而右邊必應背單詞中該單詞的發音…

Blazor WebAssembly + Grpc Web=未來?

Blazor WebAssembly是什么首先來說說WebAssembly是什么&#xff0c;WebAssembly是一個可以使C#,Java,Golang等靜態強類型編程語言&#xff0c;運行在瀏覽器中的標準&#xff0c;瀏覽器廠商基于此標準實現執行引擎。在實現了WebAssembly標準引擎之后&#xff0c;瀏覽器中可以執行…

C語言試題五十二之學生的記錄由學號和成績組稱個,n名大學生得數據已在主函數中放入結構體數組a中,請編寫函數fun,它的功能時:按分數的高低排列學生的記錄,高分在前。

??個人主頁:個人主頁 ??系列專欄:C語言試題200例目錄 ??推薦一款刷算法、筆試、面經、拿大公司offer神器 ?? 點擊跳轉進入網站 ?作者簡介:大家好,我是碼莎拉蒂,CSDN博客專家(全站排名Top 50),阿里云博客專家、51CTO博客專家、華為云享專家 1、題目 請編寫一個…

Xtrabackup備份MySQL

一、安裝Xtrabackup 1234# wget --no-check-certificate http://www.percona.com/downloads/percona-release/redhat/0.1-4/percona-release-0.1-4.noarch.rpm# rpm -ivh percona-release-0.1-4.noarch.rpm# yum list | grep percona# yum -y install percona-xtrabackup-24二、…

Can't create directory 'E:\Repositories\***\db\transactions\138-41.txn':

遇到這種問題應該是遷移SVN庫時丟失了文件夾引起的&#xff0c;直接在服務器上找到對應項目的db文件夾&#xff0c;手動創建“transactions”目錄和“txn-protorevs”目錄即可正常提交。

[它山之石] 一件事情,假設你不能說清楚,十有八九你就做不好

記得有一次開會。我的頭兒說了標題所寫的這句話,自己深以為然。 有過較多解決這個問題的經歷的人可能會有這種感覺&#xff0c;非常多時候&#xff0c;面對一個問題。我們即使沒有全然將之想清 楚。也可以基于已有的經驗給出一個可以work的解決方式&#xff0c;當然這樣的情況下…

【MATLAB統計分析與應用100例】案例013:matlab讀取Excel數據,調用nlinfit函數作一元非線性回歸

1. 一元線性回歸分析效果預覽 2. matlab完整實現代碼 %讀取數據,繪制散點圖** HeadData = xlsread(examp08_02.xls); %從Excel文

C語言試題五十三之將所有大于1小于整數m的非素數存入xx所指的數組中,非素數的個數通過k傳回。

??個人主頁:個人主頁 ??系列專欄:C語言試題200例目錄 ??推薦一款刷算法、筆試、面經、拿大公司offer神器 ?? 點擊跳轉進入網站 ?作者簡介:大家好,我是碼莎拉蒂,CSDN博客專家(全站排名Top 50),阿里云博客專家、51CTO博客專家、華為云享專家 1、題目 請編寫一個…