文章目錄
- 1、mq(消息隊列)概述
- 2、RabbitMQ環境搭建
- 3、java基于AMQP協議操作RabbitMQ
- 4、基于Spring AMQP操作RabbitMQ
- 5、代碼中創建隊列與交換機
- ①、配置類創建
- ②、基于@RabbitListener注解創建
- 6、RabbitMQ詳解
- ①、work模型
- ②、交換機
- 1、Fanout(廣播)交換機
- 2、Direct(定向)交換機
- 3、Topic(話題)交換機
- 7、消息轉換器
- 總結
1、mq(消息隊列)概述
MQ 是 Message Queue(消息隊列)的簡稱,是一種用于異步通信和解耦的中間件技術。它的核心功能是通過隊列結構存儲和傳輸消息,允許生產者(發送消息的一方)和消費者(接收消息的一方)在不同時間進行數據交換,而無需直接連接
MQ作用:
①、異步調用:
異步調用方式其實就是基于消息通知的方式,一般包含三個角色:
- 消息發送者:投遞消息的人,就是原來的調用方
- 消息代理:管理、管理存儲、轉發消息的中間件
- 消息接收者:接收和處理消息的人,就是原來的服務提供方
優點:
- 異步調用,無需等待,性能好
- 故障隔離,下游服務故障不影響上游業務
缺點:
- 不能立即得到調用結果,時效性差
- 確定下游業務執行是否成功
- 業務安全依賴于Broker的可靠性
②、削峰/降流
在電子商務一些秒殺、促銷活動中,合理使用消息隊列可以有效抵御促銷活動剛開始大量訂單涌入對系統的沖擊。如下圖所示:
③、降低系統耦合性
對于發送方來說,只需要將自己的消息發送到消息隊列就ok了,而對于接收方來說,只需要接收消息即可,而無需關注誰發的,極大降低了發送接收方的耦合性。
④、順序保證
消息隊列保證數據按照特定的順序被處理,適用于那些對數據順序有嚴格要求的場景。大部分消息隊列,例如 RocketMQ、RabbitMQ、Pulsar、Kafka,都支持順序消息。
⑤、延時/定時處理
消息發送后不會立即被消費,而是指定一個時間,到時間后再消費。大部分消息隊列,例如 RocketMQ、RabbitMQ、Pulsar、Kafka,都支持定時/延時消息。
⑥、即時通訊
MQTT(消息隊列遙測傳輸協議)是一種輕量級的通訊協議,采用發布/訂閱模式,非常適合于物聯網(IoT)等需要在低帶寬、高延遲或不可靠網絡環境下工作的應用。它支持即時消息傳遞,即使在網絡條件較差的情況下也能保持通信的穩定性。RabbitMQ 內置了 MQTT 插件用于實現 MQTT 功能(默認不啟用,需要手動開啟)
四大mq產品對比:
2、RabbitMQ環境搭建
我們在docker環境下通過docker pull來快速獲取RabbitMQ的鏡像。
拉取:
docker pull rabbitmq:3.8-management
運行:
docker run -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123456 -v mq-plugins:/plugins --name mq --hostname mq -p 15672:15672 -p 5672:5672 -d rabbitmq:3.8-management
其中,15672端口是圖形化界面的端口,而5672是發送接收消息的端口。
登錄之后,界面如下:
-
publisher: 消息發送者
-
consumer: 消息的消費者
-
queue: 隊列,存儲消息
-
exchange: 交換機,負責路由消息
-
connectors: 生產者或者消費者和消息隊列建立連接的情況
-
channels: 消息通道,生產者消費者進行通信需要建立一個通道。
-
Admin: 管理虛擬主機,添加和查看已有的用戶
RabbitMQ架構:
3、java基于AMQP協議操作RabbitMQ
AMQP(Advanced Message Queuing Protocol),是用于在應用程序之間傳遞消息的開放標準協議。協議以語言和平臺無關,更符合互聯網的要求。
官網文檔教程:
java中操作RabbitMQ
新建java的maven項目,添加依賴:
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.17</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>2.0.17</version></dependency></dependencies>
這幾個依賴必須導入。
發送方(Send.java) :
public class Send {private final static String QUEUE_NAME = "hello";public static void main(String[] args) {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.138.133");factory.setPort(5672);factory.setUsername("root");factory.setPassword("123456");// 建立連接,創建管道try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello RabbitMQ!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.out.println(" [x] Sent '" + message + "'");} catch (Exception e) {throw new RuntimeException(e);}}}
接收方(Recv.java) :
public class Recv {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.138.133");factory.setPort(5672);factory.setUsername("root");factory.setPassword("123456");// 建立連接,創建管道Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}
host和用戶、密碼等依據自己情況修改
先運行Recv,此時接收方會處于等待接收的狀態,隨后Send發送消息,接收成功。
運行結果:
[x] Sent 'Hello RabbitMQ!'[x] Received 'Hello RabbitMQ!'
4、基于Spring AMQP操作RabbitMQ
Spring AMQP是基于AMQP協議定義的一套API規范,提供了模板來發送和接收消息。包含兩部分,其中spring-amqp是基礎抽象,spring-rabbit是底層的默認實現。因此RabbitMQ中我們可以通過Spring進行操作。
<dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依賴,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--單元測試--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>
消息發送者
@Resourceprivate RabbitTemplate rabbitTemplate;@Testpublic void testSend() {// 隊列名稱String queueName = "hello";// 消息String message = "hello RabbitMQ!";// 發送消息rabbitTemplate.convertAndSend(queueName, message);}
消息消費者(其實就是通過監聽隊列來獲取信息):
@RabbitListener()中的queues參數里面的值就是隊列的名字。
@Slf4j
@Component
public class Consumer {@RabbitListener(queues = {"hello"}) //這里參數是隊列的名字,填寫的時候按自己情況來。public void testConsumer(String msg) {log.info("消費者收到消息:" + msg);}}
如果想要在代碼中創建隊列的話,可以在config類中定義:
@Configuration
public class MQConfig {//代表創建一個叫queue的隊列@Beanpublic Queue queue() {return new Queue("queue");}
}
5、代碼中創建隊列與交換機
①、配置類創建
@Configuration
public class MQConfig {// 聲明交換機@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("test.fanout");}// 聲明隊列1@Beanpublic Queue fanoutQueue1() {return new Queue("fanout.queue1");}// 聲明隊列2@Beanpublic Queue fanoutQueue2() {return new Queue("fanout.queue2");}// 綁定隊列1與交換機@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}// 綁定隊列2與交換機@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
如上就是通過在一個config類中定義bean實現注入。
②、基于@RabbitListener注解創建
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
該注解用配置類等價于:
@Configuration
public class MQConfig {// 聲明交換機@Beanpublic DirectExchange fanoutExchange() {return new DirectExchange("test.direct");}// 聲明隊列1@Beanpublic Queue fanoutQueue1() {return new Queue("direct.queue1");}// 綁定隊列1與交換機@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, DirectExchange directExchange) {return BindingBuilder.bind(fanoutQueue1).to(directExchange).with("red");}@Beanpublic Binding bindingQueue2(Queue fanoutQueue1, DirectExchange directExchange) {return BindingBuilder.bind(fanoutQueue1).to(directExchange).with("blue");}
}
可以說極大簡化了開發。
6、RabbitMQ詳解
①、work模型
work模型就是多個消費者綁定到一個隊列,加快消息處理速度,通過設置prefech來控制消費者領取消息的數量。
而prefetch默認值為250,這個可以自己來控制調整。
發送方代碼:
@Testpublic void testWorkQueue() throws Exception {String queueName = "work.queue";for (int i = 0; i < 50; i++) {String message = "hello, worker, message_" + i;rabbitTemplate.convertAndSend(queueName, message);Thread.sleep(50);}}
接收方代碼:
@RabbitListener(queues = {"work.queue"}) //這里參數是隊列的名字,填寫的時候按自己情況來。public void workConsumer(String msg) throws InterruptedException {System.out.println("work.queue隊列1收到消息:" + msg);Thread.sleep(20);}@RabbitListener(queues = {"work.queue"}) //這里參數是隊列的名字,填寫的時候按自己情況來。public void workConsumer2(String msg) throws InterruptedException {System.err.println("work.queue隊列2收到消息:" + msg);Thread.sleep(200);}
最后發現兩個接收方無論處理快還是慢,最后每個都只能處理25個消息,而我們一共發了50條消息,這樣會導致處理效率很低。
解決方法,在properties中加一個配置:
spring:listener:simple:prefetch: 1
運行結果:
這樣處理的話,效率就高多了,基本就是發一條消息,誰有空誰來處理即可。
②、交換機
1、Fanout(廣播)交換機
Fanout交換機會將接收到的消息廣播到每一個與其綁定的queue,也叫廣播模式。
這里我們在隊列中聲名兩個queue,分別叫fanout.queue1和fanout.queue2。
交換機使用amq.fanout,為fanout類型
記得先將交換機與隊列進行綁定:
接收方代碼:
@RabbitListener(queues = {"fanout.queue1"}) //這里參數是隊列的名字,填寫的時候按自己情況來。public void fanoutConsumer(String msg) throws InterruptedException {System.out.println("fanout.queue隊列1收到消息:" + msg);}@RabbitListener(queues = {"fanout.queue2"}) //這里參數是隊列的名字,填寫的時候按自己情況來。public void fanoutConsumer2(String msg) throws InterruptedException {System.err.println("fanout.queue隊列2收到消息:" + msg);}
發送方代碼:
@Testpublic void testFanoutQueue() throws Exception {String exchange = "amq.fanout";String message = "hello, everyone";rabbitTemplate.convertAndSend(exchange, "", message);}
運行結果:
fanout.queue隊列2收到消息:hello, everyone
fanout.queue隊列1收到消息:hello, everyone
2、Direct(定向)交換機
Direct Exchange 會將接收到的消息根據規則路由到指定的Queue,因此稱為定向路由。
每一個Queue都與Exchange設置一個BindingKey,發布者發送消息時,指定消息的RoutingKey,Exchange將消息路由到BindingKey與消息RoutingKey一致的隊列
當交換機中的key對應的值和queue中的bingdingKey值相同時,消息就發送到對應的消費者手中,同時不同queue的bingdingKey值可以是相同的,同一個bindingKey可以有多個值。
創建兩個direct.queue:
綁定到amq.direct上:
接收消息代碼:
@RabbitListener(queues = {"direct.queue1"}) //這里參數是隊列的名字,填寫的時候按自己情況來。public void DirectConsumer(String msg) throws InterruptedException {System.out.println("fanout.queue隊列1收到消息:" + msg);}@RabbitListener(queues = {"direct.queue2"}) //這里參數是隊列的名字,填寫的時候按自己情況來。public void DirectConsumer2(String msg) throws InterruptedException {System.err.println("fanout.queue隊列2收到消息:" + msg);}
發送消息代碼:
@Testpublic void testDirectQueue() throws Exception {String exchange = "amq.direct";String message1 = "hello, Red";String message2 = "hello, Blue";String message3 = "hello, Yellow";rabbitTemplate.convertAndSend(exchange, "red", message1);rabbitTemplate.convertAndSend(exchange, "blue", message2);rabbitTemplate.convertAndSend(exchange, "yellow", message3);}
運行結果:
direct.queue隊列1收到消息:hello, Red
direct.queue隊列1收到消息:hello, Blue
direct.queue隊列2收到消息:hello, Red
direct.queue隊列2收到消息:hello, Yellow
3、Topic(話題)交換機
TopicExchange與DirectExchange類似,區別在于routingKey可以是多個單詞的列表,并且以“.”分割。
Queue與Exchange指定BindingKey時可以使用通配符:
#: 代指0個或多個單詞
*: 代指一個單詞
新建兩個隊列:
使用amq.topic綁定:
接收方代碼:
@RabbitListener(queues = {"topic.queue1"}) //這里參數是隊列的名字,填寫的時候按自己情況來。public void TopicConsumer(String msg) throws InterruptedException {System.out.println("topic.queue隊列1收到消息:" + msg);}@RabbitListener(queues = {"topic.queue2"}) //這里參數是隊列的名字,填寫的時候按自己情況來。public void TopicConsumer2(String msg) throws InterruptedException {System.err.println("topic.queue隊列2收到消息:" + msg);}
發送方代碼:
@Testpublic void testTopicQueue() throws Exception {String exchange = "amq.topic";String message = "Japan's news";String message2 = "China's news";String message3 = "China's weather";String message4 = "Japan's weather";rabbitTemplate.convertAndSend(exchange, "Japan.news", message);rabbitTemplate.convertAndSend(exchange, "China.news", message2);rabbitTemplate.convertAndSend(exchange, "China.weather", message3);rabbitTemplate.convertAndSend(exchange, "Japan.weather", message4);}
運行結果:
topic.queue隊列2收到消息:Japan's news
topic.queue隊列2收到消息:China's news
topic.queue隊列1收到消息:China's news
topic.queue隊列1收到消息:China's weather
7、消息轉換器
發送java對象代碼:
@Testpublic void testSendObject() {Map<String, Object> msg = new HashMap<>();msg.put("name", "jack");msg.put("age", 18);rabbitTemplate.convertAndSend("object.queue", msg);}
接收消息為亂碼:
解決方法:
我們引入消息轉換器,使用json來處理消息。
引入對應依賴:
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
接收方和消費方都配置消息轉換器:
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;@Configuration
public class messageConfig {@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
}
總結
重要點:
- 搭建環境,熟悉RabbitMQ面板與配置
- 使用SpringBoot集成開發配置
- 重點學會@RabbitListener的使用
- 熟悉常見交換機和隊列
- 配置使用消息轉換器