目錄
??、MQ如何保證消息不丟失
1?、哪些環節可能會丟消息
2?、?產者發送消息如何保證不丟失
2.1、?產者發送消息確認機制
2.2、Rocket?MQ的事務消息機制
2.3?、Broker寫?數據如何保證不丟失?
2.3.1** ?先需要理解操作系統是如何把消息寫?到磁盤的**。
2.3.2然后來看MQ是如何調?fsync的?
2.4?、Broker主從同步如何保證不丟失?
2.5?、消費者消費消息如何不丟失
2.6?、如果MQ服務全部掛了,?如何保證不丟失?
2.7?、MQ消息零丟失?案總結
面試題:說說你的項目RocketMQ如何保證消息不丟失??
??、MQ如何保證消息的順序性
三、MQ如何保證消息冪等性?
1?、?產者發送消息到服務端如何保持冪等?
2?、消費者消費消息如何保持冪等?
?四、MQ如何快速處理積壓的消息
1?、消息積壓會有哪些問題。
2?、怎么處理?量積壓的消息
請問RocketMQ消息積壓一般產生原因是什么?如何解決消息積壓問題呢?
五、Rocket?MQ課程總結
??、MQ如何保證消息不丟失
1?、哪些環節可能會丟消息
?先分析下MQ的整個消息鏈路中,有哪些步驟是可能會丟消息的
其中,?1,?2?,4三個場景都是跨?絡的,?跨?絡就肯定會有丟消息的可能。
然后關于3這個環節,通常MQ存盤時都會先寫?操作系統的緩存page?cache中,然后再由操作系統異步的將??消息寫?硬盤?。這個中間有個時間差,就可能會造成消息丟失?。如果服務掛了,緩存中還沒有來得及寫?硬盤?的消息就會丟失。
2?、?產者發送消息如何保證不丟失
?產者發送消息之所以可能會丟消息,都是因為?絡?。因為?絡的不穩定性,容易造成請求丟失?。怎么解決這?樣的問題呢?其實—個統—的思路就是?產者確認?。簡單來說,就是?產者發出消息后,給?產者—個確定的?通知,?這個消息在Broker端是否寫?完成了?。就好?打電話,不確定電話通沒通,那就互相說個“?喂?”,?具體確認—下?。只不過基于這個同樣的思路,各個MQ產品有不同的實現?式。?
2.1、?產者發送消息確認機制
在Rocket?MQ中,提供了三種不同的發送消息的?式:
?異步發送,?不需要Broker確認?。效率很??,但是會有丟消息的可能。?
// 異步發送, 不需要Broker確認 。效率很? ,但是會有丟消息的可能。
producer.sendOneway(msg);
// 同步發送, ?產者等待Broker的確認 。消息最安全 ,但是效率很低。
SendResult sendResult = producer.send(msg, 20 * 1000);
// 異步發送, ?產者另起—個線程等待Broker確認, 收到Broker確認后直接觸發回調?法 。消息安全和效率之間?較均 衡 ,但是會加?客戶端的負擔。
producer.send(msg, new SendCallback() { @Overridepublic void onSuccess(SendResult sendResult) {// do something}@Overridepublic void onException(Throwable e) {// do something}
});
與之類似的,?Kafka也同樣提供了這種同步和異步的發送消息機制。
//直接send發送消息, 返回的是—個Future。這就相當于是異步調? 。
Future<RecordMetadata> future = producer.send(record)
//調?future的get?法才會實際獲取到發送的結果 。?產者收到這個結果后, 就可以知道消息是否成功發到broker了。 這個過程就變成了—個同步的過程。
RecordMetadata recordMetadata = producer.send(record).get();
?在RabbitMQ中,則是提供了—個Publisher?Confirms?產者確認機制?。其思路也是Publiser收到Broker的響?應后再出發對應的回調?法。
//獲取channel
Channel ch = ...;
//添加兩個回調, —個處理ack響應, —個處理nack響應
ch.addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback)
這些各種各樣不同API的背后,都是—個統—的思路,就是給?產者響應,讓?產者知道消息有沒有發送成功?。如果沒有發送成功,也由?產者??進?補救?。可以重發,也可以向業務拋異常?。都由?產者??處理。
2.2、Rocket?MQ的事務消息機制
Rocket?MQ提出了事務消息機制,其實也是保證?產者安全發送消息的利器?。事務消息機制的基本流程如下:
其實整體上來看,?Rocket?MQ的事務消息機制,?還是基于?產者確認構建的—種實現機制?。其核?思想,?還是??通過Broker主動觸發?產者的回調?法,從?確認消息是否成功發送到了Broker?。只不過,?這?將—次確認變?成了多次確認?。在多次確認的過程中,?除了確認消息的安全性,?還給了?產者“反悔”的機會?。另外,事務消息?機制進—步將?產者確認與?產者的本地事務結合到了—起,從?讓?產者確認這個機制有了更多的業務屬性。
例如,?以最常?的電商訂單場景為例,就可以在下完訂單后,等到?戶?付的過程中使?事務消息機制?。這樣?可以保證本地下單和第三??付平臺?付這兩個業務是事務性的,要么同時成功,就往下游發訂單消息?。要么?就同時失敗,不往下游發訂單消息。
2.3?、Broker寫?數據如何保證不丟失?
接下來,?Producer把消息發送到Broker上了之后,?Broker是不是能夠保證消息不丟失呢?這?也有—個核?的問題,那就是PageCache緩存。
數據會優先寫?到緩存,然后過—段時間再寫?到磁盤?。但是緩存中的數據有個特點,就是斷電即丟失,所?以,如果服務器發??正常斷電,?內存中的數據還沒有寫?磁盤,?這時就會造成消息丟失。
怎么解決這個問題呢?
2.3.1** ?先需要理解操作系統是如何把消息寫?到磁盤的**。
?
以Linux為例, ?戶態的應?程序,不管是什么應?程序, 想要寫?磁盤?件時,都只能調?操作系統提供的?write系統調?,?申請寫磁盤?。?于消息如何經過PageCache再寫?到磁盤中,?這個過程,?這個過程是在內核??態執?的,也就是操作系統??執?的,應?程序?法?預?。這個過程中,應?系統唯—能夠?預的,就是調??操作系統提供的sync系統調?,?申請—次刷盤操作,?主動將PageCache中的數據寫?到磁盤。
>> man 2 write
WRITE(2) Linux Programmer 's
Manual
NAME
write - write to a file descriptor>> man 2 fsync
FSYNC(2) Linux Programmer 's
Manual
NAME
fsync, fdatasync - synchronize a file 's in-core state with storage device
2.3.2然后來看MQ是如何調?fsync的?
先來看Rocket?MQ:
Rocket?MQ的Broker提供了—個很明確的配置項flush?DiskType?,可以選擇刷盤模式?。有兩個可選項,?SYNC_FLUSH?同步刷盤和ASYNC_FLUSH 異步刷盤。
所謂同步刷盤,?是指broker每往?志?件中寫?—條消息,就調?—次刷盤操作?。?異步刷盤,則是指broker?每隔—個固定的時間,才去調?—次刷盤操作?。異步刷盤性能更穩定,但是會有丟消息的可能?。?同步刷盤的?消息安全性就更??,但是操作系統的IO壓?就會?常??。
在Rocket?MQ中,就算是同步刷盤,其實也并不是真的寫—次消息就刷盤—次,?這在海量消息的場景下,操作??系統是撐不住的?。所以,我們在之前梳理Rocket?MQ核?源碼的過程中看到,?Rocket?MQ的同步刷盤的實現????式其實也是以10毫秒的間隔去調?刷盤操作?。從理論上來說,也還是會有?正常斷電造成消息丟失的可能,?甚??嚴格意義上來說,任何應?程序都不可能完全保證斷電消息不丟失?。但是,?Rocket?MQ的這—套同步刷盤機??制,卻可以通過絕?部分業務場景的驗證?。這其實就是—種平衡。
然后來看Kafka:
Kafka中并沒有明顯的同步刷盤和異步刷盤的區別,不過他暴露了—系列的參數,可以管理刷盤的頻率。
flush.ms?: 多?時間進?—次強制刷盤。
log.flush.interval.messages:表示當同—個Partiton的消息條數積累到這個數量時,?就會申請—次刷盤操作?。默?認是Long.MAX。
log.flush.interval.ms:?當—個消息在內存中保留的時間,?達到這個數量時,?就會申請—次刷盤操作?。他的默認值是?空?。如果這個參數配置為空?,則?效的是下—個參數。
log.flush.scheduler.interval.ms:檢查是否有?志?件需要進?刷盤的頻率?。默認也是Long.MAX。
其實在這??家可以思考下,對kafka來說,把log.flush.interval.messages參數設置成1 ,就是每寫?—條消?息就調?—次刷盤操作,?這不就是所謂的同步刷盤了嗎?
最后來看RabbitMQ:
關于消息刷盤問題,?RabbitMQ官?給了更明確的說法?。那就是對于Classic經典對列,?即便聲明成了持久化對??列,?RabbitMQ的服務端也不會實時調?fsync,?因此?法保證服務端消息斷電不丟失?。對于Stream流式對列,?則更加直接,?RabbitMQ明確不會主動調?fsync進?刷盤,?是交由操作系統??刷盤。
??于怎么辦呢?他明確就建議了,如果對消息安全性有更?的要求,可以使?Publisher?Confirms機制來進—?步保證消息安全?。這其實也是對Kafka和Rocket?MQ同樣適?的建議。
2.4?、Broker主從同步如何保證不丟失?
對于Broker來說,通常Slave的作?就是做—個數據備份?。當Broker服務宕機了,?甚?是磁盤都壞了時,可以?從Slave上獲取數據記錄?。但是,如果主從同步失敗了,那么Broker的這—層保證就會失效?。因此,?主從同步?也有可能造成消息的丟失。
我們這?重點來討論—下,?Rocket?MQ的普通集群以及Dledger?可?集群。
先來看Rocket?MQ的普通集群?案,在這種?案下,可以指定集群中每個節點的角?,?固定的作為Master或者?Slave。
在這種集群機制下,消息的安全性還是?較?的?。但是有—種極端的情況需要考慮?。因為消息需要從Master往?Slave同步,?這個過程是跨?絡的,?因此也是有時間延遲的?。所以,如果Master出現?正常崩潰,那么就有可???能有—部分數據是已經寫?到了Master但是還來得及同步到Slave?。這—部分未來得及同步的數據,在Rocket?MQ的這種集群機制下,就會—直記錄在Master節點上?。等到Master重啟后,就可以繼續同步了?。另外?由于Slave并不會主動切換成Master?,所以Master服務崩潰后,也不會有新的消息寫進來,?因此也不會有消息??沖突的問題?。所以,?只要Mater的磁盤沒有壞,那么在這種普通集群下,?主從同步通常不會造成消息丟失。
與之形成對?的是Kafka的集群機制?。在Kafka集群中,如果Leader?Partition的服務崩潰了,那么,那些Follower?Partition就會選舉產?—個新的Leadr?Partition?。?集群中所有的消息,都以Leader?Partition的為準?。即便舊的Leader?Partition重啟了,也是作為Follower?Partition啟動,?主動刪除掉??的HighWater之后的?數據,然后從新的Leader?Partition上重新同步消息?。這樣,就會造成那些已經寫?舊的Leader?Partition但是???還沒來得及同步的消息,就徹底丟失了。
Rocket?MQ和Kafka之間的這種差異,其實還是來?于他們處理MQ問題的初衷不同?。Rocket?MQ誕?于阿?的??融體系,天?對消息的安全性?較敏感?。?Kafka誕?于LinkedIn的?志收集體系,天?對服務的可?性要??求更??。這也體現了不同產品對業務的取舍。
然后來看下Rocket?MQ的Dledger?可?集群?。在Rocket?MQ中,?直接使?基于Raft協議的Dledger來保存?CommitLog消息?志?。也就是說他的消息會通過Dledger的Raft協議,在主從節點之間同步。
?關于Raft協議,?之前章節做給分析,他是—種基于兩階段的多數派同意機制?。每個節點會將客戶端的治指令?以Entry的形式保存到??的Log?志當中?。此時Entry是uncommited狀態?。當有多數節點統統保存了Entry后,就可以執?Entry中的客戶端指令,提交到StateMachine狀態機中?。此時Entry更新為commited狀態。
他優先保證的是集群內的數據—致性,?并不是保證不丟失?。在某些極端場景下,??如出現?絡分區情況時,?也會丟失—些未經過集群內確認的消息?。不過,基于Rocket?MQ的使?場景,?這種丟失消息的可能性?常??。?另外,?之前也提到過,?這種服務端?法保證消息安全的問題,其實結合客戶端的?產者確認機制,?是可以得到??較好的處理的?。因此,在Rocket?MQ中使?Dledger集群的話,數據主從同步這個過程,數據安全性還是????較?的?。基本可以認為不會造成消息丟失。
2.5?、消費者消費消息如何不丟失
最后,消費者消費消息的過程中,?需要從Broker上拉取消息,?這些消息也是跨?絡的,所以拉取消息的請求也?可能丟失?。這時,會不會有丟消息的可能呢?
?乎所有的MQ產品都設置了消費狀態確認機制?。也就是消費者處理完消息后,?需要給Broker—個響應,表示?消息被正常處理了?。如果Broker端沒有拿到這個響應,不管是因為Consumer沒有拿到消息,?還是Consumer ?處理完消息后沒有給出相應,?Broker都會認為消息沒有處理成功?。之后,?Broker就會向Consumer重復投遞這?些沒有處理成功的消息?。Rocket?MQ和Kafka是根據Offset機制重新投遞,?RabbitMQ的Classic?Queue經典??對列,則是把消息重新?隊?。因此,正常情況下,?Consumer消費消息這個過程,?是不會造成消息丟失的,相?反,可能需要考慮下消息冪等的問題。
但是,?這也并不是說消費者消費消息不可能丟失?。例如像下?這種情況,?Consumer異步處理消息,就有可能?造成消息丟失。
consumer.registerMessageListener(new MessageListenerConcurrently{ @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>msgs,ConsumeConcurrentlyContext context) {new Thread(){public void run(){//處理業務邏輯System.out.printf("%s Receive New Messages: &s %n", Thread.currentThread() .getN }};return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; });
這?需要注意的是,通常在開發過程中,不太會這么直?的使?多線程異步機制去處理問題?。但是,很有可能?在處理業務時,使?—些第三?框架來處理消息?。他們是不是使?的多線程異步機制,那就不能確定了?。所以,線程并發,在任何業務場景下,都是必不可少的基本功。
2.6?、如果MQ服務全部掛了,?如何保證不丟失?
最后有—種?概率的極端情況,就是MQ的服務全部掛掉了,?這時,要如何保證業務能夠繼續穩定進?,?同時?業務數據不會丟失呢?
通常的做法是設計—個降級緩存?。Producer往MQ發消息失敗了,就往降級緩存中寫,然后,依然正常去進?后續的業務。
此時,再啟動—個線程,不斷嘗試將降級緩存中的數據往MQ中發送?。這樣,??少當MQ服務恢復過來后,?這些?消息可以盡快進?到MQ中,繼續往下游Conusmer推送,?不?于造成消息丟失。
2.7?、MQ消息零丟失?案總結
最后要注意到,?這?討論到的各種MQ消息防?丟失的?案,其實都是以增加集群負載,?降低吞吐為代價的。
這必然會造成集群效率下降?。因此,?這些保證消息安全的?案通常都需要根據業務場景進?靈活取舍,?不是?—股腦的直接?上。
這?希望你能夠理解到,?這些消息零丟失?案,其實是沒有最優解的?。因為如果有最優解,那么這些MQ產品,就不需要保留各種各樣的設計了?。這和很多?試?股?是有沖突的?。?試?股?強調標準答案,?實際業?務中,?這個問題是沒有標準答案的,—切,都需要根據業務場景去調整
面試題:說說你的項目RocketMQ如何保證消息不丟失??
RocketMQ通過多層面的機制來確保消息的可靠性,包括生產者端、broker端和消費者端。
1. 生產者端保證
????????a. 同步發送
????????????????同步發送是最可靠的發送方式,它會等待broker的確認響應。
????????b. 異步發送 + 重試機制
????????????????異步發送通過回調來處理發送結果,并可以設置重試次數。
2.Broker端保證
????????a. 同步刷盤,通過配置broker.conf文件,可以啟用同步刷盤:
????????????????brokerRole = SYNC_MASTER?
3. 消費者端保證
????????a. 手動提交消費位移,使用手動提交可以確保消息被正確處理后再提交位移。
????????b. 冪等性消費,在消費端實現冪等性處理,確保重復消費不會導致業務問題。
通過這些機制的組合,RocketMQ能夠在各個環節保證消息的可靠性,極大地降低了消息丟失的風險。在實際應用中,可以根據業務需求選擇合適的配置和實現方式,以在可靠性和性能之間取得平衡。
??、MQ如何保證消息的順序性
這??先需要明確的是,通常討論MQ的消息順序性,其實是在強調局部有序,?不是全局有序?。就好?QQ和?微信的聊天消息,通常只要保證同—個聊天窗?內的消息是嚴格有序的?。?于不同窗口之間的消息,順序出了點偏差,其實是?所謂的?。所謂全局有序,通常在業務上沒有太多的使?場景?。在Rocket?MQ和Kafka中把Topic的分區數設置成1,?這類強?保證消息全局有序的?案,純屬思維體操。
那么怎么保證消息局部有序呢?最典型的還是Rocket?MQ的順序消費機制。
這個機制需要兩個??的保障。
- 1 、Producer將—組有序的消息寫?到同—個MessageQueue中。
- 2?、Consumer每次集中從—個MessageQueue中拿取消息。
在Producer端,?Rocket?MQ和Kafka都提供了分區計算機制,可以讓應?程序??決定消息寫?到哪—個分區?。所以這—塊,?是由業務??決定的?。只要通過定制數據分?算法,把—組局部有序的消息發到同—個對列?當中,就可以通過對列的FI?FO特性,保證消息的處理順序?。對于RabbitMQ?,則可以通過維護Exchange與Queue之間的綁定關系,將這—組局部有序的消息轉發到同—個對列中,從?保證這—組有序的消息,在?RabbitMQ內部保存時,?是有序的。
在Conusmer端,?Rocket?MQ是通過讓Consumer注?不同的消息監聽器來進?區分的?。?具體的實現機制,在 之前章節分析過,核?是通過對Consumer的消費線程進?并發控制,來保證消息的消費順序的?。類?到Kafka 呢?。Kafka中并沒有這樣的并發控制?。?實際上,?Kafka的Consumer對某—個Partition拉取消息時,天?就是???單線程的,所以,參照Rocket?MQ的順序消費模型,?Kafka的Consumer天?就是能保證局部順序消費的。
?于RabbitMQ,?以他的Classic Queue經典對列為例,他的消息被—個消費者從隊列中拉取后,就直接從隊列?中把消息刪除了?。所以,基本不存在資源競爭的問題?。那就簡單的是—個隊列只對應—個Consumer?,那就是??能保證順序消費的?。如果—個隊列對應了多個Consumer,?同—批消息,可能會進?不同的Consumer處理,所以也就沒法保證消息的消費順序
三、MQ如何保證消息冪等性?
1?、?產者發送消息到服務端如何保持冪等?
Producer發送消息時,如果采?發送者確認的機制,那么Producer發送消息會等待Broker的響應?。如果沒有?收到Broker的響應,?Producer就會發起重試?。但是,?Producer沒有收到Broker的響應,也有可能是Broker已?經正常處理完了消息,?只不過發給Producer的響應請求丟失了?。這時候Producer再次發起消息重試,就有可能造成消息重復。
Rocket?MQ的處理?式,?是會在發送消息時,給每條消息分配一個唯一的ID。
//org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
//for MessageBatch,ID has been set in the generating processif ( !(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);}public static void setUniqID(final Message msg) {if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null){msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,createUniqID());}
}
通過這個ID?,就可以判斷消息是否重復投遞。
?對于Kafka?,則會通過他的冪等性配置,?防??產者重復投遞消息造成的冪等性問題。
在Kafka中,?需要打開idempotence冪等性控制后(默認是打開的,但是如果其他配置有沖突,會影響冪等性配?置)?。Kafka為了保證消息發送的Exactly-once語義,增加了?個概念:
- . ?PID:每個新的Producer在初始化的過程中就會被分配—個唯—的PID?。這個PID對?戶是不可?的。
- . ?Sequence?Numer: 對于每個PID,?這個Producer針對Partition會維護—個sequenceNumber?。這是—個從?0開始單調遞增的數字?。當Producer要往同—個Partition發送消息時,?這個Sequence?Number就會加1?。然后會隨著消息—起發往Broker。
- . ?Broker端則會針對每個<PID,Partition>維護—個序列號(?SN),?只有當對應的SequenceNumber = SN+1 時,?Broker才會接收消息,?同時將SN更新為SN+1 。否則,SequenceNumber過?就認為消息已經寫?了,不需要再重復寫??。?如果SequenceNumber過??,就會認為中間可能有數據丟失了?。對?產者就會?拋出—個OutOfOrderSequenceException。
?
2?、消費者消費消息如何保持冪等?
這?以Rocket?MQ來討論如何防?消費者多次重復消費同—條消息。
?先,關于消息會如何往消費者投遞?。Rocket?MQ官?明確做了回答:?
也就是說,在?多數情況下,不需要單獨考慮消息重復消費的問題?。但是,?同樣,?這個回答?也說明了,存在?—些?概率情況,?需要單獨考慮消費者的消息冪等問題。
?于有哪些?概率情況呢?最典型的情況就是?絡出現波動的時候?。Rocket?MQ是通過消費者的響應機制來推進offset的,如果consumer從broker上獲取了消息,正常處理之后,他要往broker返回—個響應,但是如果??絡出現波動,consumer從broker上拿取到了消息,但是等到他向broker發響應時,發??絡波動,?這個響應???丟失了,那么就會造成消息的重復消費?。因為broker沒有收到響應,就會向這個Consumer所在的Group重復投遞消息。
然后,消費者如何防?重復消費呢?
防?重復消費,?最主要是要找到—個唯—性的指標?。在Rocket?MQ中,?Producer發出—條消息后,?Rocket?MQ?內部會給每—條消息分配—個唯—的messageId?。?這個messageId在Consumer中是可以獲取到的?。所以??多數情況下,?這個messageId就是—個很好的唯—性指標?。Consumer只要將處理過的messageId記錄下來, 就可以判斷這條消息之前有沒有處理過。
但是同樣也有—些特殊情況?。如果Producer是采?批量發送,或者是事務消息機制發送,那么這個messageId 就沒有那么好控制了?。所以,如果在真實業務中,更建議根據業務場景來確定唯—指標?。例如,在電商下單的??場景,訂單ID就是—個很好的帶有業務屬性的唯—指標?。在使?Rocket?MQ時,可以使?message的key屬性???來傳遞訂單ID?。這樣Consumer就能夠?較好的防?重復消費。
最后,對于冪等性問題,?除了要防?重復消費外,?還需要防?消費丟失?。也就是Consumer—直沒有正常消費?消息的情況。
在Rocket?MQ中,?重復投遞的消息,會單獨放到以消費者組為維度構建的重試對列中?。如果經過多次重試后還?是?法被正常消費,那么最終消息會進?到消費者組對應的死信對列中?。也就是說,如果Rocket?MQ中出現了?死信對列,那么就意味著有—批消費者的邏輯是—直有問題的,?這些消息始終?法正常消費?。這時就需要針對?死信對列,?單獨維護—個消費者,對這些錯誤的業務消息進?補充處理?。這?需要注意—下的是,?Rocket?MQ??中的死信對列,默認權限是?法消費的,?需要?動調整權限才能正常消費。
?四、MQ如何快速處理積壓的消息
1?、消息積壓會有哪些問題。
對Rocket?MQ和Kafka來說,他們的消息積壓能?本來就是很強的,?因此,短時間的消息積壓,?是沒有太多問題?的?。但是需要注意,如果消息積壓問題—直得不到解決,?Rocket?MQ和Kafka在?志?件過期后,就會直接刪除?過期的?志?件?。?這些?志?件上未消費的消息,就會直接丟失。
?對RabbitMQ來說, Classic?Queue經典對列和Quorum?Queue仲裁對列,如果有?量消息積壓,未被消費,就會嚴重影響服務端的性能,?因此需要重點關注?。??于Stream Queue流式對列,整體的處理機制已經?和Rocket?MQ與Kafka??較相似了,對消息積壓的承受能?就會?較強?。但是還是需要注意和Rocket?MQ與Kafka相同的問題。
2?、怎么處理?量積壓的消息
產?消息積壓的根本原因還是Consumer處理消息的效率太低,所以最核?的?標還是要提升Consumer消費消息的效率?。如果不能從業務上提升Consumer消費消息的性能,那么最直接的辦法就是針對處理消息?較慢?的消費者組,增加更多的Consumer實例?。但是這?需要注意—下,增加Consumer實例是不是會有上限。
對于RabbitMQ?,如果是Classic Queue經典對列,那么針對同—個Queue的多個消費者,?是按照Work Queue 的模式,在多個Consuemr之間依次分配消息的?。所以這時,如果Consumer消費能?不夠,那么直接加更多 ?的Consumer實例就可以了?。這?需要注意下的是如果各個Consumer實例他們的運?環境,或者是處理消息 ?的速度有差別?。那么可以優化—下每個Consumer的?重(Qos屬性)?,從?盡量?的發揮Consumer實例的性能。
?對于Rocket?MQ,?因為同—個消費者組下的多個Cosumer需要和對應Topic下的MessageQueue建?對應關 ??系,?—個MessageQueue最多只能被—個Consumer消費,?因此,增加的Consumer實例最多也只能和Topic 下的MessageQueue個數相同?。如果此時再繼續增加Consumer的實例,那么就會有些Consumer實例是沒有??MessageQueue去消費的,?因此也就沒有?了。
這時,如果Topic下的MessageQueue配置本來就不夠多的話,那就?法—直增加Consumer節點個數了?。這時 怎么處理呢?如果要快速處理積壓的消息,可以創建—個新的Topic?,配置?夠多的MessageQueue?。然后把???Consumer實例的Topic轉向新的Topic?,并緊急上線—組新的消費者,?只負責消費舊Topic中的消息,并轉存到?新的Topic中?。這個速度明顯會?普通Consumer處理業務邏輯要快很多?。然后在新的Topic上,就可以通過添???加消費者個數來提?消費速度了?。之后再根據情況考慮是否要恢復成正常情況。
其實這種思路和Rocket?MQ內部很多特殊機制的處理?式是—樣的?。例如固定級別的延遲消息機制,也是?把消息臨時轉到—個系統內部的Topic下,處理過后,再轉回來。?
?于Kafka?,也可以采?和Rocket?MQ相似的處理?式。
請問RocketMQ消息積壓一般產生原因是什么?如何解決消息積壓問題呢?
一般消息出現堆積原因有:
●?消費者消息處理邏輯異常,導致消息無法正常消費。
●?消息生產應用出現突發流量,消息生產速度遠大于消費速度,消息來不及消費出現堆積。
●?消費者依賴的下游服務耗時變長,消費線程阻塞等。
●?消費線程不夠,消費并發度較小,消費速度跟不上生產速度。
?
解決方案有:
(1)確認消息的消費耗時是否合理,通過打印堆棧日志信息分析如果是消費耗時較長,可以參考出來解決方案:
????????1. 分析和優化業務邏輯
????????????????●?簡化邏輯:仔細分析業務邏輯,去除不必要的步驟和復雜計算。
????????????????●?分解任務:將復雜的任務分解為多個簡單的子任務,逐步處理。
????????????????●?異步處理:對于不需要立即完成的任務,考慮使用異步處理,將其放到后臺執行。
????????2. 使用并行和并發技術
????????????????●?多線程處理:在消費者內部使用多線程來并行處理消息。
????????????????●?批量處理:如果業務允許,合并多條消息進行批量處理,減少處理次數。
????????3. 優化I/O操作
????????????????●?數據庫優化:優化數據庫查詢,使用索引、減少查詢次數或使用批量操作。
????????????????●?緩存使用:對于頻繁訪問的數據,使用緩存來減少數據庫或外部服務的訪問次數。
????????????????●?網絡優化:減少網絡請求的次數和延遲,使用更高效的協議或配置
(2)如果消費耗時正常,則有可能是因為消費并發度不夠導致消息堆積,需要逐步調大消費線程或擴容節點來解決。
(3)設置消息過期時間
在消息發送時設置TTL,消息在隊列中超過一定時間后自動過期并被丟棄。這樣可以確保系統不會處理過期的消息。這個要看具體的業務場景。
五、Rocket?MQ課程總結
所有的MQ?,其實處理的都是—類相似的問題?。但是,?互聯?卻誕?了不下??種MQ的產品?。為什么都做著差?不多的功能,但是卻有這么多不同的產品呢?這就需要對MQ的業務場景進?逐步的深度挖掘?。把業務問題理???解得越深刻,那么對這些不同產品的理解才會更深刻,??后處理各種各樣的業務問題,也才會有更多的可選???案,或者,換種說法,就是經驗?。這才是程序員最?的價值。