SpringBoot日常:封裝rabbitmq starter組件

文章目錄

    • 邏輯實現
      • RabbitExchangeEnum
      • RabbitConfig
      • RabbitModuleInfo
      • RabbitModuleInitializer
      • RabbitProperties
      • RabbitProducerManager
      • POM.xml
      • spring.factories
    • 功能測試
      • application.yml配置
      • 生產者:
      • 消費者:
      • 測試結果:
      • 總結

本章內容主要介紹編寫一個rabbitmq starter,能夠通過配置文件進行配置交換機、隊列以及綁定關系等等。項目引用該組件后能夠自動初始化交換機和隊列,并進行簡單通信。
如若有其他需求,可自行擴展,例如消息消費的確認等
參考文章:SpringBoot日常:自定義實現SpringBoot Starter

邏輯實現

下面直接進入主題,介紹整體用到的文件和邏輯內容

RabbitExchangeEnum

交換機枚舉類,四種交換機類型,分別是直連交換機、主題交換機、扇出交換機和標題交換機

/*** @Author 碼至終章* @Version 1.0*/
public enum RabbitExchangeEnum {DIRECT,TOPIC,FANOUT,HEADERS;
}

RabbitConfig

初始化配置文件

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author 碼至終章* @Version 1.0*/
@Configuration
public class RabbitConfig {/*** 通過yaml配置,創建隊列、交換機初始化器*/@Bean@ConditionalOnMissingBeanpublic RabbitModuleInitializer rabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitProperties rabbitProperties) {return new RabbitModuleInitializer(amqpAdmin, rabbitProperties);}
}

RabbitModuleInfo

配置信息的映射的文件,用于接收配置文件中配置的交換機和隊列屬性

import cn.seczone.starter.rabbitmq.enums.RabbitExchangeEnum;
import lombok.Data;import java.util.Map;/*** 隊列和交換機機綁定關系實體對象** @Author 碼至終章* @Version 1.0*/
@Data
public class RabbitModuleInfo {/*** 路由Key*/private String routingKey;/*** 隊列信息*/private Queue queue;/*** 交換機信息*/private Exchange exchange;/*** 交換機信息類*/@Datapublic static class Exchange {/*** 交換機類型* 默認直連交換機*/private RabbitExchangeEnum type = RabbitExchangeEnum.DIRECT;/*** 交換機名稱*/private String name;/*** 是否持久化* 默認true持久化,重啟消息不會丟失*/private boolean durable = true;/*** 當所有隊綁定列均不在使用時,是否自動刪除交換機* 默認false,不自動刪除*/private boolean autoDelete = false;/*** 交換機其他參數*/private Map<String, Object> arguments;}/*** 隊列信息類*/@Datapublic static class Queue {/*** 隊列名稱*/private String name;/*** 是否持久化* 默認true持久化,重啟消息不會丟失*/private boolean durable = true;/*** 是否具有排他性* 默認false,可多個消費者消費同一個隊列*/private boolean exclusive = false;/*** 當消費者均斷開連接,是否自動刪除隊列* 默認false,不自動刪除,避免消費者斷開隊列丟棄消息*/private boolean autoDelete = false;/*** 綁定死信隊列的交換機名稱*/private String deadLetterExchange;/*** 綁定死信隊列的路由key*/private String deadLetterRoutingKey;private Map<String, Object> arguments;}}

RabbitModuleInitializer

執行初始化邏輯詳情文件,具體的邏輯為根據配置文件信息創建對應的交換機和隊列,并設置其屬性和綁定關系。

