【RabbitMQ】Spring Boot 結合 RabbitMQ 完成應用間的通信

???🔥個人主頁:?中草藥

🔥專欄:【中間件】企業級中間件剖析


?Spring 框架與 RabbitMQ 的整合主要通過?Spring AMQP(Advanced Message Queuing Protocol)模塊實現,提供了便捷的消息隊列開發能力。

引入依賴

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies>

配置

#配置RabbitMQ的基本信息
spring:rabbitmq:host: 110.41.51.65port: 15673 #默認為5672username: studypassword: studyvirtual-host: bite #默認值為#或者以下這種方式rabbitmq:addresses: #amqp://username:password@Ip:port/virtual-host

一、工作隊列模式(Work Queue)

場景:多個消費者共享一個隊列,消息被輪詢分發(Round-Robin),用于任務分發和負載均衡(如耗時任務處理)。

producer

@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/work")public String work(){//使用內置交換機 RoutingKey 和隊列名稱一致for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE,"Hello World"+i);}return "OK";}
}

@RabbitListener?是 Spring 框架中用于監聽 RabbitMQ 隊列的注解,通過使用這個注解,可以定義一個方法,以便從 RabbitMQ 隊列中接收消息(相當于消費者)。該注解支持多種參數類型,這些參數類型代表了從 RabbitMQ 接收到的消息和相關信息。

以下是一些常用的參數類型:

  1. String:返回消息的內容。
  2. Messageorg.springframework.amqp.core.Message):Spring AMQP 的 Message 類,返回原始的消息體以及消息的屬性,如消息 ID、內容、隊列信息等。
  3. Channelcom.rabbitmq.client.Channel):RabbitMQ 的通道對象,可以用于進行更高級的操作,如手動確認消息 。
@Component
public class WorkListener {@RabbitListener(queues = Constants.WORK_QUEUE)public void queueListener1(Message message, Channel channel) {System.out.println("Listener1:["+Constants.WORK_QUEUE+"]"+message+",channel:"+channel);}@RabbitListener(queues = Constants.WORK_QUEUE)public void queueListener2(String message) {System.out.println("Listener2:["+Constants.WORK_QUEUE+"]"+message);}
}

觀察控制臺

輪詢分發:默認按順序將消息分發給不同消費者。

二、發布訂閱模式(Publish/Subscribe)

場景:消息廣播到所有綁定的隊列(如日志廣播、事件通知),使用?Fanout 交換機

聲明使用交換機和隊列 并綁定

@Configuration
public class RabbitMQConfig {@Bean("fanoutQueue1")public Queue funoutQueue1() {return QueueBuilder.durable(Constants.FUNOUT_QUEUE1).build();}@Bean("fanoutQueue2")public Queue funoutQueue2() {return QueueBuilder.durable(Constants.FUNOUT_QUEUE2).build();}@Bean("funoutExchange")public FanoutExchange funoutExchange(){return ExchangeBuilder.fanoutExchange(Constants.FUNOUT_EXCHANGE).durable(true).build();}@Bean("funoutQueueBinding1")public Binding funoutQueueBinding1(@Qualifier("fanoutQueue1") Queue queue,@Qualifier("funoutExchange") FanoutExchange funoutExchange){return BindingBuilder.bind(queue).to(funoutExchange);}@Bean("funoutQueueBinding2")public Binding funoutQueueBinding2(@Qualifier("fanoutQueue2") Queue queue,@Qualifier("funoutExchange") FanoutExchange funoutExchange){return BindingBuilder.bind(queue).to(funoutExchange);}
}

producer


@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/funout")public String funout(){rabbitTemplate.convertAndSend(Constants.FUNOUT_EXCHANGE,"","Hello Spring funout");return "OK";}
}

listener

@Component
public class FunoutListener {@RabbitListener(queues = Constants.FUNOUT_QUEUE1)public void queueListener1(String message) {System.out.println("Listener1:["+Constants.FUNOUT_QUEUE1+"]"+message);}@RabbitListener(queues = Constants.FUNOUT_QUEUE2)public void queueListener2(String message) {System.out.println("Listener2:["+Constants.FUNOUT_QUEUE2+"]"+message);}
}

在實際開發之中,一個 listener 監聽一個 queue?

三、路由模式(Routing)

場景:根據?路由鍵(Routing Key)?精準匹配,將消息發送到指定隊列,使用?Direct 交換機

聲明使用交換機和隊列 并綁定

