消息交換機(Exchange)
RabbitMQ消息模型的核心理念是生產者永遠不會直接發送任何消息給隊列,一般的情況生產者甚至不知道消息應該發送到哪些隊列。
相反的,生產者只能發送消息給交換機(Exchange)。交換機的作用非常簡單,一邊接收從生產者發來的消息,另一邊把消息推送到隊列中。交換機必須清楚的知道消息如何處理它收到的每一條消息。是否應該追加到一個指定的隊列?是否應該追加到多個隊列?或者是否應該丟棄?這些規則通過交換機的類型進行定義。
交換機的類型有:direct,topic,headers 和 fanout。我們以fanout為例創建一個“logs”類型的交換機。
1 | channel.exchangeDeclare( "logs" ,? "fanout" ); |
fanout交換機非常簡單,它會廣播它收到的所有隊列的所有消息。
交換機命名
在前面的例子中,我們不了解交換機的任何概念,也能發送消息,這是因為我們使用了默認的交換機(""),但以后可以使用我們自定義的交換機了。
1 2 | channel.basicPublish( "" ,? "hello" ,?null,?message.getBytes());? // 空字符串交換機 channel.basicPublish(? "logs" ,? "" ,?null,?message.getBytes());? //logs 交換機 |
臨時隊列(Temporary Queues)
在前面的例子中,我們為隊列都指定了具體的名字(如hello和task_queue),給隊列命名是非常重要的事情,因為生產者和消費者是隊列名稱來傳遞消息的。
但是對于日志來說的消息隊列,我們會監聽所有的日志消息,而不是其中的一些子集。而且我們只關注當前發生的消息而不是歷史消息,要解決這些問題需要這么做:
首先,當我們連接Rabbit服務器時,我們需要一個新的空隊列。我們可以自己隨機生成一個隊列名字或者讓服務器隨機生成一個隊列名字。
其次,當消息消費者失去連接時,隊列應該自動刪除。
在Java中,我們使用不帶參數的queueDeclare()方法創建一個非持久化的,唯一的,用后自動刪除的隊列。
1 | String?queueName?=?channel.queueDeclare().getQueue(); |
queueName可能是像 amq.gen-JzTY20BRgKO-HjmUJj0wLg 這樣的隨機隊列名。
消息綁定(Bindings)
前面我們創建了一個fanout類型的交換機和隊列。現在需要告訴交換機發送消息到隊列。交換機和隊列之間的關系就是消息綁定(binding)。
使用下面的代碼logs交換機會將消息傳遞給隊列。
1 | channel.queueBind(queueName,? "logs" ,? "" ); |
將交換機和消息綁定放在一起
現在我們有一個提交日志的的消息生產者,它與我們之前的消息發送者并沒有太大的區別,唯一不同的地方是我們將消息發送到 logs 交換機,而不是沒有名字的交換機。當發送消息時,我們需要提供一個路由,盡管它在 fanout 交換機中并沒有什么作用。下面是提交日志的Java代碼。
EmitLog.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | package ?com.favccxx.favrabbit; import ?com.rabbitmq.client.Channel; import ?com.rabbitmq.client.Connection; import ?com.rabbitmq.client.ConnectionFactory; public ?class ?EmitLog?{ ? private ?static ?final ?String?EXCHANGE_NAME?=? "logs" ; ? public ?static ?void ?main(String[]?argv)? throws ?Exception?{ ?? ConnectionFactory?factory?=? new ?ConnectionFactory(); ?? factory.setHost( "localhost" ); ?? Connection?connection?=?factory.newConnection(); ?? Channel?channel?=?connection.createChannel(); ?? channel.exchangeDeclare(EXCHANGE_NAME,? "fanout" ); ?? String[]?sendMsgs?=?{ "I" ,? "saw" ,? "a" ,? "dog" }; ?? String?message?=?getMessage(sendMsgs); ?? channel.basicPublish(EXCHANGE_NAME,? "" ,? null ,?message.getBytes( "UTF-8" )); ?? System.out.println( "?[x]?Sent?'" ?+?message?+? "'" ); ?? channel.close(); ?? connection.close(); ? } ? private ?static ?String?getMessage(String[]?strings)?{ ?? if ?(strings.length?<? 1 ) ??? return ?"info:?Hello?World!" ; ?? return ?joinStrings(strings,? "?" ); ? } ? private ?static ?String?joinStrings(String[]?strings,?String?delimiter)?{ ?? int ?length?=?strings.length; ?? if ?(length?==? 0 ) ??? return ?"" ; ?? StringBuilder?words?=? new ?StringBuilder(strings[ 0 ]); ?? for ?( int ?i?=? 1 ;?i?<?length;?i++)?{ ??? words.append(delimiter).append(strings[i]); ?? } ?? return ?words.toString(); ? } } |
正如上面所示,與消息服務器建立連接后,聲明了一個交換機,這是因為系統不允許發布到空交換機。?如果沒有隊列綁定到交換機的話,消息就會丟失,但我們不用擔心。如果沒有消費者監聽消息的話,我們就丟棄該消息。
接收消息代碼ReceiveLogs.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | package ?com.favccxx.favrabbit; import ?java.io.IOException; import ?com.rabbitmq.client.AMQP; import ?com.rabbitmq.client.Channel; import ?com.rabbitmq.client.Connection; import ?com.rabbitmq.client.ConnectionFactory; import ?com.rabbitmq.client.Consumer; import ?com.rabbitmq.client.DefaultConsumer; import ?com.rabbitmq.client.Envelope; public ?class ?ReceiveLogs?{ ? private ?static ?final ?String?EXCHANGE_NAME?=? "logs" ; ? public ?static ?void ?main(String[]?argv)? throws ?Exception?{ ?? ConnectionFactory?factory?=? new ?ConnectionFactory(); ?? factory.setHost( "localhost" ); ?? Connection?connection?=?factory.newConnection(); ?? Channel?channel?=?connection.createChannel(); ?? channel.exchangeDeclare(EXCHANGE_NAME,? "fanout" ); ?? String?queueName?=?channel.queueDeclare().getQueue(); ?? channel.queueBind(queueName,?EXCHANGE_NAME,? "" ); ?? System.out.println( "?[*]?Waiting?for?messages.?To?exit?press?CTRL+C" ); ?? Consumer?consumer?=? new ?DefaultConsumer(channel)?{ ??? @Override ??? public ?void ?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties, ????? byte []?body)? throws ?IOException?{ ???? String?message?=? new ?String(body,? "UTF-8" ); ???? System.out.println( "?[x]?Received?'" ?+?message?+? "'" ); ??? } ?? }; ?? channel.basicConsume(queueName,? true ,?consumer); ? } } |
測試數據
運行幾個日志消息接收者實例,使用日志消息發送者發送消息,發現每個日志消息接收者都接收到同樣的數據,說明發布訂閱成功。
1 | ? [x]?Received? 'I?saw?a?dog' |