import cn.hutool.core.convert.Convert;
import cn.seczone.starter.rabbitmq.enums.RabbitExchangeEnum;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;/*** @Author cys* @Date 2024/6/17 14:23* @Version 1.0*/
@Slf4j
public class RabbitModuleInitializer implements SmartInitializingSingleton {AmqpAdmin amqpAdmin;RabbitProperties rabbitProperties;public RabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitProperties rabbitProperties) {this.amqpAdmin = amqpAdmin;this.rabbitProperties = rabbitProperties;}@Overridepublic void afterSingletonsInstantiated() {log.info("初始化rabbitmq交換機、隊列----------------start");declareRabbitModule();log.info("初始化rabbitmq交換機、隊列----------------end");}/*** RabbitMQ 根據配置動態創建和綁定隊列、交換機*/private void declareRabbitModule() {List<RabbitModuleInfo> rabbitModuleInfos = rabbitProperties.getModules();if (CollectionUtils.isEmpty(rabbitModuleInfos)) {return;}for (RabbitModuleInfo rabbitModuleInfo : rabbitModuleInfos) {configParamValidate(rabbitModuleInfo);// 隊列Queue queue = convertQueue(rabbitModuleInfo.getQueue());// 交換機Exchange exchange = convertExchange(rabbitModuleInfo.getExchange());// 綁定關系String routingKey = rabbitModuleInfo.getRoutingKey();String queueName = rabbitModuleInfo.getQueue().getName();String exchangeName = rabbitModuleInfo.getExchange().getName();Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);// 創建隊列if (!isExistQueue(queueName)) {amqpAdmin.declareQueue(queue);}// 創建交換機amqpAdmin.declareExchange(exchange);// 隊列 綁定 交換機amqpAdmin.declareBinding(binding);}}/*** RabbitMQ動態配置參數校驗** @param rabbitModuleInfo 隊列和交換機機綁定關系*/public void configParamValidate(RabbitModuleInfo rabbitModuleInfo) {String routingKey = rabbitModuleInfo.getRoutingKey();Assert.isTrue(StringUtils.isNotBlank(routingKey), "RoutingKey 未配置");Assert.isTrue(rabbitModuleInfo.getExchange() != null, String.format("routingKey:%s未配置exchange", routingKey));Assert.isTrue(StringUtils.isNotBlank(rabbitModuleInfo.getExchange().getName()), String.format("routingKey:%s未配置exchange的name屬性", routingKey));Assert.isTrue(rabbitModuleInfo.getQueue() != null, String.format("routingKey:%s未配置queue", routingKey));Assert.isTrue(StringUtils.isNotBlank(rabbitModuleInfo.getQueue().getName()), String.format("routingKey:%s未配置exchange的name屬性", routingKey));}/*** 轉換生成RabbitMQ隊列** @param queue 隊列* @return Queue*/public Queue convertQueue(RabbitModuleInfo.Queue queue) {Map<String, Object> arguments = queue.getArguments();// 轉換ttl的類型為longif (arguments != null && arguments.containsKey("x-message-ttl")) {arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));}// 是否需要綁定死信隊列String deadLetterExchange = queue.getDeadLetterExchange();String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();if (StringUtils.isNotBlank(deadLetterExchange) && StringUtils.isNotBlank(deadLetterRoutingKey)) {if (arguments == null) {arguments = new HashMap<>(4);}arguments.put("x-dead-letter-exchange", deadLetterExchange);arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);}return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);}/*** 轉換生成RabbitMQ交換機** @param exchangeInfo 交換機信息* @return Exchange*/public Exchange convertExchange(RabbitModuleInfo.Exchange exchangeInfo) {AbstractExchange exchange = null;RabbitExchangeEnum exchangeType = exchangeInfo.getType();String exchangeName = exchangeInfo.getName();boolean isDurable = exchangeInfo.isDurable();boolean isAutoDelete = exchangeInfo.isAutoDelete();Map<String, Object> arguments = exchangeInfo.getArguments();switch (exchangeType) {case DIRECT:// 直連交換機exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments);break;case TOPIC:// 主題交換機exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments);break;case FANOUT://扇形交換機exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments);break;case HEADERS:// 頭交換機exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments);break;}return exchange;}/*** 判斷隊列是否存在** @param queueName 隊列名* @return boolean*/private boolean isExistQueue(String queueName) {if (StringUtils.isBlank(queueName)) {throw new RuntimeException("隊列名稱為空");}boolean flag = true;Properties queueProperties = amqpAdmin.getQueueProperties(queueName);if (queueProperties == null) {flag = false;}return flag;}}

RabbitProperties

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;import java.util.List;/*** @Author 碼至終章* @Version 1.0*/
@Component
@ConfigurationProperties(prefix = "cys.rabbit")
@Data
public class RabbitProperties {private List<RabbitModuleInfo> modules;
}

RabbitProducerManager

發送消息的生產者方法

public class RabbitProducerManager {private static final Logger log = LoggerFactory.getLogger(RabbitProducerManager.class);private final RabbitTemplate rabbitTemplate;public void sendMessage(String exchange, String rabbitRouting, Object message) {this.rabbitTemplate.convertAndSend(exchange, rabbitRouting, message);log.info("向路由:{}, 發送消息成功:{}", rabbitRouting, message);}public void sendMessage(String exchange, String rabbitRouting, Object message, CorrelationData correlationData) {this.rabbitTemplate.convertAndSend(exchange, rabbitRouting, message);log.info("向路由:{}, 發送消息成功:{}, correlationData:{}", new Object[]{rabbitRouting, message, correlationData});}public RabbitProducerManager(final RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}
}

POM.xml

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><version>2.7.18</version></dependency><!--RabbitMQ 依賴--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.7.18</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.25</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version></dependency></dependencies>

spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.mycomponent.starter.rabbitmq.config.RabbitConfig,\
cn.mycomponent.starter.rabbitmq.client.RabbitProducerManager,\
cn.mycomponent.starter.rabbitmq.config.RabbitProperties

功能測試

application.yml配置

spring:profiles:active: dev## rabbitmq鏈接配置  rabbitmq:host: 192.168.199.199port: 5672username: testpassword: 123456789virtual-host: testcys:rabbit:modules:- exchange:name: mytest#type為RabbitExchangeTypeEnum枚舉中的值。不配置默認為Directtype: DIRECTqueue:name: default.queuearguments:# 隊列中所有消息的最大存活時間。單位毫秒。 1分鐘x-message-ttl: 60000# routing-key可以為空routing-key: default.queue.key

生產者:

@TableName(value ="task",autoResultMap = true)
@Data
public class TaskEntity implements Serializable {/*** 主鍵*/@TableId(type = IdType.AUTO)@TableField(value = "cust_id")private Long custId;
}@RestController
@RequestMapping("/mqtest")
public class MqController {@AutowiredRabbitProducerManager rabbitProducerManager;@AutowiredMailService mailService;@GetMapping("/mqtest")public void test(){TaskEntity taskEntity = new TaskEntity();taskEntity.setCustId(211212L);rabbitProducerManager.sendMessage("mytest","default.queue.key", JSON.toJSONString(taskEntity));}
}

消費者:

@Component
public class MyListener {@RabbitListener(queues = "default.queue")public void handMessage(String message){TaskEntity taskEntity = JSON.parseObject(message, TaskEntity.class);System.out.println("接收到的消息"+taskEntity);}
}

測試結果:

請求接口/mqtest/mqtest
在這里插入圖片描述

總結

到這為止,關于封裝rabbitmq starter就結束了。當然,本文只是介紹了最基礎的部分,后續大家可以在這基礎上實現擴展,比如統一接受消息再通過事件監聽、同一隊列設置多個消費者線程等等,說到這里,如果只是豐富的小伙伴可能會想到spring-cloud-starter-stream-rabbit,大家也可以參考參考這個是如何實現的。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/42121.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/42121.shtml
英文地址,請注明出處:http://en.pswp.cn/web/42121.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

stm32 USB CDC類虛擬串口初體驗

1. 目標 本文介紹CubeMX生成 USB CDC類虛擬串口工程的操作步驟。 2. 配置流程 時鐘配置 usb外設需要48M時鐘輸入 stm32405使用外部時鐘源HSE,否則配不出來48M時鐘stm32h750內部有一個48M時鐘 stm32f405時鐘配置 stm32h750時鐘配置 Connectivity ->USB_OTG_FS 和 Connect…

GEE代碼實例教程詳解:植被狀況指數(VCI)與干旱監測

簡介 在本篇博客中&#xff0c;我們將使用Google Earth Engine (GEE) 進行植被狀況指數&#xff08;Vegetation Condition Index, VCI&#xff09;的計算和干旱監測。通過MODIS NDVI數據&#xff0c;我們可以評估2001年至2024年間的植被狀況和干旱等級。 背景知識 MODIS NDV…

C++初階:從C過渡到C++的入門基礎

??所屬專欄&#xff1a;C?? ??作者主頁&#xff1a;嶔某?? C發展歷史 C的起源可以追溯到1979年&#xff0c;當時BjarneStroustrup(本賈尼斯特勞斯特盧普&#xff0c;這個翻譯的名字不同的地?可能有差異)在?爾實驗室從事計算機科學和軟件?程的研究?作。?對項?中復…

第4章 Vite模塊化與插件系統(二)

4.3 常用插件介紹 4.3.1 官方插件 vitejs/plugin-vue 用于支持 Vue.js 開發&#xff1a; npm install vitejs/plugin-vue --save-devimport vue from vitejs/plugin-vueexport default defineConfig({plugins: [vue()] })vitejs/plugin-react 用于支持 React 開發&#xf…

JavaDS —— 順序表ArrayList

順序表 順序表是用一段物理地址連續的存儲單元依次存儲數據元素的線性結構&#xff0c;一般情況下采用數組存儲。在數組上完成數據的增刪查改。在物理和邏輯上都是連續的。 模擬實現 下面是我們要自己模擬實現的方法&#xff1a; 首先我們要創建一個順序表&#xff0c;順序表…

關于Mars3d的入門

關于Mars3d的入門 一. 創建地球&#xff0c;加載瓦片圖層二 矢量圖層2.1 常用矢量圖層2.1.1 GraphicLayer2.1.2 GeoJsonLayer 2.2 矢量圖層的點擊事件 三 矢量數據四 事件機制 一. 創建地球&#xff0c;加載瓦片圖層 // 1. 創建地球let map new mars3d.Map("mars3dContai…

基于openStreetMap的路徑規劃ROS功能包

文章目錄 概要OSM是什么主要特點主要組成部分使用場景如何獲取OSM常規參數配置笛卡爾坐標系原點經緯度設置編譯和運行如何規劃演示效果概要 由于https://github.com/MichalDobis/osm_planner存在一些使用問題,不是那么方便,我對其進行了一些修改,便于進行起點到終點進行路徑…

數據如何查詢

分組查詢 分組查詢&#xff08;Group By&#xff09;是在關系型數據庫中用來對數據進行分組并對每個組應用聚合函數的一種操作。這種查詢通常結合聚合函數&#xff08;如 COUNT、SUM、AVG、MAX、MIN 等&#xff09;使用&#xff0c;用于在查詢結果中生成匯總信息 特點(聚合)&am…

從零開始做題:My_lllp

題目 給出一張png圖片 解題 ┌──(holyeyes?kali2023)-[~/Misc/題目/zulu/My_lllp] └─$ python2 lsb.py extract my_lllp.png out.txt my_lllp [] Image size: 1080x1079 pixels. [] Written extracted data to out.txt. ┌──(holyeyes?kali2023)-[~/Misc/題目/zul…

python的線程池和進程池

Python 3.2 就已經引入了 concurrent.futures 模塊&#xff0c;提供了線程池&#xff08;ThreadPoolExecutor&#xff09;和進程池&#xff08;ProcessPoolExecutor&#xff09;&#xff0c;用于簡化并發編程的管理和調度。 ThreadPoolExecutor 在ThreadPoolExecutor 是 conc…

簡易Qt串口助手

界面顯示如下 關于串口類 初始化 設置串口號 設置波特率 打開串口 發送按鈕功能實現 接收數據顯示在控件中 關閉串口

使用 MFA 保護對企業應用程序的訪問

多因素身份驗證&#xff08;MFA&#xff09;是在授予用戶訪問特定資源的權限之前&#xff0c;使用多重身份驗證來驗證用戶身份的過程&#xff0c;僅使用單一因素&#xff08;傳統上是用戶名和密碼&#xff09;來保護資源&#xff0c;使它們容易受到破壞&#xff0c;添加其他身份…

springboot非物質文化遺產管理系統-計算機畢業設計源碼16087

目錄 摘要 1 緒論 1.1 選題背景與意義 1.2國內外研究現狀 1.3論文結構與章節安排 2系統分析 2.1 可行性分析 2.2 系統流程分析 2.2.1系統開發流程 2.2.2 用戶登錄流程 2.2.3 系統操作流程 2.2.4 添加信息流程 2.2.5 修改信息流程 2.2.6 刪除信息流程 2.3 系統功能…

前端開發過程中經常遇到的問題以及對應解決方法 (持續更新)

我的朋友已經工作了 3 年&#xff0c;他過去一直擔任前端工程師。 不幸的是&#xff0c;他被老板批評了&#xff0c;因為他在工作中犯了一個錯誤&#xff0c;這是一個非常簡單但容易忽視的問題&#xff0c;我想也是很多朋友容易忽視的一個問題。 今天我把它分享出來&#xff…

Linux三劍客(grep、awk和sed)操作及與管道結合使用

1. 總覽 grep、sed和awk被稱為Linux三劍客&#xff0c;是因為它們在文本處理和數據操作方面極其強大且常用。 Linux三劍客在文件處理中的作用&#xff1a; grep&#xff08;數據查找定位&#xff09;&#xff1a;文本搜索工具&#xff0c;在文件中搜索符合正則表達式的文本內容…

Redis原理-數據結構

Redis原理篇 1、原理篇-Redis數據結構 1.1 Redis數據結構-動態字符串 我們都知道Redis中保存的Key是字符串&#xff0c;value往往是字符串或者字符串的集合。可見字符串是Redis中最常用的一種數據結構。 不過Redis沒有直接使用C語言中的字符串&#xff0c;因為C語言字符串存…

【大模型LLM面試合集】大語言模型架構_attention

1.attention 1.Attention 1.1 講講對Attention的理解&#xff1f; Attention機制是一種在處理時序相關問題的時候常用的技術&#xff0c;主要用于處理序列數據。 核心思想是在處理序列數據時&#xff0c;網絡應該更關注輸入中的重要部分&#xff0c;而忽略不重要的部分&…

BJT的結構(晶體管電壓/電流+β+晶體管特性曲線/截止與飽和+直流負載線(Q點))+單片機數碼管基礎

2024-7-8&#xff0c;星期一&#xff0c;20:23&#xff0c;天氣&#xff1a;晴&#xff0c;心情&#xff1a;晴。今天沒有什么特殊的事情發生&#xff0c;周末休息了兩天&#xff0c;周一回來繼續學習啦&#xff0c;加油加油&#xff01;&#xff01;&#xff01; 今日完成模電…

視頻號矩陣管理系統:短視頻內容營銷的智能助手

隨著短視頻行業的蓬勃發展&#xff0c;視頻號矩陣管理系統應運而生&#xff0c;為內容創作者和品牌提供了一站式的短視頻管理和營銷解決方案。本文將深入探討視頻號矩陣管理系統的核心功能&#xff0c;以及它如何助力用戶在短視頻營銷領域取得成功。 視頻號矩陣管理系統概述 …

在PyTorch中使用TensorBoard

文章目錄 在PyTorch中使用TensorBoard1.安裝2.TensorBoard使用2.1創建SummaryWriter實例2.2利用add_scalar()記錄metrics2.3關閉Writer2.4啟動TensorBoard 3.本地連接服務器使用TensorBoard3.1方法一&#xff1a;使用SSH命令進行本地端口轉發3.2方法二&#xff1a;啟動TensorBo…