Spring Clould 消息隊列 - RabbitMQ

??視頻地址:微服務(SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式)?

?初識MQ-同步通訊的優缺點(P61,P62)

同步和異步通訊

微服務間通訊有同步和異步兩種方式:

同步通訊:就像打電話,需要實時響應。

異步通訊:就像發郵件,不需要馬上回復。

兩種方式各有優劣,打電話可以立即得到響應,但是你卻不能跟多個人同時通話。發送郵件可以同時與多個人收發郵件,但是往往響應會有延遲。

1.同步通訊

我們之前學習的Feign調用就屬于同步方式,雖然調用可以實時得到結果,但存在下面的問題:

總結:

同步調用的優點:

  • 時效性較強,可以立即得到結果

同步調用的問題:

  • 耦合度高

  • 性能和吞吐能力下降

  • 有額外的資源消耗

  • 有級聯失敗問題

初識MQ-異步通訊的優缺點(P63)

2.異步通訊

異步調用則可以避免上述問題:

我們以購買商品為例,用戶支付后需要調用訂單服務完成訂單狀態修改,調用物流服務,從倉庫分配響應的庫存并準備發貨。

在事件模式中,支付服務是事件發布者(publisher),在支付完成后只需要發布一個支付成功的事件(event),事件中帶上訂單id。

訂單服務和物流服務是事件訂閱者(Consumer),訂閱支付成功的事件,監聽到事件后完成自己業務即可。

為了解除事件發布者與訂閱者之間的耦合,兩者并不是直接通信,而是有一個中間人(Broker)。發布者發布事件到Broker,不關心誰來訂閱事件。訂閱者從Broker訂閱事件,不關心誰發來的消息。

Broker 是一個像數據總線一樣的東西,所有的服務要接收數據和發送數據都發到這個總線上,這個總線就像協議一樣,讓服務間的通訊變得標準和可控。

?

好處:

  • 吞吐量提升:無需等待訂閱者處理完成,響應更快速

  • 故障隔離:服務沒有直接調用,不存在級聯失敗問題

  • 調用間沒有阻塞,不會造成無效的資源占用

  • 耦合度極低,每個服務都可以靈活插拔,可替換

  • 流量削峰:不管發布事件的流量波動多大,都由Broker接收,訂閱者可以按照自己的速度去處理事件

缺點:

  • 架構復雜了,業務沒有明顯的流程線,不好管理

  • 需要依賴于Broker的可靠、安全、性能

好在現在開源軟件或云平臺上 Broker 的軟件是非常成熟的,比較常見的一種就是我們今天要學習的MQ技術。

初識MQ-mq常見技術介紹(P64)

MQ,中文是消息隊列(MessageQueue),字面來看就是存放消息的隊列。也就是事件驅動架構中的Broker。

比較常見的MQ實現:

  • ActiveMQ

  • RabbitMQ

  • RocketMQ

  • Kafka

幾種常見MQ的對比:

RabbitMQActiveMQRocketMQKafka
公司/社區RabbitApache阿里Apache
開發語言ErlangJavaJavaScala&Java
協議支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定義協議自定義協議
可用性一般
單機吞吐量一般非常高
消息延遲微秒級毫秒級毫秒級毫秒以內
消息可靠性一般一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延遲:RabbitMQ、Kafka

kafka性能可靠性一般,吞吐量非常高,一般用于海量數據的操作。

RabbitMQ快速入門-介紹和安裝(P65)

RabbitMQ是基于Erlang語言開發的開源消息通信中間件,官網地址:RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ?

安裝RabbitMQ,參考課前資料:

? RabbitMQ安裝鏈接:? RabbitMQ安裝

頁面信息:?

?

MQ的基本結構:

RabbitMQ中的一些角色:

- publisher:生產者
- consumer:消費者
- exchange個:交換機,負責消息路由
- queue:隊列,存儲消息
- virtualHost:虛擬主機,隔離不同租戶的exchange、queue、消息的隔離

總結:

