???🔥個人主頁:?中草藥
🔥專欄:【中間件】企業級中間件剖析
?Spring 框架與 RabbitMQ 的整合主要通過?Spring AMQP(Advanced Message Queuing Protocol)模塊實現,提供了便捷的消息隊列開發能力。
引入依賴
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies>
配置
#配置RabbitMQ的基本信息
spring:rabbitmq:host: 110.41.51.65port: 15673 #默認為5672username: studypassword: studyvirtual-host: bite #默認值為#或者以下這種方式rabbitmq:addresses: #amqp://username:password@Ip:port/virtual-host
一、工作隊列模式(Work Queue)
場景:多個消費者共享一個隊列,消息被輪詢分發(Round-Robin),用于任務分發和負載均衡(如耗時任務處理)。
producer
@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/work")public String work(){//使用內置交換機 RoutingKey 和隊列名稱一致for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE,"Hello World"+i);}return "OK";}
}
@RabbitListener
?是 Spring 框架中用于監聽 RabbitMQ 隊列的注解,通過使用這個注解,可以定義一個方法,以便從 RabbitMQ 隊列中接收消息(相當于消費者)。該注解支持多種參數類型,這些參數類型代表了從 RabbitMQ 接收到的消息和相關信息。
以下是一些常用的參數類型:
String
:返回消息的內容。Message
(org.springframework.amqp.core.Message
):Spring AMQP 的 Message 類,返回原始的消息體以及消息的屬性,如消息 ID、內容、隊列信息等。Channel
(com.rabbitmq.client.Channel
):RabbitMQ 的通道對象,可以用于進行更高級的操作,如手動確認消息 。
@Component
public class WorkListener {@RabbitListener(queues = Constants.WORK_QUEUE)public void queueListener1(Message message, Channel channel) {System.out.println("Listener1:["+Constants.WORK_QUEUE+"]"+message+",channel:"+channel);}@RabbitListener(queues = Constants.WORK_QUEUE)public void queueListener2(String message) {System.out.println("Listener2:["+Constants.WORK_QUEUE+"]"+message);}
}
觀察控制臺
輪詢分發:默認按順序將消息分發給不同消費者。
二、發布訂閱模式(Publish/Subscribe)
場景:消息廣播到所有綁定的隊列(如日志廣播、事件通知),使用?Fanout 交換機。
聲明使用交換機和隊列 并綁定
@Configuration
public class RabbitMQConfig {@Bean("fanoutQueue1")public Queue funoutQueue1() {return QueueBuilder.durable(Constants.FUNOUT_QUEUE1).build();}@Bean("fanoutQueue2")public Queue funoutQueue2() {return QueueBuilder.durable(Constants.FUNOUT_QUEUE2).build();}@Bean("funoutExchange")public FanoutExchange funoutExchange(){return ExchangeBuilder.fanoutExchange(Constants.FUNOUT_EXCHANGE).durable(true).build();}@Bean("funoutQueueBinding1")public Binding funoutQueueBinding1(@Qualifier("fanoutQueue1") Queue queue,@Qualifier("funoutExchange") FanoutExchange funoutExchange){return BindingBuilder.bind(queue).to(funoutExchange);}@Bean("funoutQueueBinding2")public Binding funoutQueueBinding2(@Qualifier("fanoutQueue2") Queue queue,@Qualifier("funoutExchange") FanoutExchange funoutExchange){return BindingBuilder.bind(queue).to(funoutExchange);}
}
producer
@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/funout")public String funout(){rabbitTemplate.convertAndSend(Constants.FUNOUT_EXCHANGE,"","Hello Spring funout");return "OK";}
}
listener
@Component
public class FunoutListener {@RabbitListener(queues = Constants.FUNOUT_QUEUE1)public void queueListener1(String message) {System.out.println("Listener1:["+Constants.FUNOUT_QUEUE1+"]"+message);}@RabbitListener(queues = Constants.FUNOUT_QUEUE2)public void queueListener2(String message) {System.out.println("Listener2:["+Constants.FUNOUT_QUEUE2+"]"+message);}
}
在實際開發之中,一個 listener 監聽一個 queue?
三、路由模式(Routing)
場景:根據?路由鍵(Routing Key)?精準匹配,將消息發送到指定隊列,使用?Direct 交換機。
聲明使用交換機和隊列 并綁定
@Configuration
public class RabbitMQConfig {//direct@Bean("directQueue1")public Queue directQueue1() {return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();}@Bean("directQueue2")public Queue directQueue2() {return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();}@Bean("directExchange")public DirectExchange directExchange(){return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();}@Bean("directQueueBinding1")public Binding directQueueBinding1(@Qualifier("directQueue1") Queue queue,@Qualifier("directExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("orange");}@Bean("directQueueBinding2")public Binding directQueueBinding2(@Qualifier("directQueue2") Queue queue,@Qualifier("directExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("black");}@Bean("directQueueBinding3")public Binding directQueueBinding3(@Qualifier("directQueue2") Queue queue,@Qualifier("directExchange") DirectExchange directExchange){return BindingBuilder.bind(queue).to(directExchange).with("orange");}
}
producer
@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/direct/{routingKey}")public String direct(@PathVariable String routingKey){rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE,routingKey,"Hello Spring direct "+routingKey);return "direct OK";}
}
listener
@Component
public class DirectListener {@RabbitListener(queues = Constants.DIRECT_QUEUE1)public void queueListener1(String message) {System.out.println("Listener1:["+Constants.DIRECT_QUEUE1+"]"+message);}@RabbitListener(queues = Constants.DIRECT_QUEUE2)public void queueListener2(String message) {System.out.println("Listener2:["+Constants.DIRECT_QUEUE2+"]"+message);}
}
如果某一個隊列即綁定了black和orange,將會分別發送到隊列
四、通配符模式(Topics)
場景:根據路由鍵的?通配符規則?匹配隊列,使用?Topic 交換機,支持?*
(匹配一個單詞)和?#
(匹配多個單詞)。
Topics 和 Routing 模式的區別是:
- topics 模式使用的交換機類型為 topic (Routing 模式使用的交換機類型為 direct)
- topic 類型的交換機在匹配規則上進行了擴展,Binding Key 支持通配符匹配
聲明使用交換機和隊列 并綁定
@Configuration
public class RabbitMQConfig {//TOPIC@Bean("topicExchange")public TopicExchange topicExchange(){return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();}@Bean("topicQueue1")public Queue topicQueue1(){return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();}@Bean("topicQueue2")public Queue topicQueue2(){return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();}@Bean("topicQueueBinding1")public Binding topicQueueBinding1(@Qualifier("topicExchange")TopicExchange topicExchange,@Qualifier("topicQueue1") Queue topicQueue){return BindingBuilder.bind(topicQueue).to(topicExchange()).with("*.orange.*");}@Bean("topicQueueBinding2")public Binding topicQueueBinding2(@Qualifier("topicExchange")TopicExchange topicExchange,@Qualifier("topicQueue2") Queue topicQueue){return BindingBuilder.bind(topicQueue).to(topicExchange).with("*.*.rabbit");}@Bean("topicQueueBinding3")public Binding topicQueueBinding3(@Qualifier("topicExchange")TopicExchange topicExchange,@Qualifier("topicQueue2") Queue topicQueue){return BindingBuilder.bind(topicQueue).to(topicExchange).with("lazy.#");}
}
producer
@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/topic/{routingKey}")public String topic(@PathVariable String routingKey){rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routingKey,"Hello Spring topic "+routingKey);return "topic OK";}
}
listener
@Component
public class TopicListener {@RabbitListener(queues = Constants.TOPIC_QUEUE1)public void queueListener1(String message) {System.out.println("Listener1:["+Constants.TOPIC_QUEUE1+"]"+message);}@RabbitListener(queues = Constants.TOPIC_QUEUE2)public void queueListener2(String message) {System.out.println("Listener2:["+Constants.TOPIC_QUEUE2+"]"+message);}
}
五、完成應用通信
SpringBoot 整合 RabbitMQ 實現應用通信是微服務/分布式系統中常見的異步解耦方案。
以此圖為實例
訂單系統
@Configuration
public class RabbitMQConfig {@Bean("orderQueue")public Queue orderQueue() {return QueueBuilder.durable("order.create").build();}
}@RestController
@RequestMapping("/order")
public class OrderController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/create")public String create(){rabbitTemplate.convertAndSend("","order.create","訂單信息,訂單ID:"+ UUID.randomUUID());//這里僅僅是模擬演示,實際的下單操作比較復雜,包括參數的校驗,數據庫存儲等等 業務代碼省略return "下單成功";}
}
?物流系統
@Component
@RabbitListener(queues = "order.create")
public class OrderListener {@RabbitHandler//該注解根據所識別的數據類型不同自動分配不同的方法public void handleOrder(String orderInfo) {System.out.println("接收到新的訂單消息:"+orderInfo);//接收到訂單消息后,進行相應的業務出路 代碼省略}@RabbitHandler//在此處為方便演示我們將order-service 讓此項目應用//正確的做法是將OrderInfo抽取出來單獨作為一個包,兩個service都引用這個包public void handleOrder2(OrderInfo orderInfo) {System.out.println("接收到新的訂單消息:"+orderInfo);}
}
@RabbitHandler
?注解用于標記方法,這些方法會根據消息的類型來處理接收到的消息。當一個消息監聽器容器接收到消息時,它會根據消息的類型選擇合適的?@RabbitHandler
?注解的方法來處理該消息。?
測試結果
當在發送的是一個對象時,為保證對象的可讀性,我們要保證對象可被序列化,且通過 Jackson2JsonMessageConverter 將其從原生序列化改為Json格式
@Configuration
public class RabbitMQConfig {@Beanpublic Jackson2JsonMessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter());return rabbitTemplate;}
}
對于Listener,為保證同樣具有解讀json的能力,也應該去加上相同的配置?
@Data
public class OrderInfo implements Serializable {//實現序列化接口private String orderId;private String name;
}
?可觀察結果
對時間的慷慨,就等于慢性自殺。——奧斯特洛夫斯基
🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀
以上,就是本期的全部內容啦,若有錯誤疏忽希望各位大佬及時指出💐
? 制作不易,希望能對各位提供微小的幫助,可否留下你免費的贊呢🌸?