21、MQ常見問題梳理

目錄

??、MQ如何保證消息不丟失

1?、哪些環節可能會丟消息

2?、?產者發送消息如何保證不丟失

2.1、?產者發送消息確認機制

2.2、Rocket?MQ的事務消息機制

2.3?、Broker寫?數據如何保證不丟失?

2.3.1** ?先需要理解操作系統是如何把消息寫?到磁盤的**。

2.3.2然后來看MQ是如何調?fsync的?

2.4?、Broker主從同步如何保證不丟失?

2.5?、消費者消費消息如何不丟失

2.6?、如果MQ服務全部掛了,?如何保證不丟失?

2.7?、MQ消息零丟失?案總結

面試題:說說你的項目RocketMQ如何保證消息不丟失??

??、MQ如何保證消息的順序性

三、MQ如何保證消息冪等性?

1?、?產者發送消息到服務端如何保持冪等?

2?、消費者消費消息如何保持冪等?

?四、MQ如何快速處理積壓的消息

1?、消息積壓會有哪些問題。

2?、怎么處理?量積壓的消息

請問RocketMQ消息積壓一般產生原因是什么?如何解決消息積壓問題呢?

五、Rocket?MQ課程總結


??MQ如何保證消息不丟失

1?、哪些環節可能會丟消息

?先分析下MQ的整個消息鏈路中,有哪些步驟是可能會丟消息的

其中,?1?2?4三個場景都是跨?絡的,?跨?絡就肯定會有丟消息的可能。

然后關于3這個環節,通常MQ存盤時都會先寫?操作系統的緩存page?cache中,然后再由操作系統異步的將??消息寫?硬盤?。這個中間有個時間差,就可能會造成消息丟失?。如果服務掛了,緩存中還沒有來得及寫?硬盤?的消息就會丟失。

2?、?產者發送消息如何保證不丟失

?產者發送消息之所以可能會丟消息,都是因為?絡?。因為?絡的不穩定性,容易造成請求丟失?。怎么解決這?樣的問題呢?其實—個統—的思路就是?產者確?。簡單來說,就是?產者發出消息后,給?產者—個確定的?通知,?這個消息在Broker端是否寫?完成了?。就好?打電話,不確定電話通沒通,那就互相說個???具體確認—下?。只不過基于這個同樣的思路,各個MQ品有不同的實現?式。?

2.1、?產者發送消息確認機制

Rocket?MQ中,提供了三種不同的發送消息的?式:

?異步發送,?不需要Broker確認?。效率很??,但是會有丟消息的可能。?

// 異步發送, 不需要Broker確認 。效率很? ,但是會有丟消息的可能。
producer.sendOneway(msg);
// 同步發送, ?產者等待Broker的確認 。消息最安全 ,但是效率很低。
SendResult sendResult = producer.send(msg, 20 * 1000);
// 異步發送, ?產者另起—個線程等待Broker確認, 收到Broker確認后直接觸發回調?法 。消息安全和效率之間?較均 衡 ,但是會加?客戶端的負擔。
producer.send(msg, new SendCallback() { @Overridepublic void onSuccess(SendResult sendResult) {// do something}@Overridepublic void onException(Throwable e) {// do something}
});

與之類似的,?Kafka也同樣提供了這種同步和異步的發送消息機制。

//直接send發送消息, 返回的是—個Future。這就相當于是異步調? 。
Future<RecordMetadata> future = producer.send(record)
//調?future的get?法才會實際獲取到發送的結果 。?產者收到這個結果后, 就可以知道消息是否成功發到broker了。 這個過程就變成了—個同步的過程。
RecordMetadata recordMetadata = producer.send(record).get();

?在RabbitMQ中,則是提供了—個Publisher?Confirms?產者確認機制?。其思路也是Publiser收到Broker的響?應后再出發對應的回調?法。

//獲取channel
Channel ch = ...;
//添加兩個回調, —個處理ack響應, —個處理nack響應
ch.addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback)

這些各種各樣不同API的背后,都是—個統—的思路,就是給?產者響應,讓?產者知道消息有沒有發送成?。如果沒有發送成功,也由?產者??進?補救?。可以重發,也可以向業務拋異常?。都由?產者??處理。

2.2Rocket?MQ的事務消息機制

Rocket?MQ提出了事務消息機制,其實也是保證?產者安全發送消息的利器?。事務消息機制的基本流程如下:

其實整體上來看,?Rocket?MQ的事務消息機制,?還是基于?產者確認構建的—種實現機制?。其核?思想,?還是??通過Broker主動觸發?產者的回調?法,從?確認消息是否成功發送到了Broker?。只不過,?這?將—次確認變?成了多次確認?。在多次確認的過程中,?除了確認消息的安全性,?還給了?產者反悔的機會?。另外,事務消息?機制進—步將?產者確認與?產者的本地事務結合到了—起,從?讓?產者確認這個機制有了更多的業務屬性。