RabbitMQ快速入門-消息模型介紹(P66)

RabbitMQ官方提供了5個不同的Demo示例,對應了不同的消息模型:

基本消息和工作消息是直接基于隊列實現的。?

RabbitMQ快速入門-簡單隊列模型(P67)

課前資料提供了一個Demo工程,mq-demo:

導入后可以看到結構如下:

包括三部分:

  • mq-demo:父工程,管理項目依賴

  • publisher:消息的發送者

  • consumer:消息的消費者

入門案例

簡單隊列模式的模型圖:

官方的HelloWorld是基于最基礎的消息隊列模型來實現的,只包括三個角色:

  • publisher:消息發布者,將消息發送到隊列queue

  • queue:消息隊列,負責接受并緩存消息

  • consumer:訂閱隊列,處理隊列中的消息

1.publisher實現

思路:

  • 建立連接

  • 創建Channel

  • 聲明隊列

  • 發送消息

  • 關閉連接和channel

代碼實現:

package cn.itcast.mq.helloworld;
?
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立連接ConnectionFactory factory = new ConnectionFactory();// 1.1.設置連接參數,分別是:主機名、端口號、vhost、用戶名、密碼factory.setHost("192.168.150.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");// 1.2.建立連接Connection connection = factory.newConnection();
?// 2.創建通道ChannelChannel channel = connection.createChannel();
?// 3.創建隊列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);
?// 4.發送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("發送消息成功:【" + message + "】");
?// 5.關閉通道和連接channel.close();connection.close();
?}
}

2.consumer實現

代碼思路:

  • 建立連接

  • 創建Channel

  • 聲明隊列

  • 訂閱消息

代碼實現:

