初識MQ
同步調用
目前我們采用的是基于OpenFeign的同步調用,也就是說業務執行流程是這樣的:
-
支付服務需要先調用用戶服務完成余額扣減
-
然后支付服務自己要更新支付流水單的狀態
-
然后支付服務調用交易服務,更新業務訂單狀態為已支付
三個步驟依次執行。
這其中就存在3個問題:
第一,拓展性差
但是隨著業務規模擴大,產品的功能也在不斷完善,最終支付業務會越來越臃腫。
第二,性能下降?
采用了同步調用,調用者需要等待服務提供者執行完返回結果后,才能繼續向下執行,也就是說每次遠程調用,調用者都是阻塞等待狀態。最終整個業務的響應時長就是每次遠程調用的執行時長之和:
第三,級聯失敗?
由于我們是基于OpenFeign調用交易服務、通知服務。當交易服務、通知服務出現故障時,整個事務都會回滾,交易失敗。
這其實就是同步調用的級聯失敗問題。
異步調用
異步調用方式其實就是基于消息通知的方式,一般包含三個角色:
-
消息發送者:投遞消息的人,就是原來的調用方
-
消息Broker:管理、暫存、轉發消息,你可以把它理解成微信服務器
-
消息接收者:接收和處理消息的人,就是原來的服務提供方
在異步調用中,發送者不再直接同步調用接收者的業務接口,而是發送一條消息投遞給消息Broker。然后接收者根據自己的需求從消息Broker那里訂閱消息。每當發送方發送消息后,接受者都能獲取消息并處理。
這樣,發送消息的人和接收消息的人就完全解耦了。
異步調用的優勢包括:
-
耦合度更低
-
性能更好
-
業務拓展性強
-
故障隔離,避免級聯失敗
當然,異步通信也并非完美無缺,它存在下列缺點:
-
完全依賴于Broker的可靠性、安全性和性能
-
架構復雜,后期維護和調試麻煩
常見的消息隊列(MessageQueue)
目比較常見的MQ實現:
-
ActiveMQ
-
RabbitMQ
-
RocketMQ
-
Kafka
RabbitMQ ActiveMQ RocketMQ Kafka 公司/社區 Rabbit Apache 阿里 Apache 開發語言 Erlang Java Java Scala&Java 協議支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定義協議 自定義協議 可用性 高 一般 高 高 單機吞吐量 一般 差 高 非常高 消息延遲 微秒級 毫秒級 毫秒級 毫秒以內 消息可靠性 高 一般 高 一般
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延遲:RabbitMQ、Kafka
RabbitMQ
RabbitMQ是基于Erlang語言開發的開源消息通信中間件,官網地址:
RabbitMQ: One broker to queue them all | RabbitMQ
RabbitMQ對應的架構如圖:
其中包含幾個概念:
-
publisher
:生產者,也就是發送消息的一方 -
consumer
:消費者,也就是消費消息的一方 -
queue
:隊列,存儲消息。生產者投遞的消息會暫存在消息隊列中,等待消費者處理 -
exchange
:交換機,負責消息路由。生產者發送的消息由交換機決定投遞到哪個隊列。 -
virtual host
:虛擬主機,起到數據隔離的作用。每個虛擬主機相互獨立,有各自的exchange、queue
交換機
首先展示交換機項下的創建交換機:
我們點擊任意交換機,即可進入交換機詳情頁面。仍然會利用控制臺中的publish message 發送一條消息:
隊列
我們打開Queues
選項卡,新建一個隊列:
數據隔離
用戶管理
點擊Admin
選項卡,首先會看到RabbitMQ控制臺的用戶管理界面:
virtual host
SpringAMQP
???RabbitMQ
采用了AMQP協議,因此它具備跨語言的特性。任何語言只要遵循AMQP協議收發消息,都可以與RabbitMQ
交互。并且RabbitMQ
官方也提供了各種不同語言的客戶端。
SpringAmqp的官方地址:
Spring AMQP
SpringAMQP提供了三個功能:
-
自動聲明隊列、交換機及其綁定關系
-
基于注解的監聽器模式,異步接收消息
-
封裝了RabbitTemplate工具,用于發送消息
快速入門
-
publisher直接發送消息到隊列
-
消費者監聽并處理隊列中的消息
導入依賴
<!--AMQP依賴,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
工程中就可以直接使用SpringAMQP了。
消息發送
首先配置MQ地址,在publisher
服務的application.yml
中添加配置:
spring:rabbitmq:host: 192.168.100.128 # 你的虛擬機IPport: 5672 # 端口virtual-host: /hmall # 虛擬主機username: hmall # 用戶名password: 123 # 密碼
然后在publisher
服務中編寫測試類SpringAmqpTest
,并利用RabbitTemplate
實現消息發送:
消息接收
首先配置MQ地址,在consumer
服務的application.yml
中添加配置:
spring:rabbitmq:host: 192.168.150.101 # 你的虛擬機IPport: 5672 # 端口virtual-host: /hmall # 虛擬主機username: hmall # 用戶名password: 123 # 密碼
然后在consumer
服務的com.itheima.consumer.listener
包中新建一個類SpringRabbitListener
,代碼如下:
測試
啟動consumer服務,然后在publisher服務中運行測試代碼,發送MQ消息。最終consumer收到消息:
WorkQueues模型
Work queues,任務模型。簡單來說就是讓多個消費者綁定到一個隊列,共同消費隊列中的消息。
當消息處理比較耗時的時候,可能生產消息的速度會遠遠大于消息的消費速度。長此以往,消息就會堆積越來越多,無法及時處理。
此時就可以使用work 模型,
多個消費者共同處理消息處理,消息處理的速度就能大大提高了。
但消息是平均分配給每個消費者,并沒有考慮到消費者的處理能力。
導致1個消費者空閑,另一個消費者忙的不可開交。沒有充分利用每一個消費者的能力
能者多勞
在spring中有一個簡單的配置,可以解決這個問題。我們修改consumer服務的application.yml文件,添加配置:
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能獲取一條消息,處理完成才能獲取下一個消息
?這樣充分利用了每一個消費者的處理能力,可以有效避免消息積壓問題。
總結
Work模型的使用:
-
多個消費者綁定到一個隊列,同一條消息只會被一個消費者處理
-
通過設置prefetch來控制消費者預取的消息數量
交換機類型
-
Publisher:生產者,不再發送消息到隊列中,而是發給交換機
-
Exchange:交換機,一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。
-
Queue:消息隊列也與以前一樣,接收消息、緩存消息。不過隊列一定要與交換機綁定。
-
Consumer:消費者,與以前一樣,訂閱隊列,沒有變化
Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!
交換機的類型有四種:
-
Fanout:廣播,將消息交給所有綁定到交換機的隊列。我們最早在控制臺使用的正是Fanout交換機
-
Direct:訂閱,基于RoutingKey(路由key)發送給訂閱了消息的隊列
-
Topic:通配符訂閱,與Direct類似,只不過RoutingKey可以使用通配符
-
Headers:頭匹配,基于MQ的消息頭匹配,用的較少。
Fanout交換機
Fanout,英文翻譯是扇出,我們學過的廣播,發出消息任何綁定的隊列都可以收到。
-
1) 可以有多個隊列
-
2) 每個隊列都要綁定到Exchange(交換機)
-
3) 生產者發送的消息,只能發送到交換機
-
4) 交換機把消息發送給綁定過的所有隊列
-
5) 訂閱隊列的消費者都能拿到消息
消息發送
在有交換機參與時,發送方調用的參數時要注意參數類型
rabbitTemplate.convertAndSend( exchangeName,? "",? message );
第一個參數:交換機名稱
第二個參數:交換機與隊列綁定的RoutingKey值
第三個參數:消息對象
總結
交換機的作用:
-
接收publisher發送的消息
-
將消息按照規則路由到與之綁定的隊列
-
不能緩存消息,路由失敗,消息丟失
-
FanoutExchange的會將消息路由到每個綁定的隊列
Direct交換機
在Fanout模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。
在Direct模型下:
-
隊列與交換機的綁定,不能是任意綁定了,而是要指定一個
RoutingKey
(路由key) -
消息的發送方在 向 Exchange發送消息時,也必須指定消息的
RoutingKey
。 -
Exchange不再把消息交給每一個綁定的隊列,而是根據消息的
Routing Key
進行判斷,只有隊列的Routingkey
與消息的Routing key
完全一致,才會接收到消息
總結
描述下Direct交換機與Fanout交換機的差異
-
Fanout交換機將消息路由給每一個與之綁定的隊列
-
Direct交換機根據RoutingKey判斷路由給哪個隊列
-
如果多個隊列具有相同的RoutingKey,則與Fanout功能類似
Topic交換機
說明
Topic
類型的Exchange
與Direct
相比,都是可以根據RoutingKey
把消息路由到不同的隊列。
只不過Topic
類型Exchange
可以讓隊列在綁定BindingKey
的時候使用通配符!
BindingKey
一般都是有一個或多個單詞組成,多個單詞之間以.
分割,例如: item.insert
通配符規則:
-
#
:匹配一個或多個詞 -
*
:匹配不多不少恰好1個詞
總結
描述下Direct交換機與Topic交換機的差異?
-
Topic交換機接收的消息RoutingKey必須是多個單詞,以
.
分割 -
Topic交換機與隊列綁定時的bindingKey可以指定通配符
-
#
:代表0個或多個詞 -
*
:代表1個詞
聲明隊列和交換機
? ? ? ? ?通過編寫代碼的方式來聲明創建交換機和隊列
? ? ? ? ?程序啟動時檢查隊列和交換機是否存在,如果不存在自動創建。
基本API
SpringAMQP提供了一個Queue類,用來創建隊列:
SpringAMQP還提供了一個Exchange接口,來表示所有不同類型的交換機:
我們可以自己創建隊列和交換機,不過SpringAMQP還提供了ExchangeBuilder來簡化這個過程:
而在綁定隊列和交換機時,則需要使用BindingBuilder來創建Binding對象:
示例:
基于注解聲明
基于@Bean的方式聲明隊列和交換機比較麻煩,Spring還提供了基于注解方式來聲明。
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消費者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消費者2接收到direct.queue2的消息:【" + msg + "】");
}
消息轉換器
Spring的消息發送代碼接收的消息體是一個Object:
而在數據傳輸時,它會把你發送的消息序列化為字節發送給MQ,接收消息的時候,還會把字節反序列化為Java對象。
只不過,默認情況下Spring采用的序列化方式是JDK序列化。眾所周知,JDK序列化存在下列問題:
-
數據體積過大
-
有安全漏洞
-
可讀性差
配置JSON轉換器
顯然,JDK序列化方式并不合適。我們希望消息體的體積更小、可讀性更高,因此可以使用JSON方式來做序列化和反序列化。
在publisher
和consumer
兩個服務中都引入依賴:
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
注意,如果項目中引入了spring-boot-starter-
web
依賴,則無需再次引入Jackson
依賴。
配置消息轉換器,在publisher
和consumer
兩個服務的啟動類中添加一個Bean即可:
@Bean
public MessageConverter messageConverter(){// 1.定義消息轉換器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自動創建消息id,用于識別不同消息,也可以在業務中基于ID判斷是否是重復消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}
注意: publisher用什么類型的消息傳遞,接收者也要用什么類型來接收