SpringCloud系列教程:微服務的未來(二十五)-基于注解的聲明隊列交換機、消息轉換器、業務改造

前言

在現代分布式系統中,消息隊列是實現服務解耦和異步處理的關鍵組件。Spring框架提供了強大的支持,使得與消息隊列(如RabbitMQ、Kafka等)的集成變得更加便捷和靈活。本文將深入探討如何利用Spring的注解驅動方式來配置和管理隊列、交換機、消息轉換器等組件,從而實現一個高效且可擴展的消息處理架構。

在本博客中,我們將重點介紹:

如何使用Spring的注解方式配置RabbitMQ的隊列和交換機。
如何配置消息轉換器(如Jackson2JsonMessageConverter)來處理不同格式的消息。
如何根據業務需求對現有代碼進行改造,將消息隊列引入到系統中,從而實現消息的異步處理與解耦。
通過這篇文章,您將了解如何使用Spring框架的注解配置簡化消息隊列的管理,同時提升系統的可擴展性和維護性。


基于注解的聲明隊列交換機

利用SpringAMQP聲明DirectExchange并與隊列綁定
需求如下:

  1. 在consumer服務中,聲明隊列direct.queue1和direct.queue2
  2. 在consumer服務中,聲明交換機hmall.direct,將兩個隊列與其綁定
  3. 在consumer服務中,編寫兩個消費者方法,分別監聽direct.queue1和direct.queue2

在這里插入圖片描述
基于Bean聲明隊列和交換機代碼如下:

package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DirectConfiguration {@Beanpublic DirectExchange directExchange(){return new DirectExchange("hmall.direct")}@Beanpublic Queue directQueue1(){return new Queue("direct.queuue1");}@Beanpublic Binding directQueue1bindingRed( Queue directQueue1, DirectExchange directExchange ){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}@Beanpublic Binding directQueue1bindingBlue( Queue directQueue1, DirectExchange directExchange ){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}@Beanpublic Queue directQueue2(){return new Queue("direct.queuue2");}@Beanpublic Binding directQueue2bindingRed( Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}@Beanpublic Binding directQueue2bindingYellow( Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}

SpringAMOP還提供了基于@RabbitListener注解來聲明隊列和交換機的方式

@RabbitListener(bindings =@QueueBinding(value = @Queue(name =direct.queue1),exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}
))
public void listenDirectQueuel(string msg){System.out.println("消費者1接收到Direct消息:【+msg+"】");
}

接收者代碼如下:

	@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1",durable = "true"),exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void listenDirectQueue1(String message)throws Exception {log.info("消費者1監聽到direct.queue2的消息,["+message+"]");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2",durable = "true"),exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))

消息轉換器

消息轉換器
需求:測試利用SpringAMQP發送對象類型的消息

  • 聲明一個隊列,名為object.queue
  • 編寫單元測試,向隊列中直接發送一條消息,消息類型為Map
  • 在控制臺查看消息,總結你能發現的問題
// 準備消息
Map<String,0bject>msg = new HashMap<>();
msg.put("name","Jack");
msg.put("age"21);

創建隊列object.queue
在這里插入圖片描述
測試代碼如下:

	@Testpublic void TestSendObject(){Map<String, Object> msg = new HashMap<>();msg.put("name", "Jack");msg.put("age", 18);//3.發送消息 參數分別是:交換機名稱、RoutingKey(暫時為空)、消息rabbitTemplate.convertAndSend("object.queue",msg);}

在控制臺上找到object.queue中得到消息
在這里插入圖片描述
Spring的對消息對象的處理是由org.springframework.amgp.support.converter.MessageConverter來處理的。而默認實現是SimpleMessageConverter,基于IDK的ObjectOutputStream完成序列化。存在下列問題:

  • JDK的序列化有安全風險
  • JDK序列化的消息太大
  • JDK序列化的消息可讀性差

建議采用JSON序列化代替默認的JDK序列化,要做兩件事情:
在publisher和consumer中都要引入jackson依賴:

<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>

在publisher和consumer中都要配置Messageconverter:

@Bean
public MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();
}

在這里插入圖片描述
在這里插入圖片描述
消費者代碼:

	@RabbitListener(queues = "object.queue")public void listenObjectQueue(Map<String,Object> msg)throws Exception {log.info("消費者監聽到pbject.queue的消息,["+msg+"]");}

