AMQP RabbitMQ

轉載:http://blog.ftofficer.com/2010/03/translation-rabbitmq-python-rabbits-and-warrens/

官方介紹:http://www.rabbitmq.com/erlang-client-user-guide.html

開始吧

AMQP當中有四個概念非常重要:虛擬主機(virtual host),交換機(exchange),隊列(queue)和綁定(binding)。一個虛擬主機持有一組交換機、隊列和綁定。為什么需要多個虛擬主機呢?很簡單,RabbitMQ當中,用戶只能在虛擬主機的粒度進行權限控制。因此,如果需要禁止A組訪問B組的交換機/隊列/綁定,必須為A和B分別創建一個虛擬主機。每一個RabbitMQ服務器都有一個默認的虛擬主機“/”。如果這就夠了,那現在就可以開始了。

交換機,隊列,還有綁定……天哪!

剛開始我思維的列車就是在這里脫軌的…… 這些鬼東西怎么結合起來的?

隊列(Queues)是你的消息(messages)的終點,可以理解成裝消息的容器。消息就一直在里面,直到有客戶端(也就是消費者,Consumer)連接到這個隊列并且將其取走為止。不過。你可以將一個隊列配置成這樣的:一旦消息進入這個隊列,biu~,它就煙消云散了。這個有點跑題了……

需要記住的是,隊列是由消費者(Consumer)通過程序建立的,不是通過配置文件或者命令行工具。這沒什么問題,如果一個消費者試圖創建一個已經存在的隊列,RabbitMQ就會起來拍拍他的腦袋,笑一笑,然后忽略這個請求。因此你可以將消息隊列的配置寫在應用程序的代碼里面。這個概念不錯。

OK,你已經創建并且連接到了你的隊列,你的消費者程序正在百無聊賴的敲著手指等待消息的到來,敲啊,敲啊…… 沒有消息。發生了什么?你當然需要先把一個消息放進隊列才行。不過要做這個,你需要一個交換機(Exchange)……

交換機可以理解成具有路由表的路由程序,僅此而已。每個消息都有一個稱為路由鍵(routing key)的屬性,就是一個簡單的字符串。交換機當中有一系列的綁定(binding),即路由規則(routes),例如,指明具有路由鍵 “X” 的消息要到名為timbuku的隊列當中去。先不討論這個,我們有點超前了。

你的消費者程序要負責創建你的交換機(復數)。啥?你是說你可以有多個交換機?是的,這個可以有,不過為啥?很簡單,每個交換機在自己獨立的進程當中執行,因此增加多個交換機就是增加多個進程,可以充分利用服務器上的CPU核以便達到更高的效率。例如,在一個8核的服務器上,可以創建5個交換機來用5個核,另外3個核留下來做消息處理。類似的,在RabbitMQ的集群當中,你可以用類似的思路來擴展交換機一邊獲取更高的吞吐量。

OK,你已經創建了一個交換機。但是他并不知道要把消息送到哪個隊列。你需要路由規則,即綁定(binding)。一個綁定就是一個類似這樣的規則:將交換機“desert(沙漠)”當中具有路由鍵“阿里巴巴”的消息送到隊列“hideout(山洞)”里面去。換句話說,一個綁定就是一個基于路由鍵將交換機和隊列連接起來的路由規則。例如,具有路由鍵“audit”的消息需要被送到兩個隊列,“log-forever”和“alert-the-big-dude”。要做到這個,就需要創建兩個綁定,每個都連接一個交換機和一個隊列,兩者都是由“audit”路由鍵觸發。在這種情況下,交換機會復制一份消息并且把它們分別發送到兩個隊列當中。交換機不過就是一個由綁定構成的路由表。

現在復雜的東西來了:交換機有多種類型。他們都是做路由的,不過接受不同類型的綁定。為什么不創建一種交換機來處理所有類型的路由規則呢?因為每種規則用來做匹配分子的CPU開銷是不同的。例如,一個“topic”類型的交換機試圖將消息的路由鍵與類似“dogs.*”的模式進行匹配。匹配這種末端的通配符比直接將路由鍵與“dogs”比較(“direct”類型的交換機)要消耗更多的CPU。如果你不需要“topic”類型的交換機帶來的靈活性,你可以通過使用“direct”類型的交換機獲取更高的處理效率。那么有哪些類型,他們又是怎么處理的呢?

