?
RabbitMq是一個消息中間件:它接收消息、轉發消息。你可以把它理解為一個郵局:當你向郵箱里寄出一封信后,郵遞員們就能最終將信送到收信人手中。?
RabbitMq、消息相關術語如下:
生產者:生產者只發送消息,發送消息的程序即為生產者:
消息隊列:消息隊列就相當于RabbitMq中的郵箱,消息存儲在消息隊列中。隊列本質上是一個大的消息緩存,它能存多少消息,取決于主機的內存和磁盤限制。多個生產者可以往同一個消息隊列中發送消息;多個消費者可以從同一個隊列中獲取數據。
消費者:消費者是一個等待接收消息的程序:
注意:生產者、消費者和RabbitMq可以在不同的機器上;在很多的應用中,一個生產者同時也可能是消費者。
在下面圖形中,“P”是消息的生產者,“C”是消息的消費者,中間的紅框是消息隊列,保存了從生產者那里接收到的準備轉發到消費方的消息。
?
Hello World
發送消息
生產者連接RabbitMq,發送一條簡單的消息”Hello World!“后就退出。
在Send.java類中,需要引入以下依賴包:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
給隊列起個名字:
public class Send {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {...}
}
創建連接到服務器的連接Collection:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {...
}
這個連接是套接字連接,為我們處理協議版本協商和身份驗證等。這里我們連接一個本地的RabbitMq因此是localhost,如果想連接到一個遠程的RabbitMq,只需要把localhst改成那臺機器的IP地址即可。
創建完連接之后,要繼續創建一個信道:Channel。使用try-with-resource表達式,因為Connection和Channel都實現了JAVA接口Closeable,屬于資源,需要關閉。使用try-with-resource就不需要顯示地在我們的代碼中進行關閉了。(關于信道,請參考文章最頂部的RabbitMq原理圖,它是TCP里面的虛擬鏈接,例如:電纜相當于一個TCP,信道就是里面的一個獨立光纖,一條TCP上面創建多條信道是沒有問題的;TCP一旦打開就會創建AMQP信道;無論是發布消息、接收消息、訂閱隊列,這些動作都是通過信道完成的)。
為了發送消息,我們還必須定義一個消息發送到的消息隊列:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
隊列聲明函數queueDeclare有多個參數,其分別是:
1. queue: 隊列的名稱?
2. durable: 是否持久化?
當durable = false時,隊列非持久化。因為隊列是存放在內存中的,所以當RabbitMQ重啟或者服務器重啟時該隊列就會丟失 ;
當durable = true時,隊列持久化。當RabbitMQ重啟后隊列不會丟失。RabbitMQ退出時它會將隊列信息保存到 Erlang自帶的Mnesia數據庫 中,當RabbitMQ重啟之后會讀取該數據庫 。
3. exclusive: 是否排外的 ;
當exclusive = true則設置隊列為排他的。如果一個隊列被聲明為排他隊列,該隊列僅對首次聲明它的連接(Connection)可見,是該Connection私有的,類似于加鎖,并在連接斷開connection.close()時自動刪除 ;
當exclusive = false則設置隊列為非排他的,此時不同連接(Connection)的管道Channel可以使用該隊列 ;
注意2點:
排他隊列是 基于連接(Connection) 可見的,同個連接(Connection)的不同管道 (Channel) 是可以同時訪問同一連接創建的排他隊列 。其他連接是訪問不了的 ,強制訪問將報錯:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'hello-testExclusice' in vhost '/'.;
以下聲明是沒問題的:
Channel channel = connection.createChannel();Channel channel2 = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, true, false, null);channel2.queueDeclare(QUEUE_NAME, false, true, false, null);=>如果是不同的 connection 創建的 channel 和 channel2,那么以上的=>channel2.queueDeclare()是會報錯的!!!!!!
"首次" 是指如果某個連接(Connection)已經聲明了排他隊列,其他連接是不允許建立同名的排他隊列的。這個與普通隊列不同:即使該隊列是持久化的(durable = true),一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除,這種隊列適用于一個客戶端同時發送和讀取消息的應用場景。
4. autoDelete: 是否自動刪除 ;如果autoDelete = true,當所有消費者都與這個隊列斷開連接時,這個隊列會自動刪除。注意: 不是說該隊列沒有消費者連接時該隊列就會自動刪除,因為當生產者聲明了該隊列且沒有消費者連接消費時,該隊列是不會自動刪除的。
5. arguments: 設置隊列的其他一些參數,如 x-rnessage-ttl 、x-expires 、x-rnax-length 、x-rnax-length-bytes、 x-dead-letter-exchange、 x-deadletter-routing-key 、 x-rnax-priority 等。
basicPublish() 方法是基礎的發布消息方法,它有四個參數
- String exchange : 交換機名, 當不使用交換機時,傳入“”空串。
- String routingKey :(路由地址) 發布消息的隊列, 無論channel綁定那個隊列,最終發布消息的隊列都由該字串指定。
- AMQP.BasicProperties props :消息的配置屬性,例如 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化。
- byte[] body :消息數據本體, 必須是byte數組
定義一個消息隊列時,只有該隊列不存在的時候才能被創建。
消息是二進制數組,因此你可以根據需要指定編碼。
完整的Send.java如下:?
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Send {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}
}
接收消息
消費者監聽RabbitMq中的消息,因此與生產者發送一條消息就退出不同,消費者要保持運行狀態來接收消息并打印出來。
與生產者相同,我們需要創建Connetcion和Channel、定義隊列(需要監聽并接收消息的隊列):
public class Recv {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");}
}
注意我們也在這里聲明隊列,因為我們可能在生產者之前啟動消費者。我們想要確保在我們嘗試消費消息的時候隊列就已經存在了。
這里我們為什么不使用try-with-resource表達式自動關閉channel和connection?這樣我們就可以使我們的程序一直保持運行狀態,如果把channel、connection這些關了,程序也就停止了。這就尷尬了,因為我們需要保持消費者一直處于異步監聽消息過來的狀態。
RabbitMq會將隊列中的消息異步地推送過來,我們需要提供一個回調函數來緩存消息直到我們需要用到這些消息:
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
basicConsume方法會啟動一個消費者,并返回服務端生成的消費者標識,它的幾個參數是
1. queue:隊列名
2. autoAck:true 接收到傳遞過來的消息后自動acknowledged(應答服務器),false 接收到消息后不自動應答服務器
3. deliverCallback: 當一個消息發送過來后的回調接口
4. cancelCallback:當一個消費者取消訂閱時的回調接口;取消消費者訂閱隊列時除了使用
方法的返回值是服務端生成的消費者標識
完整的接收端代碼
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class Recv {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}
接下來創建一個工作隊列,用于在多個工作者之間分配耗時的任務。
工作隊列(即任務隊列)的主要思想是避免立即執行那些需要等他們執行完成的資源密集型任務。相反,我們將任務安排在稍后完成。我們將任務封裝為消息并將其發送到隊列,后臺運行的工作進程將取出任務并執行完成。如果你啟動了多個工作者,這些任務將在多個工作者之間分享。
這個概念也即我們說的異步,在項目中,有時候一個簡單的Web請求,后臺要做一系列的操作。這時候,如果后臺執行完成之后再給前臺返回消息將會導致瀏覽器頁面等待從而出現假死狀態。因此,通常的做法是,在這個Http請求到后臺,后臺獲取到正確的參數等信息后立即給前臺返回一個成功標志,然后后臺異步地進行后續的操作。
準備
本章中,我們將發送字符串消息來模擬復雜的任務。這里因為沒有一個真實的復雜任務,因此用Thread.sleep()方法來模擬復雜耗時的任務。我們用字符串中的含點(“.")的數量來表示任務的復雜程度,一個點表示一秒鐘的耗時,例如:一個發送”Hello ...“字符串的任務將會耗時3秒鐘。
修改前面的Send.java為NewTask.java,允許從命令行發送消息。修改后的程序會把任務發送到工作隊列:
完整的NewTask.java代碼為:
public class NewTask {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws IOException, TimeoutException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");try(Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message = String.join(" ", argv);channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}
}
Recv.java也要做一些修改:模擬字符串消息中的每個點耗時1秒鐘,它將處理傳送過來的消息并執行任務,修改后的程序名字是Work.java:
DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");}
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
doWork是模擬執行過程中耗時的偽任務:
private static void doWork(String task) throws InterruptedException {for (char ch: task.toCharArray()) {if (ch == '.') Thread.sleep(1000);}
}
完整的Work.java為:
public class Worker {private final static String TASK_QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("localhost");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println(" [x] Done");}};boolean autoAck = true; // acknowledgment is covered belowchannel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });}private static void doWork(String task) throws InterruptedException {for (char ch: task.toCharArray()) {if (ch == '.') Thread.sleep(1000);}}
}
使用工作隊列的優點之一是能夠輕松地進行并行化操作。假設我們在做一個后臺日志收集系統,我們可以很容易地增加更多的Worker從而提高系統性能。
首先,我們同時啟動兩個Worker,接下來,我們先后啟動5個Task,并分別通過main()參數傳入五個字符串消息:
First message. Second message.. Third message... Fourth message.... Fifth message.....
看一下兩個Worker都接收到了什么樣的消息
?默認情況下,RabbitMQ將按順序將每個消息發送給下一個使用者。平均每個消費者將得到相同數量的消息。這種消息的調度方式稱之為循環調度,你可以開啟更多的Worker來進行測試。
因為消費者執行一個任務會有時間耗時,假設一個消費者在執行一個任務執行一半的時候掛掉了將會怎樣?消息會不會因此丟失?在我們目前的代碼里,一旦RabbitMq將一條消息轉發給了一個消費者后,將會立即將消息刪除(注意Worker.java里的autoAck目前設置為true),因此,在我們上面例子里,如kill掉一個正在處理數據的Worker,那么該數據將會丟失。不僅如此,所有那些指派給該Worker的還未處理的消息也會丟失。
但在實際工作中,我們并不希望一個Worker掛掉之后就會丟失數據,我們希望的是:如果該Worker掛掉了,所有轉發給該Worker的消息將會重新轉發給其他Worker進行處理(包括處理了一半的消息)。為了確保一條消息永不丟失,RabbitMq支持消息回執。消費者在接收到一條消息,并且成功處理完成之后會給RabbitMq回發一條確認ack確認消息,RabbitMq此時才會刪除該條消息。如果一個Worker正在處理一條消息時掛掉了(信道關閉、連接關閉、TCP連接丟失),它將沒有機會發送ack回執,RabbitMq就認為該消息沒有消費成功,于是便會將該消息重新放到隊列中,如果此時有其他消費者還是在線狀態,RabbitMq會立即將該條消息再轉發給其他在線的消費者。這種機制可以保證任何消息都不會丟失。
默認情況下,需要手動進行消息確認,在前面的例子里,我們通過autoAck=true顯示地關閉了手動消息確認,因此,RabbitMq將采用自動消息確認的機制。現在,我們修改我們的程序,采用手動發送回執的方式,當我們完成對消息的處理后,再手動發送回執確認:
channel.basicQos(1); // accept only one unack-ed message at a time (see below)DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
可以看到將autoAck設置為了false。
ack發送信道必須和接收消息的信道(channel)是同一個,如果嘗試通過一個不同的信道發送ack回執,將會拋出channel等級協議異常(官網說會拋出異常,但在實際測試中并沒有拋異常,只是該條消息得不到回執,從而也無法刪除)。另一個常見的錯誤是關閉了自動ack后忘了手動回執,雖然只是一個簡單的錯誤,但是帶來的后果卻是嚴重的,它將導致已經消費掉的消費不會被刪除,并且當消費該消息的消費者在退出之后,RabbitMq會將該條消息重新進行轉發,內存將被慢慢耗盡。我們可以通過下面的命令來檢查這種錯誤:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
?該命令有三列內容,第一列是在監聽的隊列名稱,第二列是Ready狀態的消息數量,第三列是Unacked的消息數量。
消息的持久化
前面講了如何保證當消費者掛掉之后消息不被丟失,但是,如果RabbitMq服務或者部署RabbitMq的服務器掛掉了之后,消息仍然會丟失。當RabbitMq崩潰之后,它將會忘記所有的隊列和消息,除非,有什么機制讓RabbitMq將隊列信息和消息保存下來。
要確保消息和隊列不會丟失,我們必須要確保兩件事情。
首先,我們要確保RabbitMq永遠不丟失隊列,要做到這點,我們在定義的時候就需要告訴RabbitMq它是需要持久化的,通過指定durable參數實現:
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
雖然這個命令本身是正確的,但需要注意的是我們前面hello隊列是一個非持久化隊列,RabbitMq不允許重新定義一個已經存在的隊列(用不同的參數),否則會拋出異常。
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - parameters for queue 'hello' in vhost '/' not equivalent, class-id=50, method-id=10)
要么重啟RabbitMq讓該臨時隊列消失,要么在控制臺將該隊列刪除,或重新創建一個新的隊列:
1 boolean durable = true;
2 channel.queueDeclare("task_queue", durable, false, false, null);
生產者和消費者要做同步修改。
做完上面這一步就保證了隊列(task_quee)的持久化,此時,即便RabbitMq崩潰了也不會丟失該隊列,當RabbitMq重啟后將自動重新加載該隊列。
其次,要確保消息也被持久化。要做到這一點,在生產者發布消息的時候需要指定消息的屬性為:PERSISTENT_TEXT_PLAIN。
import com.rabbitmq.client.MessageProperties;channel.basicPublish("", "task_queue",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
注意,即便設置了消息的持久化屬性也不能保證消息會被100%地寫入到磁盤中。因為RabbitMq在接收到消息和寫入到磁盤不是同步的,消息可能只是被寫入到緩存中而還沒來得及寫入磁盤,RabbitMq崩潰了,此時也會丟失消息。但無論如何,比前面簡單的消息隊列已經強大了很多。
公平調度
你可能已經注意到,此時任務調度仍然不能完全按照我們希望的方式工作。舉個例子,在只有兩個Worker的環境中,奇數的消息比較重,偶數的消息比較輕時,一個Worker將會一直處于忙碌狀態,而另一個Worker將會一直處于空閑狀態,但RabbitMq并不知道這種情況,它會依然均衡地向兩個Worker傳遞消息。發生這種情況是因為,當一個消息進入隊列之后,RabbitMq只是盲目地將該第n個消息轉發給第n個消費者,它并不關注每個消費者發了多少個回執。
為了解決這個問題,我們可以通過調用basicQos方法,給它傳入1。這將告訴RabbitMq不要同時給一個隊列轉發多于1條的消息,換句話說,在一個消費者沒有完成并回執前一條消息時,不要再給它轉發其他消息。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
?完整的代碼
NewTask.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;public class NewTask {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);String message = String.join(" ", argv);channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}}
Worker.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class Worker {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");final Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");channel.basicQos(1);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");try {doWork(message);} finally {System.out.println(" [x] Done");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });}private static void doWork(String task) {for (char ch : task.toCharArray()) {if (ch == '.') {try {Thread.sleep(1000);} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}}}}
}
?工作隊列模式的設想是每一條消息只會被轉發給一個消費者。接下來會講解另一種完全不一樣的場景:?我們會把一個消息轉發給多個消費者,這種模式稱之為發布-訂閱模式。
為了闡述這個模式,我們將會搭建一個簡單的日志系統,它包含兩種程序:一種發送日志消息,另一種接收并打印日志消息。在這個日志系統里,生產者發布的消息將會以廣播的形式讓每一個運行的消費者都可以獲取的到,我們讓其中一個消費者接收消息并寫入磁盤,另一個消費者接收消息并打印在電腦屏幕上。
交換器(Exchange)
回憶一下前面的內容:
- 一個生產者用以發送消息;
- 一個隊列緩存消息;
- 一個消費者用以消費隊列中的消息。
RabbitMq消息模式的核心思想是:一個生產者并不會直接往一個隊列中發送消息,事實上,生產者根本不知道它發送的消息將被轉發到哪些隊列。
實際上,生產者只能把消息發送給一個exchange,exchange只做一件簡單的事情:一方面它們接收從生產者發送過來的消息,另一方面,把接收到的消息推送給隊列。
一個exchage必須清楚地知道如何處理一條消息。
有四種類型的交換器,分別是:direct、topic、headers、fanout。主要講解最后一種:fanous(廣播模式)。下面創建一個fanout類型的交換器,將創建的交換機命名為logs,類型是fanout:
channel.exchangeDeclare("logs", "fanout");
廣播模式交換器很簡單,從字面意思也能理解,就是把接收到的消息推送給所有它知道的隊列。在我們的日志系統中正好需要這種模式。
如果想查看當前系統中有多少個exchange,可以使用以下命令:
sudo rabbitmqctl list_exchanges
或者通過控制臺查看(Exchanges標簽下):
可以看到有很多以amq.*開頭的交換器,以及(AMQP default)默認交換器,這些是默認創建的交換器。
在前面工作隊列的指南中,我們并未顯式的使用交換器(指定交換器為字符串""),但是依然可以將消息發送到隊列中,其實并不是我們沒有使用交換器,實際上是我們使用的是默認交換器,回顧一下我們之前是如何發送消息的:
channel.basicPublish("", "hello", null, message.getBytes());
第一個參數是交換器的名字,空字符串表示它是一個默認或無命名的交換器,消息將會由指定的路由鍵(第二個參數,routingKey,后面會講)轉發到隊列。
既然exchange可以指定為空字符串(""),那么可否指定為null?答案是:不能!
在AMQImpl類中的Publish()方法中,不光是exchange不能為null,routingKey路由鍵也不能為null,否則會拋出異常:
現在,可以顯式的使用剛創建的交換器:
channel.basicPublish( "logs", "", null, message.getBytes());
臨時隊列
前面我們使用的隊列都是有具體的隊列名(hello),創建命名隊列是很必要的,因為我們需要將消費者指向同一名字的隊列。因此,要想在生產者和消費者中間共享隊列就必須要使用命名隊列。
但是,現在講解的日志系統也可以使用非命名隊列(可以不手動命名),我們希望收到所有日志消息,并且我們希望總是接收到新的日志消息而不是舊的日志消息。為了解決這個問題,需要分兩步走。
首先,無論何時當消費者連接到RabbitMq,都需要一個新的、空的隊列來接收日志消息,因此,消費者在連接上RabbitMq之后需要創建一個任意名字的隊列或者讓RabbitMq生成一個任意的隊列名字。
其次,一旦該消費者斷開了與RabbitMq的連接,隊列也被自動刪除。
使用無參的queueDeclare(),就可以創建一個非持久化、專有的、自動刪除且名字隨機生成的隊列。
String queueName = channel.queueDeclare().getQueue();
綁定(Binding)
當廣播模式的交換器和隊列已經創建好了,接下來就是要告訴交換器向隊列里發送消息。交換器與隊列之間的關系稱之為綁定關系。
channel.queueBind(queueName, "logs", "");
queueBind的第三個參數是routingkey。
至此,交換器已經可以往隊列中發送消息了。
可以通過下列命令來查看隊列的綁定關系:
rabbitmqctl list_bindings
完整的代碼??
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class EmitLog {private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel();) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);String message = "RabbitMq fanout......";channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("utf-8"));System.out.println(" [x] Sent '" + message + "'");}}
}
Connection創建完成之后,定義了exchange,這一步是必要的,因為如果沒有交換器將無法發送消息。如果沒有隊列綁定到該交換器上,那么,交換器收到的消息將會丟失掉,但是對本章的日志系統來說沒問題的,當沒有消費者時,就完全的放棄掉數據,消費者連接上時,只接收最新的日志消息就好。
public class ReceiveLogs {private static final String EXCHANGE_NAME = "logs";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);final String queue = channel.queueDeclare().getQueue();channel.queueBind(queue,EXCHANGE_NAME,"");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTa,delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(queue,true,deliverCallback,consumerTag -> {});}
}
basicConsume的autoAck設置為true,因為現在是廣播模式,每個消費者都會收到一樣的消息,并且這里給消費者生產的隨機名稱的隊列相當于是獨有的,所以在接收到消息之后立即發送確認回執是OK的。
現在已經可以把消息廣播給很多的消費者。接下來我們將增加一個特性:訂閱這些信息中的一些信息。例如,只將error級別的錯誤存儲到硬盤中,同時可以將所有級別(error、info、warning等)的日志都打印在控制臺上。
綁定(Bindings)
回顧一下創建綁定關系的代碼:
channel.queueBind(queueName, EXCHANGE_NAME, "");
一個綁定是一個交換器與隊列之間的關系。意思是指:這個隊列對這個交換器的消息感興趣。
該方法同時還有另一個routing Key參數(第三個參數),為了避免與basicPublish參數中的路由鍵(routing key)混淆,我們稱之為綁定鍵(bingind key),下面展示了如何通過一個綁定key創建一個綁定:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
綁定鍵(這里是"black")的含義依賴于交換器的類型。在我們的日志系統中,交換器類型是fanout,綁定鍵沒有任何意義,會被忽略掉。
直連交換機(Direct Exchange)
我們正在用的廣播模式的交換器并不夠靈活,它只是不加思索地進行廣播。現在使用direct exchange來代替。直連交換器的路由算法非常簡單:將消息推送到binding key與該消息的routing key相同的隊列。
請看下圖:
在該圖中,直連交換器X上綁定了兩個隊列。第一個隊列的綁定鍵orange,第二個隊列有兩個綁定鍵:black和green。在這種場景下,一個消息在發布時(basicPublish)指定的路由鍵若為orange,則該消息將只被路由到隊列Q1,若路由鍵為black或green的消息,將只被路由到隊列Q2。其他的消息都將被丟失。
多重綁定
同一個綁定鍵可以綁定到不同的隊列上去,在上圖中,交換器X與隊列Q2的綁定鍵也是black,在這種情況下,直連交換器將會和廣播交換器有著相同的行為,將消息推送到所有匹配的隊列。一個路由鍵為black的消息將會同時被推送到隊列Q1和Q2。
發送日志的代碼片段
//一如既往的先創建一個交換器:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//發送消息:"severity"參數是日志系統中“info”、“warning”和“error”中的一個。
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
接收消息的代碼片段與之前的基本相同,只是需要在創建綁定關系時,指定severity的值(也就是綁定值):
String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
完整的代碼
EmitLogDirect.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class EmitLogDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "direct");String severity = getSeverity(argv);String message = getMessage(argv);channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + severity + "':'" + message + "'");}}//..
}
發送者發送消息的routingKey和消息都來自于命令行傳遞過來的argv參數中。
ReceiveLogsDirect.java
import com.rabbitmq.client.*;public class ReceiveLogsDirect {private static final String EXCHANGE_NAME = "direct_logs";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "direct");String queueName = channel.queueDeclare().getQueue();if (argv.length < 1) {System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");System.exit(1);}for (String severity : argv) {channel.queueBind(queueName, EXCHANGE_NAME, severity);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}
消費者接收消息時,queueBind的bindingKey也要來自于命令行的argv參數。
用direct交換器替換了fanout交換器,使得我們可以有選擇性地接收消息。盡管如此,仍然還有限制:不能基于多個標準進行路由。
在日志系統中,我們可能不僅希望根據日志等級訂閱日志,還希望根據日志來源訂閱日志。這個概念來自于unix工具syslog,它不僅可以根據日志等級(info/warn/crit...)來路由日志,同時還可以根據設備(auth/cron/kern...)來路由日志,這將更加靈活。我們可能希望只監聽來自'cron'的error級別日志,同時又要接收來自'kern'的所有級別的日志。我們的日志系統如果要實現這個功能,就需要使用到另外一種交換器:主題交換器(Topic Exchange)。
主題交換器(Topic Exchange)
發送到主題交換器的消息的routing key必須是由點號分開的一串單詞,這些單詞可以是任意的,但通常是與消息相關的一些特征。比如以下是幾個有效的routing key:?"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit",routing key的單詞可以有很多,最大限制是255 bytes。
binding key必須與routing key模式一樣。Topic交換器的邏輯與direct交換器有點相似:使用特定路由鍵發送的消息將被發送到所有使用匹配的綁定鍵綁定的隊列。然而,綁定鍵有兩個特殊的情況,如下:
- *?表示匹配任意一個單詞
- #?表示匹配任意一個或多個單詞
下圖表示了這這兩個通配符的用法:
在這個例子中,我們將發送所有跟動物有關的消息,這些消息將會發送到由三個單詞,兩個點號組成的routing key,第一個單詞了表示的是速度,第二個單詞表示顏色,第三個單詞表示種類:
"<speed>.<colour>.<species>"。
我們創建三個綁定關系:隊列Q1綁定到綁定鍵*.orange.* ,隊列Q2綁定到*.*.rabbit和lazy.#。
總結下來就是:
- 隊列Q1對橘黃色(orange)顏色的所有動物感興趣;
- 隊列Q2對所有的兔子(rabbit)和所有慢吞吞(lazy)的動物感興趣。
一個路由為?"quick.orange.rabbit"的消息,將會被轉發到這兩個隊列,路由為"lazy.orange.elephant"的消息也被轉發給這兩個隊列,路由為?"quick.orange.fox"的消息將只被轉發到Q1隊列,路由為?"lazy.brown.fox"的消息將只被轉發到Q2隊列。"lazy.pink.rabbit"?只被轉發到Q2隊列一次(雖然它匹配綁定鍵*.*.rabbit和lazy.#),路由為?"quick.brown.fox"的消息與任何一個綁定鍵都不匹配,因此將會被丟棄。
如果我們發送的消息的的路由是由一個單詞“orangle"或4個單詞”quick.orangle.male.rabbit“將會怎樣?會因為與任何一個綁定鍵不匹配而被丟棄。若路由為?"lazy.orange.male.rabbit"的消息,因為匹配"lazy.#"綁定鍵,因而會被轉發到Q2隊列。
Topic交換器非常強大,可以像其他類型的交換器一樣工作:當一個隊列的綁定鍵是"#"是,它將會接收所有的消息,而不再考慮所接收消息的路由鍵,就像是fanout交換器一樣;當一個隊列的綁定鍵沒有用到”#“和”*“時,它又像direct交換一樣工作。
2、完整的代碼
下面是在我們日志系統中采用Topic交換器的完整代碼,日志消息的路由由兩個單詞組成:"<facility>.<severity>"。
EmitLogTopic.java
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class EmitLogTopic {private final static String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost);try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);String message = "A critical kernel error";String routingKey = "kern.critical";channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("utf-8"));System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");}}
}
ReceiveLogsTopic.java
import com.rabbitmq.client.*;public class ReceiveLogsTopic {private final static String EXCHANGE_NAME = "topic_logs";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);String queueName = channel.queueDeclare().getQueue();if (args.length < 1) {System.err.println("Usage: ReceiveLogsTopic [binding_key]...");System.exit(1);}for (String bindingKey : args) {channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);}System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" +delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");};channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });}
}