文章目錄
- 邏輯實現
- RabbitExchangeEnum
- RabbitConfig
- RabbitModuleInfo
- RabbitModuleInitializer
- RabbitProperties
- RabbitProducerManager
- POM.xml
- spring.factories
- 功能測試
- application.yml配置
- 生產者:
- 消費者:
- 測試結果:
- 總結
本章內容主要介紹編寫一個rabbitmq starter,能夠通過配置文件進行配置交換機、隊列以及綁定關系等等。項目引用該組件后能夠自動初始化交換機和隊列,并進行簡單通信。
如若有其他需求,可自行擴展,例如消息消費的確認等
參考文章:SpringBoot日常:自定義實現SpringBoot Starter
邏輯實現
下面直接進入主題,介紹整體用到的文件和邏輯內容
RabbitExchangeEnum
交換機枚舉類,四種交換機類型,分別是直連交換機、主題交換機、扇出交換機和標題交換機
/*** @Author 碼至終章* @Version 1.0*/
public enum RabbitExchangeEnum {DIRECT,TOPIC,FANOUT,HEADERS;
}
RabbitConfig
初始化配置文件
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @Author 碼至終章* @Version 1.0*/
@Configuration
public class RabbitConfig {/*** 通過yaml配置,創建隊列、交換機初始化器*/@Bean@ConditionalOnMissingBeanpublic RabbitModuleInitializer rabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitProperties rabbitProperties) {return new RabbitModuleInitializer(amqpAdmin, rabbitProperties);}
}
RabbitModuleInfo
配置信息的映射的文件,用于接收配置文件中配置的交換機和隊列屬性
import cn.seczone.starter.rabbitmq.enums.RabbitExchangeEnum;
import lombok.Data;import java.util.Map;/*** 隊列和交換機機綁定關系實體對象** @Author 碼至終章* @Version 1.0*/
@Data
public class RabbitModuleInfo {/*** 路由Key*/private String routingKey;/*** 隊列信息*/private Queue queue;/*** 交換機信息*/private Exchange exchange;/*** 交換機信息類*/@Datapublic static class Exchange {/*** 交換機類型* 默認直連交換機*/private RabbitExchangeEnum type = RabbitExchangeEnum.DIRECT;/*** 交換機名稱*/private String name;/*** 是否持久化* 默認true持久化,重啟消息不會丟失*/private boolean durable = true;/*** 當所有隊綁定列均不在使用時,是否自動刪除交換機* 默認false,不自動刪除*/private boolean autoDelete = false;/*** 交換機其他參數*/private Map<String, Object> arguments;}/*** 隊列信息類*/@Datapublic static class Queue {/*** 隊列名稱*/private String name;/*** 是否持久化* 默認true持久化,重啟消息不會丟失*/private boolean durable = true;/*** 是否具有排他性* 默認false,可多個消費者消費同一個隊列*/private boolean exclusive = false;/*** 當消費者均斷開連接,是否自動刪除隊列* 默認false,不自動刪除,避免消費者斷開隊列丟棄消息*/private boolean autoDelete = false;/*** 綁定死信隊列的交換機名稱*/private String deadLetterExchange;/*** 綁定死信隊列的路由key*/private String deadLetterRoutingKey;private Map<String, Object> arguments;}}
RabbitModuleInitializer
執行初始化邏輯詳情文件,具體的邏輯為根據配置文件信息創建對應的交換機和隊列,并設置其屬性和綁定關系。
import cn.hutool.core.convert.Convert;
import cn.seczone.starter.rabbitmq.enums.RabbitExchangeEnum;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;/*** @Author cys* @Date 2024/6/17 14:23* @Version 1.0*/
@Slf4j
public class RabbitModuleInitializer implements SmartInitializingSingleton {AmqpAdmin amqpAdmin;RabbitProperties rabbitProperties;public RabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitProperties rabbitProperties) {this.amqpAdmin = amqpAdmin;this.rabbitProperties = rabbitProperties;}@Overridepublic void afterSingletonsInstantiated() {log.info("初始化rabbitmq交換機、隊列----------------start");declareRabbitModule();log.info("初始化rabbitmq交換機、隊列----------------end");}/*** RabbitMQ 根據配置動態創建和綁定隊列、交換機*/private void declareRabbitModule() {List<RabbitModuleInfo> rabbitModuleInfos = rabbitProperties.getModules();if (CollectionUtils.isEmpty(rabbitModuleInfos)) {return;}for (RabbitModuleInfo rabbitModuleInfo : rabbitModuleInfos) {configParamValidate(rabbitModuleInfo);// 隊列Queue queue = convertQueue(rabbitModuleInfo.getQueue());// 交換機Exchange exchange = convertExchange(rabbitModuleInfo.getExchange());// 綁定關系String routingKey = rabbitModuleInfo.getRoutingKey();String queueName = rabbitModuleInfo.getQueue().getName();String exchangeName = rabbitModuleInfo.getExchange().getName();Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);// 創建隊列if (!isExistQueue(queueName)) {amqpAdmin.declareQueue(queue);}// 創建交換機amqpAdmin.declareExchange(exchange);// 隊列 綁定 交換機amqpAdmin.declareBinding(binding);}}/*** RabbitMQ動態配置參數校驗** @param rabbitModuleInfo 隊列和交換機機綁定關系*/public void configParamValidate(RabbitModuleInfo rabbitModuleInfo) {String routingKey = rabbitModuleInfo.getRoutingKey();Assert.isTrue(StringUtils.isNotBlank(routingKey), "RoutingKey 未配置");Assert.isTrue(rabbitModuleInfo.getExchange() != null, String.format("routingKey:%s未配置exchange", routingKey));Assert.isTrue(StringUtils.isNotBlank(rabbitModuleInfo.getExchange().getName()), String.format("routingKey:%s未配置exchange的name屬性", routingKey));Assert.isTrue(rabbitModuleInfo.getQueue() != null, String.format("routingKey:%s未配置queue", routingKey));Assert.isTrue(StringUtils.isNotBlank(rabbitModuleInfo.getQueue().getName()), String.format("routingKey:%s未配置exchange的name屬性", routingKey));}/*** 轉換生成RabbitMQ隊列** @param queue 隊列* @return Queue*/public Queue convertQueue(RabbitModuleInfo.Queue queue) {Map<String, Object> arguments = queue.getArguments();// 轉換ttl的類型為longif (arguments != null && arguments.containsKey("x-message-ttl")) {arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));}// 是否需要綁定死信隊列String deadLetterExchange = queue.getDeadLetterExchange();String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();if (StringUtils.isNotBlank(deadLetterExchange) && StringUtils.isNotBlank(deadLetterRoutingKey)) {if (arguments == null) {arguments = new HashMap<>(4);}arguments.put("x-dead-letter-exchange", deadLetterExchange);arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);}return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);}/*** 轉換生成RabbitMQ交換機** @param exchangeInfo 交換機信息* @return Exchange*/public Exchange convertExchange(RabbitModuleInfo.Exchange exchangeInfo) {AbstractExchange exchange = null;RabbitExchangeEnum exchangeType = exchangeInfo.getType();String exchangeName = exchangeInfo.getName();boolean isDurable = exchangeInfo.isDurable();boolean isAutoDelete = exchangeInfo.isAutoDelete();Map<String, Object> arguments = exchangeInfo.getArguments();switch (exchangeType) {case DIRECT:// 直連交換機exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments);break;case TOPIC:// 主題交換機exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments);break;case FANOUT://扇形交換機exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments);break;case HEADERS:// 頭交換機exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments);break;}return exchange;}/*** 判斷隊列是否存在** @param queueName 隊列名* @return boolean*/private boolean isExistQueue(String queueName) {if (StringUtils.isBlank(queueName)) {throw new RuntimeException("隊列名稱為空");}boolean flag = true;Properties queueProperties = amqpAdmin.getQueueProperties(queueName);if (queueProperties == null) {flag = false;}return flag;}}
RabbitProperties
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;import java.util.List;/*** @Author 碼至終章* @Version 1.0*/
@Component
@ConfigurationProperties(prefix = "cys.rabbit")
@Data
public class RabbitProperties {private List<RabbitModuleInfo> modules;
}
RabbitProducerManager
發送消息的生產者方法
public class RabbitProducerManager {private static final Logger log = LoggerFactory.getLogger(RabbitProducerManager.class);private final RabbitTemplate rabbitTemplate;public void sendMessage(String exchange, String rabbitRouting, Object message) {this.rabbitTemplate.convertAndSend(exchange, rabbitRouting, message);log.info("向路由:{}, 發送消息成功:{}", rabbitRouting, message);}public void sendMessage(String exchange, String rabbitRouting, Object message, CorrelationData correlationData) {this.rabbitTemplate.convertAndSend(exchange, rabbitRouting, message);log.info("向路由:{}, 發送消息成功:{}, correlationData:{}", new Object[]{rabbitRouting, message, correlationData});}public RabbitProducerManager(final RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}
}
POM.xml
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><version>2.7.18</version></dependency><!--RabbitMQ 依賴--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.7.18</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.25</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version></dependency></dependencies>
spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.mycomponent.starter.rabbitmq.config.RabbitConfig,\
cn.mycomponent.starter.rabbitmq.client.RabbitProducerManager,\
cn.mycomponent.starter.rabbitmq.config.RabbitProperties
功能測試
application.yml配置
spring:profiles:active: dev## rabbitmq鏈接配置 rabbitmq:host: 192.168.199.199port: 5672username: testpassword: 123456789virtual-host: testcys:rabbit:modules:- exchange:name: mytest#type為RabbitExchangeTypeEnum枚舉中的值。不配置默認為Directtype: DIRECTqueue:name: default.queuearguments:# 隊列中所有消息的最大存活時間。單位毫秒。 1分鐘x-message-ttl: 60000# routing-key可以為空routing-key: default.queue.key
生產者:
@TableName(value ="task",autoResultMap = true)
@Data
public class TaskEntity implements Serializable {/*** 主鍵*/@TableId(type = IdType.AUTO)@TableField(value = "cust_id")private Long custId;
}@RestController
@RequestMapping("/mqtest")
public class MqController {@AutowiredRabbitProducerManager rabbitProducerManager;@AutowiredMailService mailService;@GetMapping("/mqtest")public void test(){TaskEntity taskEntity = new TaskEntity();taskEntity.setCustId(211212L);rabbitProducerManager.sendMessage("mytest","default.queue.key", JSON.toJSONString(taskEntity));}
}
消費者:
@Component
public class MyListener {@RabbitListener(queues = "default.queue")public void handMessage(String message){TaskEntity taskEntity = JSON.parseObject(message, TaskEntity.class);System.out.println("接收到的消息"+taskEntity);}
}
測試結果:
請求接口/mqtest/mqtest
總結
到這為止,關于封裝rabbitmq starter就結束了。當然,本文只是介紹了最基礎的部分,后續大家可以在這基礎上實現擴展,比如統一接受消息再通過事件監聽、同一隊列設置多個消費者線程等等,說到這里,如果只是豐富的小伙伴可能會想到spring-cloud-starter-stream-rabbit,大家也可以參考參考這個是如何實現的。