高性能異步通信組件。
同步調用
以支付為例:
可見容易發生雪崩。
異步調用
以支付為例:
支付服務當甩手掌柜了,不管后面的幾個服務的結果。只管庫庫發,后面那幾個服務想取的時候就取,因為消息代理里可以一直裝,緩存消息。
消息代理(英文Broker)
消息代理相關的技術MQ技術
Erlang面向并發的語言
?
RabbitMQ安裝部署
可直接采用Docker,方便。
RabbitMQ消息流轉的過程(整體架構)
消費者監聽隊列,發送者不直接發給隊列,而是發給exchange交換機,交換機會根據規則把消息路由給不同隊列。
因為RabbitMQ的性能很強,每秒鐘可以達到數萬的并發,所以企業有多個項目的話,往往部署一套RabbitMQ就夠了,多個項目可以共享RabbitMQ服務。但是大家一起的話,很可能交換機出現沖突,所以引出RabbitMQ的新概念virtual-host(虛擬主機),類似與MySQL里的database(眾所周知,MySQL里可以創建多個database,每個database的表和其他database的表直接是相互隔離的),每個項目都創一個自己的virtual-host,就可以相互隔離開了。
例子:MQ入門-06.RabbitMQ-快速入門_嗶哩嗶哩_bilibili
交換機不存只負責轉發。交換機和隊列必須有一個關系,才能給隊列發消息。binding綁定關系。
數據隔離
虛擬主機是實現了數據隔離。
不同的項目創建不同的用戶。為新建用戶建一個虛擬主機。
視頻舉例:MQ入門-07.RabbitMQ-數據隔離_嗶哩嗶哩_bilibili
RabbitMQ的java客戶端
這里我們不采用Rabbit官方提供的java客戶端,而是Spring AMQP,它是基于AMQP協議(消息收發與語言和平臺無關),官方提供的java客戶端使用起來繁瑣,所以使用Spring AMQP。
4:00??MQ入門-08.Java客戶端-快速入門_嗶哩嗶哩_bilibili
因為是簡單入門案例,可以省去交換機這個步驟。
控制臺創建隊列->pom里引入spring-amqp依賴->yaml里配置RabbitMQ服務端信息(如地址主機名、端口、虛擬主機名、用戶、密碼),這樣微服務才能連接到RabbitMQ->發送消息(SpringAMQP提供了RabbitTemplate工具類,方便我們發送消息)->接受消息
接收者接受消息的代碼:加一個@Comonent把它注冊成Spring的一個Bean,這個類的內部要有一個方法,這個方法要加上@RabbitListener(也就是消費者的監聽者)的注解后面再帶上隊列名,現在只要隊列有消息,方法就可以拿到了。方法參數自己設。
WorkQueues
任務模型,讓多個消費者綁定到一個隊列,共同消費隊列中的消息。
那么隊列中的消息會被哪個消費者收到呢?
模擬WorkQueue(實現一個隊列綁定多個消費者)
MQ入門-09.Java客戶端-WorkQueue_嗶哩嗶哩_bilibili
在RabbitMQ控制臺建立一個隊列->一個發消息的->2個消費者
下圖兩個消費者,一個發送者。
?
隊列同一個消息只能被一個消費者處理,很多條消息的話均勻分配(默認輪詢,1人1條)。(上圖2個消費者結果有奇偶規律)
把很多消息平均分給消費者,可以加快消息處理速度。
每個消費者能力不一樣(通過sleep(ms)修改),均勻分配消息(默認)肯定是不合理的,只需要修改application.yml,設置preFetch值為1,實現能者多勞。
Fanout交換機(廣播交換機)
再看一下以前學的這個發消息的過程。帶有交換機的完整模型。
交換機的作用是接受發送者發送的消息,并將消息路由到與其綁定的隊列。
那么什么是Fanout交換器?
特征:把接收到的消息路由到每一個和他綁定的隊列。(隊列中的消息只能被一個消費者處理,有了Fanout交換機,發的消息就可以被多個消費者處理了。我們完全可以給每個微服務創建一個隊列,然后隊列綁到交換機上,fanout交換機想廣播一樣,給每個隊列群發/復制消息。)
案例步驟:聲明2個隊列,一個交換機exhanges,然后banding綁定,第一種方法可以直接用控制臺發消息然后查看,還有一種方法java代碼,2段接受代碼,1段發送代碼(這回調3個參數的api,exchange,null,message,之前寫的例子用到的都是兩個參數,隊列名和消息)。
Direct交換機(定向)
MQ入門-11.Java客戶端-Direct交換機_嗶哩嗶哩_bilibili
和原來差不多,這回交換機選direct,隊列設置bindingkey,發送時發送者的參數分別設好exchange,routingkey值,message。其余的步驟和原來一樣。
所以兩個key值一樣,direct交換機也能實現Fanout交換機的功能。一個queue可以設多個bindingkey。
Topic交換機(話題)
?隊列bindingkey可以通過通配符簡易設置。
topic和direct比除了多了一個統配符,功能差不多。
聲明隊列交換機
之前隊列和交換機的創建都是依靠控制臺,這次學習用代碼聲明隊列交換機,這樣項目一啟動就會自己創建隊列和交換機了。
可以用new的方式。(更簡單)
也可以用builder方式。
發送者只管發,什么也不關心,所以通常我們在消費者這一端,聲明隊列交換機及綁定關系。
步驟(以Fanout交換機為例):創建FanoutConfiguration類,聲明交換機,隊列,綁定關系。可以new(比較簡單),也可以builder構建(比較專業)。
上述基于JavaBean綁定太麻煩了,這回學習注解@RatbbitListener。
基于JavaBean還是基于注解,完全個人喜好了。(但是如果基于javabean,聲明direct交換機好像沒法寫很多bindingKey)
消息轉換器
以前我們發消息一直用到rabbitTemplate的convertAndSend方法。我們發消息是可以傳任何java對象作為消息,網絡傳輸其實是以字節傳的,直接對象不行,因此傳的時候要把java對象轉換成字節。這個轉換就是由消息轉換器轉的。
java里有一種JDK自帶的序列化的一個工具(能把任一java對象序列化成字節的形式),所以這個消息轉換的過程就是采用JDK自帶的序列化方式。
ObjectOutputStream:對象流,jdk自帶的序列化工具,能把任意java對象序列化成字節。
但是推薦使用JSON的消息轉換器。
發送方和接收方一定要用相同的消息轉換器。
pom配置好之后到MAVEN里刷新一下。
給發送方和接收方都配置一個消息轉換類。
對比圖(下邊的JSON的消息轉換器)
業務改造
把業務從OpenFeign的同步調用改成基于MQ的異步調用。
OpenFeign:02-基本概念_嗶哩嗶哩_bilibili
nacos可以抽取共享配置,不用重復進行同樣的配置。
共同代碼可以寫到common里
消息可靠性
發送者的可靠性
發送者重連
yml
發送者確認
路由失敗的兩種原因:
exchange2交換機沒連隊列,路由失敗。返回ACK(因為消息確實發出去了)
routingkey和bandingkey沒有匹配上的。
如果返回NACK要重發消息。
none是關閉(默認)
simple是同步阻塞
correlated異步的
至此,ConfirmCallback和ReturnCallback全部寫完了。
唯一,uuid,最簡單的隨機算法
MQ的可靠性
消息傳到MQ也不一樣可靠,因為MQ本身也可能把消息弄丟。
MQ內存儲存的方式,可能弄丟消息,還有可能導致MQ性能下降和阻塞。
為此,產生兩個方案:數據持久化(把數據提前持久化存到磁盤,提前的意思:不是等到滿了再存到磁盤,提前就開始往磁盤存了)、LazyQueue
數據持久化
持久化的一種方法:寫出到磁盤,這樣就永久保存了。
交換機的持久化(默認都是持久的交換機、durable屬性)(Spring的AMQP代碼生成的交換機默認也是持久化的)
隊列持久化(默認持久化durable屬性)(Spring的AMQP代碼生成的默認也是持久化的)
消息持久化(手動設置為Persistent)
LazyQueue
上節課寫了數據持久化,當我們把交換機、隊列、消息持久化了以后,就不用再擔心MQ宕機而導致消息丟失了,不僅如此,RabbitMQ也不會再因為消息堆積配置out而出現阻塞了。但是數據持久化后,不僅要在內存里寫,還要再磁盤里寫一份,這樣每條消息處理的耗時就增加了,這也就導致它的整體并發能力有點下降。為了解決這個問題,引入了一種新的隊列模式,lazy Queue(消息直接寫磁盤里)。它不僅僅具有數據持久化的優勢,同時還解決了并發能力下降的問題。
那么怎樣去設置一個隊列變成Lazy Queue模式呢
第一種方法,控制臺添加
第二種方法,代碼添加(聲明bean的方式或者注解都可以)
消費者的可靠性
消費者確認機制
三種狀態(ack,nack,reject):不管哪種狀態,都不能在剛收到消息的時候就返回,我們應該根據處理結果去做判斷,也就是consumer處理完的時候。
注意:這回配置的是消費者下的application文件。
有個缺點:消息異常了就會像踢球一樣反復被踢來踢去,下面學的失敗重試機制就可以解決這個問題(設置最大嘗試次數)
失敗重試機制
第一種:重試次數耗盡后直接reject,不要了
第二種:重試次數耗盡后,返回nack,消息回到隊列,再重試
第三種:重試次數耗盡后,消費者將失敗消息扔進另一個交換機,這個交換機連著另一個隊列
下面以第三種方式為例(相比于前兩種更復雜,多了一個交換機和一個新隊列)
業務冪等性
解決重復消費問題(比如事務發到消費者那了,本來消費者要給MQ返回一個ACK,因為斷電沒發送出去,MQ以為消費者宕機了,于是恢復電的時候,MQ又會重新給消費者發送一次,可消費者已經處理完這個事務了,如果這個事務涉及到支付,那消費者就會白白消費兩次。)這時就用到業務冪等性了。
下面以消息重復提交導致業務被重復執行的這個場景為例的幾個方案:
1.唯一id(我們可以給消息帶上唯一的id)
這種方案存在業務侵入的問題:本來是沒有id這個屬性的,現在的消息都帶了id屬性,并且還放在了數據庫里!!!!!
2.業務判斷
加了一個業務判斷,當訂單來了后,不急著標記訂單為已支付,而是每次都在標記前查詢一下訂單狀態。
延遲消息
在上面三個可靠性都失敗下的兜底方案——延遲消息
死信交換機
一旦我們通過dead-letter-exchange屬性指定了一個交換機,那么死信就都會被投到這個交換機里,不會被丟棄了。
我們已經有了一組交換機和隊列,再準備一組特殊的交換機和隊列,給上面的隊列通過dead-letter-exchange屬性指定下面的交換機,現在下面的交換機就變成上面的隊列的死信交換機了。我們不會給上面的隊列綁定消費者,通常是給死信交換機的隊列綁定消費者。發送者向上面的交換機發消息,發消息時給消息設置過期時間TTL=30s,然后消息就發到上面的隊列,因為上面的隊列沒有消費者,就在這里停下來了開始等待,有個計時器開始等待數30秒,30秒后消息變成死信進入死信交換機。這樣就通過死信交換機的形式實現了延遲效果。
延遲消息插件
用了這個插件,只要簡單的把delay屬性配置成true就好了,然后就可以實現延遲消息。
步驟:首先下載配置好插件->聲明交換機并設置其delay屬性為true(可用注解或bean方式)->發消息的時候要設置一個過期時間
利用延遲消息解決用戶訂單超時未支付問題
可以分成幾步:下單之后發消息,完成延遲消息的發送。編寫消息監聽,它要干的事是去修改訂單狀態。利用延遲消息實現延時任務的效果。
MQ高級-13.延遲消息-取消超時訂單_嗶哩嗶哩_bilibili