Springboot-RabbitMQ 消息隊列使用

一、概念介紹:

RabbitMQ中幾個重要的概念介紹:

  • Channels:信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的 TCP 連接內地虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。

  • Exchanges:交換器,用來接收生產者發送的消息并將這些消息路由給服務器中的隊列。

    • 交換機類型主要有以下幾種:
    • Direct Exchange(直連交換機):這種類型的交換機根據消息的Routing Key(路由鍵)進行精確匹配,只有綁定了相同路由鍵的隊列才會收到消息。適用于點對點的消息傳遞場景。
    • Fanout Exchange(扇形交換機):這種類型的交換機采用廣播模式,它會將消息發送給所有綁定到該交換機的隊列,不管消息的路由鍵是什么。適用于消息需要被多個消費者處理的場景。
    • Topic Exchange(主題交換機):這種類型的交換機支持基于模式匹配的路由鍵,可以使用通配符*(匹配一個單詞)和#(匹配零個或多個單詞)進行匹配。適用于實現更復雜的消息路由邏輯。
    • Headers Exchange(頭交換機):這種類型的交換機不處理路由鍵,而是根據發送的消息內容中的headers屬性進行匹配。適用于需要在消息頭中攜帶額外信息的場景。
  • Queues:消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。

二、引入依賴:

 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

三、添加配置信息

spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestlistener:simple:acknowledge-mode: manual  # 手動提交

四、Direct Exchange(直連交換機)模式

1、新建配置文件 RabbitDirectConfig類

package com.example.direct;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author wasin* @version 1.0* @date 2024/6/4* @description: 直連交換機--這種類型的交換機根據消息的Routing Key(路由鍵)進行精確匹配,* 只有綁定了相同路由鍵的隊列才會收到消息。適用于點對點的消息傳遞場景*/
@Configuration
public class RabbitDirectConfig {/*** 隊列名稱*/public static final String QUEUE_MESSAGE ="QUEUE_MESSAGE";public static final String QUEUE_USER ="QUEUE_USER";/*** 交換機*/public static final String EXCHANGE="EXCHANGE_01";/*** 路由*/public static final String ROUTING_KEY="ROUTING_KEY_01";@Beanpublic Queue queue01() {return new Queue(QUEUE_MESSAGE, //隊列名稱true, //是否持久化false, //是否排他false //是否自動刪除);}@Beanpublic Queue queue02() {return new Queue(QUEUE_USER, //隊列名稱true, //是否持久化false, //是否排他false //是否自動刪除);}@Beanpublic DirectExchange exchange01() {return new DirectExchange(EXCHANGE,true, //是否持久化false //是否排他);}@Beanpublic Binding demoBinding() {return BindingBuilder.bind(queue01()).to(exchange01()).with(ROUTING_KEY);}@Beanpublic Binding demoBinding2() {return BindingBuilder.bind(queue02()).to(exchange01()).with(ROUTING_KEY);}
}

2、添加消息生產者 Producer類

package com.example.direct;import com.example.entity.User;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** @author wasin* @version 1.0* @date 2024/6/4* @description:*/
@Component
public class Producer {@ResourceRabbitTemplate rabbitTemplate;public void sendMessageByExchangeANdRoute(String message){rabbitTemplate.convertAndSend(RabbitDirectConfig.EXCHANGE, RabbitDirectConfig.ROUTING_KEY,message);}/*** 默認交換器,隱式地綁定到每個隊列,路由鍵等于隊列名稱。* @param message*/public void sendMessageByQueue(String message){rabbitTemplate.convertAndSend(RabbitDirectConfig.QUEUE_MESSAGE,message);}public void sendMessage(User user){rabbitTemplate.convertAndSend(RabbitDirectConfig.QUEUE_USER,user);}
}

3、添加消息消費者

package com.example.direct;import com.example.entity.User;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author wasin* @version 1.0* @date 2024/6/4* @description:*/
@Component
public class Consumer {@RabbitListener(queues = RabbitDirectConfig.QUEUE_USER)public void onMessage(User user){System.out.println("收到的實體bean消息:"+user);}@RabbitListener(queues = RabbitDirectConfig.QUEUE_MESSAGE)public void onMessage2(String message){System.out.println("收到的字符串消息:"+message);}
}

4、 測試

package com.example;import com.example.entity.User;
import com.example.direct.Producer;
import com.example.fanout.FanoutProducer;
import com.example.topic.TopicProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
class SpringbootRabbitMqApplicationTests {@ResourceProducer producer;@Testpublic void sendMessage() throws InterruptedException {producer.sendMessageByQueue("哈哈");producer.sendMessage(new User().setAge(10).setName("wasin"));}
}

五、Topic Exchange(主題交換機)模式

1、新建RabbitTopicConfig類

package com.example.topic;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author wasin* @version 1.0* @date 2024/6/4* @description: 主題交換機--這種類型的交換機支持基于模式匹配的路由鍵,* 可以使用通配符*(匹配一個單詞)和#(匹配零個或多個單詞)進行匹配。適用于實現更復雜的消息路由邏輯。*/
@Configuration
public class RabbitTopicConfig {/*** 交換機*/public static final String EXCHANGE = "EXCHANGE_TOPIC1";/*** 隊列名稱*/public static final String QUEUE_TOPIC1 = "QUEUE_TOPIC";/*** 路由* "*" 與 "#",用于做模糊匹配。其中 "*" 用于匹配一個單詞,"#" 用于匹配多個單詞(可以是零個)* 可以匹配 aa.wasin.aa.bb  wasin.aa.bb  wasin.aa ....* aa.bb.wasin.cc 無法匹配*/public static final String ROUTING_KEY1 = "*.wasin.#";@Beanpublic Queue queue() {return new Queue(QUEUE_TOPIC1, //隊列名稱true, //是否持久化false, //是否排他false //是否自動刪除);}@Beanpublic TopicExchange exchange() {return new TopicExchange(EXCHANGE,true, //是否持久化false //是否排他);}@Beanpublic Binding binding() {return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY1);}
}

2、新建 消息生產者和發送者