Fanout Exchange?– 不處理路由鍵。你只需要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息。Fanout交換機轉發消息是最快的。

Direct Exchange?– 處理路由鍵。需要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全匹配。這是一個完整的匹配。如果一個隊列綁定到該交換機上要求路由鍵 “dog”,則只有被標記為“dog”的消息才被轉發,不會轉發dog.puppy,也不會轉發dog.guard,只會轉發dog

Topic Exchange?– 將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。我在RedHat的朋友做了一張不錯的圖,來表明topic交換機是如何工作的:

Source:?Red Hat Messaging Tutorial: 1.3 Topic Exchange

持久化這些小東西們

你花了大量的時間來創建隊列、交換機和綁定,然后,砰~服務器程序掛了。你的隊列、交換機和綁定怎么樣了?還有,放在隊列里面但是尚未處理的消息們呢?

放松~如果你是用默認參數構造的這一切的話,那么,他們,都,biu~,灰飛煙滅了。是的,RabbitMQ重啟之后會干凈的像個新生兒。你必須重做所有的一切,亡羊補牢,如何避免將來再度發生此類杯具?

隊列和交換機有一個創建時候指定的標志durable,直譯叫做堅固的。durable的唯一含義就是具有這個標志的隊列和交換機會在重啟之后重新建立,它不表示說在隊列當中的消息會在重啟后恢復。那么如何才能做到不只是隊列和交換機,還有消息都是持久的呢?

但是首先一個問題是,你真的需要消息是持久的嗎?對于一個需要在重啟之后回復的消息來說,它需要被寫入到磁盤上,而即使是最簡單的磁盤操作也是要消耗時間的。如果和消息的內容相比,你更看重的是消息處理的速度,那么不要使用持久化的消息。不過對于我們@DigiTar來說,持久化很重要。

當你將消息發布到交換機的時候,可以指定一個標志“Delivery Mode”(投遞模式)。根據你使用的AMQP的庫不同,指定這個標志的方法可能不太一樣(我們后面會討論如何用Python搞定)。簡單的說,就是將Delivery Mode設置成2,也就是持久的(persistent)即可。一般的AMQP庫都是將Delivery Mode設置成1,也就是非持久的。所以要持久化消息的步驟如下:

  1. 將交換機設成 durable。
  2. 將隊列設成 durable。
  3. 將消息的 Delivery Mode 設置成2 。

就這樣,不是很復雜,起碼沒有造火箭復雜,不過也有可能犯點小錯誤。

下面還要羅嗦一個東西……綁定(Bindings)怎么辦?我們無法在創建綁定的時候設置成durable。沒問題,如果你綁定了一個durable的隊列和一個durable的交換機,RabbitMQ會自動保留這個綁定。類似的,如果刪除了某個隊列或交換機(無論是不是durable),依賴它的綁定都會自動刪除。

注意兩點:

  • RabbitMQ 不允許你綁定一個非堅固(non-durable)的交換機和一個durable的隊列。反之亦然。要想成功必須隊列和交換機都是durable的。
  • 一旦創建了隊列和交換機,就不能修改其標志了。例如,如果創建了一個non-durable的隊列,然后想把它改變成durable的,唯一的辦法就是刪除這個隊列然后重現創建。因此,最好仔細檢查創建的標志。

開始喂蛇了~

【譯注】說喂蛇是因為Python的圖標是條蛇。

AMQP的一個空白地帶是如何在Python當中使用。對于其他語言有一大坨材料。

  • Java –?http://www.rabbitmq.com/java-client.html
  • .NET –?http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.5.0/rabbitmq-dotnet-client-1.5.0-user-guide.pdf
  • Ruby –?http://somic.org/2008/06/24/ruby-amqp-rabbitmq-example/

但是對Python老兄來說,你需要花點時間來挖掘一下。所以我寫了這個,這樣別的家伙們就不需要經歷我這種抓狂的過程了。