例如,?以最常?的電商訂單場景為例,就可以在下完訂單后,等到?戶?付的過程中使?事務消息機制?。這樣?可以保證本地下單和第三??付平臺?付這兩個業務是事務性的,要么同時成功,就往下游發訂單消息?。要么?就同時失敗,不往下游發訂單消息。

2.3?Broker寫?數據如何保證不丟失?

接下來,?Producer把消息發送到Broker上了之后,?Broker是不是能夠保證消息不丟失呢?這?也有—個核?的問題,那就是PageCache緩存。

數據會優先寫?到緩存,然后過—段時間再寫?到磁盤?。但是緩存中的數據有個特點,就是斷電即丟失,所?以,如果服務器發??正常斷電,?內存中的數據還沒有寫?磁盤,?這時就會造成消息丟失。

怎么解決這個問題呢?

2.3.1** ?先需要理解操作系統是如何把消息寫?到磁盤的**

?

Linux為例, ?戶態的應?程序,不管是什么應?程序, 想要寫?磁盤?件時,都只能調?操作系統提供的?write系統調?,?申請寫磁盤?。?于消息如何經過PageCache再寫?到磁盤中,?這個過程,?這個過程是在內核??態執?的,也就是操作系統??執?的,應?程序?法?預?。這個過程中,應?系統唯—能夠?預的,就是調??操作系統提供的sync系統調?,?申請—次刷盤操作,?主動將PageCache中的數據寫?到磁盤。

>> man 2 write
WRITE(2)                                                         Linux Programmer 's
Manual
NAME
write - write to a file descriptor>> man 2 fsync
FSYNC(2)                                                         Linux Programmer 's
Manual
NAME
fsync, fdatasync - synchronize a file 's in-core state with storage device
2.3.2然后來看MQ是如何調?fsync?

先來看Rocket?MQ

Rocket?MQBroker提供了—個很明確的配置項flush?DiskType?,可以選擇刷盤模式?。有兩個可選項,?SYNC_FLUSH?同步刷盤和ASYNC_FLUSH 異步刷盤。

所謂同步刷盤,?是指broker每往?志?件中寫?—條消息,就調?—次刷盤操?。?異步刷盤,則是指broker?每隔—個固定的時間,才去調?—次刷盤操作?。異步刷盤性能更穩定,但是會有丟消息的可能?。?同步刷盤的?消息安全性就更??,但是操作系統的IO?就會?常??

Rocket?MQ中,就算是同步刷盤,其實也并不是真的寫—次消息就刷盤—次,?這在海量消息的場景下,操作??系統是撐不住的?。所以,我們在之前梳理Rocket?MQ核?源碼的過程中看到,?Rocket?MQ同步刷盤的實現????式其實也是以10毫秒的間隔去調?刷盤操作?。從理論上來說,也還是會有?正常斷電造成消息丟失的可能,???嚴格意義上來說,任何應?程序都不可能完全保證斷電消息不丟失?。但是,?Rocket?MQ的這—套同步刷盤機??制,卻可以通過絕?部分業務場景的驗證?。這其實就是—種平衡。

然后來看Kafka:

Kafka中并沒有明顯的同步刷盤和異步刷盤的區別,不過他暴露了—系列的參數,可以管理刷盤的頻率。

flush.ms?: 多?時間進?—次強制刷盤。

log.flush.interval.messages:表示當同—個Partiton的消息條數積累到這個數量時,?就會申請—次刷盤操作?。默?認是Long.MAX

log.flush.interval.ms?當—個消息在內存中保留的時間,?達到這個數量時,?就會申請—次刷盤操作?。他的默認值是??。如果這個參數配置為空?,則?效的是下—個參數。

log.flush.scheduler.interval.ms:檢查是否有?志?件需要進?刷盤的頻率?。默認也是Long.MAX

其實在這??家可以思考下,對kafka來說,把log.flush.interval.messages參數設置成1 ,就是每寫?—條消?息就調?—次刷盤操作,?這不就是所謂的同步刷盤了嗎?

最后來看RabbitMQ

關于消息刷盤問題,?RabbitMQ官?給了更明確的說法?。那就是對于Classic經典對列,?即便聲明成了持久化對??列,?RabbitMQ的服務端也不會實時調?fsync?因此?法保證服務端消息斷電不丟失?。對于Stream流式對列,?則更加直接,?RabbitMQ明確不會主動調?fsync進?刷盤,?是交由操作系統??刷盤。

??于怎么辦呢?他明確就建議了,如果對消息安全性有更?的要求,可以使?Publisher?Confirms機制來進—?步保證消息安全?。這其實也是對KafkaRocket?MQ同樣適?的建議。