@Configuration
public class RabbitMQConfig {//direct@Bean("directQueue1")public Queue directQueue1() {return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();}@Bean("directQueue2")public Queue directQueue2() {return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();}@Bean("directExchange")public DirectExchange directExchange(){return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();}@Bean("directQueueBinding1")public Binding directQueueBinding1(@Qualifier("directQueue1") Queue queue,@Qualifier("directExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("orange");}@Bean("directQueueBinding2")public Binding directQueueBinding2(@Qualifier("directQueue2") Queue queue,@Qualifier("directExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("black");}@Bean("directQueueBinding3")public Binding directQueueBinding3(@Qualifier("directQueue2") Queue queue,@Qualifier("directExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("orange");}
}

producer


@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/direct/{routingKey}")public String direct(@PathVariable String routingKey){rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE,routingKey,"Hello Spring direct "+routingKey);return "direct OK";}
}

listener

@Component
public class DirectListener {@RabbitListener(queues = Constants.DIRECT_QUEUE1)public void queueListener1(String message) {System.out.println("Listener1:["+Constants.DIRECT_QUEUE1+"]"+message);}@RabbitListener(queues = Constants.DIRECT_QUEUE2)public void queueListener2(String message) {System.out.println("Listener2:["+Constants.DIRECT_QUEUE2+"]"+message);}
}

如果某一個隊列即綁定了black和orange,將會分別發送到隊列

四、通配符模式(Topics)

場景:根據路由鍵的?通配符規則?匹配隊列,使用?Topic 交換機,支持?*(匹配一個單詞)和?#(匹配多個單詞)。

Topics 和 Routing 模式的區別是:

  1. topics 模式使用的交換機類型為 topic (Routing 模式使用的交換機類型為 direct)
  2. topic 類型的交換機在匹配規則上進行了擴展,Binding Key 支持通配符匹配

聲明使用交換機和隊列 并綁定

@Configuration
public class RabbitMQConfig {//TOPIC@Bean("topicExchange")public TopicExchange topicExchange(){return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();}@Bean("topicQueue1")public Queue topicQueue1(){return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();}@Bean("topicQueue2")public Queue topicQueue2(){return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();}@Bean("topicQueueBinding1")public Binding topicQueueBinding1(@Qualifier("topicExchange")TopicExchange topicExchange,@Qualifier("topicQueue1") Queue topicQueue){return BindingBuilder.bind(topicQueue).to(topicExchange()).with("*.orange.*");}@Bean("topicQueueBinding2")public Binding topicQueueBinding2(@Qualifier("topicExchange")TopicExchange topicExchange,@Qualifier("topicQueue2") Queue topicQueue){return BindingBuilder.bind(topicQueue).to(topicExchange).with("*.*.rabbit");}@Bean("topicQueueBinding3")public Binding topicQueueBinding3(@Qualifier("topicExchange")TopicExchange topicExchange,@Qualifier("topicQueue2") Queue topicQueue){return BindingBuilder.bind(topicQueue).to(topicExchange).with("lazy.#");}
}

producer


@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/topic/{routingKey}")public String topic(@PathVariable String routingKey){rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routingKey,"Hello Spring topic "+routingKey);return "topic OK";}
}

listener

@Component
public class TopicListener {@RabbitListener(queues = Constants.TOPIC_QUEUE1)public void queueListener1(String message) {System.out.println("Listener1:["+Constants.TOPIC_QUEUE1+"]"+message);}@RabbitListener(queues = Constants.TOPIC_QUEUE2)public void queueListener2(String message) {System.out.println("Listener2:["+Constants.TOPIC_QUEUE2+"]"+message);}
}

五、完成應用通信

SpringBoot 整合 RabbitMQ 實現應用通信是微服務/分布式系統中常見的異步解耦方案。

以此圖為實例

訂單系統

@Configuration
public class RabbitMQConfig {@Bean("orderQueue")public Queue orderQueue() {return QueueBuilder.durable("order.create").build();}
}@RestController
@RequestMapping("/order")
public class OrderController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/create")public String create(){rabbitTemplate.convertAndSend("","order.create","訂單信息,訂單ID:"+ UUID.randomUUID());//這里僅僅是模擬演示,實際的下單操作比較復雜,包括參數的校驗,數據庫存儲等等 業務代碼省略return "下單成功";}
}

?物流系統

@Component
@RabbitListener(queues = "order.create")
public class OrderListener {@RabbitHandler//該注解根據所識別的數據類型不同自動分配不同的方法public void handleOrder(String orderInfo) {System.out.println("接收到新的訂單消息:"+orderInfo);//接收到訂單消息后,進行相應的業務出路 代碼省略}@RabbitHandler//在此處為方便演示我們將order-service 讓此項目應用//正確的做法是將OrderInfo抽取出來單獨作為一個包,兩個service都引用這個包public void handleOrder2(OrderInfo orderInfo) {System.out.println("接收到新的訂單消息:"+orderInfo);}
}

@RabbitHandler?注解用于標記方法,這些方法會根據消息的類型來處理接收到的消息。當一個消息監聽器容器接收到消息時,它會根據消息的類型選擇合適的?@RabbitHandler?注解的方法來處理該消息。?