首先,我們需要一個Python的AMQP庫。有兩個可選:

  • py-amqplib?– 通用的AMQP
  • txAMQP?– 使用?Twisted?框架的AMQP庫,因此允許異步I/O。

根據你的需求,py-amqplib或者txAMQP都是可以的。因為是基于Twisted的,txAMQP可以保證用異步IO構建超高性能的AMQP程序。但是Twisted編程本身就是一個很大的主題……因此清晰起見,我們打算用 py-amqplib。更新:請參見Esteve Fernandez關于txAMQP的使用和代碼樣例的回復。

AMQP支持在一個TCP連接上啟用多個MQ通信channel,每個channel都可以被應用作為通信流。每個AMQP程序至少要有一個連接和一個channel。

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
password="guest", virtual_host="/", insist=False)
chan = conn.channel()

每個channel都被分配了一個整數標識,自動由Connection()類的.channel()方法維護。或者,你可以使用.channel(x)來指定channel標識,其中x是你想要使用的channel標識。通常情況下,推薦使用.channel()方法來自動分配channel標識,以便防止沖突。

現在我們已經有了一個可以用的連接和channel。現在,我們的代碼將分成兩個應用,生產者(producer)和消費者(consumer)。我們先創建一個消費者程序,他會創建一個叫做“po_box”的隊列和一個叫“sorting_room”的交換機:

chan.queue_declare(queue="po_box", durable=True,
exclusive=False, auto_delete=False)
chan.exchange_declare(exchange="sorting_room", type="direct", durable=True,
auto_delete=False,)

這段代碼干了啥?首先,它創建了一個名叫“po_box”的隊列,它是durable的(重啟之后會重新建立),并且最后一個消費者斷開的時候不會自動刪除(auto_delete=False)。在創建durable的隊列(或者交換機)的時候,將auto_delete設置成false是很重要的,否則隊列將會在最后一個消費者斷開的時候消失,與durable與否無關。如果將durable和auto_delete都設置成True,只有尚有消費者活動的隊列可以在RabbitMQ意外崩潰的時候自動恢復。

(你可以注意到了另一個標志,稱為“exclusive”。如果設置成True,只有創建這個隊列的消費者程序才允許連接到該隊列。這種隊列對于這個消費者程序是私有的)。

還有另一個交換機聲明,創建了一個名字叫“sorting_room”的交換機。auto_delete和durable的含義和隊列是一樣的。但是,.excange_declare() 還有另外一個參數叫做type,用來指定要創建的交換機的類型(如前面列出的):fanout,?direct?和?topic.

到此為止,你已經有了一個可以接收消息的隊列和一個可以發送消息的交換機。不過我們需要創建一個綁定,把它們連接起來。

chan.queue_bind(queue=”po_box”, exchange=”sorting_room”,
routing_key=”jason”)

這個綁定的過程非常直接。任何送到交換機“sorting_room”的具有路由鍵“jason” 的消息都被路由到名為“po_box” 的隊列。

現在,你有兩種方法從隊列當中取出消息。第一個是調用chan.basic_get(),主動從隊列當中拉出下一個消息(如果隊列當中沒有消息,chan.basic_get()會返回None, 因此下面代碼當中print msg.body 會在沒有消息的時候崩掉):

msg = chan.basic_get("po_box")
print msg.body
chan.basic_ack(msg.delivery_tag)

但是如果你想要應用程序在消息到達的時候立即得到通知怎么辦?這種情況下不能使用chan.basic_get(),你需要用chan.basic_consume()注冊一個新消息到達的回調。

def recv_callback(msg):print 'Received: ' + msg.body
chan.basic_consume(queue='po_box', no_ack=True,
callback=recv_callback, consumer_tag="testtag")
while True:chan.wait()
chan.basic_cancel("testtag")

chan.wait()?放在一個無限循環里面,這個函數會等待在隊列上,直到下一個消息到達隊列。chan.basic_cancel()用來注銷該回調函數。參數consumer_tag?當中指定的字符串和chan.basic_consume()?注冊的一直。在這個例子當中chan.basic_cancel()?不會被調用到,因為上面是個無限循環…… 不過你需要知道這個調用,所以我把它放在了代碼里。

