SpringAMQP 快速入門
- 1. 創建項目
- 2. 快速入門
- 2.2.1 消息發送
- 2.2.2 消息接收
- 3. 交換機
- 3.1 Fanout Exchange(扇出交換機)
- 3.1.1 創建隊列與交換機
- 3.1.2 消息接收
- 3.1.3 消息發送
- 3.2 Direct Exchange(直連交換機)
- 3.2.1 創建交換機與隊列 綁定 Routing Key
- 3.2.2 消息接收
- 3.1.3 消息發送
- 3.3 Topic Exchange(主題交換機)
- 3.3.1 創建交換機與隊列 綁定 Routing Key
- 3.3.2 消息接收
- 3.3.3 消息發送
- 4. 聲明隊列和交換機
- 4.1 聲明隊列
- 4.2 聲明交換機
- 4.3 聲明綁定關系
- 4.4 測試
- 4.5 基于注解聲明
- 5. 消息轉換器
- 5.1 測試默認消息轉換器
- 5.2 配置JSON轉換器
- 5.3 消息接收
Spring AMQP(Advanced Message Queuing Protocol)是 Spring 框架的一個模塊,用于簡化在基于消息的應用程序中使用消息隊列的開發。它建立在 AMQP 協議之上,提供了與消息中間件(如 RabbitMQ)集成的便捷方式。
以下是 Spring AMQP 的主要特點和概念:
- 簡化的消息生產者和消費者: Spring AMQP 提供了簡單的模板(
AmqpTemplate
)用于發送和接收消息,大大簡化了消息生產者和消費者的開發。 - 聲明式的消息監聽器容器: Spring AMQP 允許使用注解聲明消息監聽器,而不需要手動編寫消息消費者。通過
@RabbitListener
注解,可以將一個方法標識為消息監聽器,以便在接收到消息時自動調用該方法。 - 消息轉換: Spring AMQP 提供了靈活的消息轉換機制,可以將消息從一種格式轉換為另一種格式,以便與不同類型的消息隊列進行集成。
- 聲明式的隊列、交換機和綁定: 使用 Spring AMQP,可以通過注解聲明式地定義隊列、交換機和綁定關系,而不需要在代碼中顯式創建這些對象。
- 事務支持: Spring AMQP 支持事務,可以在消息發送和接收過程中使用事務來確保消息的可靠性。
- 異常處理: 提供了豐富的異常體系,方便開發者處理在消息處理過程中可能發生的異常情況。
- 集成 Spring Boot: Spring AMQP 很好地集成到 Spring Boot 中,通過簡單的配置即可快速搭建基于消息的應用。
- 并發性: 允許配置消息監聽器容器的并發性,以便同時處理多個消息。
1. 創建項目
選擇 AMQP 依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.0</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.hzy</groupId><artifactId>SpringAMQP-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>SpringAMQP-demo</name><description>SpringAMQP-demo</description><properties><java.version>8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
在SpringAMQP-demo中新建兩個子模塊 publisher
、consumer
2. 快速入門
前面在消息隊列 - RabbitMQ這篇博客中添加了一個 test 用戶和 /test 虛擬主機,現在在 /test 中創建一個隊列
2.2.1 消息發送
配置 publisher
的 application.yml
文件
spring:rabbitmq:host: 192.168.193.40port: 5672username: testpassword: testvirtual-host: /test
然后在publisher
中編寫測試類SpringAmqpTest
,并利用RabbitTemplate
實現消息發送:
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testQueue1() {// 隊列名稱String queueName = "test.queue1";// 消息String message = "hello";// 發送消息rabbitTemplate.convertAndSend(queueName, message);}
}
成功發送到 test.queue1
中
2.2.2 消息接收
在consumer
的application.yml
中添加配置:
spring:rabbitmq:host: 192.168.193.40port: 5672username: testpassword: testvirtual-host: /test
新建一個類SpringRabbitListener
監聽隊列 test.queue1
@Component
public class SpringRabbitListener {// 利用RabbitListener來聲明要監聽的隊列信息// 將來一旦監聽的隊列中有了消息,就會推送給當前服務,調用當前方法,處理消息。// 方法體中接收的就是消息體的內容@RabbitListener(queues = "test.queue1")public void listenTestQueueMessage(String msg) {System.out.println("spring 消費者接收到消息:【" + msg + "】");}
}
啟動服務后就接收到了消息
隊列中的消息被消費
3. 交換機
3.1 Fanout Exchange(扇出交換機)
扇出交換機將消息廣播到與交換機綁定的所有隊列,忽略路由鍵。適用于廣播消息給多個消費者的場景,不關心消息的具體內容。
3.1.1 創建隊列與交換機
創建隊列
創建交換機
綁定隊列到交換機
3.1.2 消息接收
在SpringRabbitListener
中添加兩個方法分別監聽 fanout.queue1 和 fanout.queue2
@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1Message(String msg) {System.out.println("spring 消費者接收 fanout.queue1 消息:【" + msg + "】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2Message(String msg) {System.out.println("spring 消費者接收 fanout.queue2 消息:【" + msg + "】");}
3.1.3 消息發送
在 SpringAmqpTest
中添加消息發送方法
@Testpublic void SendFanoutExchange() {// 交換機名稱String exchangeName = "test.fanout";// 消息String message = "hello, test.fanout";rabbitTemplate.convertAndSend(exchangeName, "", message);}
成功接收到消息
3.2 Direct Exchange(直連交換機)
直連交換機是最簡單的交換機類型,它將消息路由到與消息中的路由鍵完全匹配的隊列中,在消息生產者指定的路由鍵和隊列的綁定鍵完全相同時,消息將被發送到相應的隊列。
3.2.1 創建交換機與隊列 綁定 Routing Key
創建隊列
創建交換機
綁定 Routing Key
3.2.2 消息接收
在SpringRabbitListener
中添加兩個方法分別監聽 fanout.queue1 和 fanout.queue2
@RabbitListener(queues = "direct.queue1")public void listenDirectQueue1Message(String msg) {System.out.println("spring 消費者1接收 direct.queue1 消息:【" + msg + "】");}@RabbitListener(queues = "direct.queue2")public void listenFanoutQueue2Message(String msg) {System.out.println("spring 消費者2接收 direct.queue2 消息:【" + msg + "】");}
3.1.3 消息發送
在 SpringAmqpTest
中添加消息發送方法
@Testpublic void SendDirectExchange() {// 交換機名稱String exchangeName = "test.direct";// 消息String message = "hello, red";rabbitTemplate.convertAndSend(exchangeName, "red", message);}
Routing Key 為 red時兩個隊列都能收到消息
修改 Routing Key 為 blue
@Testpublic void SendDirectExchange() {// 交換機名稱String exchangeName = "test.direct";// 消息String message = "hello, blue";rabbitTemplate.convertAndSend(exchangeName, "blue", message);}
只有 direct.queue2 能收到消息
3.3 Topic Exchange(主題交換機)
Topic Exchange
和 Direct Exchange
類似區別在于使用直連交換機時,消息的路由鍵(Routing Key)需要與隊列綁定時指定的路由鍵完全匹配。使用主題交換機時,消息的路由鍵可以使用通配符進行模式匹配,支持更靈活的消息路由規則。
路由鍵中可以使用 *
(匹配一個單詞)和 #
(匹配零個或多個單詞)通配符。
適用于需要根據一定的模式匹配將消息路由到不同隊列的場景,可以處理更復雜的消息路由需求。
3.3.1 創建交換機與隊列 綁定 Routing Key
創建隊列
創建交換機
綁定 Routing Key
3.3.2 消息接收
在SpringRabbitListener
中添加兩個方法分別監聽 topic.queue1 和 topic.queue2
@RabbitListener(queues = "topic.queue1")public void listenTopicQueue1Message(String msg) {System.out.println("spring 消費者1接收 topic.queue1 消息:【" + msg + "】");}@RabbitListener(queues = "topic.queue2")public void listenTopicQueue2Message(String msg) {System.out.println("spring 消費者2接收 topic.queue2 消息:【" + msg + "】");}
3.3.3 消息發送
在 SpringAmqpTest
中添加消息發送方法
@Testpublic void SendTopicExchange() {// 交換機名稱String exchangeName = "test.topic";// 消息String message = "郵件通知";rabbitTemplate.convertAndSend(exchangeName, "mail.notices", message);message = "微信通知";rabbitTemplate.convertAndSend(exchangeName, "wechat.notices", message);message = "今日新聞";rabbitTemplate.convertAndSend(exchangeName, "today.news", message);}
4. 聲明隊列和交換機
在 Spring AMQP 中,聲明隊列和交換機是連接到 RabbitMQ 之前的重要步驟。這些聲明定義了你的消息傳遞系統的基礎架構,包括隊列和交換機的名稱、類型以及與其相關的其他屬性。
之前我們都是基于RabbitMQ控制臺來創建隊列、交換機。但是在實際開發時,隊列和交換機是程序員定義的,將來項目上線,又要交給運維去創建。那么程序員就需要把程序中運行的所有隊列和交換機都寫下來,交給運維。在這個過程中是很容易出現錯誤的。
因此推薦的做法是由程序啟動時檢查隊列和交換機是否存在,如果不存在自動創建。
Spring AMQP 提供了用來聲明隊列、交換機、及其綁定關系的的類:
- Queue:用于聲明隊列,可以用工廠類QueueBuilder構建
- Exchange:用于聲明交換機,可以用工廠類ExchangeBulider構建
- Binding:用于聲明隊列和交換機的綁定關系,可以用工廠類BindingBuilder構建
4.1 聲明隊列
創建 一個配置類 FanoutConfiguration
@Configuration
public class FanoutConfiguration {
}
SpringAMQP 提供了 Queue類用來創建隊列
在配置類中添加方法
@Beanpublic Queue fanoutQueue3(){// durable() 持久化隊列QueueBuilder durable = QueueBuilder.durable("fanout.queue3");return durable.build();}
4.2 聲明交換機
SpringAMQP提供了一個Exchange接口,來表示所有不同類型的交換機:
在配置類中添加方法
@Beanpublic FanoutExchange fanoutExchange(){ExchangeBuilder exchangeBuilder = ExchangeBuilder.fanoutExchange("test.fanout2");return exchangeBuilder.build();}
4.3 聲明綁定關系
SpringAMQP 提供了 Binding 類 來綁定隊列于交換機
在配置類中添加方法
@Beanpublic Binding fanoutBinding3(FanoutExchange fanoutExchange,Queue fanoutQueue3){return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);}
4.4 測試
啟動服務,查看控制臺可以看到隊列、交換機、和綁定關系都成功創建。
4.5 基于注解聲明
基于@Bean的方式聲明隊列和交換機比較麻煩,每添加一個隊列、交換機、綁定關系都要寫一個@Bean方法。Spring還提供了基于注解方式來聲明。
使用注解方式聲明 Fanout
交換機與隊列
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "fanout.queue3", durable = "true"),exchange = @Exchange(value = "test.fanout2", type = "fanout")))public void listenFanoutQueue3Message(String msg) {System.out.println("消費者 接收 fanout.queue3 消息:【" + msg + "】");}
使用注解方式聲明Direct
交換機與隊列
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "direct.queue1", durable = "true"),exchange = @Exchange(name = "test.direct", type = "direct"),key = {"red","blue"} ))public void listenDirectQueue1Message(String msg) {System.out.println("spring 消費者1接收 direct.queue1 消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "direct.queue2", durable = "true"),exchange = @Exchange(name = "test.direct", type = "direct"),key = {"red","yellow"}))public void listenDirectQueue2Message(String msg) {System.out.println("spring 消費者2接收 direct.queue2 消息:【" + msg + "】");}
使用注解方式聲明Topic
交換機與隊列
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.queue1", durable = "true"),exchange = @Exchange(name = "test.topic", type = ExchangeTypes.TOPIC),key = "#.notices"))public void listenTopicQueue1Message(String msg) {System.out.println("spring 消費者1接收 topic.queue1 消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "topic.queue2", durable = "true"),// type = ExchangeTypes.TOPIC 或者 "topic" 默認 "direct"exchange = @Exchange(name = "test.topic", type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicQueue2Message(String msg) {System.out.println("spring 消費者2接收 topic.queue2 消息:【" + msg + "】");}
可以看到使用@RabbitListener
注解的方式比 @Bean
方式簡單很多
5. 消息轉換器
5.1 測試默認消息轉換器
發送一個map集合
@Testpublic void testSendMapQueue1() {// 隊列名稱String queueName = "test.queue1";// 消息Map<String,String> map = new HashMap<>();map.put("name","zs");// 發送消息rabbitTemplate.convertAndSend(queueName, map);}
在控制臺查看消息
可以看到默認使用的序列化方式是JDK序列化,眾所周知,JDK序列化存在下列問題:
- 數據體積過大
- 有安全漏洞
- 可讀性差
使用我們需要使用可讀性更高更輕量級的序列化方式:JSON
5.2 配置JSON轉換器
顯然,JDK序列化方式并不合適。我們希望消息體的體積更小、可讀性更高,因此可以使用JSON方式來做序列化和反序列化。
在publisher
和consumer
兩個服務中都引入依賴:
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
配置消息轉換器,在publisher
和consumer
兩個服務的啟動類中添加一個Bean即可:
@Bean
public MessageConverter messageConverter(){// 1.定義消息轉換器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自動創建消息id,用于識別不同消息,也可以在業務中基于ID判斷是否是重復消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}
再測試一次在控制臺中查看
public void listenTestQueueMessage(Map<String, String> msg) {System.out.println("消費者接收到test.queue1消息:【" + msg + "】");}