package cn.itcast.mq.helloworld;
?
import com.rabbitmq.client.*;
?
import java.io.IOException;
import java.util.concurrent.TimeoutException;
?
public class ConsumerTest {
?public static void main(String[] args) throws IOException, TimeoutException {// 1.建立連接ConnectionFactory factory = new ConnectionFactory();// 1.1.設置連接參數,分別是:主機名、端口號、vhost、用戶名、密碼factory.setHost("192.168.150.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");// 1.2.建立連接Connection connection = factory.newConnection();
?// 2.創建通道ChannelChannel channel = connection.createChannel();
?// 3.創建隊列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);
?// 4.訂閱消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.處理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}

總結

基本消息隊列的消息發送流程:

  1. 建立connection

  2. 創建channel

  3. 利用channel聲明隊列

  4. 利用channel向隊列發送消息

基本消息隊列的消息接收流程:

  1. 建立connection

  2. 創建channel

  3. 利用channel聲明隊列

  4. 定義consumer的消費行為handleDelivery()

  5. 利用channel將消費者與隊列綁定

SpringAMQP-基本介紹(P68)

SpringAMQP是基于RabbitMQ封裝的一套模板,并且還利用SpringBoot對其實現了自動裝配,使用起來非常方便。

SpringAmqp的官方地址:Spring AMQP

SpringAMQP提供了三個功能:

  • 自動聲明隊列、交換機及其綁定關系

  • 基于注解的監聽器模式,異步接收消息

  • 封裝了RabbitTemplate工具,用于發送消息

Basic Queue 簡單隊列模型

在父工程mq-demo中引入依賴

<!--AMQP依賴,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1.消息發送

首先配置MQ地址,在publisher服務的application.yml中添加配置:

spring:rabbitmq:host: 192.168.150.101 # 主機名port: 5672 # 端口virtual-host: / # 虛擬主機username: itcast # 用戶名password: 123321 # 密碼

然后在publisher服務中編寫測試類SpringAmqpTest,并利用RabbitTemplate實現消息發送:

package cn.itcast.mq.spring;
?
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
?
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
?@Autowiredprivate RabbitTemplate rabbitTemplate;
?@Testpublic void testSimpleQueue() {// 隊列名稱String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 發送消息rabbitTemplate.convertAndSend(queueName, message);}
}

2.消息接收

首先配置MQ地址,在consumer服務的application.yml中添加配置:

spring:rabbitmq:host: 192.168.150.101 # 主機名port: 5672 # 端口virtual-host: / # 虛擬主機username: itcast # 用戶名password: 123321 # 密碼

然后在consumer服務的cn.itcast.mq.listener包中新建一個類SpringRabbitListener,代碼如下:

package cn.itcast.mq.listener;
?
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
?
@Component
public class SpringRabbitListener {
?@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消費者接收到消息:【" + msg + "】");}
}

3.測試

啟動consumer服務,然后在publisher服務中運行測試代碼,發送MQ消息

SpringAMQP-入門案例的消息發送(P69)

?

?

?

Basic Queue 簡單隊列模型

在父工程mq-demo中引入依賴

<!--AMQP依賴,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

1.消息發送

首先配置MQ地址,在publisher服務的application.yml中添加配置:

spring:rabbitmq:host: 192.168.150.101 # 主機名port: 5672 # 端口virtual-host: / # 虛擬主機username: itcast # 用戶名password: 123321 # 密碼

然后在publisher服務中編寫測試類SpringAmqpTest,并利用RabbitTemplate實現消息發送:

package cn.itcast.mq.spring;
?
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
?
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
?@Autowiredprivate RabbitTemplate rabbitTemplate;
?@Testpublic void testSimpleQueue() {// 隊列名稱String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 發送消息rabbitTemplate.convertAndSend(queueName, message);}
}

SpringAMQP-入門案例的消息接收(P70)

2.消息接收

首先配置MQ地址,在consumer服務的application.yml中添加配置:

spring:rabbitmq:host: 192.168.150.101 # 主機名port: 5672 # 端口virtual-host: / # 虛擬主機username: itcast # 用戶名password: 123321 # 密碼

然后在consumer服務的cn.itcast.mq.listener包中新建一個類SpringRabbitListener,代碼如下:

package cn.itcast.mq.listener;
?
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
?
@Component
public class SpringRabbitListener {
?@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消費者接收到消息:【" + msg + "】");}
}

3.測試

啟動consumer服務,然后在publisher服務中運行測試代碼,發送MQ消息

?

SpringAMQP-WorkQueue模型(P71)

Work queues,也被稱為(Task queues),任務模型。簡單來說就是讓多個消費者綁定到一個隊列,共同消費隊列中的消息

當消息處理比較耗時的時候,可能生產消息的速度會遠遠大于消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。

此時就可以使用work 模型,多個消費者共同處理消息處理,速度就能大大提高了。

1.消息發送

這次我們循環發送,模擬大量消息堆積現象。

在publisher服務中的SpringAmqpTest類中添加一個測試方法:

/*** workQueue* 向隊列中不停發送消息,模擬消息堆積。*/
@Test
public void testWorkQueue() throws InterruptedException {// 隊列名稱String queueName = "simple.queue";// 消息String message = "hello, message_";for (int i = 0; i < 50; i++) {// 發送消息rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}
}

2.消息接收

要模擬多個消費者綁定同一個隊列,我們在consumer服務的SpringRabbitListener中添加2個新的方法:

@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消費者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);
}
?
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消費者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);
}

注意到這個消費者sleep了1000秒,模擬任務耗時。

3.測試

啟動ConsumerApplication后,在執行publisher服務中剛剛編寫的發送測試方法testWorkQueue。

可以看到消費者1很快完成了自己的25條消息。消費者2卻在緩慢的處理自己的25條消息。

也就是說消息是平均分配給每個消費者,并沒有考慮到消費者的處理能力。這樣顯然是有問題的。

4.能者多勞

在spring中有一個簡單的配置,可以解決這個問題。我們修改consumer服務的application.yml文件,添加配置:

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個消息

5.總結

Work模型的使用:

  • 多個消費者綁定到一個隊列,同一條消息只會被一個消費者處理

  • 通過設置prefetch來控制消費者預取的消息數量