需要注意的另一個東西是no_ack參數。這個參數可以傳給chan.basic_get()chan.basic_consume(),默認是false。當從隊列當中取出一個消息的時候,RabbitMQ需要應用顯式地回饋說已經獲取到了該消息。如果一段時間內不回饋,RabbitMQ會將該消息重新分配給另外一個綁定在該隊列上的消費者。另一種情況是消費者斷開連接,但是獲取到的消息沒有回饋,則RabbitMQ同樣重新分配。如果將no_ack?參數設置為true,則py-amqplib會為下一個AMQP請求添加一個no_ack屬性,告訴AMQP服務器不需要等待回饋。但是,大多數時候,你也許想要自己手工發送回饋,例如,需要在回饋之前將消息存入數據庫。回饋通常是通過調用chan.basic_ack()方法,使用消息的delivery_tag屬性作為參數。參見chan.basic_get()?的實例代碼。

好了,這就是消費者的全部代碼。(下載:amqp_consumer.py)

不過沒有人發送消息的話,要消費者何用?所以需要一個生產者。下面的代碼示例表明如何將一個簡單消息發送到交換區“sorting_room”,并且標記為路由鍵“jason” :

msg = amqp.Message("Test message!")
msg.properties["delivery_mode"] = 2
chan.basic_publish(msg,exchange="sorting_room",routing_key="jason")

你也許注意到我們設置消息的delivery_mode屬性為2,因為隊列和交換機都設置為durable的,這個設置將保證消息能夠持久化,也就是說,當它還沒有送達消費者之前如果RabbitMQ重啟則它能夠被恢復。

剩下的最后一件事情(生產者和消費者都需要調用的)是關閉channel和連接:

chan.close()
conn.close()

很簡單吧。(下載:amqp_publisher.py)

轉載于:https://www.cnblogs.com/orez88/articles/1864328.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/378906.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/378906.shtml
英文地址,請注明出處:http://en.pswp.cn/news/378906.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

fsync與fflush的關系和區別

read/write/fsync與fread/fwrite/fflush的關系和區別 read/write/fsync: linux底層操作; 內核調用, 涉及到進程上下文的切換,即用戶態到核心態的轉換,這是個比較消耗性能的操作。 fread/fwrite/fflush:…

lumanager mysql密碼_LuManager單獨安裝mysqli

首先確定你正在使用的php版本以及php.ini的位置,LuManager自帶了幾個版本。如果是默認安裝,應該是5.2.17。php.ini的位置應該是在/usr/local/php_fcgi/lib/php.ini要確定這些信息,可以自己編寫一個 info.phpphpinfo();?>把文件存放到網站…

數據庫系統數據庫管理系統_數據庫管理系統介紹

數據庫系統數據庫管理系統數據庫 (Database) A database is a collection of related data. In database any user can efficiently access the data which users want to retrieve. It can be anything from a simple collection of roll numbers, names, addresses and phone…

vba將select的值直接賦給變量

strSql ""strSql strSql & " select max(number) from dbo.#DATA" & vbCrLfrss.Open strSql, cnn numb rss.Fields(0)rss.Close轉載于:https://www.cnblogs.com/zigewb/archive/2013/02/06/2900645.html

set_exception_handler 自定義異常處理

剛才已經說過了set_error_handler這個函數,作用就是自定義錯誤處理, 那么現在就來簡單的說一下set_exception_handler,看名字我們就能發現,這說的是自定義異常處理。 呵呵,我聰明吧?來,先看一下…

如何獲取ubuntu源碼包里面的源碼進行編譯

如何獲取ubuntu源碼包里面的源碼進行編譯 1、在獲取源碼包之前,確保在軟件源配置文件 /etc/apt/sources.list中添加了deb-src項 2、使用如下命令獲取xxx源碼包的詳細信息: sudo apt-cache showsrc xxx 這用來查詢當前鏡像站點中是否有該源碼包。 3、源碼包中通常…

