2019獨角獸企業重金招聘Python工程師標準>>>
一、Publish/Subscribe(發布/訂閱)(using the Java Client)
在前面的教程中,我們創建了一個work Queue(工作隊列)。工作隊列背后的假設是每個任務是交付給一個工作者(worker) 也就是均勻分給每個消費者。在本部分,我們將做一些完全不同的事情,我們將提供一個消息到多個消費者。這種模式被稱為“發布/訂閱”。
為了說明這個模式,我們將構建一個簡單的日志系統。它將包括兩個項目:
第一個將發出日志消息 第二個將接收并打印它們。
在我們的日志系統,每運行一次,接收器項目將得到消息的副本。這樣我們能夠運行一個接收機并且可以直接記錄到磁盤,同時我們可以運行另一個接收器,看到屏幕上的日志。 注:從本質上講,發表日志消息廣播給所有的接收者。
下面讓我們腦中帶幾個問題,讓我們一步一步去解決:
如果我把消息分配給所有的消費者,我們將怎么做呢?
二、Exchanges(交換機)
在前部分的教程中,我們從一個隊列發送和接收消息。現在是時候讓Rabbit推出完整的消息模型。
讓我們快速復習我們前面的教程::
- 生產者是一個用戶發送消息的應用程序。
- 一個隊列是存儲消息的緩沖區。
- 消費者是一個用戶應用程序接收消息。
相反,生產者只能發送Exchanges (消息交換區)。交換是一個非常簡單的事情。 一方面它從生產者那收到消息并推他們到另一邊隊列。交換區必須知道如何處理它收到一條消息:
- 它應該被加到一個特定的隊列嗎?
- 它應該被加到多隊列?
- 或者它應該丟棄嗎?
交換的規則定義的類型。

如上圖所示:X表示Exchange(交換機);
有一些可用的交換類型direct,?topic,?headers?and?fanout。我們將專注于最后一個——fanout。讓我們創建一個這種類型的交換,稱之為日志:
channel.exchangeDeclare("logs", "fanout");
fanout交換非常簡單。你大概可以猜到的名字,只是廣播所有的消息接收隊列它知道。而這正是我們需要為我們的記錄器。
問題:
①? exchange list?列出所有?(交換機)列表
sudo rabbitmqctl list_exchanges
Listing exchanges ...direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
logs fanout
...done.
在此列表中有一些amq* 交換器 與默認(匿名)交換。這些都是默認創建的,但可能你不需要使用它們。
② 缺省名字的 exchange(交換機)
在前部分的教程中我們對exchange 一無所知,,但仍然能夠將消息發送到隊列。這是可能的,因為我們是使用一個 默認的交換,我們確定的空字符串(" ") 。
記得之前我們發布一個消息:
channel.basicPublish("", "hello", null, message.getBytes());
第一個參數是該交換區的名稱;空字符串表示默認或無名的交換,:如果routingKey存在的話,消息路由到指定的隊列的名稱。
現在,我們可以發布我們的交換器:
channel.basicPublish( "logs", "", null, message.getBytes());
三、Temporary queues(臨時隊列)
你可能記得以前我們使用的隊列都是指定名稱的(還記得hello和task_queue嗎?)。對我們來說命名一個隊列是至關重要的,
當你想在生產者和消費者中分享隊列的時候,給一個隊列的名稱是必須的。
但是那些都不是日志記錄系統所需要的,我們希望能夠獲得所有的日志信息,而不只是其中的一部分,而且我們只對當前正在傳遞的信息感興趣,對舊的日志信息不感興趣,要解決這些問題,我們需要分兩個步驟:
首先當我們鏈接到RabbitMQ服務器的時候,需要一個新的、空的隊列,為了做到這點,可以創建一個隨機名的隊列, 或者更好的方法就是讓服務器選擇一個隨機的隊列名。
其次,當斷開與隊列的連接時,消費者應該被自動刪除掉。 在Java客戶端,我們通過一個無參數的queueDeclare()方法為我們創建一個非持久的、唯一的、能自動刪除的隊列與隊列名稱
String queueName = channel.queueDeclare().getQueue();
在這點上,queueName包含了一個隨機隊列名稱。例如它可能看起來像amq.gen-JzTY20BRgKO-HjmUJj0wLg。 四、Bindings(綁定)