2.4?Broker主從同步如何保證不丟失?

對于Broker來說,通常Slave的作?就是做—個數據備份?。當Broker服務宕機了,?甚?是磁盤都壞了時,可以?Slave上獲取數據記錄?。但是,如果主從同步失敗了,那Broker的這—層保證就會失效?。因此,?主從同步?也有可能造成消息的丟失。

我們這?重點來討論—下,?Rocket?MQ的普通集群以及Dledger?可?集群。

先來看Rocket?MQ的普通集群?案,在這種?案下,可以指定集群中每個節點的角?,?固定的作為Master或者?Slave

在這種集群機制下,消息的安全性還是?較?的?。但是有—種極端的情況需要考慮?。因為消息需要從Master?Slave同步,?這個過程是跨?絡的,?因此也是有時間延遲的?。所以,如果Master出現?正常崩潰,那么就有可???能有—部分數據是已經寫?到了Master但是還來得及同步到Slave?。這—部分未來得及同步的數據,在Rocket?MQ的這種集群機制下,就會—直記錄在Master節點上?。等到Master重啟后,就可以繼續同步了?。另外?由于Slave并不會主動切換成Master?,所以Master服務崩潰后,也不會有新的消息寫進來,?此也不會有消息??沖突的問題?。所以,?只要Mater的磁盤沒有壞,那么在這種普通集群下,?主從同步通常不會造成消息丟失。

與之形成對?的是Kafka的集群機制?。在Kafka集群中,如Leader?Partition的服務崩潰了,那么,那些Follower?Partition就會選舉產?—個新的Leadr?Partition?。?集群中所有的消息,都以Leader?Partition的為?。即便舊的Leader?Partition重啟了,也是作為Follower?Partition啟動,?主動刪除掉??的HighWater之后的?數據,然后從新的Leader?Partition上重新同步消息?。這樣,就會造成那些已經寫?舊的Leader?Partition但是???還沒來得及同步的消息,就徹底丟失了。

Rocket?MQKafka之間的這種差異,其實還是來?于他們處MQ問題的初衷不同?Rocket?MQ誕?于阿?的??融體系,天?對消息的安全性?較敏感?。?Kafka誕?于LinkedIn的?志收集體系,天?對服務的可?性要??求更??。這也體現了不同產品對業務的取舍。

然后來看下Rocket?MQDledger?可?集群?。在Rocket?MQ中,?直接使?基于Raft協議的Dledger來保存?CommitLog消息?志?。也就是說他的消息會通過DledgerRaft協議,在主從節點之間同步。

?關于Raft協議,?之前章節做給分析,他是—種基于兩階段的多數派同意機制?。每個節點會將客戶端的治指令?Entry的形式保存到??的Log?志當中?。此時Entryuncommited狀態?。當有多數節點統統保存了Entry后,就可以執?Entry中的客戶端指令,提交到StateMachine狀態機中?。此時Entry更新為commited狀態。

他優先保證的是集群內的數據—致性,?并不是保證不丟失?。在某些極端場景下,??如出現?絡分區情況時,?也會丟失—些未經過集群內確認的消息?。不過,基于Rocket?MQ的使?場景,?這種丟失消息的可能性?常???另外,?之前也提到過,?這種服務端?法保證消息安全的問題,其實結合客戶端的?產者確認機制,?是可以得到??較好的處理的?。因此,在Rocket?MQ中使?Dledger集群的話,數據主從同步這個過程,數據安全性還是????較?的?。基本可以認為不會造成消息丟失。

2.5?、消費者消費消息如何不丟失

最后,消費者消費消息的過程中,?需要從Broker上拉取消息,?這些消息也是跨?絡的,所以拉取消息的請求也?可能丟失?。這時,會不會有丟消息的可能呢?

?乎所有的MQ產品都設置了消費狀態確認機制?。也就是消費者處理完消息后,?需要給Broker—個響應,表示?消息被正常處理了?。如果Broker端沒有拿到這個響應,不管是因為Consumer沒有拿到消息,?還是Consumer ?處理完消息后沒有給出相應,?Broker都會認為消息沒有處理成功?。之后,?Broker就會向Consumer重復投遞這?些沒有處理成功的消息?Rocket?MQKafka是根據Offset機制重新投遞,?RabbitMQClassic?Queue經典??對列,則是把消息重新?隊?。因此,正常情況下,?Consumer消費消息這個過程,?是不會造成消息丟失的,相?反,可能需要考慮下消息冪等的問題。

但是,?這也并不是說消費者消費消息不可能丟失?。例如像下?這種情況,?Consumer異步處理消息,就有可能?造成消息丟失。