python 示例_帶有示例的Python字典popitem()方法

python 示例字典popitem()方法 (Dictionary popitem() Method) popitem() method is used to remove random/last inserted item from the dictionary. popitem()方法用于從字典中刪除隨機/最后插入的項目。 Before the Python version 3.7, it removes random item and from …

優化算法的意義,之二。

前一篇分析了求質數的兩個算法,在代碼執行效率和系統開銷兩方面進行了比較。 這在通信系統的設計和實現中,是非常重要的兩點。因為需要同時面對的是巨大的用戶群,和復雜的業務應用,通信系統的設計經常要面臨魚與熊掌間的選擇。 用…

srs配置文件分析

配置文件中的每一項都是一個SrsConfDirective對象。 例子:vhost 1、 整個vhost 是一個SrsConfDirective對象。 1.1、名字:std::string name vhost 1.2、參數:std::vectorstd::string args第0個值 defaultVhost 1.3、子SrsConfDirective&a…

寄存器(CPU工作原理)03 - 零基礎入門學習匯編語言08

第二章:寄存器(CPU工作原理)03 讓編程改變世界 Change the world by program 物理地址 CPU訪問內存單元時要給出內存單元的地址。所有的內存單元構成的存儲空間是一個一維的線性空間。 我們將這個唯一的地址稱為物理地址。 16位結構的CPU…

判別Linux是CentOs還是Ubuntu的最簡單方法

在終端執行以下兩條命令即可 CentOs:yum -help Ubuntu:apt-get -help

threadgroup_Java ThreadGroup toString()方法與示例

threadgroupThreadGroup類的toString()方法 (ThreadGroup Class toString() method) toString() method is available in java.lang package. toString()方法在java.lang包中可用。 toString() method is used to returns string denotation of this thread group (i.e. this m…

240多個jQuery插件

文件上傳(File upload)Ajax File Upload.jQUploader.Multiple File Upload plugin. jQuery File Style.Styling an input type file.Progress Bar Plugin.表單驗證(Form Validation)jQuery Validation.Auto Help.Simple jQuery form validation.jQuery XAV - form validations…

解壓縮命令

.Tar.gz 解壓:Tar zxvf FileName.Tar.gz 壓縮:Tar zcvf FileName.Tar.gz DirName 大致總結了一下Linux下各種格式的壓縮包的壓縮、解壓方法。但是部分方法我沒有用到,也就不全,希望大家幫我補充,我將隨時修改完善&…

Anaconda下安裝OpenCV和Tensorflow(最簡潔高效的方法)

安裝Tensorflow 1,打開Anaconda Navigator 2,手動創建tensorflow環境,這個和你的python版本號一致哈(方法一第一步之后,輸入python即可查看當前的版本) 3,手動搜索并下載添加 4,…

Java System類console()方法及示例

系統類console()方法 (System class console() method) console() method is available in java.lang package. console()方法在java.lang包中可用。 console() method is used to return the console object which is uniquely associated with the current JVM(Java Virtual …

使用FD_CLOEXEC實現close-on-exec,關閉子進程無用文件描述符

我們經常會碰到需要fork子進程的情況,而且子進程很可能會繼續exec新的程序。這就不得不提到子進程中無用文件描述符的問題! fork函數的使用本不是這里討論的話題,但必須提一下的是:子進程以寫時復制(COW,C…

mysql_result 對應mysqli哪個_php – 如何在mysqli中轉換mysql_result?

這個代碼以前是在mysql中,現在因為它已被棄用,我決定在mysqli中轉換我的代碼,但是我在我的頁面中有這個問題有分頁,在它使用mysql之前沒有錯誤,但現在我得到了這一行出錯:Warning: mysqli_fetch_assoc() expects exactly 1 parameter, 2 given錯誤是顯而易見的,我知…

Anaconda中下載速度賊慢?

清華鏡像 大致步驟如下: 1,windsR輸入cmd,打開命令終端頁面 輸入conda config --set show_channel_urls yes 2,在相應的C盤目錄下會生成.condarc文件 3,用記事本或者notepad打開進行替換 channels:- defaults show_c…