channel.queueBind(queueName, "logs", "");
五、Putting it all together(發布者/訂閱者 實現)
生產者代碼和之前的發送消息的代碼并沒有太大的區別,最重要的變化是,我們現在要將發布的消息傳遞給logs exchange來代替無名的exchange(之前的是""),
在發送消息時需要提供一個routingKey,它對于fanout exchange是非常重要的,不能被忽視的,這里的EmitLog.java代碼如下
- </pre><pre?name="code"?class="java">import?java.io.IOException;??
- import?com.rabbitmq.client.ConnectionFactory;??
- import?com.rabbitmq.client.Connection;??
- import?com.rabbitmq.client.Channel;??
- ??
- public?class?EmitLog?{??
- ??
- ????private?static?final?String?EXCHANGE_NAME?=?"logs";??
- ??
- ????public?static?void?main(String[]?argv)??
- ??????????????????throws?java.io.IOException?{??
- ??
- ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
- ????????factory.setHost("localhost");??
- ????????Connection?connection?=?factory.newConnection();??
- ????????Channel?channel?=?connection.createChannel();??
- ??
- ????????channel.exchangeDeclare(EXCHANGE_NAME,?"fanout");??
- ??
- ????????String?message?=?getMessage(argv);??
- ??
- ????????channel.basicPublish(EXCHANGE_NAME,?"",?null,?message.getBytes());??
- ????????System.out.println("?[x]?Sent?'"?+?message?+?"'");??
- ??
- ????????channel.close();??
- ????????connection.close();??
- ????}??
- ????//...??
- }??
接收端:
- import?java.io.IOException;??
- import?com.rabbitmq.client.ConnectionFactory;??
- import?com.rabbitmq.client.Connection;??
- import?com.rabbitmq.client.Channel;??
- import?com.rabbitmq.client.QueueingConsumer;??
- ??
- public?class?ReceiveLogs?{??
- ??
- ????private?static?final?String?EXCHANGE_NAME?=?"logs";??
- ??
- ????public?static?void?main(String[]?argv)??
- ??????????????????throws?java.io.IOException,??
- ??????????????????java.lang.InterruptedException?{??
- ??
- ????????ConnectionFactory?factory?=?new?ConnectionFactory();??
- ????????factory.setHost("localhost");??
- ????????Connection?connection?=?factory.newConnection();??
- ????????Channel?channel?=?connection.createChannel();??
- ??
- ????????channel.exchangeDeclare(EXCHANGE_NAME,?"fanout");??
- ????????String?queueName?=?channel.queueDeclare().getQueue();??
- ????????channel.queueBind(queueName,?EXCHANGE_NAME,?"");??
- ??
- ????????System.out.println("?[*]?Waiting?for?messages.?To?exit?press?CTRL+C");??
- ??
- ????????QueueingConsumer?consumer?=?new?QueueingConsumer(channel);??
- ????????channel.basicConsume(queueName,?true,?consumer);??
- ??
- ????????while?(true)?{??
- ????????????QueueingConsumer.Delivery?delivery?=?consumer.nextDelivery();??
- ????????????String?message?=?new?String(delivery.getBody());??
- ??
- ????????????System.out.println("?[x]?Received?'"?+?message?+?"'");??
- ????????}??
- ????}??
- }??
像以前一樣,我們開始做編譯
$ javac -cp rabbitmq-client.jar EmitLog.java ReceiveLogs.java
如果你想將日志保存到一個文件,打開一個控制臺并運行 $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log
如果你想看到日志在你的屏幕上,產生一個新的終端并運行: $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs
發布日志類型: $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar EmitLog
使用rabbitmqctl list_bindings實際上您可以驗證綁定和隊列的代碼是否是我們想要的? 有兩個ReceiveLogs。 $ sudo rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
...done.