consumer.registerMessageListener(new MessageListenerConcurrently{ @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>msgs,ConsumeConcurrentlyContext context) {new Thread(){public void run(){//處理業務邏輯System.out.printf("%s Receive New Messages: &s %n", Thread.currentThread()     .getN }};return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; });

這?需要注意的是,通常在開發過程中,不太會這么直?的使?多線程異步機制去處理問題?。但是,很有可能?在處理業務時,使?—些第三?框架來處理消息?。他們是不是使?的多線程異步機制,那就不能確定了?。所以,線程并發,在任何業務場景下,都是必不可少的基本功。

2.6?、如果MQ服務全部掛了,?如何保證不丟失?

最后有—種?概率的極端情況,就是MQ的服務全部掛掉了,?這時,要如何保證業務能夠繼續穩定進?,?同時?業務數據不會丟失呢?

通常的做法是設計—個降級緩存?ProducerMQ發消息失敗了,就往降級緩存中寫,然后,依然正常去進?后續的業務。

此時,再啟動—個線程,不斷嘗試將降級緩存中的數據往MQ中發送?。這樣,??少當MQ服務恢復過來后,?這些?消息可以盡快進?到MQ中,繼續往下游Conusmer推送,?不?于造成消息丟失。

2.7?MQ消息零丟失?案總結

最后要注意到,?這?討論到的各種MQ消息防?丟失的?案,其實都是以增加集群負載,?降低吞吐為代價的。

這必然會造成集群效率下降?。因此,?這些保證消息安全的?案通常都需要根據業務場景進?靈活取舍,?不是?—股腦的直接?上。

這?希望你能夠理解到,?這些消息零丟失?案,其實是沒有最優解的?。因為如果有最優解,那么這些MQ品,就不需要保留各種各樣的設計了?。這和很多?試?股?是有沖突的?。?試?股?強調標準答案,?實際業?務中,?這個問題是沒有標準答案的,—切,都需要根據業務場景去調整

面試題:說說你的項目RocketMQ如何保證消息不丟失??


RocketMQ通過多層面的機制來確保消息的可靠性,包括生產者端、broker端和消費者端。
1. 生產者端保證
????????a. 同步發送
????????????????同步發送是最可靠的發送方式,它會等待broker的確認響應。
????????b. 異步發送 + 重試機制
????????????????異步發送通過回調來處理發送結果,并可以設置重試次數。
2.Broker端保證
????????a. 同步刷盤,通過配置broker.conf文件,可以啟用同步刷盤:
????????????????brokerRole = SYNC_MASTER?
3. 消費者端保證
????????a. 手動提交消費位移,使用手動提交可以確保消息被正確處理后再提交位移。
????????b. 冪等性消費,在消費端實現冪等性處理,確保重復消費不會導致業務問題。
通過這些機制的組合,RocketMQ能夠在各個環節保證消息的可靠性,極大地降低了消息丟失的風險。在實際應用中,可以根據業務需求選擇合適的配置和實現方式,以在可靠性和性能之間取得平衡。

??MQ如何保證消息的順序性

這??先需要明確的是,通常討論MQ的消息順序性,其實是在強調局部有序,?不是全局有序?。就好?QQ?微信的聊天消息,通常只要保證同—個聊天窗?內的消息是嚴格有序的?。?于不同窗口之間的消息,順序出了點偏差,其實是?所謂的?。所謂全局有序,通常在業務上沒有太多的使?場景?。在Rocket?MQKafka中把Topic的分區數設置成1?這類強?保證消息全局有序的?案,純屬思維體操。

那么怎么保證消息局部有序呢?最典型的還是Rocket?MQ的順序消費機制。

這個機制需要兩個??的保障。

  1. 1 Producer將—組有序的消息寫?到同—個MessageQueue中。
  2. 2?Consumer每次集中從—個MessageQueue中拿取消息。

Producer端,?Rocket?MQKafka都提供了分區計算機制,可以讓應?程序??決定消息寫?到哪—個分?。所以這—塊,?是由業務??決定的?。只要通過定制數據分?算法,把—組局部有序的消息發到同—個對列?當中,就可以通過對列的FI?FO特性,保證消息的處理順序?。對于RabbitMQ?,則可以通過維護ExchangeQueue之間的綁定關系,將這—組局部有序的消息轉發到同—個對列中,從?保證這—組有序的消息,在?RabbitMQ內部保存時,?是有序的。

Conusmer端,?Rocket?MQ是通過讓Consumer注?不同的消息監聽器來進?區分的?。?具體的實現機制,在 之前章節分析過,核?是通過對Consumer的消費線程進?并發控制,來保證消息的消費順序的?。類?到Kafka ?Kafka中并沒有這樣的并發控制?。?實際上,?KafkaConsumer對某—個Partition拉取消息時,天?就是???單線程的,所以,參照Rocket?MQ的順序消費模型,?KafkaConsumer天?就是能保證局部順序消費的。

?于RabbitMQ?以他的Classic Queue經典對列為例,他的消息被—個消費者從隊列中拉取后,就直接從隊列?中把消息刪除了?。所以,基本不存在資源競爭的問題?。那就簡單的是—個隊列只對應—個Consumer?,那就是??能保證順序消費的?。如果—個隊列對應了多個Consumer?同—批消息,可能會進?不同的Consumer處理,所以也就沒法保證消息的消費順序

三、MQ如何保證消息冪等性?

1?、?產者發送消息到服務端如何保持冪等?

Producer發送消息時,如果采?發送者確認的機制,那么Producer發送消息會等待Broker的響應?。如果沒有?收到Broker的響應,?Producer就會發起重試?。但是,?Producer沒有收到Broker的響應,也有可能是Broker?經正常處理完了消息,?只不過發給Producer的響應請求丟失了?。這時候Producer再次發起消息重試,就有可能造成消息重復。

Rocket?MQ的處理?式,?是會在發送消息時,給每條消息分配一個唯一的ID

//org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl
//for MessageBatch,ID has been set in the generating processif ( !(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);}public static void setUniqID(final Message msg) {if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null){msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,createUniqID());}
}

通過這個ID?,就可以判斷消息是否重復投遞。

?對于Kafka?,則會通過他的冪等性配置,?防??產者重復投遞消息造成的冪等性問題。

Kafka中,?需要打開idempotence冪等性控制后(默認是打開的,但是如果其他配置有沖突,會影響冪等性配?)?Kafka為了保證消息發送的Exactly-once語義,增加了?個概念:

  • . ?PID:每個新的Producer在初始化的過程中就會被分配—個唯—的PID?。這個PID對?戶是不可?的。
  • . ?Sequence?Numer: 對于每個PID?這個Producer針對Partition會維護—個sequenceNumber?。這是—個從?0開始單調遞增的數字?。當Producer要往同—個Partition發送消息時,?這個Sequence?Number就會加1?然后會隨著消息—起發往Broker
  • . ?Broker端則會針對每個<PID,Partition>維護—個序列號(?SN),?只有當對應的SequenceNumber = SN+1 時,?Broker才會接收消息,?同時將SN更新為SN+1 否則,SequenceNumber過?就認為消息已經寫?了,不需要再重復寫??。?如果SequenceNumber過??,就會認為中間可能有數據丟失了?對?產者就會?拋出—個OutOfOrderSequenceException

?

2?、消費者消費消息如何保持冪等?

這?以Rocket?MQ來討論如何防?消費者多次重復消費同—條消息。

?先,關于消息會如何往消費者投遞?Rocket?MQ官?明確做了回答:?

也就是說,在?多數情況下,不需要單獨考慮消息重復消費的問題?。但是,?同樣,?這個回答?也說明了,存在?—些?概率情況,?需要單獨考慮消費者的消息冪等問題。

?于有哪些?概率情況呢?最典型的情況就是?絡出現波動的時候?Rocket?MQ是通過消費者的響應機制來推offset的,如果consumerbroker上獲取了消息,正常處理之后,他要往broker返回—個響應,但是如果??絡出現波動,consumerbroker上拿取到了消息,但是等到他向broker發響應時,發??絡波動,?這個響應???丟失了,那么就會造成消息的重復消費?。因為broker沒有收到響應,就會向這個Consumer所在的Group重復投遞消息。

然后,消費者如何防?重復消費呢?

防?重復消費,?最主要是要找到—個唯—性的指標?。在Rocket?MQ中,?Producer發出—條消息后,?Rocket?MQ?內部會給每—條消息分配—個唯—的messageId?。?這個messageIdConsumer中是可以獲取到的?。所以??多數情況下,?這個messageId就是—個很好的唯—性指標?Consumer只要將處理過的messageId記錄下來, 就可以判斷這條消息之前有沒有處理過。

但是同樣也有—些特殊情況?。如果Producer是采?批量發送,或者是事務消息機制發送,那么這個messageId 就沒有那么好控制了?。所以,如果在真實業務中,更建議根據業務場景來確定唯—指標?。例如,在電商下單的??場景,訂單ID就是—個很好的帶有業務屬性的唯—指標?。在使?Rocket?MQ時,可以使?messagekey屬性???來傳遞訂單ID?。這樣Consumer就能夠?較好的防?重復消費。

最后,對于冪等性問題,?除了要防?重復消費外,?還需要防?消費丟失?。也就Consumer—直沒有正常消費?消息的情況。

Rocket?MQ中,?重復投遞的消息,會單獨放到以消費者組為維度構建的重試對列中?。如果經過多次重試后還?是?法被正常消費,那么最終消息會進?到消費者組對應的死信對列中?。也就是說,如果Rocket?MQ中出現了?死信對列,那么就意味著有—批消費者的邏輯是—直有問題的,?這些消息始終?法正常消費?。這時就需要針對?死信對列,?單獨維護—個消費者,對這些錯誤的業務消息進?補充處理?。這?需要注意—下的是,?Rocket?MQ??中的死信對列,默認權限是?法消費的,?需要?動調整權限才能正常消費。

?四、MQ如何快速處理積壓的消息

1?、消息積壓會有哪些問題。

Rocket?MQKafka來說,他們的消息積壓能?本來就是很強的,?因此,短時間的消息積壓,?是沒有太多問題??。但是需要注意,如果消息積壓問題—直得不到解決,?Rocket?MQKafka在?志?件過期后,就會直接刪除?過期的?志?件?。?這些?志?件上未消費的消息,就會直接丟失。

?對RabbitMQ來說, Classic?Queue經典對列和Quorum?Queue仲裁對列,如果有?量消息積壓,未被消費,就會嚴重影響服務端的性能,?因此需要重點關注?。??于Stream Queue流式對列,整體的處理機制已經?Rocket?MQKafka??較相似了,對消息積壓的承受能?就會?較強?。但是還是需要注意和Rocket?MQKafka相同的問題。

2?、怎么處理?量積壓的消息

產?消息積壓的根本原因還是Consumer處理消息的效率太低,所以最核?的?標還是要提升Consumer消費消息的效率?。如果不能從業務上提升Consumer消費消息的性能,那么最直接的辦法就是針對處理消息?較慢?的消費者組,增加更多的Consumer實例?。但是這?需要注意—下,增加Consumer實例是不是會有上限。

對于RabbitMQ?,如果是Classic Queue經典對列,那么針對同—個Queue的多個消費者,?是按照Work Queue 的模式,在多個Consuemr之間依次分配消息的?。所以這時,如Consumer消費能?不夠,那么直接加更多 ?Consumer實例就可以了?。這?需要注意下的是如果各個Consumer實例他們的運?環境,或者是處理消息 ?的速度有差別?。那么可以優化—下每個Consumer的?重(Qos屬性)?,從?盡量?的發揮Consumer實例的性能。

?對于Rocket?MQ?因為同—個消費者組下的多個Cosumer需要和對應Topic下的MessageQueue建?對應關 ??系,?—個MessageQueue最多只能被—個Consumer消費,?因此,增加的Consumer實例最多也只能和Topic 下的MessageQueue個數相同?。如果此時再繼續增加Consumer的實例,那么就會有些Consumer實例是沒有??MessageQueue去消費的,?因此也就沒有?了。

這時,如果Topic下的MessageQueue配置本來就不夠多的話,那就?法—直增加Consumer節點個數了?。這時 怎么處理呢?如果要快速處理積壓的消息,可以創建—個新的Topic?,配置?夠多的MessageQueue?。然后把???Consumer實例的Topic轉向新的Topic?,并緊急上線—組新的消費者,?只負責消費舊Topic的消息,并轉存到?新的Topic?。這個速度明顯會?普通Consumer處理業務邏輯要快很多?。然后在新的Topic上,就可以通過添???加消費者個數來提?消費速度了?。之后再根據情況考慮是否要恢復成正常情況。

其實這種思路和Rocket?MQ內部很多特殊機制的處理?式是—樣的?。例如固定級別的延遲消息機制,也是?把消息臨時轉到—個系統內部的Topic下,處理過后,再轉回來。?

?于Kafka?,也可以采?和Rocket?MQ相似的處理?式。

請問RocketMQ消息積壓一般產生原因是什么?如何解決消息積壓問題呢?


一般消息出現堆積原因有:


●?消費者消息處理邏輯異常,導致消息無法正常消費。
●?消息生產應用出現突發流量,消息生產速度遠大于消費速度,消息來不及消費出現堆積。
●?消費者依賴的下游服務耗時變長,消費線程阻塞等。
●?消費線程不夠,消費并發度較小,消費速度跟不上生產速度。
?
解決方案有:
(1)確認消息的消費耗時是否合理,通過打印堆棧日志信息分析如果是消費耗時較長,可以參考出來解決方案:
????????1. 分析和優化業務邏輯
????????????????●?簡化邏輯:仔細分析業務邏輯,去除不必要的步驟和復雜計算。
????????????????●?分解任務:將復雜的任務分解為多個簡單的子任務,逐步處理。
????????????????●?異步處理:對于不需要立即完成的任務,考慮使用異步處理,將其放到后臺執行。
????????2. 使用并行和并發技術
????????????????●?多線程處理:在消費者內部使用多線程來并行處理消息。
????????????????●?批量處理:如果業務允許,合并多條消息進行批量處理,減少處理次數。
????????3. 優化I/O操作
????????????????●?數據庫優化:優化數據庫查詢,使用索引、減少查詢次數或使用批量操作。
????????????????●?緩存使用:對于頻繁訪問的數據,使用緩存來減少數據庫或外部服務的訪問次數。
????????????????●?網絡優化:減少網絡請求的次數和延遲,使用更高效的協議或配置
(2)如果消費耗時正常,則有可能是因為消費并發度不夠導致消息堆積,需要逐步調大消費線程或擴容節點來解決。
(3)設置消息過期時間
在消息發送時設置TTL,消息在隊列中超過一定時間后自動過期并被丟棄。這樣可以確保系統不會處理過期的消息。這個要看具體的業務場景。

五、Rocket?MQ課程總結

所有的MQ?,其實處理的都是—類相似的問題?。但是,?互聯?卻誕?了不下??種MQ的產品?。為什么都做著差?不多的功能,但是卻有這么多不同的產品呢?這就需要對MQ的業務場景進?逐步的深度挖掘?。把業務問題理???解得越深刻,那么對這些不同產品的理解才會更深刻,??后處理各種各樣的業務問題,也才會有更多的可選???案,或者,換種說法,就是經驗?。這才是程序員最?的價值。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/89613.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/89613.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/89613.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

MySQL數據庫--SQL DDL語句

SQL--DDL語句 1&#xff0c;DDL-數據庫操作2&#xff0c;DDL-表操作-查詢3&#xff0c;DDL-表操作-創建4&#xff0c;DDL-表操作-數據類型4.1&#xff0c;DDL-表操作-數值類型4.2&#xff0c;DDL-表操作-字符串類型4.3&#xff0c;DDL-表操作-日期時間類型4.4&#xff0c;實例 …

Spring Cloud 服務追蹤實戰:使用 Zipkin 構建分布式鏈路追蹤

Spring Cloud 服務追蹤實戰&#xff1a;使用 Zipkin 構建分布式鏈路追蹤 在分布式微服務架構中&#xff0c;一個用戶請求往往需要經過多個服務協作完成&#xff0c;如果出現性能瓶頸或異常&#xff0c;排查會非常困難。此時&#xff0c;分布式鏈路追蹤&#xff08;Distributed…

Linux云計算基礎篇(6)

一、IO重定向和管道 stdin&#xff1a;standard input 標準輸入 stdout&#xff1a;standard output 標準輸出 stderr&#xff1a; standard error 標準錯誤輸出 舉例 find /etc/ -name passwd > find.out 將正確的輸出重定向在這個find.ou…

Python將COCO格式分割標簽繪制到對應的圖片上

Python將COCO格式分割標簽繪制到對應的圖片上 前言前提條件相關介紹COCO 格式簡介&#xff08;實例分割&#xff09;&#x1f4c1; 主要目錄結構&#xff1a;&#x1f4c4; JSON 標注文件結構示例&#xff1a;? 特點&#xff1a; 實驗環境Python將COCO格式分割標簽繪制到對應的…

光纖(FC)交換機與以太網(網絡)交換機的區別

光纖通道交換機&#xff08;FC交換機&#xff09;與普通以太網交換機&#xff08;網絡交換機&#xff09;在用途、協議、性能、可靠性等方面存在顯著差異&#xff0c;主要區別如下&#xff1a; 1. 用途與網絡類型 FC交換機 主要用于存儲區域網絡&#xff08;SAN&#xff09;&a…

電磁場有限元方法EX2.2-里茲法求解泊松方程控制的邊值問題

電磁場有限元方法EX2.2-里茲法求解泊松方程控制的邊值問題 簡單學習一下有限元法的基礎理論&#xff0c;書本為電磁場有限元經典教材&#xff1a; THE FINITE ELEMENT METHOD IN ELECTROMAGNETICS, JIAN-MING JIN 目錄 電磁場有限元方法EX2.2-里茲法求解泊松方程控制的邊值問…

云端備份與恢復策略:企業如何選擇最安全的備份解決方案

更多云服務器知識&#xff0c;盡在hostol.com 想象一下&#xff0c;某個凌晨&#xff0c;你突然發現公司所有重要數據都被加密&#xff0c;系統崩潰&#xff0c;業務停擺。有人給你打來電話說&#xff1a;“一切都被勒索了&#xff0c;恢復費用可能需要幾百萬。”這時&#xf…

OSPF高級特性之FRR

一、概述 眾所周知,IGP當中鏈路狀態路由協議(OSPF、ISIS)之所以可以代替我們的矢量路由協議(RIP),就是因為鏈路狀態路由協議可以根據某些特性快速的感知到路由的變化從而改變路徑。 前面我們已經介紹過了OSPF的其中一個快速收斂的機制,SPF算法,本章節將介紹另一個快速收斂機制,…

多元化國產主板,滿足更高性能、更高安全的金融發展

在金融行業數字化轉型的浪潮中&#xff0c;對于核心硬件的性能與安全需求達到了前所未有的高度。國產主板應運而生&#xff0c;憑借其卓越的多元化特性&#xff0c;為金融領域帶來了高性能運算與高安全防護的雙重保障&#xff0c;成為推動金融行業發展的關鍵力量。以高能計算機…

數據庫分布式架構:ShardingSphere 實踐

一、數據庫分布式架構概述 1.1 分布式架構概念 在當今數字化時代&#xff0c;隨著業務的不斷拓展和數據量的爆炸式增長&#xff0c;傳統的單機數據庫架構逐漸暴露出諸多局限性。例如&#xff0c;在電商大促期間&#xff0c;海量的訂單數據和用戶訪問請求會讓單機數據庫不堪重…

【WRFDA教程第二期】運行WRFDA 3DVAR/4DVAR數據同化

目錄 一、準備階段&#xff1a;下載并解壓測試數據二、運行 3DVAR 教學實驗日志分析&#xff08;wrfda.log&#xff09;進階實驗建議&#xff1a;對比不同設置的影響輸出文件說明 三、運行 4DVAR 教學實驗步驟1&#xff1a;準備工作目錄與環境變量步驟2&#xff1a;鏈接可執行文…

redis緩存三大問題分析與解決方案

什么是緩存&#xff1f; 緩存&#xff08;Cache&#xff09;是一種將熱點數據緩存在內存中&#xff08;如 Redis&#xff09;以加快訪問速度、減輕數據庫壓力的技術。 但引入緩存后可能出現 三大核心問題&#xff1a; 緩存穿透&#xff08;Cache Penetration&#xff09;緩存…

李宏毅機器學習筆記——梯度下降法

深度學習介紹 基于仿生學的一種自成體系的機器學習算法&#xff0c;包括但不限于圖像識別、語音、文本領域。 梯度下降法 作為深度學習算法種常用的優化算法 梯度下降法&#xff0c;是一種基于搜索的最優化方法&#xff0c;最用是最小化一個損失函數。梯度下降是迭代法的一…

day50/60

浙大疏錦行 DAY 50 預訓練模型CBAM模塊 知識點回顧&#xff1a; resnet結構解析CBAM放置位置的思考針對預訓練模型的訓練策略 差異化學習率三階段微調 ps&#xff1a;今日的代碼訓練時長較長&#xff0c;3080ti大概需要40min的訓練時長 作業&#xff1a; 好好理解下resnet18的…

Vue3 之vite.config.js配置

一、示例 import { defineConfig } from vite import vue from vitejs/plugin-vue import path from path // https://vitejs.dev/config/ export default defineConfig({plugins: [vue()],base: ./,build: {assetsDir: static, //指定靜態資源目錄rollupOptions: {input: {mai…

利用Gpu訓練

方法一&#xff1a; 分別對網絡模型&#xff0c;數據&#xff08;輸入&#xff0c;標注&#xff09;&#xff0c;損失函數調用.cuda() 網絡模型&#xff1a; if torch.cuda.is_available():netnet.cuda() 數據&#xff08;訓練和測試&#xff09;&#xff1a; if torch.cud…

使用excel中的MATCH函數進行匹配數據

一、背景 在平日處理數據時&#xff0c;經常需要將給定數據按照制定的數據進行排序&#xff0c;數量比較大時&#xff0c;逐個處理有點費事費力且容易出錯&#xff0c;這時可借助excel表格中match函數進行精確匹配。 二、使用match函數–精確排序操作步驟 主要工作步驟&#xf…

SpringCloud系列(41)--SpringCloud Config分布式配置中心簡介

前言&#xff1a;微服務意味著要將單體應用中的業務拆分成一個個子服務&#xff0c;每個服務的粒度相對較小&#xff0c;因此系統中會出現大量的服務&#xff0c;但由于每個服務都需要必要的配置信息才能運行&#xff0c;所以—套集中式的、動態的配置管理設施是必不可少的&…

wireshark介紹和使用

Wireshark 介紹 Wireshark 是一款開源的 網絡協議分析工具&#xff08;Packet Sniffer&#xff09;&#xff0c;用于捕獲和分析網絡數據包。它支持多種協議解析&#xff0c;適用于網絡調試、安全分析、網絡教學等場景。 官網&#xff1a;https://www.wireshark.org/ 特點&#…

【甲方安全建設】敏感數據檢測工具 Earlybird 安裝使用詳細教程

文章目錄 背景工具介紹安裝方法一、Linux 與 macOS 安裝流程二、Windows 系統安裝流程(一)三、Windows 系統安裝流程(二)四、錯誤處理使用說明模塊與規則機制集成與運維建議結語背景 隨著源代碼泄露、配置誤提交、密碼硬編碼等風險頻發,企業源代碼庫中潛在的敏感信息泄漏…