在廣播模式下,消息發送流程是這樣的:

  • 1) 可以有多個隊列

  • 2) 每個隊列都要綁定到Exchange(交換機)

  • 3) 生產者發送的消息,只能發送到交換機,交換機來決定要發給哪個隊列,生產者無法決定

  • 4) 交換機把消息發送給綁定過的所有隊列

  • 5) 訂閱隊列的消費者都能拿到消息

SpringAMQP-發布訂閱模型介紹(P72)

發布訂閱的模型如圖:

可以看到,在訂閱模型中,多了一個exchange角色,而且過程略有變化:

  • Publisher:生產者,也就是要發送消息的程序,但是不再發送到隊列中,而是發給X(交換機)
  • Exchange:交換機,圖中的X。一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有以下3種類型:
    • ? Fanout:廣播,將消息交給所有綁定到交換機的隊列
    • ? Direct:定向,把消息交給符合指定routing key 的隊列
    • ? Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列
  • Consumer:消費者,與以前一樣,訂閱隊列,沒有變化
  • Queue:消息隊列也與以前一樣,接收消息、緩存消息。

Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!

SpringAMQP-FanoutExchange(P73)

Fanout,英文翻譯是扇出,我覺得在MQ中叫廣播更合適。

在廣播模式下,消息發送流程是這樣的:

  • 1) ?可以有多個隊列
  • 2) ?每個隊列都要綁定到Exchange(交換機)
  • 3) ?生產者發送的消息,只能發送到交換機,交換機來決定要發給哪個隊列,生產者無法決定
  • 4) ?交換機把消息發送給綁定過的所有隊列
  • 5) ?訂閱隊列的消費者都能拿到消息

我們的計劃是這樣的:

  • 創建一個交換機 itcast.fanout,類型是Fanout

  • 創建兩個隊列fanout.queue1和fanout.queue2,綁定到交換機itcast.fanout

1.聲明隊列和交換機

Spring提供了一個接口Exchange,來表示所有不同類型的交換機:

在consumer中創建一個類,聲明隊列和交換機:

package cn.itcast.mq.config;
?
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
?
@Configuration
public class FanoutConfig {/*** 聲明交換機* @return Fanout類型交換機*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}
?/*** 第1個隊列*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}
?/*** 綁定隊列和交換機*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}
?/*** 第2個隊列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}
?/*** 綁定隊列和交換機*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

2.消息發送

在publisher服務的SpringAmqpTest類中添加測試方法:

@Test
public void testFanoutExchange() {// 隊列名稱String exchangeName = "itcast.fanout";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);
}

3.消息接收

在consumer服務的SpringRabbitListener中添加兩個方法,作為消費者:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消費者1接收到Fanout消息:【" + msg + "】");
}
?
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {System.out.println("消費者2接收到Fanout消息:【" + msg + "】");
}

4.總結

交換機的作用是什么?

  • 接收publisher發送的消息

  • 將消息按照規則路由到與之綁定的隊列

  • 不能緩存消息,路由失敗,消息丟失

  • FanoutExchange的會將消息路由到每個綁定的隊列

聲明隊列、交換機、綁定關系的Bean是什么?

  • Queue

  • FanoutExchange

  • Binding

SpringAMQP-DirectExchange(P74)

在Fanout模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。

在Direct模型下:

  • 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)

  • 消息的發送方在 向 Exchange發送消息時,也必須指定消息的 RoutingKey

  • Exchange不再把消息交給每一個綁定的隊列,而是根據消息的Routing Key進行判斷,只有隊列的Routingkey與消息的 Routing key完全一致,才會接收到消息

案例需求如下

  1. 利用@RabbitListener聲明Exchange、Queue、RoutingKey

  2. 在consumer服務中,編寫兩個消費者方法,分別監聽direct.queue1和direct.queue2

  3. 在publisher中編寫測試方法,向itcast. direct發送消息

1.基于注解聲明隊列和交換機

基于@Bean的方式聲明隊列和交換機比較麻煩,Spring還提供了基于注解方式來聲明。

在consumer的SpringRabbitListener中添加兩個消費者,同時基于注解來聲明隊列和交換機:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消費者接收到direct.queue1的消息:【" + msg + "】");
}
?
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消費者接收到direct.queue2的消息:【" + msg + "】");
}

2.消息發送

在publisher服務的SpringAmqpTest類中添加測試方法:

@Test
public void testSendDirectExchange() {// 交換機名稱String exchangeName = "itcast.direct";// 消息String message = "紅色警報!日本亂排核廢水,導致海洋生物變異,驚現哥斯拉!";// 發送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
}

3.總結

描述下Direct交換機與Fanout交換機的差異?

  • Fanout交換機將消息路由給每一個與之綁定的隊列

  • Direct交換機根據RoutingKey判斷路由給哪個隊列

  • 如果多個隊列具有相同的RoutingKey,則與Fanout功能類似

基于@RabbitListener注解聲明隊列和交換機有哪些常見注解?

  • @Queue

  • @Exchange

SpringAMQP-TopicExchange(P75)

1.說明

Topic類型的ExchangeDirect相比,都是可以根據RoutingKey把消息路由到不同的隊列。只不過Topic類型Exchange可以讓隊列在綁定Routing key 的時候使用通配符!

Routingkey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如: item.insert

通配符規則:

#:匹配一個或多個詞

*:匹配不多不少恰好1個詞

舉例:

item.#:能夠匹配item.spu.insert 或者 item.spu

item.*:只能匹配item.spu

圖示:

解釋:

  • Queue1:綁定的是china.# ,因此凡是以 china.開頭的routing key 都會被匹配到。包括china.news和china.weather

  • Queue2:綁定的是#.news ,因此凡是以 .news結尾的 routing key 都會被匹配。包括china.news和japan.news

案例需求:

實現思路如下:

  1. 并利用@RabbitListener聲明Exchange、Queue、RoutingKey

  2. 在consumer服務中,編寫兩個消費者方法,分別監聽topic.queue1和topic.queue2

  3. 在publisher中編寫測試方法,向itcast. topic發送消息

2.消息發送

在publisher服務的SpringAmqpTest類中添加測試方法:

/*** topicExchange*/
@Test
public void testSendTopicExchange() {// 交換機名稱String exchangeName = "itcast.topic";// 消息String message = "喜報!孫悟空大戰哥斯拉,勝!";// 發送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}

3.消息接收

在consumer服務的SpringRabbitListener中添加方法:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消費者接收到topic.queue1的消息:【" + msg + "】");
}
?
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消費者接收到topic.queue2的消息:【" + msg + "】");
}

4.總結

描述下Direct交換機與Topic交換機的差異?

  • Topic交換機接收的消息RoutingKey必須是多個單詞,以 **.** 分割

  • Topic交換機與隊列綁定時的bindingKey可以指定通配符

  • #:代表0個或多個詞

  • *:代表1個詞

SpringAMQP-消息轉換器(P76)

之前說過,Spring會把你發送的消息序列化為字節發送給MQ,接收消息的時候,還會把字節反序列化為Java對象。

只不過,默認情況下Spring采用的序列化方式是JDK序列化。眾所周知,JDK序列化存在下列問題:

  • 數據體積過大

  • 有安全漏洞

  • 可讀性差

我們來測試一下。

1.測試默認轉換器

我們修改消息發送的代碼,發送一個Map對象:

@Test
public void testSendMap() throws InterruptedException {// 準備消息Map<String,Object> msg = new HashMap<>();msg.put("name", "Jack");msg.put("age", 21);// 發送消息rabbitTemplate.convertAndSend("simple.queue","", msg);
}

停止consumer服務

發送消息后查看控制臺:

2.配置JSON轉換器

顯然,JDK序列化方式并不合適。我們希望消息體的體積更小、可讀性更高,因此可以使用JSON方式來做序列化和反序列化。

