- Publisher:生產者,不再發送消息到隊列中,而是發給交換機
- Exchange:交換機,一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。
- Queue:消息隊列也與以前一樣,接收消息、緩存消息。不過隊列一定要與交換機綁定。
- Consumer:消費者,與以前一樣,訂閱隊列,沒有變化
Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!
交換機的類型有四種:
- Fanout:廣播,將消息交給所有綁定到交換機的隊列。我們最早在控制臺使用的正是Fanout交換機
- Direct:訂閱,基于RoutingKey(路由key)發送給訂閱了消息的隊列
- Topic:通配符訂閱,與Direct類似,只不過RoutingKey可以使用通配符
- Headers:頭匹配,基于MQ的消息頭匹配,用的較少。
Fanout交換機
簡單點來說,就是生產者把消息發給交換機,交換機根據路由(綁定規則)來轉發消息給隊列,消費者訂閱隊列,獲得消息。
Fanout,英文翻譯是扇出,我覺得在MQ中叫廣播更合適。
在廣播模式下,消息發送流程是這樣的:
- 1) 可以有多個隊列
- 2) 每個隊列都要綁定到Exchange(交換機)
- 3) 生產者發送的消息,只能發送到交換機
- 4) 交換機把消息發送給綁定過的所有隊列
- 5) 訂閱隊列的消費者都能拿到消息
注意:我下面的代碼都是在上一個加依賴的基礎上的,可看我上一個文檔
01利用官方文檔的
消息發送:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.util.Scanner;public class EmitLog {//定義交換機private static final String EXCHANGE_NAME = "fanout-exchange";public static void main(String[] argv) throws Exception {//創建連接工廠ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//建立連接和通道try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {//通過channel.exchangeDeclare方法聲明了一個名為EXCHANGE_NAME(即"logs")的交換機// 并指定了其類型為fanout。fanout類型的交換機會將消息廣播到所有與之綁定的隊列中。channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//發送消息Scanner scanner=new Scanner(System.in);while(scanner.hasNext()){String message = scanner.nextLine();//channel.basicPublish方法將消息發布到前面聲明的交換機中。// 注意,這里的routingKey(即第二個參數)為空字符串"",因為對于fanout類型的交換機來說,routingKey是不起作用的。channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}}
}
接收:
注意一定要創建隊列,不然只有交換機沒用
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class ReceiveLogs {private static final String EXCHANGE_NAME = "fanout-exchange";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();//創建通道Channel channel1 = connection.createChannel();Channel channel2 = connection.createChannel();//聲明交換機channel1.exchangeDeclare(EXCHANGE_NAME, "fanout");channel2.exchangeDeclare(EXCHANGE_NAME, "fanout");//隊列的名字String queueName = "星星";//創建隊列channel1.queueDeclare(queueName, true, false, false, null);channel1.queueBind(queueName, EXCHANGE_NAME, "");String queueName1 = "晨晨";//創建隊列channel2.queueDeclare(queueName1, true, false, false, null);channel2.queueBind(queueName1, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [1] Received '" + message + "'");};DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [2] Received '" + message + "'");};channel1.basicConsume(queueName, true, deliverCallback, consumerTag -> { });channel2.basicConsume(queueName1, true, deliverCallback2, consumerTag -> { });}
}
可以看到fanout類型,生產者發送一個消息,所有的消費者都能接收到,這個類型不用設置路由
02注解形式的:
消息發送
先在配置類中聲明交換機,隊列,以及綁定關系
public static final String FANOUT_QUEUE_1 = "fanout.queue.1";public static final String FANOUT_QUEUE_2 = "fanout.queue.2";public static final String FANOUT_EXCHANGE = "fanout.exchange";@Beanpublic Queue fanoutQueue1() {return new Queue(FANOUT_QUEUE_1);}@Beanpublic Queue fanoutQueue2() {return new Queue(FANOUT_QUEUE_2);}@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(FANOUT_EXCHANGE);}@Beanpublic Binding binding1() {return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());}@Beanpublic Binding binding2() {return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());}
Test類中添加測試方法:
@Test
public void testFanoutExchange() {// 交換機名稱String exchangeName = "fanout.exchange";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);
}
消息接收
在添加兩個方法,作為消費者:
@RabbitListener(queues = "fanout.queue.1")
public void listenFanoutQueue1(String msg) {System.out.println("消費者1接收到Fanout消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue.2")
public void listenFanoutQueue2(String msg) {System.out.println("消費者2接收到Fanout消息:【" + msg + "】");
}
總結
交換機的作用是什么?
- 接收publisher發送的消息
- 將消息按照規則路由到與之綁定的隊列
- 不能緩存消息,路由失敗,消息丟失
- FanoutExchange的會將消息路由到每個綁定的隊列
Direct交換機
在Fanout模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。
在Direct模型下:
- 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個
RoutingKey
(路由key) - 消息的發送方在 向 Exchange發送消息時,也必須指定消息的
RoutingKey
。 - Exchange不再把消息交給每一個綁定的隊列,而是根據消息的
Routing Key
進行判斷,只有隊列的Routingkey
與消息的Routing key
完全一致,才會接收到消息
此處我省略了官方文檔那種,直接springboot注解那種的,而且不再bean。而是全注解那種的,也是對黑馬代碼的進一步優化,不手動操作添加
下面是黑馬的案例
案例需求如圖:
- 聲明一個名為
hmall.direct
的交換機 - 聲明隊列
direct.queue1
,綁定hmall.direct
,bindingKey
為blud
和red
- 聲明隊列
direct.queue2
,綁定hmall.direct
,bindingKey
為yellow
和red
- 在
consumer
服務中,編寫兩個消費者方法,分別監聽direct.queue1和direct.queue2 - 在publisher中編寫測試方法,向
hmall.direct
發送消息
先在配置類加入下面的然后啟動一下:
/** 基于注解的來聲明交換機和隊列及其綁定關系 */@RabbitListener( bindings = @QueueBinding(exchange = @Exchange(name = "heima.direct", type = ExchangeTypes.DIRECT),value = @org.springframework.amqp.rabbit.annotation.Queue(name = "direct.queue1"),key = {"red", "blue"}))public void rabbitListener5(String message) {System.out.println("紅藍: " + message);}@RabbitListener( bindings = @QueueBinding(exchange = @Exchange(name = "heima.direct", type = ExchangeTypes.DIRECT),value = @org.springframework.amqp.rabbit.annotation.Queue(name = "direct.queue2"),key = {"yellow","red"}))public void rabbitListener6(String message) {System.out.println("黃紅: " + message);}
消息發送
在Test類中添加測試方法:
@Testpublic void testSendDirectExchange() {// 交換機名稱String exchangeName = "heima.direct";// 消息String message = "紅色警報!日本亂排核廢水,導致海洋生物變異,驚現哥斯拉!";// 發送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);}@Testpublic void testSendDirectExchange01() {// 交換機名稱String exchangeName = "heima.direct";// 消息String message = "最新報道,哥斯拉是居民自治巨型氣球,虛驚一場!";// 發送消息rabbitTemplate.convertAndSend(exchangeName, "blue", message);}
消息接收:
@RabbitListener(queues = "direct.queue1")public void listenDirectQueue1(String msg) {System.out.println("消費者1接收到direct.queue1的消息:【" + msg + "】");}@RabbitListener(queues = "direct.queue2")public void listenDirectQueue2(String msg) {System.out.println("消費者2接收到direct.queue2的消息:【" + msg + "】");}
先點擊測試的那個red的運行一下,在啟動項目:
我們再切換為blue這個key:
你會發現,只有消費者1收到了消息:
###總結
描述下Direct交換機與Fanout交換機的差異?
- Fanout交換機將消息路由給每一個與之綁定的隊列
- Direct交換機根據RoutingKey判斷路由給哪個隊列
- 如果多個隊列具有相同的RoutingKey,則與Fanout功能類似