文章目錄
- 工作隊列模式
- 引入依賴
- 配置
- 聲明
- 生產者代碼
- 消費者代碼
- 發布/訂閱模式
- 引入依賴
- 聲明
- 生產者代碼
- 發送消息
- 消費者代碼
- 運行程序
- 路由模式
- 聲明
- 生產者代碼
- 消費者代碼
- 運行程序
- 通配符模式
- 聲明
- 生產者代碼
- 消費者代碼
- 運行程序
工作隊列模式
引入依賴
我們在創建 SpringBoot
項目的時候,選上這兩個依賴即可
或者在依賴中加入
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置
將配置文件后綴改成 yml
之后,進行配置
#配置 RabbitMQ 的基本信息
spring:rabbitmq: host: 127.0.0.1 #RabbitMQ 服務器的地址 port: 15673 #RabbitMQ的TCP協議的端口號,而不是管理平臺的端口號。默認為5672 username: guest password: guest virtual-host: coding #默認為 /
或者這樣寫
spring:rabbitmq:addresses: amqp://guest:guest@127.0.0.1:5672/coding
- 格式為:
amqp://username:password@ip:port/virtual-host
聲明
注意引入的是這個包
package org.example.rabbitmq.config; import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; @Configuration
public class RabbitMQConfig { // 聲明一個隊列,來自第三方包,就是一個對象 @Bean("workQueue") public Queue workQueue(){ return QueueBuilder.durable(Constants.WORK_QUEUE).build(); }
}
生產者代碼
package org.example.rabbitmq.controller; import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; @RestController
@RequestMapping("/producer")
public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/work") public String work() { // 使用內置交換機的話,RoutingKey 和隊列名稱一致 rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, "hello spring amqp: work..."); return "發送成功"; }
}
- 在運行程序之后,隊列不會被立馬創建出來
- 需要發送消息之后才會被創建
消費者代碼
消費者是通過實現一個監聽類,來監聽有沒有消息
- 采用一個注解——
@RabbitListener
@RabbitListener
是Spring
框架中用于監聽RabbitMQ
隊列的注解,通過使用這個注解,可以定義一個方法,以便從RabbitMQ
隊列中接收消息。
- 該注解支持多種參數類型,這些參數類型代表了從
RabbitMQ
接收到的消息和相關信息- 以下是一些常用的參數類型:
String
:返回消息的內容Message
(org.spring.framework.ampq.core.Message
):Spring AMPQ
的Message
類,返回原始的消息體以及消息的屬性,如消息ID
,內容,隊列信息等Channel
(com.rabbitmq.client.Channel
):RabbitMQ
的通道對象,可以用于進行高級的操作,如手動確認消息
package org.example.rabbitmq.listener; import org.apache.logging.log4j.message.Message;
import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; @Component
public class WorkListener { @RabbitListener(queues = Constants.WORK_QUEUE) public void queueListener1(Message message) { System.out.println("listener 1 [" + Constants.WORK_QUEUE + "] 接收到消息:" + message); } @RabbitListener(queues = Constants.WORK_QUEUE) public void queueListener2(String message) { System.out.println("listener 2 [" + Constants.WORK_QUEUE + "] 接收到消息:" + message); }
}
發布/訂閱模式
在發布/訂閱模式中,多了一個 Exchange
角色。Exchange
常見有三種類型,分別代表不同的路由規則
Fanout
: 廣播,將消息交給所有綁定到交換機的隊列 (Publish/Subscribe
模式)Direct
: 定向,把消息交給符合指定Routing Key
的隊列(Routing
模式)Topic
: 通配符,把消息交給符合Routing pattern
(路由模式) 的隊列(Topics
模式)
引入依賴
我們在創建 SpringBoot
項目的時候,選上這兩個依賴即可
或者在依賴中加入
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
聲明
package org.example.rabbitmq.config; import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; @Configuration
public class RabbitMQConfig { /** * 二、發布/訂閱模式 * 聲明隊列、聲明交換機、聲明隊列和交換機的綁定 * @return */ @Bean("fanoutQueue1") // @Bean注解:交給Spring進行管理, 括號里面是指定名稱 public Queue fanoutQueue1() { return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build(); } @Bean("fanoutQueue2") public Queue fanoutQueue2() { return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build(); } @Bean("fanoutExchange") // 聲明交換機有很多種類型:FanoutExchange、DirectExchange、TopicExchange public FanoutExchange fanoutExchange() { return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build(); } @Bean("fanoutQueueBinding1") public Binding fanoutQueueBinding1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue1") Queue queue) { return BindingBuilder.bind(queue).to(fanoutExchange); } @Bean("fanoutQueueBinding2") public Binding fanoutQueueBinding2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange, @Qualifier("fanoutQueue2") Queue queue) { return BindingBuilder.bind(queue).to(fanoutExchange); }
}
生產者代碼
- 聲明隊列
- 聲明交換機
- 聲明交換機和隊列的綁定
- 發送消息
發送消息
package org.example.rabbitmq.controller; import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; @RestController
@RequestMapping("/producer")
public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/fanout") public String fanout() { rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE,"","hello spring amqp:fanout..."); return "發送成功"; }
}
消費者代碼
package org.example.rabbitmq.listener; import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; @Component
public class FanoutListener { @RabbitListener(queues = Constants.FANOUT_QUEUE1) public void queueListener1(String message) { System.out.println("隊列[" + Constants.FANOUT_QUEUE1 + "] 接收到消息:" + message); } @RabbitListener(queues = Constants.FANOUT_QUEUE2) public void queueListener2(String message) { System.out.println("隊列[" + Constants.FANOUT_QUEUE2 + "] 接收到消息:" + message); }
}
運行程序
- 運行項目,調用接口發送消息
- http://127.0.0.1:8080/producer/fanout
- 監聽類收到消息,并打印
路由模式
交換機類型為 Direct
時,會把消息交給符合指定 Routing Key
的隊列
- 隊列和交換機的綁定,不是任意的綁定了,而是要制定一個
RoutingKey
(路由key
) - 消息的發送方在向
Exchange
發送消息時,也需要指定消息的RoutingKey
Exchange
也不再把消息交給每一個綁定的key
,而是根據消息的RoutingKey
進行判斷,只有隊列的RoutingKey
和消息的RoutingKey
完全一致,才會接收消息
聲明
按照這個圖片,進行綁定
/** * 三、 路由模式 * 聲明隊列、聲明交換機、聲明隊列和交換機的綁定 * @return */
@Bean("directQueue1")
public Queue directQueue1(){ return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();
} @Bean("directQueue2")
public Queue directQueue2(){ return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();
} @Bean("directExchange")
// 聲明交換機有很多種類型:FanoutExchange、DirectExchange、TopicExchange
public DirectExchange directExchange() { return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();
} @Bean("directQueueBinding1")
public Binding directQueueBinding1(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue1") Queue queue) { return BindingBuilder.bind(queue).to(directExchange).with("a");
} @Bean("directQueueBinding2")
public Binding directQueueBinding2(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) { return BindingBuilder.bind(queue).to(directExchange).with("a");
} @Bean("directQueueBinding3")
public Binding directQueueBinding3(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) { return BindingBuilder.bind(queue).to(directExchange).with("b");
} @Bean("directQueueBinding4")
public Binding directQueueBinding4(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue2") Queue queue) { return BindingBuilder.bind(queue).to(directExchange).with("c");
}
生產者代碼
package org.example.rabbitmq.controller; import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; @RestController
@RequestMapping("/producer")
public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; /** * 三、路由模式 * @param routingKey * @return */ @RequestMapping("/direct/{routingKey}") //從路徑中拿到這個routingKey public String direct(@PathVariable("routingKey") String routingKey) { rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey,"hello spring amqp:direct, my routing key is" + routingKey); return "發送成功"; }
}
消費者代碼
package org.example.rabbitmq.listener; import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component; @Component
public class DirectListener { @RabbitListener(queues = Constants.DIRECT_QUEUE1) public void queueListener1(String message) { System.out.println("隊列[" + Constants.DIRECT_QUEUE1 + "] 接收到消息:" + message); } @RabbitListener(queues = Constants.DIRECT_QUEUE2) public void queueListener2(String message) { System.out.println("隊列[" + Constants.DIRECT_QUEUE2 + "] 接收到消息:" + message); }
}
運行程序
-
運行項目
-
調用接口發送
routingKey
為a
的消息- http://127.0.0.1:8080/producer/direct/a
- 觀察后端日志,隊列 1 和 2 都收到消息
-
調用接口發送
routingKey
為b
的消息- http://127.0.0.1:8080/producer/direct/b
- 觀察后端日志,隊列 2 收到消息
-
調用接口發送
routingKey
為c
的消息- http://127.0.0.1:8080/producer/direct/c
- 觀察后端日志,隊列 2 收到消息
通配符模式
Topics
和 Routing
模式的區別是:
topics
模式使用的交換機類型為topic
(Routing
模式使用的是direct
)topic
類型的交換機在匹配規則上進行了擴展,Binding Key
支持通配符匹配
*
表示一個單詞#
表示多個單詞
聲明
/** * 四、通配符模式 * 聲明隊列、聲明交換機、聲明隊列和交換機的綁定 * @return */
@Bean("topicQueue1")
public Queue topicQueue1(){ return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();
} @Bean("topicQueue2")
public Queue topicQueue2(){ return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();
} @Bean("topicExchange")
public TopicExchange topicExchange() { return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();
} @Bean("topicQueueBinding1")
public Binding topicQueueBinding1(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue1") Queue queue) { return BindingBuilder.bind(queue).to(topicExchange()).with("*.a.*");
} @Bean("topicQueueBinding2")
public Binding topicQueueBinding2(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue) { return BindingBuilder.bind(queue).to(topicExchange()).with("*.*.b");
} @Bean("topicQueueBinding3")
public Binding topicQueueBinding3(@Qualifier("topicExchange") TopicExchange topicExchange,@Qualifier("topicQueue2") Queue queue) { return BindingBuilder.bind(queue).to(topicExchange()).with("c.#");
}
生產者代碼
package org.example.rabbitmq.controller; import org.example.rabbitmq.constant.Constants;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; @RestController
@RequestMapping("/producer")
public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; /** * 四、通配符模式 * @param routingKey * @return */ @RequestMapping("/topic/{routingKey}") public String topic(@PathVariable("routingKey") String routingKey) { rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routingKey, "hello spring amqp:topic, my routing key is " + routingKey); return "發送成功"; }
}
消費者代碼
運行程序
-
運行程序
-
調用接口發送
routingKey
為qqq.a.b
的消息- http://127.0.0.1:8080/producer/topic/qqq.a.b
- 觀察后端日志,隊列 1 和隊列 2 均收到消息
-
調用接口發送
routingKey
為c.abc.fff
的消息- http://127.0.0.1:8080/producer/topic/c.abc.fff
- 觀察后端日志,隊列 2 收到信息
-
調用接口發送
routingKey
為g.h.j
的消息- http://127.0.0.1:8080/producer/topic/g.h.j
- 觀察后端日志,沒有隊列收到消息