在publisher和consumer兩個服務中都引入依賴:

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

配置消息轉換器。

在啟動類中添加一個Bean即可:

@Bean
public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/42117.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/42117.shtml
英文地址,請注明出處:http://en.pswp.cn/news/42117.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

數據庫名字添加中文

Jetbrains 可以呀&#xff0c;這個ui 相當棒 from database import Sqlite3Database from googletrans import Translator import csvif __name__ "__main__":TRANS_EN2ZH Falsetranslator Translator()sqlite Sqlite3Database("./drurmu.db")sqlite.r…

x.view(a,b)及x = x.view(x.size(0), -1) 的理解說明

x.view()就是對tensor進行reshape&#xff1a; 我們在創建一個網絡的時候&#xff0c;會在Foward函數內看到view的使用。 首先這里是一個簡單的網絡&#xff0c;有卷積和全連接組成。它的foward函數如下&#xff1a; class NET(nn.Module):def __init__(self,batch_size):sup…

小米交卷大模型,全新小愛同學實測來了

本文源自&#xff1a;量子位 果然只有雷軍和小米&#xff0c;能搶走風口上大模型的熱度。 在雷軍的年度演講分享中&#xff0c;講武大求學經歷&#xff0c;分享學霸4年大學2年完課經驗&#xff1b;講被《硅谷之火》點燃&#xff0c;勤奮練習寫最好的代碼&#xff0c;開啟第一…

armbian使用1panel快速部署部署springBoot項目后端

文章目錄 前言環境準備實現步驟第一步&#xff1a;Armbian安裝1panel第二步&#xff1a;安裝數據庫第三步&#xff1a;查看數據庫容器重要信息【重要】查看容器所在的網絡查看容器連接地址 第四步&#xff1a;項目配置和打包第五步:構建項目鏡像 前言 這里只是簡單記錄部署spr…

一次性解決office部署問題(即點即用等)

前言 因為之前電腦安裝了office2019&#xff0c;后面需要安裝Visio&#xff0c;下載安裝時報錯30204-44,查看發現之前安裝的office版本是即點即用版&#xff0c;可能這兩者不兼容。網上搜索教程等&#xff0c;最后發現一個工具&#xff1a;Office Tool Plus&#xff0c;可以方便…

【水文學法總結】河道內生態流量計算方法(含MATLAB實現代碼)

生態流量&#xff08;Ecological Flow, EF&#xff09; 是指維持河道內生態環境所需要的水流流量。生態流量計算方法眾多&#xff0c;主要分為水文學方法、棲息地模擬法、水力學方法、整體法等&#xff0c;各方法多用于計算維持河道生態平衡的最小生態流量&#xff08;Minimum …

LeetCode 141.環形鏈表

文章目錄 &#x1f4a1;題目分析&#x1f4a1;解題思路&#x1f514;接口源碼&#x1f4a1;深度思考?思考1?思考2 題目鏈接&#x1f449; LeetCode 141.環形鏈表&#x1f448; &#x1f4a1;題目分析 給你一個鏈表的頭節點 head &#xff0c;判斷鏈表中是否有環。 如果鏈表中…

【ES6】—let 聲明方式

一、不屬于頂層對象window let 關鍵字聲明的變量&#xff0c;不會掛載到window的屬性 var a 5 console.log(a) console.log(window.a) // 5 // 5 // 變量a 被掛載到window屬性上了 &#xff0c; a window.alet b 6 console.log(b) console.log(window.b) // 6 // undefin…

原生js獲取今天、昨天、近7天的時間(年月日時分秒)

有的時候我們需要將今天,昨天,近7天的時間(年月日時分秒)作為參數傳遞給后端,如下圖: 那怎么生成這些時間呢?如下代碼里,在methods里的toDay方法、yesterDay方法、weekDay方法分別用于生成今天、昨天和近7天的時間: <template><div class="box"&…

暫停Windows更新的方法,可延后數十萬年,簡單且有手就行

