在高并發的場景下,由于消息產生速度超過消費速度,可能會導致消息積壓的問題。本文將介紹 RocketMQ 消息積壓的原因和如何處理積壓問題。
什么是消息積壓
消息積壓是使用 MQ 消息隊列系統中,最常見的一種性能問題。如下圖所示,當生產端的生產效率大于消費端的消費效率就會造成消息處理不完的情況,也就叫 “消息積壓”。
消息積壓原因
消息積壓的原因可以歸結為以下幾點:
- 消費者處理速度慢:當消息消費者的處理能力跟不上消息產生的速度時,消息將積壓在消息隊列中。
- 消息消費失敗:當消息消費者由于某種原因無法正確消費消息時,消息會一直留在消息隊列中,導致積壓。
- 配置不合理:如果消息隊列的容量設置過小或者消費者的線程數設置過少,都可能導致消息積壓。
處理消息積壓的方法
當發生了消息積壓,這時就得想辦法趕緊把積壓的消息消費完,就得考慮提高消費能力,一般有兩種辦法:
消費者擴容
Consume 實例數量小于 MessageQueue 的數量時,增加 Consume 實例可以對消費者進行擴容,來提高消費能力。比如一個 Topic 有 4 個 MessageQueue,2 個消費者進行消費,如果增加一個消費者,明細可以加快拉取消息的頻率。
消息遷移Queue擴容
Consume 實例數量大于等于 MessageQueue 的數量時,這種情況再擴容 Consume 實例就沒什么用,就得考慮擴容 MessageQueue。
可以新建一個臨時的 Topic,臨時的 Topic 多設置一些 MessageQueue,然后先用一些消費者把消費的數據丟到臨時的 Topic,因為不用業務處理,只是轉發一下消息,還是很快的。接下來用擴容的消費者去消費新的 Topic 里的數據,消費完了之后,恢復原狀。比如一個 Topic 有 4 個 MessageQueue,并且有 4 個消費者進行消費。
通過前期設置提高消費能力
雖然通過擴容可以在一定程度上解決消息積壓問題,但在一些特殊情況下還是會出現消息積壓的問題。
消費者消息拉取的速度也取決于本地消息的消費速度,如果本地消息消費的慢,就會延遲一段時間后再去拉取。又是在什么情況下消費者會延遲一段時間后后再去拉取呢?
增加消息隊列的容量
如果消息隊列的容量設置過小,也有可能會導致消息積壓。
可以通過增加消息隊列的容量來緩解積壓問題。但需要注意,過大的消息隊列容量可能會增加消息處理的延遲。
消費者拉取的消息存在 ProcessQueue,消費者是有流量控制的,如果出現下面三種情況,就不會主動去拉取:
- ProcessQueue 保存的消息數量超過閾值(默認 1000);
- ProcessQueue 保存的消息大小超過閾值(默認 100M);
- 對于非順序消費的場景,ProcessQueue 中保存的最后一條和第一條消息偏移量之差超過閾值(默認 2000)。
優化消息消費的邏輯
檢查消息消費邏輯是否存在性能瓶頸或者不必要的復雜計算。優化消息消費的邏輯可以提高消費速度,減少消息積壓。
對于順序消費的場景,ProcessQueue 加鎖失敗,也會延遲拉取,這個延遲時間是 3s。
設置消息消費失敗的處理機制
當消息消費失敗時,可以根據業務需求選擇合適的處理方式。可以將失敗的消息記錄下來,后續再次消費;或者將失敗的消息發送到死信隊列進行處理。
監控和報警機制
建立監控和報警機制,及時發現消息積壓的情況并采取相應的措施。可以通過監控指標、日志或者專業的監控工具來實現。
消息批量處理
消費者每次一條一條消費會很慢,如果再有事務的情況下會更慢。此時可能通過批量的方式獲取數據,再對數據批量操作,
RocketMQ消息積壓是在高并發場景下常見的問題,需要合理的處理策略來保證消息系統的穩定性和性能。