一.簡單模式
1.1.核心邏輯
生產者 → 隊列 → 單個消費者(1:1 直連),消息被消費后自動從隊列刪除。
1.2.關鍵特性
- 無交換器(其實使用的是默認交換機不是顯示指定),直接指定隊列?
- 消息默認自動確認(autoAck),易丟失消息?
1.3.應用場景
單任務即時處理(如聊天消息、簡單日志)
1.4.架構圖
1.5.代碼示例
????????在Rabbit中,生產者發送完消息后,就結束了,之后的操作就與生產者無關了,而消費者是被動接收的,一直處于監聽狀態。?
- pom依賴
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version></dependency>
- 生產者?
package com.example.demo.rabbitmq.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {public static void main(String[] args)throws Exception {// 創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//設置主機地址connectionFactory.setHost("127.0.0.1");//設置連接端口號:默認為 5672connectionFactory.setPort(5672);// 虛擬主機名稱:默認為/connectionFactory.setVirtualHost("/");//設置連接用戶名;默認為guestconnectionFactory.setUsername("guest");//設置連接密碼;默認為guestconnectionFactory.setPassword("guest");//1 創建連接Connection connection = connectionFactory.newConnection();//創建頻道Channel channel=connection.createChannel();/*** 聲明(創建)隊列* 如果沒有一個名字叫simp1e-queue的隊列,則會創建該隊列,如果有則不會創建,所以該方法在確認有消息該消息隊列的情況下可以省略* queue 參數1:隊列名稱* durable 參數2:是否定義持久化隊列,當MQ重啟之后還在* exclusive 參數3:是否獨占本次連接。若獨占,只能有一個消費者監聽這個隊列目Connection關閉時刪除這個隊列* autoDelete 參數4:是否在不使用的時候自動刪除隊列,也就是在沒有Consumer時自動刪除* arquments 參數5:隊列其它參數*/channel.queueDeclare("simple_queue", true, false, false, null);//要發送的信息String message="Hello RabbitMQ!";/*** 指定消息隊列* 參數1:交換機名稱,如果沒有指定則使用默認Default_Exchange* 參數2:路由key,簡單模式可以傳遞隊列名稱* 參數3:配置信息* 參數4:消息內容*/channel.basicPublish( "","simple_queue", null, message.getBytes());channel.close();connection.close();}
}
?生產者main方法執行日志
????????如果你先啟動生產者main方法,那么你可以在RabbitMQ的web頁面可以看到在隊列中有一條消息。如果消費者一直都是監聽的,那大概率看不到,因為生產者發送消息的那一刻立馬就被消費者接收了,在消息隊列中就刪除了。
?
- 消費者
package com.example.demo.rabbitmq.simple;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException {//1.創建連接工廠ConnectionFactory factory = new ConnectionFactory();//2.設置參數factory.setHost("127.0.0.1");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("guest");//3.創建連接 ConnectionConnection connection = factory.newConnection();//4.創建channe1Channel channel = connection.createChannel();/***5.創建隊列* 如果沒有一個名字叫simp1e-queue的隊列,則會創建該隊列,如果有則不會創建,所以該方法在確認有消息該消息隊列的情況下可以省略* 數1.queue:隊列名稱* 參數2.durab1e:是否持久化。如果持久化,則當MQ重啟之后還在* 參數3.exclusive:是否獨占。* 參數4.autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉* 參數5.arguments:隊列其它參數*/channel.queueDeclare("simple_queue", true, false, false, null);// 接收消息DefaultConsumer consumer=new DefaultConsumer(channel){/*** 接收到消息后,此方法將被調用* @param consumerTag 標識* @param envelope 獲取一些信息,交換機,路由key...* @param properties 配置信息* @param body 數據* @throws IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+consumerTag);System.out.println("Exchange:"+envelope.getExchange());System.out.println("RoutingKey:"+envelope.getRoutingKey());System.out.println("properties:"+properties);System.out.println("body:"+new String(body));}};// 參數1.queue:隊列名稱// 參數2.autoAck:是否自動確認,類似咱們發短信,發送成功會收到一個確認消息// 參數3.callback:回調對象// 消費者類似一個監聽程序,主要是用來監聽消息channel.basicConsume("simple_queue",true,consumer);}
}
?接收后,消息隊列就是空的了。
你也可以點擊上面圖片中消息隊列的名稱simple_queue,查看下面具體詳情,如下面圖片。?
?
在查看消息時,注意在右上角選擇頁面的刷新頻率。?
? ?
二.工作模式
2.1.核心邏輯
生產者 → 隊列 →?多個消費者并行消費(1:N)。
工作模式與簡單模式唯一的不同在于它有多個消費者,當隊列中有消息時,多個消費者競爭,每條消息僅被一個消費者處理。
2.2.關鍵特性
- 多個消費者競爭消費同一隊列,默認輪詢分發(Round-Robin)
- 可配置?
basicQos(prefetchCount)
?實現公平分發(能者多勞)
2.3.應用場景
資源密集型任務并行處理(如文件轉碼、批量郵件)
2.4.架構圖
2.5.代碼示例
main方法啟動消費者1和消費者2,當然你可以多創建幾個消費者,復制簡單模式中的代碼即可。
修改下生產者代碼,創建一個新的消息隊列,并且發送10條消息
消費者1接收到的信息
?
消費者2接收到的消息
?
三.發布訂閱模式
交換器類型:fanout
(廣播)
3.1.核心邏輯
生產者 →?Fanout交換器?→ 綁定隊列 → 所有消費者
忽略路由鍵(Routing Key)?,消息復制到所有綁定隊列。
3.3.關鍵特性
- 一條消息被多個消費者獨立消費(廣播)
- 需顯式綁定隊列到交換器?
3.3.應用場景
事件廣播(如用戶注冊后同時發郵件、短信)
3.4.架構圖
?如上圖所示,發布訂閱模式有以下特點;
- 指定類型的交換機;
- 多個消息隊列,交換機會將一條消息發布到每一個消息隊列中;
- 每個消息隊列可以有一個或者多個消費者;
3.5.代碼示例
生產者代碼
package com.example.demo.rabbitmq.fanout;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {private static final String EXCHANGE_NAME="test_fanout_exchange";public static void main(String[] args)throws Exception {// 創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//設置主機地址connectionFactory.setHost("127.0.0.1");//設置連接端口號:默認為 5672connectionFactory.setPort(5672);// 虛擬主機名稱:默認為/connectionFactory.setVirtualHost("/");//設置連接用戶名;默認為guestconnectionFactory.setUsername("guest");//設置連接密碼;默認為guestconnectionFactory.setPassword("guest");//1 創建連接Connection connection = connectionFactory.newConnection();//創建頻道Channel channel=connection.createChannel();/*** 創建交換機* 參數1:交換機名稱* 參數2:交換機類型* 參數3.durable:是否持久化* 參數4.autoDelete:自動刪除* 參數5.internal:內部使用,一般false* 參數6.arquments:其它參數*///這兩個方法是一樣的//channel.exchangeDeclare(EXCHANGE_NAME, "fanout");channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true, false, false, null);/*** 聲明(創建)隊列* 如果沒有一個名字叫simp1e-queue的隊列,則會創建該隊列,如果有則不會創建,所以該方法在確認有消息該消息隊列的情況下可以省略* queue 參數1:隊列名稱* durable 參數2:是否定義持久化隊列,當MQ重啟之后還在* exclusive 參數3:是否獨占本次連接。若獨占,只能有一個消費者監聽這個隊列目Connection關閉時刪除這個隊列* autoDelete 參數4:是否在不使用的時候自動刪除隊列,也就是在沒有Consumer時自動刪除* arquments 參數5:隊列其它參數*/channel.queueDeclare("fanout_queue_1", true, false, false, null);channel.queueDeclare("fanout_queue_2", true, false, false, null);channel.queueDeclare("fanout_queue_3", true, false, false, null);/*** 綁定隊列到交換機* 參數1:隊列名稱* 參數2:交換機名稱* 參數3:路由key 交換機的類型為fanout,為空*/channel.queueBind("fanout_queue_1", EXCHANGE_NAME, "");channel.queueBind("fanout_queue_2", EXCHANGE_NAME, "");channel.queueBind("fanout_queue_3", EXCHANGE_NAME, "");//要發送的信息String message="Hello RabbitMQ!";/*** 指定消息隊列* 參數1:交換機名稱,如果沒有指定則使用默認Default_Exchange* 參數2:路由key,簡單模式可以傳遞隊列名稱* 參數3:配置信息* 參數4:消息內容*/channel.basicPublish( EXCHANGE_NAME,"", null, message.getBytes());channel.close();connection.close();}
}
上面代碼有以下作用:
- 創建指定類型的交換機(如果有,不創建);
- 創建三個消息隊列(如果有,不創建);
- 綁定交換機與消息隊列;
- 發送消息;?
運行后可在Rabbit MQ的管理頁面查看到下面的內容
交換機
點擊交換機名稱,查看綁定關系,綁定了三個消息隊列?
?
點擊任何一個消息隊列,你都可以看到有一條消息
?
或者你可以根據下圖查看消息隊列中的消息
?
如果你要查看某個隊列中的具體消息,點擊隊列名稱,找到Get messages。
如果不修改圖中的任何一個選項,是不會刪除隊列中的消息的。者三個消息隊列中,都有一條消息【Hello RabbitMQ!】這就是廣播的效果
?
消費端代碼?
消費端代碼并沒有變化,與簡單模式和工作模式的沒有不同,因為消費端監聽的是消息隊列,只需要修改消息隊列名稱后運行即可。
?
在上面圖片中,我只創建了兩個消費者,分別監聽了隊列1和隊列2。通過下面可以看到,隊列1和隊列2的消息被接收了。如果你感興趣,可以多加幾個消費者,注意:一個隊列可以有多個消費者。
?
四.路由模式
交換器類型:direct
(精確匹配)
4.1.核心邏輯
生產者 →?Direct交換器?→?匹配路由鍵的隊列?→ 消費者
路由鍵需與綁定鍵(Binding Key)完全一致。
4.2.關鍵特性
- 實現消息分類投遞(如按日志級別分發)
- 隊列可綁定多個路由鍵?
4.3.應用場景
精準路由(如ERROR日志存數據庫,INFO日志打印)
4.4.架構圖

根據上圖,路由模式有以下特點
- 特定類型的交換機?
direct;
- 指定具體的路由,交換機根據路由將消息發送到對應的隊列中;
- 需要注意的是,交換機到隊列的路由規則,可以多個。
4.5.代碼示例
消費者
消費端的代碼沒什么不同,在這里,我創建了兩個消費端,各監聽一個消息隊列。
生產者
package com.example.demo.rabbitmq.routing;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer {private static final String EXCHANGE_NAME="direct_exchange";public static void main(String[] args)throws Exception {// 創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//設置主機地址connectionFactory.setHost("127.0.0.1");//設置連接端口號:默認為 5672connectionFactory.setPort(5672);// 虛擬主機名稱:默認為/connectionFactory.setVirtualHost("/");//設置連接用戶名;默認為guestconnectionFactory.setUsername("guest");//設置連接密碼;默認為guestconnectionFactory.setPassword("guest");//1 創建連接Connection connection = connectionFactory.newConnection();//創建頻道Channel channel=connection.createChannel();/*** 創建交換機* 參數1:交換機名稱* 參數2:交換機類型* 參數3.durable:是否持久化* 參數4.autoDelete:自動刪除* 參數5.internal:內部使用,一般false* 參數6.arquments:其它參數*/channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, false, null);/*** 聲明(創建)隊列* 如果沒有一個名字叫simp1e-queue的隊列,則會創建該隊列,如果有則不會創建,所以該方法在確認有消息該消息隊列的情況下可以省略* queue 參數1:隊列名稱* durable 參數2:是否定義持久化隊列,當MQ重啟之后還在* exclusive 參數3:是否獨占本次連接。若獨占,只能有一個消費者監聽這個隊列目Connection關閉時刪除這個隊列* autoDelete 參數4:是否在不使用的時候自動刪除隊列,也就是在沒有Consumer時自動刪除* arquments 參數5:隊列其它參數*/channel.queueDeclare("direct_queue_1", true, false, false, null);channel.queueDeclare("direct_queue_2", true, false, false, null);/*** 綁定隊列到交換機* 參數1:隊列名稱* 參數2:交換機名稱* 參數3:路由key*/channel.queueBind("direct_queue_1", EXCHANGE_NAME, "error");channel.queueBind("direct_queue_2", EXCHANGE_NAME, "info");//要發送的信息String message="日志信息:張三調用了delete方法.錯誤了,目志級別error";/*** 指定消息隊列* 參數1:交換機名稱,如果沒有指定則使用默認Default_Exchange* 參數2:路由key,簡單模式可以傳遞隊列名稱* 參數3:配置信息* 參數4:消息內容*/channel.basicPublish( EXCHANGE_NAME,"error", null, message.getBytes());channel.close();connection.close();}
}
?在上面代碼中,重點在于:
- 創建了一個交換機,類型為DIRECT;
- 創建了兩個隊列;
- 綁定交換機與隊列的關系,并指定路由;發送消息時需指定交換機名稱和路由key;
運行上面代碼后,消費者1接收到了error消息。
修改生產者代碼中的路由key,再次執行?
消費者2接收到了信息
?
五.主題模式
交換器類型:topic
(模糊匹配)
5.1.核心邏輯
生產者 →?Topic交換器?→?通配符匹配的隊列?→ 消費者
路由鍵支持?*
(匹配一詞)和?#
(匹配多詞),如?user.*.order
。
5.2.關鍵特性
- 動態路由(如按用戶興趣訂閱消息)
- 綁定鍵格式示例:
news.#
(接收所有新聞)
5.3.應用場景
動態消息分發(如電商系統按用戶標簽推送促銷)
5.4.架構圖
主題模式和路由模式的區別在于,主題模式的路由key可以模糊匹配 。
將交換機的類型設置為topic類型,在綁定隊列時配置路由key,可以設置模糊匹配的規則,如下圖
在生產者發送消息后,交換機根據路由開始匹配,將消息發送到所有匹配的隊列中。
六.頭模式
交換器類型:headers
(鍵值對匹配)
6.1.核心邏輯
生產者 →?Headers交換器?→?匹配消息頭的隊列?→ 消費者
通過?x-match
?指定?all
(全匹配)或?any
(任一匹配)。
6.2.關鍵特性
- 不依賴路由鍵,用消息頭(Headers)路由?
- 性能較低,極少使用?
6.3.應用場景
特殊路由需求(如按消息語言或版本過濾)
6.4.架構圖
????????該模式與上文中的各個模式完全不同,在頭模式中,生產者不指定或創建消息隊列,不綁定交換機與消息隊列。這部分功能在消費者中。
在生產者中,有以下幾步:
- 聲明或創建交換機;
- 構建消息屬性,指定消息頭;
6.5.代碼案例
消費者
在該模式中,必須先啟動消費者,因為生產者沒有指定消息隊列,如果先啟動生產者,會導致數據丟失。
package com.example.demo.rabbitmq.head;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//1.創建連接工廠ConnectionFactory factory = new ConnectionFactory();//2.設置參數factory.setHost("127.0.0.1");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("guest");//3.創建連接 ConnectionConnection connection = factory.newConnection();//4.創建channe1Channel channel = connection.createChannel();//5.創建交換機channel.exchangeDeclare("headers_exchange", BuiltinExchangeType.HEADERS, true, false, false, null);/***5.創建隊列* 如果沒有一個名字叫simp1e-queue的隊列,則會創建該隊列,如果有則不會創建,所以該方法在確認有消息該消息隊列的情況下可以省略* 數1.queue:隊列名稱* 參數2.durab1e:是否持久化。如果持久化,則當MQ重啟之后還在* 參數3.exclusive:是否獨占。* 參數4.autoDelete:是否自動刪除。當沒有Consumer時,自動刪除掉* 參數5.arguments:隊列其它參數*/channel.queueDeclare("headers_queue_1", true, false, false, null);// 設置綁定參數(完全匹配)Map<String, Object> bindingArgs = new HashMap<>();bindingArgs.put("x-match", "all"); // 必須所有Header匹配bindingArgs.put("format", "JSON");bindingArgs.put("priority", "high");/*** 綁定隊列到交換機* 參數1:隊列名稱* 參數2:交換機名稱* 參數3:路由key 為空* 參數4:綁定參數*/channel.queueBind("headers_queue_1", "headers_exchange", "", bindingArgs);// 接收消息// 消費消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received (ALL match): " + message +" Headers: " + delivery.getProperties().getHeaders());};/*** 監聽消息* 參數1.queue:隊列名稱* 參數2.autoAck:是否自動確認,類似咱們發短信,發送成功會收到一個確認消息* 參數3.callback:回調對象* 參數4.cancelCallback:取消消費的回調* 參數5.arguments:消費者其它參數*/channel.basicConsume("headers_queue_1",true,deliverCallback,consumerTag->{});}
}
生產者
package com.example.demo.rabbitmq.head;import com.rabbitmq.client.*;import java.util.HashMap;
import java.util.Map;public class Producer {private static final String EXCHANGE_NAME="headers_exchange";public static void main(String[] args)throws Exception {// 創建連接工廠ConnectionFactory connectionFactory = new ConnectionFactory();//設置主機地址connectionFactory.setHost("127.0.0.1");//設置連接端口號:默認為 5672connectionFactory.setPort(5672);// 虛擬主機名稱:默認為/connectionFactory.setVirtualHost("/");//設置連接用戶名;默認為guestconnectionFactory.setUsername("guest");//設置連接密碼;默認為guestconnectionFactory.setPassword("guest");//1 創建連接Connection connection = connectionFactory.newConnection();//創建頻道Channel channel=connection.createChannel();/*** 創建交換機* 參數1:交換機名稱* 參數2:交換機類型* 參數3.durable:是否持久化* 參數4.autoDelete:自動刪除* 參數5.internal:內部使用,一般false* 參數6.arquments:其它參數*/channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS, true, false, false, null);//設置頭消息Map<String, Object> headers = new HashMap<>();headers.put("format", "JSON");headers.put("priority", "high");// 構建消息屬性AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers) //設置頭消息.build();String message = "Header Exchange Test Message";/*** 指定消息隊列* 參數1:交換機名稱,如果沒有指定則使用默認Default_Exchange* 參數2:路由key,簡單模式可以傳遞隊列名稱* 參數3:配置信息* 參數4:消息內容*/channel.basicPublish( EXCHANGE_NAME,"", properties, message.getBytes());channel.close();connection.close();}
}
?消費者接收到的消息
當然Rabbit MQ還有其它模式,如,RPC模式:遠程過程調用,本質上是同步調用,和我們使用OpenFeign調用遠程接口一樣,有機會再說。