前言
在現代分布式系統中,消息隊列是實現服務解耦和異步處理的關鍵組件。Spring框架提供了強大的支持,使得與消息隊列(如RabbitMQ、Kafka等)的集成變得更加便捷和靈活。本文將深入探討如何利用Spring的注解驅動方式來配置和管理隊列、交換機、消息轉換器等組件,從而實現一個高效且可擴展的消息處理架構。
在本博客中,我們將重點介紹:
如何使用Spring的注解方式配置RabbitMQ的隊列和交換機。
如何配置消息轉換器(如Jackson2JsonMessageConverter)來處理不同格式的消息。
如何根據業務需求對現有代碼進行改造,將消息隊列引入到系統中,從而實現消息的異步處理與解耦。
通過這篇文章,您將了解如何使用Spring框架的注解配置簡化消息隊列的管理,同時提升系統的可擴展性和維護性。
基于注解的聲明隊列交換機
利用SpringAMQP聲明DirectExchange并與隊列綁定
需求如下:
- 在consumer服務中,聲明隊列direct.queue1和direct.queue2
- 在consumer服務中,聲明交換機hmall.direct,將兩個隊列與其綁定
- 在consumer服務中,編寫兩個消費者方法,分別監聽direct.queue1和direct.queue2
基于Bean聲明隊列和交換機代碼如下:
package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DirectConfiguration {@Beanpublic DirectExchange directExchange(){return new DirectExchange("hmall.direct")}@Beanpublic Queue directQueue1(){return new Queue("direct.queuue1");}@Beanpublic Binding directQueue1bindingRed( Queue directQueue1, DirectExchange directExchange ){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}@Beanpublic Binding directQueue1bindingBlue( Queue directQueue1, DirectExchange directExchange ){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}@Beanpublic Queue directQueue2(){return new Queue("direct.queuue2");}@Beanpublic Binding directQueue2bindingRed( Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}@Beanpublic Binding directQueue2bindingYellow( Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}
SpringAMOP還提供了基于@RabbitListener注解來聲明隊列和交換機的方式
@RabbitListener(bindings =@QueueBinding(value = @Queue(name =direct.queue1),exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}
))
public void listenDirectQueuel(string msg){System.out.println("消費者1接收到Direct消息:【+msg+"】");
}
接收者代碼如下:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1",durable = "true"),exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void listenDirectQueue1(String message)throws Exception {log.info("消費者1監聽到direct.queue2的消息,["+message+"]");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2",durable = "true"),exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))
消息轉換器
消息轉換器
需求:測試利用SpringAMQP發送對象類型的消息
- 聲明一個隊列,名為object.queue
- 編寫單元測試,向隊列中直接發送一條消息,消息類型為Map
- 在控制臺查看消息,總結你能發現的問題
// 準備消息
Map<String,0bject>msg = new HashMap<>();
msg.put("name","Jack");
msg.put("age",21);
創建隊列object.queue
測試代碼如下:
@Testpublic void TestSendObject(){Map<String, Object> msg = new HashMap<>();msg.put("name", "Jack");msg.put("age", 18);//3.發送消息 參數分別是:交換機名稱、RoutingKey(暫時為空)、消息rabbitTemplate.convertAndSend("object.queue",msg);}
在控制臺上找到object.queue中得到消息
Spring的對消息對象的處理是由org.springframework.amgp.support.converter.MessageConverter來處理的。而默認實現是SimpleMessageConverter,基于IDK的ObjectOutputStream完成序列化。存在下列問題:
- JDK的序列化有安全風險
- JDK序列化的消息太大
- JDK序列化的消息可讀性差
建議采用JSON序列化代替默認的JDK序列化,要做兩件事情:
在publisher和consumer中都要引入jackson依賴:
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>
在publisher和consumer中都要配置Messageconverter:
@Bean
public MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();
}
消費者代碼:
@RabbitListener(queues = "object.queue")public void listenObjectQueue(Map<String,Object> msg)throws Exception {log.info("消費者監聽到pbject.queue的消息,["+msg+"]");}
運行結果如下:
業務改造
需求:改造余額支付功能,不再同步調用交易服務的0penFeign接口,而是采用異步MO通知交易服務更新訂單狀態。
在trade-service微服務消費者配置和pay-service微服務發送者都配置MQ依賴
<!--消息發送--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
在trade-service微服務和pay-service微服務添加上RabbitMQ配置信息
spring:rabbitmq:host: 192.168.244.136port: 5672virtual-host: /hmallusername: hmallpassword: 1234
因為消費者和發送者都需要消息轉換器,故直接將代碼寫到hm-common服務中,在config包中創建MqConfig類
package com.hmall.common.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqConfig {@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
}
同時trade-service微服務和pay-service微服務是無法自動掃描到該類,采用SpringBoot自動裝配的原理,在resource文件夾下的META-INF文件夾下的spring.factories文件中添加類路徑:
在接收者trade-service微服務中創建PayStatusListener
package com.hmall.trade.listener;import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RequiredArgsConstructor
public class PayStatusListener {private final IOrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue("trade.pay.success.queue"),exchange = @Exchange(value = "pay.direct"),key = "pay.success"))public void ListenPaySuccess(Long orderId) {orderService.markOrderPaySuccess(orderId);}}
修改pay-service服務下的com.hmall.pay.service.impl.PayOrderServiceImpl類中的tryPayOrderByBalance方法:
@Service
@RequiredArgsConstructor
@Slf4j
public class PayOrderServiceImpl extends ServiceImpl<PayOrderMapper, PayOrder> implements IPayOrderService {private final RabbitTemplate rabbitTemplate;...@Override@Transactionalpublic void tryPayOrderByBalance(PayOrderDTO payOrderDTO) {// 1.查詢支付單PayOrder po = getById(payOrderDTO.getId());// 2.判斷狀態if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){// 訂單不是未支付,狀態異常throw new BizIllegalException("交易已支付或關閉!");}// 3.嘗試扣減余額userClient.deductMoney(payOrderDTO.getPw(), po.getAmount());// 4.修改支付單狀態boolean success = markPayOrderSuccess(payOrderDTO.getId(), LocalDateTime.now());if (!success) {throw new BizIllegalException("交易已支付或關閉!");}// 5.修改訂單狀態// tradeClient.markOrderPaySuccess(po.getBizOrderNo());try {rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());} catch (Exception e) {log.error("支付成功的消息發送失敗,支付單id:{}, 交易單id:{}", po.getId(), po.getBizOrderNo(), e);}}
}
總結
本文介紹了基于Spring框架的注解方式來配置消息隊列、交換機以及消息轉換器的實現方法。通過注解配置,開發者可以更輕松地創建和管理RabbitMQ等消息隊列的組件,而無需過多的 XML 配置或繁瑣的手動配置。具體來說,我們探討了如何:
使用 @RabbitListener 和 @EnableRabbit 注解配置消息監聽器和消息隊列。
配置消息轉換器,特別是如何通過 Jackson2JsonMessageConverter 將消息轉換為JSON格式,從而實現數據的序列化與反序列化。
結合業務需求,講解如何對現有系統進行改造,集成消息隊列,實現異步處理和服務解耦。
通過這些配置和改造,系統的消息處理能力得到了增強,性能和可擴展性也得到了顯著提升。消息隊列的使用不僅能夠減少服務之間的緊耦合,還能夠通過異步方式提高系統的響應速度和吞吐量。
希望本博客能夠幫助您理解Spring在消息隊列方面的強大功能,并為您的業務應用提供參考。隨著系統復雜度的增加,合理的使用消息隊列將成為構建高可用、高性能系統的關鍵之一。