測試結果

當在發送的是一個對象時,為保證對象的可讀性,我們要保證對象可被序列化,且通過 Jackson2JsonMessageConverter 將其從原生序列化改為Json格式

@Configuration
public class RabbitMQConfig {@Beanpublic Jackson2JsonMessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter());return rabbitTemplate;}
}

對于Listener,為保證同樣具有解讀json的能力,也應該去加上相同的配置?

@Data
public class OrderInfo implements Serializable {//實現序列化接口private String orderId;private String name;
}

?可觀察結果


對時間的慷慨,就等于慢性自殺。——奧斯特洛夫斯基

🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀

以上,就是本期的全部內容啦,若有錯誤疏忽希望各位大佬及時指出💐

? 制作不易,希望能對各位提供微小的幫助,可否留下你免費的贊呢🌸?

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/72145.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/72145.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/72145.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

CDefView::_GetPIDL函數分析之ListView_GetItem函數的參數item的item.mask 為LVIF_PARAM

CDefView::_GetPIDL函數分析之ListView_GetItem函數的參數item的item.mask 為LVIF_PARAM 第一部分&#xff1a; 1: kd> t SHELL32!CDefView::_GetPIDL: 001b:77308013 55 push ebp 1: kd> dv this 0x00000015 i 0n21 …

MongoDB分頁實現方式對比:PageRequest vs Skip/Limit

MongoDB分頁實現方式對比&#xff1a;PageRequest vs Skip/Limit 一、基本概念1.1 PageRequest分頁1.2 Skip/Limit分頁 二、主要區別2.1 使用方式2.2 參數計算2.3 適用場景PageRequest適用場景&#xff1a;Skip/Limit適用場景&#xff1a; 三、性能考慮3.1 PageRequest的性能特…

Manus(一種AI代理或自動化工具)與DeepSeek(一種強大的語言模型或AI能力)結合使用任務自動化和智能決策

一、Manus與DeepSeek差異 十分好奇DeepSeek和Manus究竟誰更厲害些&#xff0c;DeepSeek是知識型大腦&#xff0c;Manus則是全能型執行者。即DeepSeek專注于語言處理、知識整合與專業文本生成。其核心優勢在于海量參數支持的深度學習和知識推理能力&#xff0c;例如撰寫論文、潤…

UI自動化:poium測試庫

以下是關于 poium 測試庫 的詳細介紹&#xff0c;涵蓋其核心功能、使用方法及與原生 Selenium 的對比&#xff0c;幫助快速掌握這一工具&#xff1a; 1. poium 簡介 定位&#xff1a;基于 Selenium 的 Page Object 模式增強庫&#xff0c;專注于簡化元素定位和頁面操作。 核心…

C#結構體(Struct)詳解

在 C# 中&#xff0c;?結構體&#xff08;struct&#xff09;? 是一種值類型數據類型&#xff0c;適用于封裝小型數據組。與類&#xff08;class&#xff09;不同&#xff0c;結構體在棧&#xff08;Stack&#xff09;上分配內存&#xff0c;且賦值時會發生值復制。以下是結構…

UVC攝像頭命令推流,推到rv1126里面去

ffmpeg命令查詢UVC設備 .\ffmpeg.exe -list_devices true -f dshow -i dummy 上圖是查詢UVC設備的效果圖&#xff0c;畫紅框的部分是UVC設備的設備名稱"USB2.0 PC CAMERA"和設備號 "device_pnp_\\?\usb#vid_1908&pid_2310&mi_00#8&39abfe5&0&a…

Linux中的基本指令(上)

目錄 ls指令 判斷linux中文件 pwd指令 認識路徑 ?編輯 絕對路徑/相對路徑 cd指令 簡要理解用戶 理解家目錄 echo指令和printf指令 touch指令 mkdir指令 cat指令 tree指令 rmdir指令和rm指令 man指令 cp指令 which指令 alias 指令 date指令 cal指令 理解…

多數元素——面試經典150題(力扣)

題目 給定一個大小為 n 的數組 nums &#xff0c;返回其中的多數元素。多數元素是指在數組中出現次數 大于 ? n/2 ? 的元素。 你可以假設數組是非空的&#xff0c;并且給定的數組總是存在多數元素。 示例 1&#xff1a; 輸入&#xff1a;nums [3,2,3] 輸出&#xff1a;3 …

Qt 數據庫操作(Sqlite)

數據庫簡介 關于數據庫的基礎知識這里就不做介紹了&#xff0c;相關博客可以查看&#xff1a; SQL基礎知識 數據庫學霸筆記 上面博客都寫的比較詳細&#xff0c;本文主要介紹如何使用Qt進行數據庫相關操作&#xff0c;數據庫分為關系型數據庫和非關系型數據&#xff0c;關系…

