為什么需要消息隊列?
隨著互聯網快速發展,業務規模不斷擴張,技術架構從單體演進到微服務,服務間調用復雜、流量激增。為了解耦服務、合理利用資源、緩沖流量高峰,「消息隊列」應運而生,常用于異步處理、服務解耦和流量削峰等場景。
- 訂單系統:在電商系統中,訂單的創建、支付、發貨等步驟可以通過消息隊列進行異步處理和解耦。
- 日志處理:使用消息隊列將日志從應用系統傳輸到日志處理系統,實現實時分析和監控。
- 任務調度:在批量任務處理、任務調度系統中,通過消息隊列將任務分發給多個工作節點,進行并行處理。
- 數據同步:在數據同步系統中,消息隊列可以用于將變更的數據異步同步到不同的存儲系統或服務。
消息隊列的模型有哪些?
1、隊列模型(也稱點對點模型)
在隊列模型中,消息從生產者發送到隊列,并且每條消息只能被一個消費者消費一次。消費之后,消息在隊列中被刪除。適用于任務處理類場景,如一個任務只需要一個處理者執行。
2、發布/訂閱模型(Publish/Subscribe)
在發布/訂閱模型中,生產者將消息發布到某個主題(Topic),所有訂閱了該主題的消費者都會接收到該消息。每個訂閱者都會接收到相同的消息,適用于廣播通知、實時推送等場景
RabbitMQ、RocketMQ、Kafka 技術選型總結
對比維度 | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|
定位 | 功能豐富的消息中間件 | 分布式高可用消息中間件 | 高吞吐分布式日志系統 / 消息隊列 |
協議支持 | AMQP,MQTT,STOMP,WebSocket | 自研協議,支持部分 RocketMQ 專屬特性 | 自研 TCP 協議 |
吞吐量 | 中等(萬級 TPS) | 高吞吐(10-20 萬 TPS) | 極高吞吐(百萬級 TPS) |
延遲 | 低延遲,毫秒級 | 毫秒級,RT 性能好 | 延遲相對高,適合大數據場景 |
可靠性 | 高,支持消息確認、持久化 | 高,支持消息可靠投遞,事務消息 | 高,分區副本機制,適合大規模持久化 |
可用性 | 單節點或集群(鏡像隊列),集群復雜 | 自帶 NameServer + Broker 集群,易擴展 | 高可用(分布式架構,Zookeeper/KRaft) |
功能特性 | 豐富(死信隊列、延遲隊列、插件機制) | 支持定時消息、事務消息、順序消費 | 支持回溯消費、批量拉取、分布式存儲 |
典型場景 | 業務解耦、異步削峰、輕量級系統 | 分布式事務、順序消息、電商系統 | 大數據場景、日志收集、流式計算 |
運維復雜度 | 中等,Erlang 特殊運維 | 中等,自研協議,配套完善 | 相對高,依賴 Zookeeper(或 KRaft) |
社區 & 成熟度 | 社區活躍,成熟穩定 | 阿里主導,國內電商常用 | Apache 頂級項目,互聯網公司主流 |
選型建議
- 輕量級、快速開發首選 RabbitMQ:比如內部系統解耦、異步發送郵件等。
- 高可靠、事務型場景推薦 RocketMQ:例如訂單支付、分布式事務。
- 高吞吐、大數據日志場景推薦 Kafka:比如用戶行為日志、埋點數據、日志分析。
需要 秒級延遲+事務性:選 RocketMQ
需要 批量消費+高吞吐:選 Kafka
需要 豐富的協議支持+易用性:選 RabbitMQ
RabbitMQ
一、 RabbitMQ如何實現延遲隊列?
RabbitMQ 本身不支持延遲消息,但是可以通過它提供的兩個特性 TTL(Time-To-Liveand Expiration ,消息存活時間)、DLX(Dead Letter Exchanges,死信交換器)來實現。
還可以利用 RabbitMQ 插件來實現。
使用TTL + 死信隊列:
在 RabbitMQ 中,通過設置消息的 TTL 和死信交換器可以實現延遲隊列。不給原隊列(正常隊列)設置消費者,當消息在原隊列中達到 TTL后,由于還未被消費則會被轉發到綁定的死信交換器,消費者從死信隊列中消費消息,從而實現消息的延遲處理。
使用 RabbitMQ插件:延遲消息插件(rabbitmg-delayed-message-exchange)
通過安裝 RabbitMQ 的延遲消息插件,可以直接創建延遲交換器(DelayedExchange)在發送消息時,指定消息的延遲時間,RabbitMQ 會在消息達到延遲時間后將其轉發到對應的隊列進行消費。
TTL 和 DLX 簡要說明
TTL(Time-To-Live):指消息在隊列中的存活時間。你可以為隊列中的所有消息統-設置TTL,也可以為每條消息單獨設置TTL。當消息超過TTL時,消息會被標記為過期。
死信隊列(DLX):當消息在原隊列中過期、被拒絕(nack/reject)或隊列已滿時消息會被轉發到綁定的死信交換器(DLX)。DLX 可以將消息重新路由到死信隊列(即這里的延遲隊列)。
TTL + DLX 時序問題
因為隊列的特點就是先進先出,如果發送的消息延遲的時間不同,例如第一個延遲 10s、第二個延遲 5s、第三個延遲 1s。
那么后面的消息,需要等 10s 的消息消費完才能消費。當 10s 消息未被消費,則后續的消息都會被阻塞,即使消息設置了更短的延遲。
延遲消息插件原理
插件提供了一種新的交換器類型x-delayed-message 。這種交換器可以像普通的交換器樣,接收消息并根據路由鍵將消息路由到相應的隊列。只不過x-delayed-message 類型的交換機接收消息投遞后,不會直接路由到隊列中,而是存儲到 Mnesia(Mnesia 是 Erlang運行時中自帶的一個數據庫管理系統)中。等到消息達到可投遞時間,消息才會被投遞到目標隊列中。
二、 RabbitMQ中消息什么時候會進入死信交換機?
消息會在以下幾種情況進入死信交換機(Dead-LetterExchange,DLX)
- 消息被拒絕(Rejection):當消費者使用 basic.reject或 basic.nack明確拒絕消息,并且不要求重新投遞(requeue 設置為 false)時,消息會被直接投遞到死信交換機。
- 消息過期(TTL Expiration):RabbitMQ 支持為消息或隊列設置TTL(Time-To-Live),即生存時間。當消息超過指定的存活時間后還未被消費,它會自動變為死信并被發送到死信交換機。
- 隊列達到最大長度(或總大小):如果隊列設置了最大長度(x-max-length 或 x-max-length-bytes),當消息數量或總大小超出限制時,最早進入隊列的消息會被移入死信交換機。這些條件下進入死信交換機的消息,可以再通過死信隊列進行日志記錄、重新處理或監資。
各種場景下的死信隊列應用
- 消費者拒絕(Rejection):當消費者不能處理消息時,使用 basic.reject或將其拒絕并路由到死信隊列。死信隊列可以用來監控和分析消費者無法處basic.nack理的異常消息。
- 過期消息處理:在電商訂單系統中,TTL機制可用來控制支付超時的訂單,將超時未支付的訂單放入死信隊列中,并在后續對這些訂單進行自動取消。
- 隊列限流:如果隊列有容量限制,當消息超出隊列容量時,最早進入的消息會被丟入死信隊列,適用于系統負載較高的場景中進行限流和控制。
死信隊列的注意事項
避免死信循環:在配置死信交換機時,避免將死信交換機和原隊列配置為相互死信的循環,防止消息重復轉發引起的系統資源浪費。
三、 RabbitMQ無法路由的消息會去到哪里?
在 RabbitMQ 中,無法被路由的消息通常有以下幾種處理方式
- 丟棄消息:默認情況下,若消息無法找到符合條件的隊列(即沒有匹配的綁定關系),RabbitMQ 會直接丟棄消息,不會進行特殊處理。
- 備份交換機(Alternate Exchange):可以為交換機配置一個備份交換機,無法被路由的消息將被發送到備份交換機,再由備份交換機根據其綁定關系決定如何處理消息。例如,可以將這些消息發送到指定隊列進行保存或處理。
- 消息回退(Return Listener):在使用 mandatory 參數的情況下,如果消息無法路由,則會觸發返回機制,將消息退回到生產者,這樣生產者可以自行處理未路由的消息
不同策略的應用場景
- 備份交換機:適用于業務場景中需要監控或重處理未路由消息的情況 Return Listener
- 回退機制:適用于生產者對消息流控制嚴格的場景,確保無路由消息被生產者感知。
- 死信交換機:適合管理超時、失敗和未能路由的消息,適用于需要嚴格消息管控的系統。
備份交換機 VS 死信交換機
1、備份交換機(Alternate Exchange)場景:
關鍵詞:交換機沒人理我(沒人綁定),我總得找個地方放消息吧)
假設你有一個訂單服務,向 RabbitMQ 的某個“業務交換機”發訂單通知:
? 你把消息發給了交換機 order-exchange;
? 理論上,它應該會被路由到 order.created.queue;
? 但 你搞錯了路由鍵,或者壓根沒人綁定這個交換機;
? 這時,交換機找不到任何綁定的隊列,它會:
①丟棄消息?NO!不合適!
②轉發給一個你事先配置好的 備份交換機(AE),比如 backup-exchange;
③你可以監聽這個 backup-exchange,做日志記錄 / 告警 / 手動重試。
? 備份交換機適合場景:
? 想監控和記錄“消息發出后卻沒人接”的情況;
? 比如有些關鍵系統要求生產消息不能丟(風控、結算等);
? 但并不一定要馬上消費,只要先收著,慢慢分析也行。
2、死信交換機(DLX)場景:
關鍵詞:隊列能接,但消費失敗 / 異常了,我得另找辦法處理這些“爛尾消息”
比如你有個延遲支付隊列 payment.delay.queue,你設置了 TTL(延遲 30 分鐘):
? 有用戶下單但沒支付,消息進入該隊列;
? 如果用戶 30 分鐘都沒動靜,消息 TTL 到期;
? RabbitMQ 就會把這個消息轉發到你設置好的 死信交換機 DLX;
? DLX 會把這些消息路由到 payment.timeout.queue;
? 然后你再通知用戶 “訂單已失效” or 扣庫存、釋放資源。
? 死信交換機適合場景:
? 延遲隊列(超時處理);
? 消費失敗重試機制(超過次數后扔給 DLX);
? 消費者拒絕消息(如業務異常、數據錯誤);
RocketMQ
一、為什么RocketMQ不使用Zookeeper作為注冊中心?而選擇擇自己實現NameServer?
💡 Zookeeper 是個“重型老大哥”,RocketMQ 想要“簡單自由的小弟”
- Zookeeper 過于嚴謹,RocketMQ覺得“太重了”
Zookeeper 天生就走 強一致性,比如 所有節點必須保持一致再返回結果,適合搞分布式鎖、選主這種嚴肅業務。
但 MQ 場景追求的是 快、實時路由、靈活擴展,服務發現偶爾不一致問題 完全能接受。
👉 RocketMQ 選了一個“輕量級 NameServer”,只負責告訴客戶端 broker 在哪,怎么連,不用動不動就三方投票、同步,延遲自然更低。
- Zookeeper 節點互相“嘮嗑”太頻繁
ZK 節點需要互相同步元數據,三臺五臺節點隔三差五互相“心跳+投票”,集群一復雜網絡抖動就可能導致卡頓或者重選主。
RocketMQ 的 NameServer 完全無狀態、互不干涉,誰活著就提供服務,活著就干活,死了不管,簡單粗暴,天然就高可用。
- RocketMQ 天然“弱一致”,沒必要搞 Zookeeper 的架子
RocketMQ 的消息發送 實時性第一位,即使短時間內拿到的路由信息不是最新的,后面重試也能發出去,系統自己兜底。
👉 “能發出去消息就行,別搞那么復雜” —— RocketMQ 。
- 降低依賴,RocketMQ 想做“獨立大哥”
Zookeeper 依賴 Java + 繁瑣的配置、維護,對 RocketMQ 來說是外部“拖油瓶”,一旦掛了整個集群連消息路由都掛了。
自己造 NameServer,小而美,RocketMQ 完全自給自足,上線簡單、部署靈活,穩得很。
? 1. NameServer 本質是一個 KV 路由中心
Broker 啟動時,主動向所有 NameServer 注冊,發送自己的路由信息(IP、端口、topic 列表、寫隊列數等)。
NameServer 本地內存維護一張路由表(topic -> broker地址列表)。
Client(Producer/Consumer)啟動時,從 NameServer 拉取路由信息,后續自己緩存路由,本地直連 Broker。
Broker 定時上報心跳(默認 30s 一次),如果超過 2 分鐘沒有心跳,NameServer 自動剔除路由。
? 2. NameServer 完全無狀態
NameServer 之間沒有任何通信,每個節點維護自己的內存路由表,互不干涉。
擴容/縮容很簡單:丟幾臺 NameServer,Broker 會自動向新 NameServer 注冊,Client 會自動感知新 NameServer。
📌 為什么 RocketMQ 故意讓 NameServer 之間不通信、路由表冗余?
1、無狀態設計,活著就能用
NameServer 沒有 Leader,沒有互相同步的壓力,掛了隨便重啟,活著的 NameServer 能繼續提供服務。
Producer、Consumer 連多個 NameServer,只要有一個活著就能獲取路由信息,不存在單點問題。
2、極簡架構,強健易擴展
NameServer 無磁盤落盤,全內存維護路由表,數據來源于 Broker 的 定時心跳。
新 NameServer 加入,Broker 會主動注冊,NameServer 重啟后 Broker 也會重新注冊,自動修復,無腦擴容。
3、“數據冗余” 換 “極高可用”
你可以理解為 RocketMQ 追求的不是路由表的存儲效率,而是 “誰都可以提供路由服務”。
就算掛掉一半 NameServer,系統完全不影響正常發消息。
如何理解Broker?
💡 Broker 就是 RocketMQ 的“核心消息存儲+分發中心”
二、RocketMQ中關于事務消息的實現
RocketMQ 中的事務消息通過兩階段提交的方式來確保消息與本地事務的一致性。
第一階段(消息發送):生產者先將消息發送到 RocketMQ 的 Topic,此時消息的狀態為半消息(HalfMessage),消費者不可見。
然后,生產者執行本地事務邏輯,并根據本地事務的執行結果來決定下一步的操作。
第二階段(提交或回查):
如果本地事務成功,生產者會向 RocketMQ 提交 commit 操作,將半消息變為正式消息,消費者可見。
如果本地事務失敗,生產者會向 RocketMQ 提交 Rollback操作,RocketMQ 會丟棄該半消息。
如果生產者沒有及時提交commit或 Rollback操作,RocketMQ 會定時回查生產者本地事務狀態,決定是否提交或回滾消息。
用打車開發票做類比:
RocketMQ 階段 | Producer 類比動作 | Consumer 類比動作 |
---|---|---|
發送半消息 | 顧客說“我要開發票” | 暫時看不到 |
本地事務 | 顧客正在付款 | 不處理(消息未投遞) |
Commit 提交事務 | 付款成功,系統發發票 | 發票打印(消費消息) |
Rollback 回滾事務 | 付款失敗,取消發票 | 沒有任何動作 |
RocketMQ 事務消息的缺點主要有:
- 改造成本高:需要改造業務邏輯,接入特定接口,手動處理回查邏輯。
- 功能有限:只支持單事務消息,不支持跨消息事務。
- 可用性依賴強:MQ 集群掛了,半消息無法發送,事務整體不可用,應用流程中斷。
用本地消息機制替代二階段提交:
- 將發消息操作寫入數據庫本地消息表,與業務操作同一事務提交,保證業務和消息持久化的原子性。
- 通過后臺定時任務異步掃描未發送的消息,負責將消息投遞到 MQ,避免因 MQ 掛掉導致事務阻塞。
- 失敗的消息通過重試機制和死信隊列,保證消息最終一致性,減少人工干預。
簡單說,本地消息機制把“發消息”變成了數據庫事務的一部分,借助異步投遞和重試,繞過了傳統二階段提交對 MQ 可用性的強依賴,實現更可靠的事務補償方案。
Kafka
一、Kafka中Zookeeper的作用
在 Kafka 中,Zookeeper 扮演了集群協調和管理的核心角色。它的主要作用是管理和協調Kafka 集群中的元數據,幫助 Kafka 實現高可用性、負載均衡和容錯性。
以下是 Kafka 中 Zookeeper 的幾個關鍵作用:
- 管理 Broker 元數據:Zookeeper 負責管理 Kafka 集群中 Broker 的注冊、狀態監控。當有新的 Broker加入或離開集群時,Zookeeper能夠及時更新集群狀態
- 協調分區副本 Leader 選舉:當某個分區的 Leader 副本故障時,Zookeeper 協調副本的選舉過程,為該分區選出新的 Leader,確保分區高可用。
- 管理消費者的Offset:在早期版本的 Kafka 中,消費者的 Offset 信息存儲在Zookeeper 中,以便消費者在重啟后可以從上次消費的位置繼續消費。最新的 Kafka版本將 Offset 存儲移至 Kafka自身的內部主題consumer offsets,減少了對Zookeeper 的依賴。
- 動態配置和負載均衡:Zookeeper 保存著 Kafka 配置和拓撲信息,當集群發生變化時(如增加或減少分區、調整副本因子),Zookeeper協助完成負載均衡。
二、Kafka中關于事務消息的實現
Kafka 的事務消息不同于我們理解的分布式事務消息,它的事務消息是實現了消息的Exacty Once 語義,即保證消息在生產、傳輸和消費過程中的“僅一次”傳遞。Kafka 的事務消息主要通過以下幾個核心組件來實現:
- 事務協調器(Transaction Coordinator):負責事務的啟動、提交和中止管理,并將事務狀態記錄到transaction_state 內部主題。
- 冪等生產者(ldempotent Producer):Kafka Producer 通過 Producer ID(PID)識別每個事務的唯一性,確保同一事務的每條消息只寫入一次。
- 事務性消費:在消費過程中,消費者可以選擇隔離未完成事務的數據(通過read_committed 設置),只消費已提交的事務消息,確保數據的最終一致性。
Kafka 事務消息流程
- 啟動事務:事務性生產者向 Transaction coordinator 請求啟動事務
- 生產消息:生產者開始向 Kafka 寫入事務消息,每條消息都帶有唯一的 Producer ID和 Sequence Number ,以保證冪等性。
- 提交事務:在所有消息寫入完成后,生產者向事務調器發送commit或abort請3求,提交或中止事務。
- 事務性消費:消費者可以通過設置 read_committed 隔離級別,僅消費已提交的消息實現最終數據一致性。
Kafka 事務隔離級別
Kafka 提供了以下兩種消費隔離級別:
- read committed:消費者只會消費事務已提交的消息,確保數據的最終一致性。
- read uncommitted:消費者可以消費所有消息,包括尚未提交的事務消息,可能會讀到未提交的數據,適用于對事務一致性要求較低的場景
進一步理解 Kafka 的事務消息的 Exactly Once
Kafka 的事務消息和 RocketMQ 的事務消息是不一樣的。RocketMQ 解決的是本地事務的執行和發消息這兩個動作滿足事務的約束。而 Kafka 事務消息則是用在一次事務中需要發送多個消息的情況,保證多個消息之間的事務約束,即多條消息要么都發送成功,要么都發送失敗,就像下面代碼所演示的。
類別 | RocketMQ 的事務消息 | Kafka 的事務消息 |
---|---|---|
像什么? | 我打車時付款和發票要一起成功 | 我叫了 3 個朋友一起拼車,要么都能上車,要么誰也上不了 |
場景 | 處理本地事務 + 發消息的強一致性 | 保證多條消息發出去的事務性(原子性) |
作用 | 發不發消息,取決于本地操作成功與否 | 多條消息,要么全 commit,要么全 rollback |
例子 | 用戶下單成功才發“扣庫存”消息 | 發“新建訂單”“記賬”“打點日志”這3條消息,要么都成功,要么都失敗 |
三、如何處理重復消息?
只有讓消費者的處理邏輯具有冪等性,保證無論同一條消息被消費多少次,結果都是一樣的,從而避免因重復消費帶來的副作用。
如何冪等處理重復消息?這需要改造業務處理邏輯,使得在重復消息的情況下也不會影響最終的結果
主要是利用唯一標識(ID)去重。
在消息中引入全局唯一 ID,例如 UUID、訂單號等,利用 redis 等緩存,或者數據庫來存儲消息 ID,然后消費者在處理消息時可以檢查該消息 ID 是否存在代表此消息是否已經處理過。
去重緩存的使用
- 使用 Redis 等緩存系統存儲已經處理的消息 ID,以減少數據庫的查詢壓力。
- 可以為 Redis 中的去重記錄設置過期時間,例如7天,以便自動清理歷史消息,減小存儲壓力。
去重表的設計:
- 在數據庫中創建一張去重表,用來存儲已處理消息的 ID 及處理時間。在消費每條消息前,先查詢該表。
- 對于高并發場景,可以結合數據庫的唯一索引來避免多次插入同一個消息ID
如何保證消息的有序性?
保證消息有序性的常見方法如下:
1、單一生產者和單一消費者:
使用單個生產者發送消息到單個隊列,并由單個消費者處理消息。這樣可以確保消息按照生產者的發送順序消費。這種方法簡單但容易成為性能瓶頸無法充分利用并發的優勢
2、分區與順序鍵(Partition Key)
在支持分區(Partition)的消息隊列中(如Kafka、RocketMQ),可以通過Partition Key 將消息發送到特定的分區。每個分區內部是有序的,這樣可以保證相同Partition Key 的消息按順序消費。
例如,在訂單處理系統中,可以使用訂單號作為 Partition Key,將同一個訂單的所有消息路由到同一個分區,確保該訂單的消息順序。
3、順序隊列(Ordered Queue)
一些消息隊列系統(如RabbitMQ)支持順序隊列,消息在隊列中的存儲順序與投遞順序一致。如果使用單個順序隊列,消息將按順序被消費。
可以使用多個順序隊列來提高并發處理能力,并使用特定規則將消息分配到不同的順序隊列中。
不同消息隊列的順序性如何保證?
1、Kafka:Kafka 中的消息在分區內部是有序的。生產者在發送消息時,可以指定分區(Partition)。如果所有相同 Key 的消息都發送到同一個分區,則可以保證這些消息的順序。
通過配置生產者的hash函數,可以將同一類型的消息發送到相同的分區,保證順序。
在消費端,使用單線程消費者從特定分區讀取消息,可以確保消費的順序性。
2、RabbitMQ:RabbitMQ 通過單個隊列可以保證消息的順序,如果消息需要并發消費,則需要將其路由到不同的順序隊列中。
使用Message Grouping技術,將具有相同屬性的消息分組到一個隊列中,以確保組內消息的順序。
通過自定義路由策略,可以將同一業務邏輯的消息發送到相同的隊列,從而保證順序。
3、RocketMQ:RocketMQ 支持順序消息(Ordered Messages),生產者可以使用 send方法將消息發送到指定的分區隊列,并使用Message Queue Selector來選擇目標隊列(本質的實現和 kafka 是一樣的)
消費者端通過順序消費模式,可以從同一個消息隊列中按順序讀取消息,確保消息的順序性。
如何處理消息堆積?
消息堆積是指在消息隊列中,消息的生產速度遠大于消費速度,導致大量消息積壓在隊列
我們需要先定位消費慢的原因,如果是 bug 則處理 bug,同時可以臨時擴容增加消費速率,減少線上的資損。
如果是因為本身消費能力較弱,則可以優化下消費邏輯常見有以下幾種方式提升消費者的消費能力:
1、增加消費者線程數量:提高并發消費能力。
2、增加消費實例:在分布式系統中,可以水平擴展多個消費實例,從而提高消費速率。
3、優化消費者邏輯:檢查消費者的代碼,減少單個消息的處理時間。例如,減少 /O 操作、使用批量處理等.
注意上述的第二點
增加消費實例,一定要注意注意Topic 對應的分區/隊列數需要大于等于消費實例數不然新增加的消費者是沒東西消費的。因為一個Topic中,一個分區/隊列只會分配給一個消費實例。
除此之外還可以進行限流和降級處理
·對消息生產端進行限流,降低生產速率,避免消息積壓進一步惡化。
·對非關鍵消息進行丟棄或延遲處理,只保留高優先級的消息,提高系統的響應速度。
優化消費者邏輯常見做法
1、批量消費:通過一次性從隊列中消費多條消息(如批量讀取 100 條),可以減少每次拉取消息的網絡開銷,提高處理效率。
2、異步消費:在消費的同時不阻塞后續消息的消費。處理完一條消息后立即開使用異步處理方法,始處理下一條消息,提升并發度(但是要注意消息丟失的風險)
3、優化數據庫操作:
- 如果消費者在處理消息時需要頻繁訪問數據庫,可以通過數據庫連接池、SQL優化、緩存等手段減少數據庫操作的時間。
- 使用批量插入或更新操作,而不是逐條處理,可以顯著提升效率,
4、臨時擴展隊列的策略
- 臨時擴展多個消費者隊列:在消息積壓嚴重時,可以通過臨時擴展多個消費者隊列,將積壓的消息分配到不同的隊列中進行消費。消費完成后,可以將這些臨時隊列關閉。例如,在Kafka 中,可以增加分區數(Partition),同時擴展更多的消費者實例,分攤消費壓力。
- 在 RocketMQ 中可以增加隊列數。
5、使用多隊列調度機制:
·例如,使用 RabbitMQ 的Exchange機制,將消息按照特定規則路由到多個隊列中。這樣可以在消息堆積時,將不同類型的消息分開處理。
6、限流與降級的實現方式
生產者限流:
在生產者端增加限流邏輯,使用令牌桶、漏桶算法等限流策略,限制生產者的發送速率,從而避免消息隊列被快速填滿。
來緩解消息發送。例如,在 Kafka 中可以通過配置生產者的 1inger.ms和 batch.size:的速度。
7、消費者降級:
在消息堆積嚴重時,對低優先級的消息進行丟棄或延遲處理。只保留高優先級消息的消費,確保系統核心功能的正常運行。
可以在消費端增加優先級隊列或通過消息屬性區分優先級,先處理高優先級的消息
消息隊列設計成推消息還是拉消息?推拉模式的優缺點?
推拉模式主要討論的是 Consumer 和 Broker 之間的消息交互方式。
Producer 到 Broker:默認是推模式(Push),Producer 主動把消息推給 Broker,理由是:①Broker 可以用多副本等機制保證可靠性;②讓成百上千個 Producer 自己存消息、等 Broker 來拉,可靠性、實現成本太高,不現實。
所以:
- Producer → Broker:推(Producer 主動投遞)
- Consumer ? Broker:推 or 拉(消費方式才區分推拉)
一句話:推拉模式只討論消費端,生產者默認都是推!
1、推模式(Broker 主動推送)優缺點:
? 優點:
- 消息實時性強:Broker 一有消息立刻推送,延遲低;
- 消費者使用簡單:Consumer 被動接收,不用管拉不拉,來就接著。
? 缺點:
- 推送速率不可控:Broker只管推,不關心消費者忙不忙,推多了容易**“消費堆積”**;
- 容易“打爆消費者”:如果生產速度快于消費速度,消費者可能**“被推死”**,相當于被DDoS了。
總結:推模式實時爽,但消費者扛不住容易“爆倉”。
2、拉模式(Consumer 主動拉取)優缺點一圖流:
? 優點:
- 消費者掌控節奏:根據自己消費能力決定是否拉取,能控流、能限流。
- Broker 更輕松:只負責存消息,被動響應請求,沒負擔。
- 批量拉取更靈活:消費者可以按需一次拉多個消息,拉多少自己說了算。
? 缺點:
- 消息延遲高:得靠消費者定期輪詢,消息到達時不能立馬推送。
- 空輪詢浪費資源:消息沒到時,消費者不停拉取等于白請求,浪費 Broker 和網絡資源。
總結:“拉模式控得住節奏,但消息延遲擋不住”
推 or 拉,怎么選?
推模式 vs 拉模式:各有優缺點
- 推模式:實時性高,但容易推爆消費者;
- 拉模式:可控但有延遲。
RocketMQ、Kafka 為啥選拉模式?
- Broker 只管存消息,不依賴消費者狀態,職責單一;
- Broker 作為中心,越輕量越穩定;
- 消費端復雜多樣,主動拉取更靈活。
如何優化拉模式的缺點?
靠 “長輪詢”:
- 沒消息就先掛起請求,有消息立馬返回;
- 兼顧低延遲 + 不空輪詢;
- 消費者不用一直無腦請求,也能第一時間拿到消息。
📌 總結:RocketMQ/Kafka選拉,但用“長輪詢”整出了推的實時性、拉的靈活性。
如何保證消息不丟失?
關鍵看三個環節:
1?? 生產階段:生產者不丟
- 確認機制:發送消息后必須等待 Broker 確認(ACK);
- 異常重試:發送失敗及時 重試 + 報警 + 日志,別悄無聲息丟失;
- 同步/異步都要兜底:異步也得有回調錯誤處理,別掉包不管。
2?? 存儲階段:Broker 不丟
- 刷盤確認:Broker 將消息持久化到磁盤上后再返回ACK,內存級別不算完事;
- 副本機制:集群時,可以配置 多副本(例如寫入2臺才返回ACK),即使主節點掛了,消息依然安全。
3?? 消費階段:消費者不丟
- 消費ACK機制:業務邏輯執行完成后才給 Broker 確認ACK;
- 失敗重試機制:消費失敗可 自動重試 / 進死信隊列(DLQ);
- 避免假消費成功:不能拿到消息直接ACK,得真正處理完再確認。