目錄
一 MQ技術選型
1 運行rabbitmq
2 基本介紹
3 快速入門
1 交換機負責路由消息給隊列
2 數據隔離
二 Java客戶端
1 快速入門
2 WorkQueue
3 FanOut交換機
4 Direct交換機
5 Topic交換機
*6 聲明隊列交換機
1 在配置類當中聲明
2 使用注解的方式指定
7 消息轉換器
*前景引入
維度 | 異步通訊 | 同步通訊 | RabbitMQ 的定位 |
---|---|---|---|
交互方式 | 通過中間件間接通信,無阻塞等待 | 直接通信,需實時響應 | 作為異步通訊的核心載體,支持消息緩存與路由 |
耦合度 | 低(生產者和消費者解耦) | 高(調用方依賴被調用方可用性) | 通過隊列解耦系統,提升容錯性 |
適用場景 | 高并發、耗時任務、事件驅動架構 | 實時性要求高的簡單交互 | 天然適合異步場景,也可通過 RPC 支持同步需求 |
性能與擴展性 | 高吞吐,支持水平擴展 | 受限于實時響應能力 | 通過集群、負載均衡優化異步性能 |
一 MQ技術選型
MQ(message Queue)消息隊列,字面來看就是存放消息的隊列。也就是異步調用中的Broke。
1 運行rabbitmq
在虛擬機上安裝Docker_虛擬機安裝docker-CSDN博客
拉取鏡像
- docker pull rabbitmq:3-management
在容器當中運行
- docker run ...
借助端口訪問
2 基本介紹
核心概念總結
角色 | 作用 | 類比 |
---|---|---|
Publisher | 發送消息的程序 | 寄信人 |
Exchange | 按規則將消息分發到隊列 | 郵局分揀員 |
Queue | 存儲消息的容器 | 郵箱 |
Consumer | 從隊列取消息并處理的程序 | 收信人 |
Virtual Host | 隔離不同業務的消息環境(如測試、生產) | 郵局內的獨立部門 |
3 快速入門
1 交換機負責路由消息給隊列
添加成功
找到一臺交換機
需要添加綁定隊列從而實現路由給隊列
消息路由成功
2 數據隔離
RabbitMQ 中的 虛擬主機(vhost) 可以用一個簡單的比喻來理解:它就像一臺大型服務器中的“獨立房間”,每個房間都有自己的門禁系統、家具和規則,互不干擾。以下是它的核心作用:
實現:
先添加一個用戶
現在這個用戶還沒有虛擬主機,這里其是無法訪問之前創建的隊列,是與之前的虛擬主機隔離開的
現在退出原先的用戶,以剛剛創建的用戶信息登錄,然后添加一個虛擬主機
現在就可以在現在的用戶之下的虛擬主機上創建新的隊列
二 Java客戶端
1 快速入門
實現:
1 導入spring-amqp依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2 添加隊列
3 配置MQ地址
4 發送消息
@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testSendMessage2Queue() {String queueName = "simple.queue1";String msg = "hello, amqp!";rabbitTemplate.convertAndSend(queueName, msg);}
5 隊列
6 在消費者的相關方法中定義
@RabbitListener(queues = "simple.queue1")public void listenSimpleQueue(String msg) {System.out.println("消費者收到了simple.queue的消息:【" + msg + "】");}
7 然后將項目啟動,再在測試類中發送消息,控制臺會實時監控到發送的消息
8 隊列當中的消息拿出來在控制臺里面就沒有消息了
2 WorkQueue
任務模型:簡單來說就是讓多個消費者綁定到一個隊列,共同消費隊列當中的消息。
一個隊列多個消費者,可以緩解消息堆積問題。
1 配置項
2 不寫的話(默認一人一半,處理不完在隊列里等待)
3 新增一個隊列
4 兩個消費者(消費能力不同,消費能力相同應該是輪詢消費)
@RabbitListener(queues = "work.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消費者1 收到了 work.queue的消息:【" + msg + "】");Thread.sleep(20);}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消費者2 收到了 work.queue的消息...... :【" + msg + "】");Thread.sleep(200);}
5 生產者
@Testvoid testWorkQueue() throws InterruptedException {String queueName = "work.queue";for (int i = 1; i <= 50; i++) {String msg = "hello, worker, message_" + i;rabbitTemplate.convertAndSend(queueName, msg);Thread.sleep(20);}}
6 測試
3 FanOut交換機
真正生產環境都會經過exchange來發送消息,而不是直接發送到隊列,交換機的類型有以下三種
Fanout模式會將接受到的消息廣播到跟其綁定的每一個隊列,廣播模式。
例子
1 先將隊列聲明好
2 再聲明交換機同時與隊列綁定
3 消費者
@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg) throws InterruptedException {System.out.println("消費者1 收到了 fanout.queue1的消息:【" + msg + "】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg) throws InterruptedException {System.out.println("消費者2 收到了 fanout.queue2的消息:【" + msg + "】");}
4 生產者
@Testvoid testSendFanout() {String exchangeName = "hmall.fanout";String msg = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, null, msg);}
測試結果:
為什么第二個參數是?null
?
在你的代碼中,第二個參數是 null
,這是為了配合 Fanout 交換機 的特性。以下是關鍵點:
Fanout 交換機的特性
- Fanout 交換機(也稱為廣播交換機)會將消息?無條件廣播到所有綁定到該交換機的隊列,完全忽略路由鍵。
- 因此,在使用 Fanout 交換機時,路由鍵(
routingKey
)可以設為?null
,因為交換機不會使用它來決定消息的路由規則。
4 Direct交換機
這種交換機可以實現與Fanout交換機相同的效果同時也可以實現定向的效果。
需求
1 創建隊列與交換機
(交換機需要給routingKey值)
2 消費者
@RabbitListener(queues = "direct.queue1") // 直接監聽名為 direct.queue1 的隊列
public void listenDirectQueue1(String msg) {System.out.println("消費者1 收到了 direct.queue1的消息:【" + msg + "】");
}@RabbitListener(queues = "direct.queue2") // 直接監聽名為 direct.queue2 的隊列
public void listenDirectQueue2(String msg) {System.out.println("消費者2 收到了 direct.queue2的消息:【" + msg + "】");
}
3 生產者
@Testvoid testSendDirect() {String exchangeName = "hmall.direct";String msg = "藍色通知,警報解除,哥斯拉是放的氣球";rabbitTemplate.convertAndSend(exchangeName, "blue", msg);}
測試:
發送的路由鍵 | 接收隊列 | 觸發的消費者 |
---|---|---|
red | direct.queue1, direct.queue2 | 消費者1 + 消費者2 |
blue | direct.queue1 | 消費者1 |
yellow | direct.queue2 | 消費者2 |
可以根據需求更改生產者的代碼邏輯:
5 Topic交換機
Topic 交換機是 RabbitMQ 中基于模式匹配的路由機制,允許通過通配符(*
?和?#
)實現靈活的路由規則。
需求
實現:
聲明隊列和交換機
消費者
@RabbitListener(queues = "topic.queue1")public void listenTopicQueue1(String msg) throws InterruptedException {System.out.println("消費者1 收到了 topic.queue1的消息:【" + msg + "】");}@RabbitListener(queues = "topic.queue2")public void listenTopicQueue2(String msg) throws InterruptedException {System.out.println("消費者2 收到了 topic.queue2的消息:【" + msg + "】");}
生產者
@Testvoid testSendTopic() {String exchangeName = "hmall.topic";String msg = "今天天氣挺不錯,我的心情的挺好的";rabbitTemplate.convertAndSend(exchangeName, "china.weather", msg);}
測試:可以根據需求修改發送的RoutingKey
Direct交換機與Topic的差異
特性 | Direct 交換機 | Topic 交換機 |
---|---|---|
路由鍵匹配方式 | 精確匹配(完全一致) | 模式匹配(支持通配符?* ?和?# ) |
靈活性 | 低(適合簡單路由) | 高(適合復雜路由場景) |
典型場景 | 訂單狀態變更、任務分發 | 日志分類、多維度消息分發 |
*6 聲明隊列交換機
為了改善在控制臺創建隊列交換機的笨重,可以使用相關接口
聲明隊列和交換機
實現:
1 在配置類當中聲明
Fanout的
package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfiguration {// fanoutExchange 定義交換機@Beanpublic FanoutExchange fanoutExchange(){// ExchangeBuilder.fanoutExchange("").build();return new FanoutExchange("hmall.fanout2");}// queue 創建隊列@Beanpublic Queue fanoutQueue3(){// QueueBuilder.durable("ff").build();//持久化return new Queue("fanout.queue3");}// 綁定隊列和交換機@Beanpublic Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);}// 創建隊列@Beanpublic Queue fanoutQueue4(){return new Queue("fanout.queue4");}// 綁定隊列和交換機@Beanpublic Binding fanoutBinding4(){return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());}
}
Direct的
package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;// @Configuration
public class DirectConfiguration {// 定義交換機@Beanpublic DirectExchange directExchange() {return new DirectExchange("hmall.direct");}// 創建隊列@Beanpublic Queue directQueue1() {return new Queue("direct.queue1");}// 隊列與交換機進行綁定@Beanpublic Binding directQueue1BindingRed(Queue directQueue1, DirectExchange directExchange) {return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}// 隊列與交換機進行綁定@Beanpublic Binding directQueue1BindingBlue(Queue directQueue1, DirectExchange directExchange) {return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}// 創建隊列@Beanpublic Queue directQueue2() {return new Queue("direct.queue2");}// 隊列與交換機進行綁定@Beanpublic Binding directQueue2BindingRed(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}// 隊列與交換機進行綁定@Beanpublic Binding directQueue2BindingBlue(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}}
2 使用注解的方式指定
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1", durable = "true"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}))public void listenDirectQueue1(String msg) throws InterruptedException {System.out.println("消費者1 收到了 direct.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2", durable = "true"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}))public void listenDirectQueue2(String msg) throws InterruptedException {System.out.println("消費者2 收到了 direct.queue2的消息:【" + msg + "】");}
通過使用?@RabbitListener
?的?bindings
?+?@QueueBinding
?注解的方式,不需要手動創建隊列、交換機或綁定關系。
-
檢查資源是否存在:
Spring 會通過?RabbitAdmin
?組件向 RabbitMQ 服務器發起檢查,確認隊列、交換機是否已存在。 -
自動創建缺失的資源:
-
若隊列?
direct.queue1
?或?direct.queue2
?不存在,會根據?@Queue
?注解的配置(如?name
、durable
)自動創建隊列。 -
若交換機?
hmall.direct
?不存在,會根據?@Exchange
?注解的配置(如?name
、type
)自動創建交換機。
-
-
自動綁定隊列到交換機:
根據?key
?指定的路由鍵,將隊列與交換機綁定(如?direct.queue1
?綁定?red
?和?blue
?路由鍵)。
7 消息轉換器
使用
1.?SimpleMessageConverter
(默認)
-
行為:
-
支持?
String
、byte[]
、Serializable
?對象。 -
若消息是?
Serializable
?對象,使用 Java 原生序列化。
-
-
問題:
-
強耦合:發送方和接收方必須有相同的類路徑(否則反序列化失敗)。
-
安全性差:Java 原生序列化易受攻擊(如反序列化漏洞)。
-
2.?Jackson2JsonMessageConverter
(推薦)
-
行為:
-
將對象轉換為 JSON 字符串,再轉為?
byte[]
。 -
反序列化時,將 JSON 還原為對象(需指定目標類型)。
-
-
優勢:
-
跨語言兼容:JSON 是通用格式,非 Java 客戶端也可解析。
-
松耦合:不強制要求發送方和接收方的類路徑一致。
-
安全性高:避免 Java 原生序列化漏洞
-
1 依賴引入
<!--Jackson--><dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId></dependency>
2 Bean的創建
// 消息轉換器@Beanpublic MessageConverter jacksonMessageConvertor(){return new Jackson2JsonMessageConverter();}
3 消費者
@RabbitListener(queues = "object.queue")public void listenObject(Map<String, Object> msg) throws InterruptedException {System.out.println("消費者 收到了 object.queue的消息:【" + msg + "】");}
4 生產者
@Testvoid testSendObject() {Map<String, Object> msg = new HashMap<>(2);msg.put("name", "jack");msg.put("age", 21);rabbitTemplate.convertAndSend("object.queue", msg);}
5 在實際業務當中的使用