網絡安全 api 網絡安全 ast技術

隨著應用或者API被攻擊利用已經越來越多&#xff0c;雖然來自開源組件的漏洞加劇了這一現象的發生&#xff0c;但是&#xff0c;其實主要還是在于應用程序或者API本身沒有做好防范&#xff0c;根源在于源代碼本身的質量沒有嚴格把控。AST是指Application Security Testing&…

Mac 配置 Maven JDK

不使用 Homebrew&#xff0c;創建指定版本 JDK 1、官網下載指定版本并安裝……省略 2、vi &#xff5e;/.zshrc 同時要檢查 bash_profile 是否存在。 if [ -f ~/.bash_profile ] ; thensource ~/.bash_profile fiJAVA_HOME_11/Library/Java/JavaVirtualMachines/jdk-11.0.1…

【每日學點HarmonyOS Next知識】拖拽調整列表順序、tab回彈、自定義彈窗this、狀態變量修飾枚舉

1、HarmonyOS 功能實現&#xff08;拖拽調整列表順序&#xff09;&#xff1f; 可參考&#xff1a; import curves from ohos.curves; import Curves from ohos.curvesEntry Component struct ListItemExample {State private arr: number[] [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]…

Django部署Filemanagement

Pycharm搭建項目安裝虛擬環境 mysqlclient對mysql的安裝&#xff0c;配置有要求 pymsql偽裝成mysqlclient&#xff0c;pymysql可以操縱mysql pip install pymysql操作sql5.7 mysql8.0會出現與pycharm不兼容問題&#xff0c;會報錯&#xff0c;所以降到5.7 # 進入mysql 需要…

【病毒分析】熊貓燒香病毒分析及其查殺修復

目錄 前言 一、樣本概況 1.1 樣本信息 1.2 測試環境及工具 1.3 分析目標 二、具體行為分析 2.1 主要行為 2.1.1 惡意程序對用戶造成的危害 2.2 惡意代碼分析 2.2.1 加固后的惡意代碼樹結構圖(是否有加固) 2.2.2 惡意程序的代碼分析片段 三、解決方案(或總結) 3.1 …

Spring Boot集成Spring Statemachine

Spring Statemachine 是 Spring 框架下的一個模塊&#xff0c;用于簡化狀態機的創建和管理&#xff0c;它允許開發者使用 Spring 的特性&#xff08;如依賴注入、AOP 等&#xff09;來構建復雜的狀態機應用。以下是關于 Spring Statemachine 的詳細介紹&#xff1a; 主要特性 …

數組總和 (leetcode 40

leetcode系列 文章目錄 一、核心操作二、外層配合操作三、核心模式代碼總結 去重方式和之前三數之和一樣&#xff0c;也可以用used數組去重&#xff0c;但本次嘗試使用set去重 一、核心操作 如果count為0了&#xff0c;則證明正好減到了0&#xff0c;就可以收獲&#xff0c;…

sqli-lab靶場學習(八)——Less26-28

前言 25關已經出現了初步的一些關鍵字過濾&#xff0c;通過雙寫可以繞過。后面的關卡&#xff0c;我們會遇到更多關鍵字過濾&#xff0c;需要各種技巧繞過。 Less26 第26關寫了會過濾空格和注釋符。有很多的答案&#xff0c;會用%a0替代空格&#xff0c;但據說這是sqli-labs部…

python:VOC格式數據集轉換為YOLO數據集格式

作者&#xff1a;CSDN _養樂多_ 本文將介紹如何將目標檢測中常用的VOC格式數據集轉換為YOLO數據集&#xff0c;并進行數據集比例劃分&#xff0c;從而方便的進行YOLO目標檢測。 如果不想分兩步&#xff0c;可以直接看第三節代碼。 文章目錄 一、將VOC格式數據集轉換為YOLO格…

Docker容器安裝軟件(完整版)

文章目錄 一、安裝Docker1.1 docker 相關的命令1.2 配置鏡像加速 二. 安裝es2.1 創建網絡2.2 拉取鏡像2.3 創建掛載點目錄2.4 部署單點es&#xff0c;創建es容器2.5 編寫elasticsearch.yml2.6 重啟es容器2.7 測試Elasticsearch是否安裝成功 三. 基于Docker安裝Kibana3.1 拉取鏡…

LINUX 指令大全

Linux服務器上有許多常用的命令&#xff0c;可以幫助你管理文件、目錄、進程、網絡和系統配置等。以下是一些常用的Linux命令&#xff1a; 文件和目錄管理 ls&#xff1a;列出當前目錄中的文件和子目錄 bash lspwd&#xff1a;顯示當前工作目錄的路徑 bash pwdcd&#xff1a;切…