在這里插入圖片描述
運行結果如下:
在這里插入圖片描述

業務改造

需求:改造余額支付功能,不再同步調用交易服務的0penFeign接口,而是采用異步MO通知交易服務更新訂單狀態。
在這里插入圖片描述
在trade-service微服務消費者配置和pay-service微服務發送者都配置MQ依賴

	<!--消息發送--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

在trade-service微服務和pay-service微服務添加上RabbitMQ配置信息

spring:rabbitmq:host: 192.168.244.136port: 5672virtual-host: /hmallusername: hmallpassword: 1234

因為消費者和發送者都需要消息轉換器,故直接將代碼寫到hm-common服務中,在config包中創建MqConfig類

package com.hmall.common.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqConfig {@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
}

同時trade-service微服務和pay-service微服務是無法自動掃描到該類,采用SpringBoot自動裝配的原理,在resource文件夾下的META-INF文件夾下的spring.factories文件中添加類路徑:
在這里插入圖片描述

在接收者trade-service微服務中創建PayStatusListener

package com.hmall.trade.listener;import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RequiredArgsConstructor
public class PayStatusListener {private final IOrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue("trade.pay.success.queue"),exchange = @Exchange(value = "pay.direct"),key = "pay.success"))public void ListenPaySuccess(Long orderId) {orderService.markOrderPaySuccess(orderId);}}

修改pay-service服務下的com.hmall.pay.service.impl.PayOrderServiceImpl類中的tryPayOrderByBalance方法:

