? ? ? ? 在使用Spring AMQP基于注解聲明監聽者時,可通過抽取常量來避免硬編碼:
@RabbitListener(bindings = @QueueBinding(exchange = @Exchange(MQConstant.USER_EXCHANGE),value = @Queue(MQConstant.USER_QUEUE),key = MQConstant.USER_REDIS_BINDING))public void deleteUserInfoRedisByExchange(String message) {log.info("監聽到消息message:{}", message);}
rabbitTemplate.convertAndSend(MQConstant.USER_EXCHANGE, MQConstant.USER_REDIS_BINDING, message);
? ? ? ? 這種方式方便快捷,在以后修改時可通過修改常量類即可。下面介紹一種基于編程式聲明監聽者并通過配置文件(yml)進行修改的方式避免硬編碼。
配置yml文件(或properties文件),自定義交換機、隊列:
hl:amqp:# RabbitMQ交換機名稱exchanges:userExchange:name: css.user.exchangetype: directdurable: true# 隊列名稱queues: userQueue:name: css.user.redis.loginUser.queuedurable: trueexclusive: falseautoDelete: false# 綁定關系 bindings:userRedisBinding:exchange: ${hl.amqp.exchanges.userExchange.name}queue: ${hl.amqp.queues.userQueue.name}routingKey: user.redis.loginUser.del
創建properties讀取yml文件:
import java.util.Map;import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;import lombok.Data;/*** RabbitMQ配置屬性類*/
@Component
@ConfigurationProperties(prefix = "hl.amqp")
@Data
public class AmqpConfigProperties {private Map<String, ExchangeConfig> exchanges;private Map<String, QueueConfig> queues;private Map<String, BindingConfig> bindings;@Datapublic static class ExchangeConfig {private String name;private String type;private boolean durable = true;private boolean autoDelete = false;private boolean internal = false;}@Datapublic static class QueueConfig {private String name;private boolean durable = true;private boolean exclusive = false;private boolean autoDelete = false;}@Datapublic static class BindingConfig {private String exchange;private String queue;private String routingKey;}
}
創建動態加載配置類,進行交換機,隊列和綁定關系的注冊:
import com.hl.campusservicesys.properties.AmqpConfigProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Slf4j
@Configuration
public class DynamicAmqpConfig {private final AmqpAdmin amqpAdmin;private final AmqpConfigProperties amqpConfigProperties;// 改為構造器注入或方法注入public DynamicAmqpConfig(AmqpAdmin amqpAdmin, AmqpConfigProperties amqpConfigProperties) {this.amqpAdmin = amqpAdmin;this.amqpConfigProperties = amqpConfigProperties;}// 創建交換機@Beanpublic Map<String, Exchange> amqpExchanges() {Map<String, Exchange> exchanges = new HashMap<>();amqpConfigProperties.getExchanges().forEach((key, config) -> {Exchange exchange = switch (config.getType()) {case ExchangeTypes.DIRECT -> new DirectExchange(config.getName(), config.isDurable(), config.isAutoDelete());case ExchangeTypes.TOPIC -> new TopicExchange(config.getName(), config.isDurable(), config.isAutoDelete());case ExchangeTypes.FANOUT -> new FanoutExchange(config.getName(), config.isDurable(), config.isAutoDelete());default -> throw new IllegalArgumentException("Unsupported exchange type: " + config.getType());};exchanges.put(key, exchange);// 直接通過amqpAdmin聲明交換機到RabbitMQamqpAdmin.declareExchange(exchange);});log.info("RabbitMQ交換機初始化完成!");return exchanges;}// 創建隊列@Beanpublic Map<String, Queue> amqpQueues() {Map<String, Queue> queues = new HashMap<>();amqpConfigProperties.getQueues().forEach((key, config) -> {Queue queue = new Queue(config.getName(),config.isDurable(),config.isExclusive(),config.isAutoDelete());queues.put(key, queue);// 直接通過amqpAdmin聲明隊列到RabbitMQamqpAdmin.declareQueue(queue);});log.info("RabbitMQ隊列初始化完成!");return queues;}// 創建綁定關系@Beanpublic Map<String, Binding> amqpBindings(Map<String, Exchange> amqpExchanges, Map<String, Queue> amqpQueues) {Map<String, Binding> bindings = new HashMap<>();amqpConfigProperties.getBindings().forEach((key, config) -> {// 查找對應的交換機和隊列Exchange exchange = null;for (Exchange ex : amqpExchanges.values()) {if (ex.getName().equals(config.getExchange())) {exchange = ex;break;}}Queue queue = null;for (Queue q : amqpQueues.values()) {if (q.getName().equals(config.getQueue())) {queue = q;break;}}if (exchange != null && queue != null) {Binding binding = BindingBuilder.bind(queue).to(exchange).with(config.getRoutingKey()).noargs();bindings.put(key, binding);// 直接通過amqpAdmin聲明綁定關系到RabbitMQamqpAdmin.declareBinding(binding);}});log.info("RabbitMQ綁定關系初始化完成!");return bindings;}@Beanpublic Jackson2JsonMessageConverter jackson2JsonMessageConverter() {return new Jackson2JsonMessageConverter();}// 配置消息轉換器@Beanpublic MessageConverter messageConverter(){// 1.定義消息轉換器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自動創建消息id,用于識別不同消息,也可以在業務中基于ID判斷是否是重復消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}// 配置 RabbitTemplate@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter messageConverter) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter);return rabbitTemplate;}}
配置消息監聽器:
/*** 用戶服務監聽器* */
@Slf4j
@Component
public class UserListener {@Resourceprivate RedisCache redisCache;@RabbitListener(queues = "${hl.amqp.queues.userQueue.name}")public void deleteUserInfoRedis(Long userId) {log.info("監聽到消息message:{}", userId);redisCache.deleteObject(RedisConstant.USER_INFO_KEY + userId);}}
創建消息發送者:
/*** RabbitMQ工具類* */
@Component
public class AmqpMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate AmqpConfigProperties amqpConfigProperties;/*** 發送消息* @param exchange 交換機配置鍵名* @param routingKey 路由鍵* @param message 消息內容*/public void sendMessage(String exchange, String routingKey, Object message) {rabbitTemplate.convertAndSend(amqpConfigProperties.getExchanges().get(exchange).getName(),amqpConfigProperties.getBindings().get(routingKey).getRoutingKey(), message);rabbitTemplate.convertAndSend(MQConstant.USER_EXCHANGE, MQConstant.USER_REDIS_BINDING, message);}}
創建常量類:
/*** MQ常量* */
public class MQConstant {public static final String USER_EXCHANGE = "userExchange";public static final String USER_QUEUE = "userQueue";public static final String USER_REDIS_BINDING = "userRedisBinding";
}
?使用:
// 發送消息更新緩存
amqpMessageSender.sendMessage(MQConstant.USER_EXCHANGE, MQConstant.USER_REDIS_BINDING, userDTO.getUserId());
示例:
啟動時完成初始化:
發送消息,監聽者接收并處理:
? ? ? ? 在啟動時,spring會自動加載注冊配置的交換機、隊列并完成綁定,對比下來肯定是沒直接使用常量類方便來著,感興趣可以玩玩。