前言 近年來&#xff0c;Windows更新頻率過快&#xff0c;最大只能暫停更新5周&#xff0c;導致用戶不厭其煩&#xff0c;從網上找到的暫停更新的方法不是過于繁瑣就是毫無效果&#xff0c;或者是暫停的時間有限&#xff0c;無意中發現一個大神的帖子可以通過修改注冊表信息以達…

Java定時任務方案

一、Timer import java.util.Timer; import java.util.TimerTask;public class TimerExample {public static void main(String[] args) {Timer timer new Timer();TimerTask task new TimerTask() {Overridepublic void run() {System.out.println("Task executed at:…

uni-app自定義多環境配置,動態修改appid

背景 在企業級項目開發中&#xff0c;一般都會分為開發、測試、預發布、生產等多個環境&#xff0c;在工程化中使用不同的打包命令改變環境變量解決不同環境各種變量需要手動修改的問題&#xff0c;比如接口請求地址&#xff0c;不同環境的請求路徑前綴都是不同的。在使用uni-…

Docker中為RabbitMQ安裝rabbitmq_delayed_message_exchange延遲隊列插件

1、前言 rabbitmq_delayed_message_exchange是一款向RabbitMQ添加延遲消息傳遞&#xff08;或計劃消息傳遞&#xff09;的插件。 插件下載地址&#xff1a;https://www.rabbitmq.com/community-plugins.html 1、下載插件 首先需要確定我們當前使用的RabbitMQ的版本&#xff0c…

Android隱藏輸入法

1、方法一(如果輸入法在窗口上已經顯示&#xff0c;則隱藏&#xff0c;反之則顯示) InputMethodManager imm (InputMethodManager) getSystemService(Context.INPUT_METHOD_SERVICE); imm.toggleSoftInput(0, InputMethodManager.HIDE_NOT_ALWAYS); 2、方法二(view為接受軟…

實踐教程|基于 pytorch 實現模型剪枝

PyTorch剪枝方法詳解&#xff0c;附詳細代碼。 一&#xff0c;剪枝分類 1.1&#xff0c;非結構化剪枝 1.2&#xff0c;結構化剪枝 1.3&#xff0c;本地與全局修剪 二&#xff0c;PyTorch 的剪枝 2.1&#xff0c;pytorch 剪枝工作原理 2.2&#xff0c;局部剪枝 2.3&#…

前端如何安全的渲染HTML字符串?

在現代的Web 應用中&#xff0c;動態生成和渲染 HTML 字符串是很常見的需求。然而&#xff0c;不正確地渲染HTML字符串可能會導致安全漏洞&#xff0c;例如跨站腳本攻擊&#xff08;XSS&#xff09;。為了確保應用的安全性&#xff0c;我們需要采取一些措施來在安全的環境下渲染…

QString常用函數介紹

此篇博客核心介紹QT中的QString類型的常用函數&#xff0c;介紹到的函數均從幫助手冊或其他博客中看到 QString 字符串類 Header: #include qmake: QT core 一、QString字符串轉換 1、QString類字符串轉換為整數 int toInt(bool *ok Q_NULLPTR, int base 10) cons…

Python 基礎 -- Tutorial(二)

5、數據結構 本章更詳細地描述了一些你已經學過的東西&#xff0c;并添加了一些新的東西。 5.1. 更多關于Lists 列表(list)數據類型有更多的方法。下面是列表對象的所有方法: list.append(x) 在列表末尾添加一項。相當于a[len(a):] [x]。 list.extend(iterable) 通過添加可…

如何使用SpringBoot 自定義轉換器

&#x1f600;前言 本篇博文是關于SpringBoot 自定義轉換器的使用&#xff0c;希望你能夠喜歡&#x1f60a; &#x1f3e0;個人主頁&#xff1a;晨犀主頁 &#x1f9d1;個人簡介&#xff1a;大家好&#xff0c;我是晨犀&#xff0c;希望我的文章可以幫助到大家&#xff0c;您的…