@Service
@RequiredArgsConstructor
@Slf4j
public class PayOrderServiceImpl extends ServiceImpl<PayOrderMapper, PayOrder> implements IPayOrderService {private final RabbitTemplate rabbitTemplate;...@Override@Transactionalpublic void tryPayOrderByBalance(PayOrderDTO payOrderDTO) {// 1.查詢支付單PayOrder po = getById(payOrderDTO.getId());// 2.判斷狀態if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){// 訂單不是未支付,狀態異常throw new BizIllegalException("交易已支付或關閉!");}// 3.嘗試扣減余額userClient.deductMoney(payOrderDTO.getPw(), po.getAmount());// 4.修改支付單狀態boolean success = markPayOrderSuccess(payOrderDTO.getId(), LocalDateTime.now());if (!success) {throw new BizIllegalException("交易已支付或關閉!");}// 5.修改訂單狀態// tradeClient.markOrderPaySuccess(po.getBizOrderNo());try {rabbitTemplate.convertAndSend("pay.direct", "pay.success", po.getBizOrderNo());} catch (Exception e) {log.error("支付成功的消息發送失敗,支付單id:{}, 交易單id:{}", po.getId(), po.getBizOrderNo(), e);}}
}

在這里插入圖片描述
在這里插入圖片描述


總結

本文介紹了基于Spring框架的注解方式來配置消息隊列、交換機以及消息轉換器的實現方法。通過注解配置,開發者可以更輕松地創建和管理RabbitMQ等消息隊列的組件,而無需過多的 XML 配置或繁瑣的手動配置。具體來說,我們探討了如何:

使用 @RabbitListener 和 @EnableRabbit 注解配置消息監聽器和消息隊列。
配置消息轉換器,特別是如何通過 Jackson2JsonMessageConverter 將消息轉換為JSON格式,從而實現數據的序列化與反序列化。
結合業務需求,講解如何對現有系統進行改造,集成消息隊列,實現異步處理和服務解耦。
通過這些配置和改造,系統的消息處理能力得到了增強,性能和可擴展性也得到了顯著提升。消息隊列的使用不僅能夠減少服務之間的緊耦合,還能夠通過異步方式提高系統的響應速度和吞吐量。

希望本博客能夠幫助您理解Spring在消息隊列方面的強大功能,并為您的業務應用提供參考。隨著系統復雜度的增加,合理的使用消息隊列將成為構建高可用、高性能系統的關鍵之一。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/896225.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/896225.shtml
英文地址,請注明出處:http://en.pswp.cn/news/896225.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

國產編輯器EverEdit - 文本編輯器的關鍵特性:文件變更實時監視,多頭編輯不掉坑

1 監視文件變更 1.1 應用場景 某些時候&#xff0c;用戶會使用多個編輯器打開同一個文件&#xff0c;如果在A編輯器修改保存&#xff0c;但是B編輯器沒有重新打開&#xff0c;直接在B編輯器修改再保存&#xff0c;則可能造成在A編輯器中修改的內容丟失&#xff0c;因此&#x…

HAProxy介紹與編譯安裝

目錄 1、HAProxy介紹 2、HAProxy編譯安裝 Centos 基礎環境 Ubuntu 基礎環境 編譯安裝HAProxy 驗證HAProxy版本 HAProxy啟動腳本 配置文件 啟動haproxy 驗證haproxy狀態 查看haproxy的狀態頁面 1、HAProxy介紹 HAProxy是法國開發者 威利塔羅(Willy Tarreau) 在2000年…

python類型轉換深淺拷貝

1.類型轉換 1.1 int(x):轉化為一個整數&#xff0c;只能轉換由純數字組成的字符串 float->int 浮點型強轉整形會去掉小數點后面的數&#xff0c;只保留整數部分 a 1.2 print(type(a)) #<class float> b int(a) print(type(b)) #<class int>print(int…

分布式光纖聲波振動技術在鉆井泄漏檢測中的應用

在石油天然氣的鉆井作業中&#xff0c;及時發現并定位泄漏點對于保障開采安全、降低環境污染以及避免經濟損失至關重要。傳統的泄漏檢測方法往往存在局限性&#xff0c;而分布式光纖聲波振動技術憑借其獨特的優勢&#xff0c;正逐漸成為鉆井過程中尋找泄漏的有力工具。 技術原理…

rtconfig.cpython-313.pyc 在 .gitignore文件中寫入 *.pyc 文件仍然沒有被忽略?

在 .gitignore 文件中添加 *.pyc 和 *.*.pyc 規則時&#xff0c;如果 .pyc 文件仍然沒有被忽略&#xff0c;可能有以下幾種原因&#xff1a; 1. 已經被 Git 跟蹤的文件 即使您在 .gitignore 中指定了忽略 .pyc 文件&#xff0c;Git 仍然會跟蹤已經被提交到版本庫中的文件。如…

機器學習---KNN算法核心原理和思路分析

文章目錄 1.算法介紹2.過擬合和欠擬合3.幾種不同的距離4.特征的歸一化處理 特此聲明&#xff1a;該內容是學習耿直哥的相關機器學習理論&#xff0c;也是文章里面的部分圖片素材的來源 1.算法介紹 KNN全稱叫做K Nearset Neighbor,翻譯之后就是K個最近的鄰居&#xff1b; 其實…

書生大模型實戰營14-MindSearch深度解析實踐

文章目錄 L2——進階島MindSearch深度解析實踐1 MindSearch 簡介2 開發環境配置2.1. 打開codespace主頁&#xff0c;選擇Blank模板進行創建2.2. 創建conda環境隔離并安裝依賴 3. 獲取硅基流動API KEY4. 啟動MindSearch4.1. 啟動后端4.2. 啟動前端 5. 部署到自己的 HuggingFace …

uniapp實現app的pdf預覽

實現效果 文件準備 static下添加該pdf文件&#xff08;下載地址&#xff1a;https://gitee.com/shallow-winds/resource_package/tree/master/%E6%96%B9%E6%B3%95%E4%B8%80/html&#xff09; 使用web-view進行展示&#xff1a; 在這里插入代碼片 <web-view :src"u…

重啟 nginx

首先確認Nginx是否已經安裝&#xff0c;并檢查它的安裝位置。 執行以下命令來檢查&#xff1a; which nginx#例如&#xff1a;顯示/usr/local/nginx/sbin/nginx方法1&#xff1a;直接使用完整路徑啟動Nginx 1.1、啟動 Nginx&#xff0c;執行命令&#xff1a; sudo /usr/loca…

java實現多圖合成mp4和視頻附件下載

java實現多圖合成mp4和視頻附件下載 在wutool中&#xff0c;封裝了視頻處理工具類&#xff0c;基于javacv和ffmpeg庫&#xff0c;實現多圖合成mp4、視頻http附件下載等。 關于wutool wutool是一個java代碼片段收集庫&#xff0c;針對特定場景提供輕量解決方案&#xff0c;只…

ollama修改監聽ip: 0.0.0.0

確認Ollama綁定IP地址 默認情況下&#xff0c;Ollama可能僅監聽本地回環地址&#xff08;127.0.0.1&#xff09;。要允許外部訪問&#xff0c;需將其配置為監聽所有IP&#xff08;0.0.0.0&#xff09;或指定IP&#xff08;如10…19&#xff09;。 修改啟動命令&#xff08;推薦…

STM32-溫濕度上傳OneNET項目

一、項目需求 使用 ESP8266 連接 OneNET 云平臺&#xff0c;并通過 MQTT 協議上傳 DHT11 獲取的溫濕度值。 二、項目框圖 三、DHT11工作原理 參考于良許嵌入式手把手教你玩轉DHT11&#xff08;原理驅動&#xff09; | 良許嵌入式 3.1 正常工作驗證 #? 上電后&#xff…

百度首頁上線 DeepSeek 入口,免費使用

大家好&#xff0c;我是小悟。 百度首頁正式上線了 DeepSeek 入口&#xff0c;這一重磅消息瞬間在技術圈掀起了驚濤駭浪&#xff0c;各大平臺都被刷爆了屏。 百度這次可太給力了&#xff0c;PC 端開放僅 1 小時&#xff0c;就有超千萬人涌入體驗。這速度&#xff0c;簡直比火…

Ubuntu:wvp-GB28181-pro安裝、運行

參考 https://doc.wvp-pro.cn 下載源碼 GitHub - 648540858/wvp-GB28181-pro: WEB VIDEO PLATFORM是一個基于GB28181-2016標準實現的網絡視頻平臺&#xff0c;支持NAT穿透&#xff0c;支持海康、大華、宇視等品牌的IPC、NVR、DVR接入。支持國標級聯&#xff0c;支持rtsp/rtm…

c++入門-------命名空間、缺省參數、函數重載

C系列 文章目錄 C系列前言一、命名空間二、缺省參數2.1、缺省參數概念2.2、 缺省參數分類2.2.1、全缺省參數2.2.2、半缺省參數 2.3、缺省參數的特點 三、函數重載3.1、函數重載概念3.2、構成函數重載的條件3.2.1、參數類型不同3.2.2、參數個數不同3.2.3、參數類型順序不同 前言…

Linux提權之計劃任務反彈shell提權(十一)

實驗環境還是一樣的 當獲取一個linux普通用戶的時&#xff0c;查看計劃任務 首先我們退出root用戶 cat /etc/crontab crontab -l 查看當前用戶命令 /var/spool/cron/crontabs/root 這個目錄是root任務文件 默認是不是root權限是 看不到 tail -f /var/log/syslog 咋們切換用…

論文解讀 | AAAI'25 Cobra:多模態擴展的大型語言模型,以實現高效推理

點擊藍字 關注我們 AI TIME歡迎每一位AI愛好者的加入&#xff01; 點擊 閱讀原文 觀看作者講解回放&#xff01; 個人信息 作者&#xff1a;趙晗&#xff0c;浙江大學-西湖大學聯合培養博士生 內容簡介 近年來&#xff0c;在各個領域應用多模態大語言模型&#xff08;MLLMs&…

ai-financial-agent - 為金融投資打造的AI代理

探索人工智能在投資研究中的應用。本項目僅用于**教育**目的&#xff0c;不用于真實交易或投資。 作者聲明&#xff1a; 本項目僅用于教育和研究目的。 不用于真實交易或投資不提供任何保證或擔保過去的表現并不代表未來的結果Creator 對經濟損失不承擔任何責任咨詢財務顧問…

Open WebUI選擇模型為空,解決辦法(for DeepSeek)

標簽&#xff1a; DeepSeek&#xff1b; Open WebUI&#xff1b; 問題&#xff1a;Open WebUI選擇模型為空&#xff0c;解決辦法 &#xff08;for DeepSeek&#xff09; 操作系統&#xff1a;Ubuntu 22 硬件&#xff1a;臺式電腦 Ubuntu 22系統&#xff0c;DeepSeek安裝成功&…

歐幾里得算法第二彈---計算多個數的最大公約數

如果要求多個數的 GCD&#xff0c;可以先求前兩個數的 GCD&#xff0c;然后用這個結果與下一個數求 GCD&#xff0c;依次類推。 為什么可以用前兩個數的 GCD 與下一個數繼續求 GCD&#xff0c;從而得到所有數的 GCD 呢&#xff1f;&#xff08;之前我不知道&#xff0c;自己也沒…