  • TopicProducer類
package com.example.topic;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** @author wasin* @version 1.0* @date 2024/6/4* @description:*/
@Component
public class TopicProducer {@ResourceRabbitTemplate rabbitTemplate;/*** @param routeKey 路由* @param message 消息*/public void sendMessageByQueue(String routeKey, String message){rabbitTemplate.convertAndSend(RabbitTopicConfig.EXCHANGE,routeKey,message);}}
  • TopicConsumer類
package com.example.topic;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @author wasin* @version 1.0* @date 2024/6/4* @description:*/
@Slf4j
@Component
public class TopicConsumer {@RabbitListener(queues = RabbitTopicConfig.QUEUE_TOPIC1)public void onMessage2(String message){log.info("topic收到的字符串消息:{}",message);}
}

六、Fanout Exchange(扇形交換機)模式

1、 新建 RabbitFanoutConfig類

package com.example.fanout;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author wasin* @version 1.0* @date 2024/6/4* @description: 扇形交換機--這種類型的交換機采用廣播模式,它會將消息發送給所有綁定到該交換機的隊列,* 不管消息的路由鍵是什么。適用于消息需要被多個消費者處理的場景。*/
@Configuration
public class RabbitFanoutConfig {/*** 交換機*/public static final String EXCHANGE = "EXCHANGE_FANOUT";/*** 隊列名稱*/public static final String QUEUE_FANOUT1 = "QUEUE_FANOUT";/*** 隊列名稱*/public static final String QUEUE_FANOUT2 = "QUEUE_FANOUT2";@Beanpublic Queue queueFanout1() {return new Queue(QUEUE_FANOUT1, //隊列名稱true, //是否持久化false, //是否排他false //是否自動刪除);}@Beanpublic Queue queueFanout2() {return new Queue(QUEUE_FANOUT2, //隊列名稱true, //是否持久化false, //是否排他false //是否自動刪除);}@Beanpublic FanoutExchange exchangeFanout() {return new FanoutExchange(EXCHANGE,true, //是否持久化false //是否排他);}@Beanpublic Binding bindingFanout() {return BindingBuilder.bind(queueFanout1()).to(exchangeFanout());}@Beanpublic Binding bindingFanout2() {return BindingBuilder.bind(queueFanout2()).to(exchangeFanout());}}

2、新建 消息生產者和發送者

  • FanoutProducer類:
package com.example.fanout;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** @author wasin* @version 1.0* @date 2024/6/4* @description:*/
@Component
public class FanoutProducer {@ResourceRabbitTemplate rabbitTemplate;/*** @param message 消息*/public void sendMessageByQueue(String message) {rabbitTemplate.convertAndSend(RabbitFanoutConfig.EXCHANGE, "", message);}}
  • FanoutConsumer類
package com.example.fanout;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;import java.io.IOException;/*** @author wasin* @version 1.0* @date 2024/6/4* @description:*/
@Slf4j
@Component
public class FanoutConsumer {/*** 手動提交* @param message* @param channel* @param tag* @throws IOException*/@RabbitListener(queues = RabbitFanoutConfig.QUEUE_FANOUT1)public void onMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {log.info("fanout1收到的字符串消息:{}",message);channel.basicAck(tag,false);}@RabbitListener(queues = RabbitFanoutConfig.QUEUE_FANOUT2)public void onMessage2(String message){log.info("fanout2到的字符串消息:{}",message);}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/22249.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/22249.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/22249.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

2021 hnust 湖科大 數字系統設計與VHDL課程 大作業 - 出租車計價器設計

2021 hnust 湖科大 數字系統設計與VHDL課程大作業-出租車計價器設計 描述 大二上的eda考查課的實驗&#xff0c;額外實現了停車等待2分鐘后收費1元/min。內含項目文件&#xff08;實測可運行&#xff09;&#xff0c;代碼&#xff0c;報告&#xff0c;視頻和照片&#xff0c;…

JavaScript函數定義,函數參數,函數調用

JavaScript函數定義&#xff1a; 在JavaScript中&#xff0c;我們可以使用關鍵字function來定義一個函數。函數定義的一般語法如下&#xff1a; function functionName(parameter1, parameter2, ...){// 函數體 }其中&#xff0c;functionName是函數的名稱&#xff0c;可以自定…

功能強大且專業的PDF轉換軟件PDF Shaper Professional 14.2

PDF Shaper Professional是一款適用于Windows的程序&#xff0c;可讓您在計算機上處理PDF文件。 要開始使用PDF Shaper Professional&#xff0c;您需要在Windows計算機上下載并安裝該程序。您還應該有合適的驅動程序和編解碼器來處理計算機上的文本和圖形。 安裝程序后&#…

分享一份糟糕透頂的簡歷,看看跟你寫的一樣不

最近看了一個人的簡歷&#xff0c;怎么說呢&#xff0c;前幾年這么寫沒問題&#xff0c;投出去就有回復&#xff0c;但從現在開始&#xff0c;這么寫肯定不行了。下面我給大家分享一下內容&#xff1a; 目錄 &#x1f926;?♀?這是簡歷文檔截圖 &#x1f937;?♀?這是基本…

淘寶評論API調用指南,讓你購物不再困擾

一、淘寶評論API概述 淘寶評論API是淘寶開放平臺提供的一種服務&#xff0c;它允許開發者通過調用API接口獲取淘寶商品評論數據&#xff0c;聯訊數據從而為用戶提供更加豐富和實用的購物決策信息。通過使用淘寶評論API&#xff0c;開發者可以輕松地實現以下功能&#xff1a; …

SwiftUI 利用 Swizz 黑魔法為系統創建的默認對象插入新協議方法(二)

功能需求 在 SwiftUI 的開發中,我們往往需要借助底層 UIKit 的“上帝之手”來進一步實現額外的定制功能。比如,在可拖放(Dragable)SwiftUI 的實現中,會缺失拖放取消的回調方法讓我們這些禿頭碼農們“欲哭無淚” 如上圖所示,我們在拖放取消時將界面中的一切改變都恢復如初…

slf4j等多個jar包沖突綁定的排查方法使用IDEA的maven help解決

1.安裝 2.使用maven help解決&#xff0c;找到對應包存在的沖突 使用exclude直接解決即可

【人工智能】第四部分:ChatGPT的技術實現

人不走空 &#x1f308;個人主頁&#xff1a;人不走空 &#x1f496;系列專欄&#xff1a;算法專題 ?詩詞歌賦&#xff1a;斯是陋室&#xff0c;惟吾德馨 目錄 &#x1f308;個人主頁&#xff1a;人不走空 &#x1f496;系列專欄&#xff1a;算法專題 ?詩詞歌…

dnf手游版游玩感悟

dnf手游于5月21號正式上線&#xff0c;作為一個dnf端游老玩家&#xff0c;并且偶爾上線ppk&#xff0c;自然下載了手游版&#xff0c;且玩了幾天。 不得不說dnf手游的優化做到了極好的程度。 就玩法系統這塊&#xff0c;因為dnf屬于城鎮地下城模式&#xff0c;相比…

前端工程化工具系列(三)—— Stylelint(v16.6.1):CSS/SCSS 代碼質量工具

Stylelint 是 CSS/SCSS 的靜態分析工具&#xff0c;用于檢查其中的違規和錯誤。 1. 環境要求 v16 以上的 Stylelint&#xff0c;支持 Node.js 的版本為 v18.12.0。 在命令行工具中輸入以下內容后回車&#xff0c;來查看當前系統中 Node.js 的版本。 node -vNode.js 推薦使用…

Shell腳本快速入門

為什么要學shell&#xff1f;能做什么&#xff1f; 答&#xff1a;CI/CD 持續集成&#xff0c;自動化部署作業方式&#xff0c;需要將一系列linux命令程序化&#xff0c;shell 就能做到。

13. 《C語言》——【strlen函數的使用和模擬實現】

文章目錄 前言strlen函數strlen函數的使用strlen函數的3種方法實現方法1方法2方法3 總結 前言 各位老板好~ &#xff0c; 今天我們講解strlen函數如何去使用以及如何去模擬實現strlen函數。希望各位老板能夠給一個點贊和一個大大的關注&#xff0c;感謝各位老板&#xff01;str…

塑料焊接機熔深對激光焊接質量有什么影響

塑料焊接機的熔深對焊接質量具有直接且顯著的影響。以下是熔深對焊接質量影響的詳細解釋&#xff1a; 1. 焊接強度&#xff1a;熔深直接決定了焊縫的截面積&#xff0c;從而影響焊接接頭的強度。較深的熔深意味著焊縫的截面積更大&#xff0c;可以提供更強的結合力&#xff0c;…

OpenStreetMap部署(OSM)

參考&#xff1a;https://github.com/openstreetmap/openstreetmap-website/blob/master/DOCKER.md OpenStreeMap 部署 操作系統建議使用 Ubuntu 22 版本 安裝 Docker # 更新軟件包索引&#xff1a; sudo apt-get update # 允許APT使用HTTPS&#xff1a; sudo apt-get inst…

【計算機組成原理】詳談計算機發展歷程

計算機發展歷程 導讀一、計算機的誕生1.1 歷史背景1.2 計算機的發明 二、計算機硬件的發展1.1 計算機的四代變化1.1.1 第一代計算機bug的由來 1.1.2 第二代計算機1.1.3 第三代計算機半導體存儲器的發展 1.1.4 第四代計算機 1.2 個人計算機的發展1.2.1 微處理器的發展1.2.2 個人…

AIGC之Stable Diffusion Web Ui 初體驗

前言 Stable Diffusion辣么火&#xff0c;同學你確定不嘗試一下嘛&#xff1f; 純代碼學習版本搞啦&#xff0c;Web Ui 也得試試咧 網上有很多安裝Stable Diffusion Web Ui 的介紹了&#xff0c;我在這說一下我的踩坑記錄 想安裝的同學&#xff0c;看這個鏈接 萬字長文&#x…

U-Net: Convolutional Networks for Biomedical Image Segmentation--論文筆記

U-Net: Convolutional Networks for Biomedical Image Segmentation 資料 1.代碼地址 2.論文地址 https://arxiv.org/pdf/1505.04597 3.數據集地址 論文摘要的翻譯 人們普遍認為&#xff0c;深度網絡的成功訓練需要數千個帶注釋的訓練樣本。在本文中&#xff0c;我們提出…

44-5 waf繞過 - SQL注入繞WAF方法

環境準備: 43-5 waf繞過 - 安全狗簡介及安裝-CSDN博客然后安裝sqlilabs靶場:構建完善的安全滲透測試環境:推薦工具、資源和下載鏈接_滲透測試靶機下載-CSDN博客 一、雙寫繞過 打開sql靶場的第一關:http://127.0.0.1/sqli-labs-master/Less-1/?id=1 驗證一下waf是否開啟防…

C\C++內存管理(未完結)

文章目錄 一.C\C內存分布二.C語言中動態內存管理方式&#xff1a;malloc/calloc/realloc/free三.C內存管理方式3.1.new/delete操作內置類型3.2.new和delete操作自定義類型 四.operator new與operator delete函數&#xff08;重要點進行講解&#xff09;4.1. operator new與oper…

npm install 出錯,‘proxy‘ config is set properly. See: ‘npm help config‘

背景 從遠程clone下項目之后&#xff0c;使用命令 npm install 安裝依賴&#xff0c;報錯如下 意為&#xff1a; 報錯&#xff1a; npm犯錯!network與網絡連通性有關的問題。 npm犯錯!網絡在大多數情況下&#xff0c;你背后的代理或有壞的網絡設置。 npm犯錯!網絡 npm犯錯…