一、分發到多Consumer(fanout)
二、Routing路由(Direct)
三、主題路由(Topic)
一、分發到多Consumer(fanout)
將同一個Message deliver到多個Consumer中。這個模式也被稱為"publish/subscribe"
創建一個日志系統,包含兩部分:第一部分發出log(Producer),第二部分接收到并打印(Consumer)。兩個Consumer,第一個將log寫到物理磁盤上;第二個將log輸出的屏幕。
1.發送消息流程:
?? ?1.Producer發送的Message實際上是發到了Exchange中。
?? ?2.Exchanges從Producer接收message投遞到queue中
?? ?3.Prducer發送的消息只是到達了Exchange中,Exchange具有不同的類型實現不同的分發方式
Exchnges的類型:direct、topic和fanout
fanout就是廣播模式,會將所有的Message都放到它所知道的queue中
channel.exchange_declare(exchange='logs', ?
?? ?type='fanout')?? //創建一個名字為logs,類型為fanout的Exchange:
1 2 3 4 5 6 7 8 9 10 11 | [root@node 112 ?~]#?rabbitmqctl?list_exchanges?//查看所有的Exchanges Listing?exchanges?... logs??fanout amq.direct????direct amq.fanout????fanout amq.headers????headers amq.match????headers amq.rabbitmq.log????topic amq.rabbitmq.trace????topic amq.topic????topic ...done. |
注意:amq.* exchanges 和the default (unnamed)exchange是RabbitMQ默認創建的。?
通過exchange,而不是routing_key來publish Message:
channel.basic_publish(exchange='logs', ?
?? ?routing_key='', ?
?? ?body=message) ?
2.臨時隊列
截至現在,我們用的queue都是有名字的:第一個是hello,第二個是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成為可能。
但是對于我們將要構建的日志系統,并不需要有名字的queue。我們希望得到所有的log,而不是它們中間的一部分。而且我們只對當前的log感興趣。為了實現這個目標,我們需要兩件事情:
?? ?1)每當Consumer連接時,我們需要一個新的,空的queue。因為我們不對老的log感興趣。幸運的是,如果在聲明queue時不指定名字,那么RabbitMQ會隨機為我們選擇這個名字。方法:
?? ?result = channel.queue_declare()?
?? ?通過result.method.queue 可以取得queue的名字。基本上都是這個樣子:amq.gen-JzTY20BRgKO-HjmUJj0wLg。
?? ?2)當Consumer關閉連接時,這個queue要被deleted。可以加個exclusive的參數。方法:
?? ?result = channel.queue_declare(exclusive=True)?? //每次獲取的都是新的,單獨使用的
?? ?
3.Bindings綁定
?? ?創建好fanout類型的Exchange和沒有名字的queue后(實際上是RabbitMQ幫我們取的名字)Exchange通過bindings把它的Message發送到目標queue
?? ?channel.queue_bind(exchange='logs', ?
?? ??? ?queue=result.method.queue) ??? ?
?? ?使用命令rabbitmqctl list_bindings 查看bindings
?? ?
4.最終代碼
拓撲圖:
Producer,在這里就是產生log的program,基本上和前幾個都差不多。最主要的區別就是publish通過了exchange而不是routing_key。
emit_log.py script:
===========================================================================
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | #!/usr/bin/env?python import?pika import?sys connection?=?pika.BlockingConnection(pika.ConnectionParameters( ???? host= 'localhost' )) channel?=?connection.channel() channel.exchange_declare(exchange= 'logs' , ???? type= 'fanout' ) message?=? '?' .join(sys.argv[ 1: ])?or? "info:?Hello?World!" channel.basic_publish(exchange= 'logs' , ???? routing_key= '' , ???? body=message) print ?"?[x]?Sent?%r" ?%?(message,) connection.close() |
還有一點要注意的是我們聲明了exchange。publish到一個不存在的exchange是被禁止的。如果沒有queue bindings exchange的話,log是被丟棄的。
Consumer:receive_logs.py:
===========================================================================
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | #!/usr/bin/env?python import?pika connection?=?pika.BlockingConnection(pika.ConnectionParameters( ???? host= 'localhost' )) channel?=?connection.channel() channel.exchange_declare(exchange= 'logs' , ???? type= 'fanout' ) result?=?channel.queue_declare(exclusive=True) queue_name?=?result.method.queue channel.queue_bind(exchange= 'logs' , ???? queue=queue_name) print ?'?[*]?Waiting?for?logs.?To?exit?press?CTRL+C' def?callback(ch,?method,?properties,?body): ???? print ?"?[x]?%r" ?%?(body,) channel.basic_consume(callback, ???? queue=queue_name, ???? no_ack=True) channel.start_consuming() |
試運行:
?? ?Consumer1:$ python receive_logs.py > logs_from_rabbit.log? //追加到文件
?? ?Consumer2:python receive_logs.py //輸出到屏幕
?? ?Producer:python emit_log.py
也可通過修改callback自己寫文件
輸出結果如圖:
二、Routing路由(Direct)
對于上一個日志系統改進。能夠使用不同的severity來監聽不同等級的log。比如我們希望只有error的log才保存到磁盤上。
1.Bindings綁定
之前的綁定
channel.queue_bind(exchange=exchange_name, ?
?? ?queue=queue_name) ?
綁定其實就是關聯了exchange和queue。或者這么說:queue對exchagne的內容感興趣,exchange要把它的Message deliver到queue中。
實際上,綁定可以帶routing_key 這個參數。其實這個參數的名稱和basic_publish 的參數名是相同了。為了避免混淆,我們把它成為binding key。
?? ?使用一個key來創建binding :
channel.queue_bind(exchange=exchange_name, ?
?? ?queue=queue_name, ?
?? ?routing_key='black')?
對于fanout的exchange來說,這個參數是被忽略的。
2.Direct Exchange
通過Bindings key完全匹配
圖Direct路由模型
exchange X和兩個queue綁定在一起。Q1的binding key是orange。Q2的binding key是black和green。
當P publish key是orange時,exchange會把它放到Q1。如果是black或者green那么就會到Q2。其余的Message都會被丟棄。
3.多重綁定(Multiple Bindings)
多個queue綁定同一個key是可以的。對于下圖的例子,Q1和Q2都綁定了black。也就是說,對于routing key是black的Message,會被deliver到Q1和Q2。其余的Message都會被丟棄。
圖muliti-bindings
4.生產者和消費者
生產者:
===========================================================================
1 2 3 4 5 6 7 8 | channel.exchange_declare(exchange= 'direct_logs' ,?? ???? type= 'direct' )?? //創建一個direct的exchange。使用log的severity作為routing?key,這樣Consumer可以針對不同severity的log進行不同的處理。 publish: channel.basic_publish(exchange= 'direct_logs' ,?? ???? routing_key=severity,? ???? body=message)?? //涉及三種severity: 'info' ,? 'warning' ,? 'error' . |
消費者:
===========================================================================
1 2 3 4 5 6 7 | result?=?channel.queue_declare(exclusive=True)?? queue_name?=?result.method.queue?? for?severity?in?severities:?? ???? channel.queue_bind(exchange= 'direct_logs' ,?? ???????? queue=queue_name,?? ???????? routing_key=severity)? //queue需要綁定severity |
5.最終版本
圖:direct_2
emit_log_direct.py?
===========================================================================
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | #!/usr/bin/env?python import?pika import?sys connection?=?pika.BlockingConnection(pika.ConnectionParameters( ???? host= 'localhost' )) channel?=?connection.channel() channel.exchange_declare(exchange= 'direct_logs' , ???? type= 'direct' ) severity?=?sys.argv[ 1 ]?if?len(sys.argv)?>? 1 ?else? 'info' message?=? '?' .join(sys.argv[ 2: ])?or? 'Hello?World!' channel.basic_publish(exchange= 'direct_logs' , ???? routing_key=severity, ???? body=message) print ?"?[x]?Sent?%r:%r" ?%?(severity,?message) connection.close() |
receive_logs_direct.py:?
===========================================================================
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | #!/usr/bin/env?python?? import?pika?? import?sys?? connection?=?pika.BlockingConnection(pika.ConnectionParameters(?? ???? host= 'localhost' ))?? channel?=?connection.channel()?? channel.exchange_declare(exchange= 'direct_logs' ,?? ???? type= 'direct' )?? result?=?channel.queue_declare(exclusive=True)?? queue_name?=?result.method.queue?? severities?=?sys.argv[ 1: ]?? if?not?severities:?? ???? print ?>>?sys.stderr,? "Usage:?%s?[info]?[warning]?[error]" ?%?\?? ???????? (sys.argv[ 0 ],)?? ???? sys.exit( 1 )?? for?severity?in?severities:?????? ???? channel.queue_bind(exchange= 'direct_logs' ,?? ???????? queue=queue_name,?? ???????? routing_key=severity)?? print ?'?[*]?Waiting?for?logs.?To?exit?press?CTRL+C' ??def?callback(ch,?method,?properties,?body):?? ???? print ?"?[x]?%r:%r" ?%?(method.routing_key,?body,)?? channel.basic_consume(callback,?? ???? queue=queue_name,?? ???? no_ack=True)?? channel.start_consuming() |
===========================================================================
試運行:
$ python receive_logs_direct.py warning error > logs_from_rabbit.log?
?? ?//把warning和error的log記錄到一個文件中
$ python receive_logs_direct.py info warning error ?
?? ?//打印所有log到屏幕?? ?
三、主題路由(Topic)
1.Topic exchange
Message的routing_key使用限制,不能使任意的。格式是以點號“."分割的字符表。
比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。你可以放任意的key在routing_key中,當然最長不能超過255 bytes。
?? ?對于routing_key,有兩個特殊字符(在正則表達式里叫元字符):
?? ?* (星號) 代表任意 一個單詞
?? ?# (hash) 0個或者多個單詞
示例:
Producer發送消息時需要設置routing_key,routing_key包含三個單詞和兩個點號。
?? ?第一個key是描述了celerity(靈巧,敏捷),第二個是colour(色彩),第三個是species(物種):"<celerity>.<colour>.<species>"。
在這里我們創建了兩個綁定: Q1 的binding key 是"*.orange.*"; Q2 是? "*.*.rabbit" 和 "lazy.#":
?? ?Q1 感興趣所有orange顏色的動物
?? ?Q2 感興趣所有的rabbits和所有的lazy的
比如routing_key是 "quick.orange.rabbit"將會發送到Q1和Q2中。消息"lazy.orange.elephant" 也會發送到Q1和Q2。但是"quick.orange.fox" 會發送到Q1;"lazy.brown.fox"會發送到Q2。"lazy.pink.rabbit" 也會發送到Q2,但是盡管兩個routing_key都匹配,它也只是發送一次。"quick.brown.fox" 會被丟棄。
如果發送的單詞不是3個呢? 答案要看情況,因為#是可以匹配0個或任意個單詞。比如"orange" or "quick.orange.male.rabbit",它們會被丟棄。如果是lazy那么就會進入Q2。類似的還有 "lazy.orange.male.rabbit",盡管它包含四個單詞。
Topic exchange和其他exchange
?? ?由于有"*" (star) and "#" (hash), Topic exchange 非常強大并且可以轉化為其他的exchange:
?? ?如果binding_key 是 "#" - 它會接收所有的Message,不管routing_key是什么,就像是fanout exchange。
?? ?如果 "*" (star) and "#" (hash) 沒有被使用,那么topic exchange就變成了direct exchange。
2.代碼實現
The code for emit_log_topic.py:
========================================================================
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | #!/usr/bin/env?python import?pika import?sys connection?=?pika.BlockingConnection(pika.ConnectionParameters( ???? host= 'localhost' )) channel?=?connection.channel() channel.exchange_declare(exchange= 'topic_logs' , ???? type= 'topic' ) routing_key?=?sys.argv[ 1 ]?if?len(sys.argv)?>? 1 ?else? 'anonymous.info' message?=? '?' .join(sys.argv[ 2: ])?or? 'Hello?World!' channel.basic_publish(exchange= 'topic_logs' , ???? routing_key=routing_key, ???? body=message) print ?"?[x]?Sent?%r:%r" ?%?(routing_key,?message) connection.close() |
========================================================================
The code for receive_logs_topic.py: ?? ?
========================================================================?? ?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | #!/usr/bin/env?python import?pika import?sys connection?=?pika.BlockingConnection(pika.ConnectionParameters( ???? host= 'localhost' )) channel?=?connection.channel() channel.exchange_declare(exchange= 'topic_logs' , ???? type= 'topic' ) ???? ?result?=?channel.queue_declare(exclusive=True) queue_name?=?result.method.queue binding_keys?=?sys.argv[ 1: ] if?not?binding_keys: ???? print ?>>?sys.stderr,? "Usage:?%s?[binding_key]..." ?%?(sys.argv[ 0 ],) ???? sys.exit( 1 ) for?binding_key?in?binding_keys: ???? channel.queue_bind(exchange= 'topic_logs' , ???????? queue=queue_name, ???????? routing_key=binding_key) print ?'?[*]?Waiting?for?logs.?To?exit?press?CTRL+C' def?callback(ch,?method,?properties,?body): ???? print ?"?[x]?%r:%r" ?%?(method.routing_key,?body,) channel.basic_consume(callback, ???? queue=queue_name, ???? no_ack=True) channel.start_consuming() |
?? ?
3.運行和結果
??? python receive_logs_topic.py "#"? //接收所有的log
??? python receive_logs_topic.py "kern.*"? //接收所有kern facility的log
??? python receive_logs_topic.py "*.critical"? //僅僅接收critical的log:?
??? python receive_logs_topic.py "kern.*" "*.critical"? //可以創建多個綁定:?
??? python emit_log_topic.py "kern.critical" "A critical kernel error"? //Producer產生一個log:"kern.critical" type:?
?? ?
參考:?? ?
http://www.rabbitmq.com/tutorials/tutorial-three-python.html
本文轉自MT_IT51CTO博客,原文鏈接:http://blog.51cto.com/hmtk520/2051247,如需轉載請自行聯系原作者