前些天發現了一個巨牛的人工智能學習網站,通俗易懂,風趣幽默,忍不住分享一下給大家。點擊跳轉到教程。
PS:更多詳情見 AMQP主頁 :http://www.amqp.org/?。
一、AMQP 是什么
AMQP(Advanced Message Queuing Protocol,高級消息隊列協議)是一個進程間傳遞異步消息的網絡協議。
二、AMQP模型
工作過程
發布者(Publisher)發布消息(Message),經由交換機(Exchange)。
交換機根據路由規則將收到的消息分發給與該交換機綁定的隊列(Queue)。
最后 AMQP 代理會將消息投遞給訂閱了此隊列的消費者,或者消費者按照需求自行獲取。
深入理解
1、發布者、交換機、隊列、消費者都可以有多個。同時因為 AMQP 是一個網絡協議,所以這個過程中的發布者,消費者,消息代理 可以分別存在于不同的設備上。
2、發布者發布消息時可以給消息指定各種消息屬性(Message Meta-data)。有些屬性有可能會被消息代理(Brokers)使用,然而其他的屬性則是完全不透明的,它們只能被接收消息的應用所使用。
3、從安全角度考慮,網絡是不可靠的,又或是消費者在處理消息的過程中意外掛掉,這樣沒有處理成功的消息就會丟失。基于此原因,AMQP 模塊包含了一個消息確認(Message Acknowledgements)機制:當一個消息從隊列中投遞給消費者后,不會立即從隊列中刪除,直到它收到來自消費者的確認回執(Acknowledgement)后,才完全從隊列中刪除。
4、在某些情況下,例如當一個消息無法被成功路由時(無法從交換機分發到隊列),消息或許會被返回給發布者并被丟棄。或者,如果消息代理執行了延期操作,消息會被放入一個所謂的死信隊列中。此時,消息發布者可以選擇某些參數來處理這些特殊情況。
三、Exchange交換機
交換機是用來發送消息的 AMQP 實體。
交換機拿到一個消息之后將它路由給一個或零個隊列。
它使用哪種路由算法是由交換機類型和綁定(Bindings)規則所決定的。
AMQP 0-9-1 的代理提供了四種交換機:
交換機可以有兩個狀態:持久(durable)、暫存(transient)。
持久化的交換機會在消息代理(broker)重啟后依舊存在,而暫存的交換機則不會(它們需要在代理再次上線后重新被聲明)。
并不是所有的應用場景都需要持久化的交換機。
默認交換機
默認交換機(default exchange)實際上是一個由消息代理預先聲明好的沒有名字(名字為空字符串)的直連交換機(direct exchange)。
它有一個特殊的屬性使得它對于簡單應用特別有用處:那就是每個新建隊列(queue)都會自動綁定到默認交換機上,綁定的路由鍵(routing key)名稱與隊列名稱相同。
舉個栗子:當你聲明了一個名為 “search-indexing-online” 的隊列,AMQP 代理會自動將其綁定到默認交換機上,綁定(binding)的路由鍵名稱也是為 “search-indexing-online”。因此,當攜帶著名為 “search-indexing-online” 的路由鍵的消息被發送到默認交換機的時候,此消息會被默認交換機路由至名為 “search-indexing-online” 的隊列中。換句話說,默認交換機看起來貌似能夠直接將消息投遞給隊列,盡管技術上并沒有做相關的操作。
直連交換機
直連型交換機(direct exchange)是根據消息攜帶的路由鍵(routing key)將消息投遞給對應綁定鍵的隊列。直連交換機用來處理消息的單播路由(unicast routing)(盡管它也可以處理多播路由)。下邊介紹它是如何工作的:
1)將一個隊列綁定到某個交換機上時,賦予該綁定一個綁定鍵(Binding Key),假設為R;
2)當一個攜帶著路由鍵(Routing Key)為R的消息被發送給直連交換機時,交換機會把它路由給綁定鍵為R的隊列。
直連交換機的隊列通常是循環分發任務給多個消費者(我們稱之為輪詢)。比如說有3個消費者,4個任務。分別分發每個消費者一個任務后,第4個任務又分發給了第一個消費者。綜上,我們很容易得出一個結論,在 AMQP 0-9-1 中,消息的負載均衡是發生在消費者(consumer)之間的,而不是隊列(queue)之間。
直連型交換機圖例:
當生產者(P)發送消息時 Rotuing key=booking 時,這時候將消息傳送給 Exchange,Exchange 獲取到生產者發送過來消息后,會根據自身的規則進行與匹配相應的 Queue,這時發現 Queue1 和 Queue2 都符合,就會將消息傳送給這兩個隊列。
如果我們以 Rotuing key=create 和 Rotuing key=confirm 發送消息時,這時消息只會被推送到 Queue2 隊列中,其他 Routing Key 的消息將會被丟棄。
扇型交換機
扇型交換機(funout exchange)將消息路由給綁定到它身上的所有隊列,而不理會綁定的路由鍵。如果 N 個隊列綁定到某個扇型交換機上,當有消息發送給此扇型交換機時,交換機會將消息的拷貝分別發送給這所有的 N 個隊列。扇型用來交換機處理消息的廣播路由(broadcast routing)。
因為扇型交換機投遞消息的拷貝到所有綁定到它的隊列,所以他的應用案例都極其相似:
大規模多用戶在線(MMO)游戲可以使用它來處理排行榜更新等全局事件
體育新聞網站可以用它來近乎實時地將比分更新分發給移動客戶端
分發系統使用它來廣播各種狀態和配置更新
在群聊的時候,它被用來分發消息給參與群聊的用戶。(AMQP 沒有內置 presence 的概念,因此 XMPP 可能會是個更好的選擇)
扇型交換機圖例:
上圖所示,生產者(P)生產消息 1 將消息 1 推送到 Exchange,由于 Exchange Type=fanout 這時候會遵循 fanout 的規則將消息推送到所有與它綁定 Queue,也就是圖上的兩個 Queue 最后兩個消費者消費。
主題交換機
前面提到的 direct 規則是嚴格意義上的匹配,換言之 Routing Key 必須與 Binding Key 相匹配的時候才將消息傳送給 Queue.
而Topic 的路由規則是一種模糊匹配,可以通過通配符滿足一部分規則就可以傳送。
它的約定是:
1)binding key 中可以存在兩種特殊字符 “” 與“#”,用于做模糊匹配,其中 “” 用于匹配一個單詞,“#”用于匹配多個單詞(可以是零個)
2)routing key 為一個句點號 “.” 分隔的字符串(我們將被句點號 “. ” 分隔開的每一段獨立的字符串稱為一個單詞),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
binding key 與 routing key 一樣也是句點號 “.” 分隔的字符串
主題交換機圖例:
當生產者發送消息 Routing Key=F.C.E 的時候,這時候只滿足 Queue1,所以會被路由到 Queue 中,如果 Routing Key=A.C.E 這時候會被同是路由到 Queue1 和 Queue2 中,如果 Routing Key=A.F.B 時,這里只會發送一條消息到 Queue2 中。
主題交換機擁有非常廣泛的用戶案例。無論何時,當一個問題涉及到那些想要有針對性的選擇需要接收消息的 多消費者 / 多應用(multiple consumers/applications) 的時候,主題交換機都可以被列入考慮范圍。
使用案例:
分發有關于特定地理位置的數據,例如銷售點
由多個工作者(workers)完成的后臺任務,每個工作者負責處理某些特定的任務
股票價格更新(以及其他類型的金融數據更新)
涉及到分類或者標簽的新聞更新(例如,針對特定的運動項目或者隊伍)
云端的不同種類服務的協調
分布式架構 / 基于系統的軟件封裝,其中每個構建者僅能處理一個特定的架構或者系統。
頭交換機
headers 類型的 Exchange 不依賴于 routing key 與 binding key 的匹配規則來路由消息,而是根據發送的消息內容中的 headers 屬性進行匹配。
頭交換機可以視為直連交換機的另一種表現形式。但直連交換機的路由鍵必須是一個字符串,而頭屬性值則沒有這個約束,它們甚至可以是整數或者哈希值(字典)等。靈活性更強(但實際上我們很少用到頭交換機)。工作流程:
1)綁定一個隊列到頭交換機上時,會同時綁定多個用于匹配的頭(header)。
2)傳來的消息會攜帶header,以及會有一個 “x-match” 參數。當 “x-match” 設置為 “any” 時,消息頭的任意一個值被匹配就可以滿足條件,而當 “x-match” 設置為 “all” 的時候,就需要消息頭的所有值都匹配成功。
交換機小結
四、Queue隊列
AMQP 中的隊列(queue)跟其他消息隊列或任務隊列中的隊列是很相似的:它們存儲著即將被應用消費掉的消息。
隊列屬性
隊列跟交換機共享某些屬性,但是隊列也有一些另外的屬性。
Name
Durable(消息代理重啟后,隊列依舊存在)
Exclusive(只被一個連接(connection)使用,而且當連接關閉后隊列即被刪除)
Auto-delete(當最后一個消費者退訂后即被刪除)
Arguments(一些消息代理用他來完成類似與 TTL 的某些額外功能)
隊列創建
隊列在聲明(declare)后才能被使用。如果一個隊列尚不存在,聲明一個隊列會創建它。如果聲明的隊列已經存在,并且屬性完全相同,那么此次聲明不會對原有隊列產生任何影響。如果聲明中的屬性與已存在隊列的屬性有差異,那么一個錯誤代碼為 406 的通道級異常就會被拋出。
隊列持久化
持久化隊列(Durable queues)會被存儲在磁盤上,當消息代理(broker)重啟的時候,它依舊存在。沒有被持久化的隊列稱作暫存隊列(Transient queues)。并不是所有的場景和案例都需要將隊列持久化。
持久化的隊列并不會使得路由到它的消息也具有持久性。倘若消息代理掛掉了,重新啟動,那么在重啟的過程中持久化隊列會被重新聲明,無論怎樣,只有經過持久化的消息才能被重新恢復。
五、Consumer消費者
消息如果只是存儲在隊列里是沒有任何用處的。被應用消費掉,消息的價值才能夠體現。在 AMQP 0-9-1 模型中,有兩種途徑可以達到此目的:
1)將消息投遞給應用 (“push API”)
2)應用根據需要主動獲取消息 (“pull API”)
使用 push API,應用(application)需要明確表示出它在某個特定隊列里所感興趣的,想要消費的消息。如是,我們可以說應用注冊了一個消費者,或者說訂閱了一個隊列。一個隊列可以注冊多個消費者,也可以注冊一個獨享的消費者(當獨享消費者存在時,其他消費者即被排除在外)。
每個消費者(訂閱者)都有一個叫做消費者標簽的標識符。它可以被用來退訂消息。消費者標簽實際上是一個字符串。
六、消息機制
消息確認
消費者應用(Consumer applications) - 用來接受和處理消息的應用 - 在處理消息的時候偶爾會失敗或者有時會直接崩潰掉。而且網絡原因也有可能引起各種問題。這就給我們出了個難題,AMQP 代理在什么時候刪除消息才是正確的?AMQP 0-9-1 規范給我們兩種建議:
1)自動確認模式:當消息代理(broker)將消息發送給應用后立即刪除。(使用 AMQP 方法:basic.deliver 或 basic.get-ok))
2)顯式確認模式:待應用(application)發送一個確認回執(acknowledgement)后再刪除消息。(使用 AMQP 方法:basic.ack)
如果一個消費者在尚未發送確認回執的情況下掛掉了,那 AMQP 代理會將消息重新投遞給另一個消費者。如果當時沒有可用的消費者了,消息代理會死等下一個注冊到此隊列的消費者,然后再次嘗試投遞。
拒絕消息
當一個消費者接收到某條消息后,處理過程有可能成功,有可能失敗。應用可以向消息代理表明,本條消息由于 “拒絕消息(Rejecting Messages)” 的原因處理失敗了(或者未能在此時完成)。
當拒絕某條消息時,應用可以告訴消息代理如何處理這條消息——銷毀它或者重新放入隊列。
當此隊列只有一個消費者時,請確認不要由于拒絕消息并且選擇了重新放入隊列的行為而引起消息在同一個消費者身上無限循環的情況發生。
在 AMQP 中,basic.reject 方法用來執行拒絕消息的操作。但 basic.reject 有個限制:你不能使用它決絕多個帶有確認回執(acknowledgements)的消息。但是如果你使用的是 RabbitMQ,那么你可以使用被稱作 negative acknowledgements(也叫 nacks)的 AMQP 0-9-1 擴展來解決這個問題。
預取消息
在多個消費者共享一個隊列的案例中,明確指定在收到下一個確認回執前每個消費者一次可以接受多少條消息是非常有用的。這可以在試圖批量發布消息的時候起到簡單的負載均衡和提高消息吞吐量的作用。For example, if a producing application sends messages every minute because of the nature of the work it is doing.(???例如,如果生產應用每分鐘才發送一條消息,這說明處理工作尚在運行。)
注意,RabbitMQ 只支持通道級的預取計數,而不是連接級的或者基于大小的預取。
消息屬性
AMQP 模型中的消息(Message)對象是帶有屬性(Attributes)的。有些屬性及其常見,以至于 AMQP 0-9-1 明確的定義了它們,并且應用開發者們無需費心思思考這些屬性名字所代表的具體含義。例如:
Content type(內容類型)
Content encoding(內容編碼)
Routing key(路由鍵)
Delivery mode (persistent or not)
投遞模式(持久化 或 非持久化)
Message priority(消息優先權)
Message publishing timestamp(消息發布的時間戳)
Expiration period(消息有效期)
Publisher application id(發布應用的 ID)
有些屬性是被 AMQP 代理所使用的,但是大多數是開放給接收它們的應用解釋器用的。有些屬性是可選的也被稱作消息頭(headers)。他們跟 HTTP 協議的 X-Headers 很相似。消息屬性需要在消息被發布的時候定義。
消息主體
AMQP 的消息除屬性外,也含有一個有效載荷 - Payload(消息實際攜帶的數據),它被 AMQP 代理當作不透明的字節數組來對待。
消息代理不會檢查或者修改有效載荷。消息可以只包含屬性而不攜帶有效載荷。它通常會使用類似 JSON 這種序列化的格式數據,為了節省,協議緩沖器和 MessagePack 將結構化數據序列化,以便以消息的有效載荷的形式發布。AMQP 及其同行者們通常使用 “content-type” 和 “content-encoding” 這兩個字段來與消息溝通進行有效載荷的辨識工作,但這僅僅是基于約定而已。
消息持久化
消息能夠以持久化的方式發布,AMQP 代理會將此消息存儲在磁盤上。如果服務器重啟,系統會確認收到的持久化消息未丟失。
簡單地將消息發送給一個持久化的交換機或者路由給一個持久化的隊列,并不會使得此消息具有持久化性質:它完全取決與消息本身的持久模式(persistence mode)。將消息以持久化方式發布時,會對性能造成一定的影響(就像數據庫操作一樣,健壯性的存在必定造成一些性能犧牲)。
七、其他
連接
AMQP 連接通常是長連接。AMQP 是一個使用 TCP 提供可靠投遞的應用層協議。AMQP 使用認證機制并且提供 TLS(SSL)保護。當一個應用不再需要連接到 AMQP 代理的時候,需要優雅的釋放掉 AMQP 連接,而不是直接將 TCP 連接關閉。
通道
有些應用需要與 AMQP 代理建立多個連接。無論怎樣,同時開啟多個 TCP 連接都是不合適的,因為這樣做會消耗掉過多的系統資源并且使得防火墻的配置更加困難。AMQP 0-9-1 提供了通道(channels)來處理多連接,可以把通道理解成共享一個 TCP 連接的多個輕量化連接。
在涉及多線程 / 進程的應用中,為每個線程 / 進程開啟一個通道(channel)是很常見的,并且這些通道不能被線程 / 進程共享。
一個特定通道上的通訊與其他通道上的通訊是完全隔離的,因此每個 AMQP 方法都需要攜帶一個通道號,這樣客戶端就可以指定此方法是為哪個通道準備的。
虛擬主機
為了在一個單獨的代理上實現多個隔離的環境(用戶、用戶組、交換機、隊列 等),AMQP 提供了一個虛擬主機(virtual hosts - vhosts)的概念。這跟 Web servers 虛擬主機概念非常相似,這為 AMQP 實體提供了完全隔離的環境。當連接被建立的時候,AMQP 客戶端來指定使用哪個虛擬主機。
AMQP 是可擴展的
AMQP 0-9-1 擁有多個擴展點:
1)定制化交換機類型:可以讓開發者們實現一些開箱即用的交換機類型尚未很好覆蓋的路由方案。例如 geodata-based routing。)
2)交換機和隊列的聲明中可以包含一些消息代理能夠用到的額外屬性。例如 RabbitMQ 中的 per-queue message TTL 即是使用該方式實現。)
3)特定消息代理的協議擴展。例如 RabbitMQ 所實現的擴展。
新的 AMQP 0-9-1 方法類可被引入。)
4)消息代理可以被其他的插件擴展,例如 RabbitMQ 的管理前端 和 已經被插件化的 HTTP API。
這些特性使得 AMQP 0-9-1 模型更加靈活,并且能夠適用于解決更加寬泛的問題。
AMQP 0-9-1 客戶端生態系統
AMQP 0-9-1 擁有眾多的適用于各種流行語言和框架的客戶端。其中一部分嚴格遵循 AMQP 規范,提供 AMQP 方法的實現。另一部分提供了額外的技術,方便使用的方法和抽象。有些客戶端是異步的(非阻塞的),有些是同步的(阻塞的),有些將這兩者同時實現。有些客戶端支持 “供應商的特定擴展”(例如 RabbitMQ 的特定擴展)。
因為 AMQP 的主要目標之一就是實現交互性,所以對于開發者來講,了解協議的操作方法而不是只停留在弄懂特定客戶端的庫就顯得十分重要。這樣一來,開發者使用不同類型的庫與協議進行溝通時就會容易的多。
參考資料
[1]:RabbitMQ中文文檔
[2]:https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html
————————————————