?RabbitMQ 是遵從AMQP 協議的, 換句話說, RabbitMQ 就是AMQP
協議的Erlang 的實現(當然RabbitMQ 還支持STOMP2 、MQTT3 等協議) 0 AMQP 的模型架構
和RabbitMQ 的模型架構是一樣的,生產者將消息發送給交換器,交換器和隊列綁定。當生產
者發送消息時所攜帶的RoutingKey 與綁定時的BindingKey 相匹配時,消息即被存入相應的隊
列之中。消費者可以訂閱相應的隊列來獲取消息。
RabbitMQ 中的交換器、交換器類型、隊列、綁定、路由鍵等都是遵循的AMQP 協議中相
應的概念。目前RabbitMQ 最新版本默認支持的是AMQP 0-9-1 。本書中如無特殊說明,都以
AQMP 0-9-1 為基準進行介紹。
STOMP. 即Simple (or Stre阻úng) Text Oriented Messaging Protocol. 簡單(流〕文本面向消息協議,它提供了一個可互操作的
連接格式,運行STOMP 客戶端與任意STOMP 消息代理C Broker ) 進行交互。STOMP 協議由于設計簡單, 易于開發客戶端,
因此在多種語言和平臺上得到廣泛的應用.
MQTT. 即Message Queuing Telemetry Transport. 消息隊列遙澳~傳輸,是ffiM 開發的一個即時通信協議,有可能成為物聯網
的重要組成部分.該協議支持所有平臺,幾乎可以把所有物聯網和外部連接起來,被用來當作傳感器和制動器的通信協議。
AMQP 協議本身包括三層:
- Module Layer: 位于協議最高層,主要定義了一些供客戶端調用的命令,客戶端可以利
用這些命令實現自己的業務邏輯。例如,客戶端可以使用Queue . Declare 命令聲明
一個隊列或者使用Basic.Consume 訂閱消費一個隊列中的消息。
-
Session Layer: 位于中間層,主要負責將客戶端的命令發送給服務器,再將服務端的應
答返回給客戶端,主要為客戶端與服務器之間的通信提供可靠性同步機制和錯誤處理。 -
Transport Layer: 位于最底層,主要傳輸二進制數據流,提供幀的處理、信道復用、錯
誤檢測和數據表示等。
AMQP 說到底還是一個通信協議,通信協議都會涉及報文交互,從low-level 舉例來說,
AMQP 本身是應用層的協議,其填充于TCP 協議層的數據部分。而從high-level 來說, AMQP
是通過協議命令進行交互的。AMQP 協議可以看作一系列結構化命令的集合,這里的命令代表
一種操作,類似于HTTP 中的方法(GET 、POST 、PUT 、DELETE 等) 。
?AMQP生產者流轉過程
為了形象地說明AMQP 協議命令的流轉過程,這里截取代碼清單1-1 中的關鍵代碼,如代
碼清單2-2 所示。
代碼清單2-2 簡潔版生產者代碼
Connection connection = factory .n ewConnection() ; //創建連接
Channel channel = connection.createChannel() ; //創建信道
String message = "Hello World! ";
channel.basicPublish(EXCHANGE NAME , ROUTING KEY ,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
//關閉資源
channel.close() ;
connection . close();
當客戶端與Broker 建立連接的時候,會調用factory .newC onnection 方法,這個方法
會進一步封裝成Protocol Header 0-9-1 的報文頭發送給Broker ,以此通知Broker 本次交互采用
的是AMQPO-9-1 協議,緊接著Broker 返回Connection.Start 來建立連接,在連接的過程
中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok , Connection.
Open/ .Open-Ok 這6 個命令的交互。
當客戶端調用connection .createChannel 方法準備開啟信道的時候,其包裝
Channel . Open 命令發送給Broker ,等待Channel.Open-Ok 命令。
當客戶端發送消息的時候,需要調用channel . basicPublish 方法,對應的AQMP 命
令為Basic.Publish ,注意這個命令和前面涉及的命令略有不同,這個命令還包含了Content
Header 和Content Body 0 Content Header 里面包含的是消息體的屬性,例如,投遞模式、優先級等,而Content Body 包含消息體本身。
當客戶端發送完消息需要關閉資源時,涉及Channel.Close/.Close-Ok 與
Connection.Close/.Close-Ok 的命令交互。詳細流轉過程如圖2-10 所示。
?
?
消費者的流轉過程, 參考代碼清單1-2 , 截取消費端的關鍵代碼如代碼清
單2-3 所示。
代碼清單2-3 簡潔版消費者代碼
Connection connection = factory.newConnection(addresses);// 創建連接
final Channel channel = connection . createChannel() ; //創建信道
Consumer consumer = new DefaultConsumer(channel) ()//_省略實現
channel . basicQos(64) ;
channel.basicConsume(QUEUE NAME , consumer) ;
//等待回調函數執行完畢之后,關閉資源
TimeUnit . SECONDS . sleep(5) ;
channel . close();
connection.close();
其詳細流轉過程如圖2-11 所示。
消費者客戶端同樣需要與Broker 建立連接,與生產者客戶端一樣,協議交互同樣涉及
Connection.Start/ . Start-Ok 、Connection.Tune/.Tune-Ok 和Connection.
Open/ . Open-Ok 等,圖2-11 中省略了這些步驟,可以參考圖2-10 。
緊接著也少不了在Connection 之上建立Channe l,和生產者客戶端一樣,協議涉及
Channel . Open/Open-Oko
如果在消費之前調用了channel . basicQos(int prefetchCount) 的方法來設置消費
者客戶端最大能"保持"的未確認的消息數,那么協議流轉會涉及Basic.Qos/.Qos-Ok 這
兩個AMQP 命令。
在真正消費之前,消費者客戶端需要向Broker 發送Basic.Consume 命令(即調用
channel.basicConsume 方法〉將Channel 置為接收模式,之后Broker 回執
Basic . Consume - Ok 以告訴消費者客戶端準備好消費消息。緊接著Broker 向消費者客戶端推
送(Push) 消息,即Basic.Deliver 命令,有意思的是這個和Basic.Publish 命令一樣會
攜帶Content Header 和Content Body 0
消費者接收到消息并正確消費之后,向Broker 發送確認,即Basic.Ack 命令。
在消費者停止消費的時候,主動關閉連接,這點和生產者一樣,涉及
Channel . Close/ . Close-Ok 手口Connection.Close/ . Close-Ok 。
AMQP 命令概覽
AMQP 0-9-1 協議中的命令遠遠不止上面所涉及的這些,為了讓讀者在遇到其他命令的時候
能夠迅速查閱相關信息,下面列舉了AMQP 0-9-1 協議主要的命令,包含名稱、是否包含內容
體(Content Body) 、對應客戶端中相應的方法及簡要描述等四個維度進行說明,具體如表2-1
所示。
?
?
?