摘要
本文主要介紹了Kafka的架構原理、消息訂閱模式以及在金融風控等領域的應用。Kafka作為數據中轉站,可同步不同系統數據,支持事件驅動架構,廣泛應用于金融支付與風控場景。其架構包括Producer、Broker、Topic、Partition、Replication、Message、Consumer和Consumer Group等組件,依賴Zookeeper保存元信息。Kafka的消息訂閱模式包括點對點、發布-訂閱、分區級訂閱、靜態訂閱和動態訂閱等,每種模式都有其特點和適用場景。此外,還探討了Kafka與RocketMQ的對比、消息丟失問題及解決方案、消息存儲與刪除策略等。
1. Kafka原理介紹
1.1. Kafka基本特點
Kafka 是一個分布式發布-訂閱消息系統。是大數據領域消息隊列中唯一的王者。最初由 linkedin 公司使用 scala 語言開發,在2010年貢獻給了Apache基金會并成為頂級開源項目。至今已有十余年,仍然是大數據領域不可或缺的并且是越來越重要的一個組件。Kafka適合離線和在線消息,消息保留在磁盤上,并在集群內復制以防止數據丟失。kafka構建在zookeeper同步服務之上。它與Flink和Spark有非常好的集成,應用于實時流式數據分析。
Kafka特點:
- 可靠性:具有副本及容錯機制。
- 可擴展性:kafka無需停機即可擴展節點及節點上線。
- 持久性:數據存儲到磁盤上,持久性保存。
- 性能:kafka具有高吞吐量。達到TB級的數據,也有非常穩定的性能。
- 速度快:順序寫入和零拷貝技術使得kafka延遲控制在毫秒級。
1.2. Kafka架構應用場景
Kafka 的使用場景其實非常廣泛,它本質上是一個 高吞吐、可擴展的分布式消息流處理平臺,常用于消息隊列(MQ)+ 日志系統 + 流式處理 三類場景。結合實際金融支付、風控和互聯網場景,我給你梳理下常見的應用:
1.2.1. 消息解耦與削峰填谷
- 場景:上下游系統解耦,生產者只需要把消息寫入 Kafka,消費者根據自身處理能力來消費。
- 示例:
- 訂單系統下單后,支付系統、庫存系統、營銷系統都訂閱訂單消息,異步處理,避免耦合。
- 雙十一、618 秒殺場景,流量洪峰先寫入 Kafka,消費者按能力消費,防止系統被壓垮。
1.2.2. 日志收集與統一處理
- 場景:收集分布式系統中的日志,統一存儲、分析。
- 示例:
- 用戶行為日志(點擊、瀏覽、下單)發送到 Kafka,再進入 大數據平臺(Hadoop、Hive、Spark、Flink) 分析。
- 風控系統采集交易流水日志,實時分析欺詐行為。
1.2.3. 實時流式計算
- 場景:Kafka + Flink / Spark Streaming / Storm,做實時數據處理。
- 示例:
- 實時監控:金融支付中的交易反欺詐系統,實時監控交易特征,秒級攔截可疑交易。
- 實時推薦:電商網站根據 Kafka 中的用戶行為數據實時更新推薦結果。
1.2.4. 數據總線(Data Pipeline)
- 場景:作為數據中轉站,把不同系統的數據可靠地同步到另一個系統。
- 示例:
- Kafka Connect 將數據庫 binlog(MySQL、PostgreSQL) 同步到 ES、HDFS、ClickHouse。
- Kafka MirrorMaker 實現跨數據中心的數據復制(異地多活)。
1.2.5. 事件驅動架構(EDA)
- 場景:通過事件驅動系統協作,而不是強耦合調用。
- 示例:
- 銀行風控:用戶發起交易 → Kafka 發布事件 → 多個風控模塊(黑名單校驗、設備指紋校驗、額度校驗)并行消費。
- 電商業務:下單事件觸發 → 物流系統、優惠券系統、積分系統都能訂閱同一個事件。
1.2.6. 金融支付與風控場景
這是你熟悉的領域,可以更貼近實際:
- 支付交易流水存儲:支付核心系統把交易流水寫入 Kafka,用于清算、對賬、監管報送。
- 實時風控:交易請求先進入 Kafka,由多個風控服務并行消費,判斷風險。
- 賬務/清結算:通過 Kafka 進行賬務流水的實時同步,避免單點瓶頸。
- 監控告警:Kafka 采集交易異常、延時、錯誤日志,實時觸發告警。
1.3. Kafka架構原理
- Producer:Producer即生產者,消息的產生者,是消息的入口。
- Broker:Broker是kafka實例,每個服務器上有一個或多個kafka的實例,我們姑且認為每個broker對應一臺服務器。每個kafka集群內的broker都有一個不重復的編號,如圖中的broker-0、broker-1等……
- Topic:消息的主題,可以理解為消息的分類,kafka的數據就保存在topic。在每個broker上都可以創建多個topic。
- Partition:Topic的分區,每個topic可以有多個分區,分區的作用是做負載,提高kafka的吞吐量。同一個topic在不同的分區的數據是不重復的,partition的表現形式就是一個一個的文件夾!
- Replication:每一個分區都有多個副本,副本的作用是做備胎。當主分區(Leader)故障的時候會選擇一個備胎(Follower)上位,成為Leader。在kafka中默認副本的最大數量是10個,且副本的數量不能大于Broker的數量,follower和leader絕對是在不同的機器,同一機器對同一個分區也只可能存放一個副本(包括自己)。
- Message:每一條發送的消息主體。
- Consumer:消費者,即消息的消費方,是消息的出口。
- Consumer Group:我們可以將多個消費組組成一個消費者組,在kafka的設計中同一個分區的數據只能被消費者組中的某一個消費者消費。同一個消費者組的消費者可以消費同一個topic的不同分區的數據,這也是為了提高kafka的吞吐量!
- Zookeeper:kafka集群依賴zookeeper來保存集群的的元信息,來保證系統的可用性。
1.4. Kafka 消息訂閱模式
Kafka 的消息訂閱模式,其實就是 Consumer 如何訂閱 Topic 并消費消息 的方式。不同于傳統 MQ(例如 RabbitMQ 的交換機 + 路由模式),Kafka的訂閱模式比較簡潔,但圍繞Topic/Partition/ConsumerGroup 有幾種典型方式。
模式 | 特點 | 消費次數 | 典型場景 |
點對點(Queue-like) | 組內消息只給一個 Consumer | 每消息 1 次(組內) | 負載均衡任務 |
發布-訂閱(Pub/Sub) | 多個組可獨立消費 | 每消息 N 次(組數) | 事件通知,多系統訂閱 |
分區級訂閱 | 精確分區分配策略 | 與分區數量相關 | 順序消費、流量均衡 |
靜態訂閱 | 手動指定分區 | 可控 | 用戶交易順序處理 |
動態訂閱 | 訂閱整個 Topic | 自動分配 | 常見業務消費 |
1.4.1. 點對點(Queue-like 模式)
機制:多個 Consumer 組成一個 Consumer Group;每條消息只會被組內一個 Consumer 消費一次;Partition 會被分配給組內某個 Consumer。類似于傳統 MQ 的“隊列模式”。
應用場景:下游服務需要做 負載均衡,如支付流水入賬任務,多個實例分攤任務量。
點對點模式通常是基于拉取或者輪詢的消息傳送模型,這個模型的特點是發送到隊列的消息被一個且只有一個消費者進行處理。生產者將消息放入消息隊列后,由消費者主動的去拉取消息進行消費。點對點模型的的優點是消費者拉取消息的頻率可以由自己控制。但是消息隊列是否有消息需要消費,在消費者端無法感知,所以在消費者端需要額外的線程去監控。
1.4.2. 發布-訂閱(Pub/Sub 模式)
機制:一個Topic可以被多個Consumer Group 訂閱;不同組的消費者都能收到完整的消息副本;Broker不會為單個消息維護“已消費狀態”,只管存消息。類似于傳統 MQ 的“廣播模式”。
應用場景:訂單事件同時通知 庫存系統、營銷系統、風控系統。支付交易流水同時寫入 清算系統、風控系統、監控系統。
發布訂閱模式是一個基于消息送的消息傳送模型,發布訂閱模型一個消息可以有多種不同的訂閱者同時消費。生產者將消息放入消息隊列后,隊列會將消息推送給訂閱過該類消息的消費者。由于是消費者被動接收推送,所以無需感知消息隊列是否有待消費的消息!但是consumer1、consumer2、consumer3由于機器性能不一樣,所以處理消息的能力也會不一樣,但消息隊列卻無法感知消費者消費的速度!所以推送的速度成了發布訂閱模模式的一個問題!假設三個消費者處理速度分別是8M/s、5M/s、2M/s,如果隊列推送的速度為5M/s,則consumer3無法承受!如果隊列推送的速度為2M/s,則consumer1、consumer2會出現資源的極大浪費!
1.4.3. 分區級訂閱(Partition Assignment)
Kafka中,Topic → Partition → Consumer 的綁定方式,有幾種策略:
- Range(范圍分配)按照分區號順序分配給消費者(可能導致負載不均)。
- RoundRobin(輪詢分配)輪流分配分區,更均勻。
- Sticky(粘性分配,推薦)保證分區盡可能穩定地分配給同一個 Consumer,減少 rebalance 開銷。
1.4.4. 靜態訂閱 vs 動態訂閱
- 靜態訂閱(Assign 模式):Consumer 手動指定訂閱的 Topic 和 Partition。適合嚴格順序消費的場景,比如一個用戶的交易記錄必須按順序處理。
- 動態訂閱(Subscribe 模式):Consumer 訂閱整個 Topic,具體分區由 Kafka 自動分配。常見于一般業務消費,擴展性更好。
1.4.5. 多播/單播對比
- 單播(Unicast):同一個 Consumer Group 內,消息只會被其中一個 Consumer 消費。
- 多播(Multicast):不同 Consumer Group 都能消費相同的消息,各自獨立。
2. kafka實戰開發經驗總結
2.1. 什么時候使用Kafka、MQ中間件
Kafka和傳統消息隊列(RabbitMQ、ActiveMQ、RocketMQ 等)雖然都是消息中間件,但定位、架構和適用場景不一樣,所以“什么時候用 Kafka、什么時候用 MQ”主要取決于你的業務需求、性能要求和系統架構。
2.1.1. 核心定位對比
特性 | Kafka | MQ(RabbitMQ / RocketMQ / ActiveMQ 等) |
設計目標 | 高吞吐、分布式日志存儲、流處理 | 可靠消息傳遞、業務解耦、事件驅動 |
消費模式 | 發布-訂閱(Pub/Sub),支持多消費組,消息可重復消費 | 隊列(Queue)+ 發布訂閱,通常一次消費即刪除 |
消息存儲 | 長期存儲(按時間/大小保留),可回溯消費 | 消費后一般刪除(RocketMQ 可配置保留) |
性能 | 百萬級 TPS | 一般萬級 TPS(RocketMQ 高性能除外) |
順序性 | 分區內有序 | 隊列內有序 |
事務支持 | 基本事務(Kafka 0.11+),但弱于 MQ | 較完善的事務消息(RocketMQ 事務消息較成熟) |
2.1.2. Kafka、MQ使用場景對比?
場景類型 | 典型需求 | Kafka 適合 | MQ 適合(RabbitMQ / RocketMQ) |
高吞吐日志采集 | 每秒百萬級日志/埋點采集,寫入存儲或分析系統 | ?(ELK、ClickHouse、Hadoop 等) | ?(吞吐不夠) |
實時流計算 | 需要實時處理數據流,窗口聚合、風控計算 | ?(Kafka Streams、Flink、Spark Streaming) | ?(延遲較大) |
大數據中間總線 | 系統間批量傳輸數據到數據湖、倉庫 | ? | ? |
消息回溯 / 重放 | 消費端需反復讀取歷史數據 | ?(可配置保留期) | ?(消費后刪除) |
系統解耦 | 系統間異步調用,降低耦合 | ??(可做但功能簡單) | ?(隊列 + 訂閱模式) |
事務一致性 | 確保消息與數據庫操作同時成功 | ??(Kafka事務弱) | ?(RocketMQ事務消息成熟) |
延遲/定時任務 | 定時發消息、延遲隊列 | ? | ?(RocketMQ 原生延遲,RabbitMQ TTL) |
復雜路由 | 消息根據規則路由到不同隊列 | ? | ?(RabbitMQ交換機靈活) |
輕量異步任務 | 低并發的任務調度,如發郵件/短信 | ?(過重) | ?(RabbitMQ / RocketMQ) |
多消費者獨立消費 | 多個系統獨立消費同一份消息 | ?(多消費組獨立) | ??(需額外配置) |
2.1.3. Kafka、MQ選型建議
需求 | 推薦 |
高吞吐(百萬級 TPS)、實時流、可回溯 | Kafka |
分布式事務、消息必達 | RocketMQ |
復雜路由、輕量級消息通信 | RabbitMQ |
實時流 + 事務一致性 | Kafka + 事務補償 / RocketMQ |
大數據日志采集 | Kafka |
延遲消息 / 定時任務 | RocketMQ / RabbitMQ(TTL) |
2.2. 點對點(Queue-like 模式) 中消息是否標記為已經消費?
在 Kafka 的點對點模式(Consumer Group 內單消費)下:消息不會被標記已消費;Broker 只管存消息,消費狀態由 Consumer Group 的 offset 記錄;
2.2.1. Kafka的設計和傳統 MQ 不一樣
在 RabbitMQ、ActiveMQ 等傳統 MQ 里,Broker 會記錄消息是否已經被某個消費者確認(ACK),已消費的消息就會被標記或刪除。但在 Kafka 中:Broker 不關心消費狀態,只負責存儲消息。Consumer 自己維護消費進度(offset)。
2.2.2. 消費進度的管理方式
每個Consumer Group在Kafka里會有一個 offset,指向該組在某個 Partition 中的消費進度(即下一個要消費的消息位置)。這個 offset 默認存儲在 __consumer_offsets
內置主題中,而不是存在消息本身。所以消息不會有“已消費標記”,只是有“消費到哪里了”的進度信息。
2.2.3. 結果表現
消息被消費后,仍然會在 Broker 的日志文件里保留(直到達到過期時間或超過配置的存儲大小才刪除)。如果 Consumer 提交 offset 出錯:可能會 重復消費(offset 沒提交成功,下次還會讀到同一條消息)。或者消息丟失(offset 提交過早,但實際還沒處理完)。
2.3. ?Kafka的消息保證不重復消費是依賴于消費者來實現?
Kafka本身的 設計哲學就是:Broker 只負責存儲和分發消息,不保證“只消費一次”,消息是否會被重復消費,要靠 Consumer(消費者端)自己來保證。
2.3.1. 為什么Kafka不能天然保證不重復消費?
- Offset提交機制決定的
- 消費者在拉取消息后,通常會在處理完成后提交offset。
- 如果處理成功但 offset 提交失敗 → 下次重啟會從舊 offset 繼續消費 → 重復消費。
- 如果offset 提交成功但處理失敗 → 消息“跳過”了 → 消息丟失。
- Kafka Broker 不維護消費狀態
- Broker不記錄某個消息是否已被消費。它只維護partition 的日志文件 + 保留策略。
2.3.2. Kafka 官方給出的消費語義有三種:
- At most once(至多一次):offset 先提交,再處理消息。風險:消息可能丟失,但絕不會重復。
- At least once(至少一次)(默認):先處理消息,再提交 offset。風險:可能重復消費,但不會丟
- Exactly once(恰好一次):Kafka 0.11 引入的特性,需要 冪等 Producer + 事務性寫入 配合,在特定場景(Kafka → Kafka,Kafka → 支持事務的存儲系統)可以實現。對 Consumer 來說,仍需保證冪等(例如業務處理可重試)。
2.3.3. 費端保證不重復消費的常見手段
- 冪等性處理:業務層面保證,即使同一條消息被重復處理,結果也是一樣。例如:數據庫
INSERT ... ON DUPLICATE KEY UPDATE
,支付扣款時先判斷“訂單是否已扣款”。 - 唯一鍵去重:給每條消息一個全局唯一 ID(如訂單號、流水號、事務 ID)。Consumer 處理前先檢查該 ID 是否已經處理過。
- 事務:Kafka 提供
read-process-write
的事務性 API,確保消費和結果寫入要么都成功,要么都失敗。
2.4. 金融風控場景中什么場景使用kafka什么時候使用RocketMQ?
2.4.1. 金融風控中Kafka的典型使用場景
Kafka 的強項是 高吞吐、低延遲、可回溯的流數據處理,在風控里更多用于 數據實時流轉和大數據計算。總結:Kafka 在風控中是 數據總線,負責把交易、埋點、外部數據等實時送到決策引擎、大數據平臺,強調吞吐、實時、可回溯。
場景 | 說明 | 為什么用 Kafka |
實時交易數據流 | 實時獲取交易流水、支付請求、賬戶變動信息,推送到風控計算引擎 | 高吞吐(百萬級 TPS),延遲低,可多消費組并行處理 |
用戶行為埋點 & 反欺詐 | 埋點收集登錄、設備指紋、IP 地址、地理位置、操作行為等 | 可與 Flink / Spark Streaming 做實時風險評分 |
外部數據接入 | 接入黑名單、征信、第三方風控數據,匯聚到數據湖 | 數據量大,可持久化保存并回放 |
實時監控 & 預警 | 將風控決策事件推送到監控系統(如 Grafana、Prometheus) | 多消費者訂閱同一數據流,不互相影響 |
批量模型重算 / 回測 | 保存歷史交易數據,用于風控模型回測和調優 | Kafka 可按時間窗口讀取歷史消息 |
2.4.2. 金融風控中RocketMQ的典型使用場景
RocketMQ在風控里更多用于可靠事務消息、事件驅動和任務調度,強調可靠性和一致性。
場景 | 說明 | 為什么用 RocketMQ |
風控決策結果落庫 | 決策結果(放行、攔截、人工復核)與交易落庫保持事務一致 | 事務消息保證數據庫與消息一致提交 |
反欺詐補救動作 | 如凍結賬戶、攔截支付、通知客服,這些動作必須可靠執行 | 消息必達,支持重試機制 |
延遲風控處理 | 某些規則需要延遲檢查(如 24 小時內重復提現) | 原生延遲消息支持 |
風控事件驅動 | 觸發下游系統(反洗錢 AML 系統、合規審核)執行任務 | 可靠投遞保證事件不會丟失 |
批量風險任務調度 | 夜間批量跑風控規則、信用評估任務 | 結合延遲消息 / 定時任務執行 |
2.4.3. 對比總結表
需求 | Kafka | RocketMQ |
高吞吐交易流采集 | ? | ? |
用戶行為埋點 | ? | ? |
實時流計算(Flink) | ? | ? |
歷史數據回放 | ? | ? |
決策結果落庫事務一致 | ? | ? |
延遲/定時風控規則 | ? | ? |
風控處置任務必達 | ??(需自己實現) | ? |
異步事件驅動 | ?? | ? |
2.5. kafka消息可能丟失呢?
是的,Kafka 消息是可能丟失的,而且丟失的原因和環節都比較多,尤其在金融風控這種高可靠場景下,需要特別注意。
2.5.1. 生產端(Producer)丟消息的原因
原因 | 說明 | 解決方式 |
acks=0 / acks=1 | - - | 生產環境用 |
未處理發送異常 | 發送時網絡異常/超時,應用沒做 | 對 send() 結果做異常處理或重試 |
未開啟重試機制 | Producer 默認 retries=0,發送失敗就丟 | 設置 |
緩沖區未刷盤 | Producer 進程掛了,緩沖區里的數據沒發出去 | 開啟 |
2.5.2. Kafka Broker丟消息的原因
原因 | 說明 | 解決方式 |
ISR(同步副本集合)不完整 | Leader 崩潰時,Follower 未完全同步數據,被選為 Leader 后數據丟 | 開啟 (比如 2),配合 |
日志未落盤 | Kafka 默認批量刷盤( | 關鍵主題可調低刷盤間隔(會降低性能) |
unclean.leader.election.enable=true | 允許不完整副本選為 Leader,會導致數據丟失 | 生產環境設為 |
磁盤損壞/數據目錄丟失 | 物理損壞、磁盤滿等異常 | 監控磁盤、配 RAID / 云存儲快照 |
2.5.3. 消費端(Consumer)丟消息的原因
原因 | 說明 | 解決方式 |
自動提交 offset(enable.auto.commit=true) | Kafka 默認 5s 自動提交 offset,如果消費未完成就宕機,下次會跳過未處理的消息 | 改為手動提交 offset(commitSync/commitAsync)在處理完成后提交 |
先提交 offset 后處理 | 提交成功但處理失敗,消息丟失 | 先處理,再提交 offset |
批量消費未處理完 | 一批數據部分成功部分失敗,但 offset 已提交到最大位置 | 使用精細化 offset 控制或單條提交 |
消費端丟棄異常消息 | 消費代碼中 try-catch 后直接忽略異常數據 | 對異常消息入死信隊列(DLQ) |
2.5.4. Kafka 可靠性最佳實踐(防止丟消息)
如果你在金融風控系統中用 Kafka,建議這樣配置:
2.5.4.1. 生產端(Producer)
acks=all
retries=Integer.MAX_VALUE
enable.idempotence=true
max.in.flight.requests.per.connection=1
2.5.4.2. Broker
min.insync.replicas=2
unclean.leader.election.enable=false
2.5.4.3. 消費端(Consumer)
enable.auto.commit=false
# 業務處理完成后手動 commit
并且:
- 對關鍵消息使用 多副本 + 事務消息(Kafka 0.11+)
- 對異常消息使用 死信隊列(Dead Letter Queue)
- 定期做 Kafka 數據校驗和對賬
2.6. kafka 只負責存儲消息,消息存儲后怎么刪除?
Kafka 的設計理念就是 Broker 只負責存儲消息,不管消費狀態,所以消息是否被“消費”跟消息刪除沒有關系。Kafka 刪除消息主要依賴 保留策略(Retention Policy) 來決定。Kafka 的消息刪除機制與消費無關,主要靠 保留策略 來實現:
- 基于時間(默認 7 天)
- 基于大小(超過磁盤上限)
- 按 segment 文件批量刪除(不是逐條刪除)
- 日志壓縮(compact):保留每個 key 的最新值
👉 所以 Kafka 可以保證消息存儲高效,同時也能靈活應對“短期日志存儲”和“長期狀態存儲”兩類需求。
2.6.1. 基于時間的保留(Log Retention by Time)
- 配置:
log.retention.hours
(默認 168 小時 = 7 天)也可以用log.retention.minutes
或log.retention.ms
精確控制 - 含義:超過指定時間的消息會被刪除(底層是刪除舊的 segment 文件)。
- 應用場景:日志采集場景,保留最近 7 天的日志即可。
2.6.2. 基于大小的保留(Log Retention by Size)
- 配置:
log.retention.bytes
,每個 partition 保留的最大數據量。 - 含義:超過大小后,舊數據被刪除。
- 應用場景:磁盤資源有限時,通過大小控制日志量。
2.6.3. 基于日志段(Log Segment)的刪除
- Kafka 底層不是一條條刪除,而是按 segment 文件 刪除:每個 partition 的日志會被切分成多個 segment 文件(默認 1GB 或
log.segment.bytes
配置)。當一個 segment 文件中的數據都過期后,整個文件會被刪除。 - 好處:批量刪除,效率高。避免逐條消息管理,保持簡單高效。
2.6.4. 基于日志壓縮(Log Compaction)
除了“保留多久/多大”,Kafka 還支持 Log Compaction(日志壓縮):
- 配置:
cleanup.policy=compact
- 含義:只保留每個 key 的最新一條記錄,舊的記錄會被清理。
- 應用場景:適合保存狀態類數據,比如“用戶的最新余額”“賬戶狀態”。
對比:
delete
策略:過期消息整體刪除(常用)。compact
策略:按 key 只保留最新值(狀態存儲用)。- 也可以同時啟用:
cleanup.policy=delete,compact
。
2.6.5. 手動刪除(不常用)
- 可以通過 管理工具 刪除整個 Topic 或 Partition:
kafka-topics.sh --delete --topic test --zookeeper zk1:2181
- 或者修改 Topic 配置,強制縮短保留時間,然后觸發清理。
2.7. RockMQ存在消息丟失的情況嗎?
是的,RocketMQ 也可能丟消息,只是它的設計在可靠性上比 Kafka強一些(尤其是事務消息、消息必達機制),但是如果配置不當或使用不正確,依然會丟。
2.7.1. 生產端(Producer)丟消息的原因
RocketMQ 的 Producer 如果配置不合理,丟消息概率也不低。
原因 | 說明 | 解決方式 |
發送失敗未重試 | 網絡抖動、Broker 不可用時,Producer | 開啟 |
單向發送(Oneway) |
| 金融風控禁止用單向發送 |
異步發送沒回調檢查 | 異步模式下 | 異步發送要有失敗重試邏輯 |
事務消息回查失敗 | 事務半消息未提交且回查失敗,消息被丟棄 | 保證事務回查邏輯可用且冪等 |
2.7.2. Broker(服務端)丟消息的原因
RocketMQ 在 Broker 存儲階段,如果磁盤或刷盤機制配置不當,也可能丟。
原因 | 說明 | 解決方式 |
刷盤方式為異步(ASYNC_FLUSH) | Broker 先寫內存,稍后刷盤,宕機前的數據會丟 | 對金融交易/風控類主題用 |
主從復制為異步(ASYNC_MASTER) | 主節點宕機時,未同步到從節點的數據丟失 | 金融場景用 (同步雙寫) |
磁盤損壞 | 物理損壞、文件系統異常 | RAID、云存儲快照、磁盤監控 |
Broker 崩潰導致 CommitLog 未落盤 | 異步刷盤下,崩潰會丟部分消息 | 關鍵業務同步刷盤 |
2.7.3. 消費端(Consumer)丟消息的原因
RocketMQ 消費模式是“至少一次”(At-Least-Once),所以不會無故丟消息,但如果 offset 管理不當,也會丟。
原因 | 說明 | 解決方式 |
自動提交 offset | 消費端默認自動提交,如果提交后處理失敗,消息會被“認為已消費”而丟失 | 金融場景用手動 ACK,處理完成后再提交 |
消費失敗但未重試 | 默認會重試,但如果配置了 | 確保開啟重試,并配置死信隊列(DLQ) |
順序消息消費阻塞 | 順序消費模式下,長時間阻塞會導致 Broker 認為消費成功 | 在業務超時前主動返回 RECONSUME_LATER |
消息被丟到 DLQ | 重試超過限制次數會進入死信隊列,不被正常消費 | 定期掃描并人工處理 DLQ |
2.7.4. RocketMQ 防丟最佳實踐(金融風控場景)
如果是風控、交易、清算等強一致性場景,建議這樣配置:
2.7.4.1. 生產端
producer.setRetryTimesWhenSendFailed(5);
producer.setRetryTimesWhenSendAsyncFailed(5);
# 避免 Oneway 模式
2.7.4.2. Broker
properties復制編輯
flushDiskType=SYNC_FLUSH
brokerRole=SYNC_MASTER
2.7.4.3. 消費端
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setMaxReconsumeTimes(16); // 超過進入DLQ
consumer.setMessageModel(MessageModel.CLUSTERING); // 集群模式
并且:
- 對異常消息統一進入死信隊列(DLQ),人工或自動補償。
- 對事務消息保證回查邏輯冪等且可用。
- 對關鍵主題開啟消息軌跡(Message Trace)方便排查丟失點。
2.8. RocketMQ 中記錄消息被消費嗎?
2.8.1. RocketMQ與Kafka的區別
- Kafka:Broker 不關心消息是否消費,只存消息;offset 在 Consumer Group 端維護,存儲在
__consumer_offsets
主題中;所以消息不會有“消費標記”。 - RocketMQ:Broker 會記錄消費進度(消費隊列 + 消費進度);通過 ConsumeQueue + Offset 機制來追蹤消息是否被消費。
2.8.2. RocketMQ 的消費隊列(ConsumeQueue)
RocketMQ 的存儲模型:
- CommitLog:存儲所有原始消息(順序寫入,類似 Kafka 的 log)。
- ConsumeQueue:存儲消費隊列(索引文件),記錄消息在 CommitLog 中的位置 + 消息 Tag 等。
- ConsumerOffset:存儲消費者組消費到哪條消息。
所以 RocketMQ Broker 能知道:某個消息被投遞給哪些消費者組。某個消費者組消費到了哪條消息(offset)。
2.8.3. RocketMQ消費確認(ACK)
消息隊列 | offset 存儲位置 | 維度 | 說明 |
Kafka | Broker 內部主題 | Consumer Group | 由消費者提交,Kafka 統一管理 |
RocketMQ (集群模式) | Broker (ConsumerOffsetManager) | Consumer Group + Queue | 由 Broker 統一管理 |
RocketMQ (廣播模式) | Consumer 本地文件 | Consumer 實例 | 每個消費者獨立維護 |
- RocketMQ 默認是“至少一次(At least once)”語義:Consumer 拉取消息 → 成功處理 → 返回 ACK 給 Broker;Broker更新該 Consumer Group的offset。
- 如果處理失敗:消息會被重新投遞(支持延時重試 / 死信隊列 DLQ)。
這意味著RocketMQ確實在 Broker層面知道一條消息有沒有被消費確認。
2.8.4. RocketMQ消息刪除
- 與 Kafka 類似,RocketMQ 也不會因為“消息被消費”就刪除消息。
- 消息刪除仍然依賴 過期時間(默認 3 天,可配置
fileReservedTime
)。 - 所以歷史消息依然會在磁盤上保留一段時間,方便回溯。
2.9. 防止消息丟失解決方案有哪些?
防止消息丟失的方案可以分為 端到端三個環節:生產端 → 消息中間件 → 消費端。不同環節丟失的原因不一樣,解決方案也不同。
2.9.1. 1?? 生產端防丟
方法 | 作用 | 適用 MQ |
可靠 ACK 機制( | 確保消息寫入多個副本后才算成功 | Kafka / RocketMQ |
冪等發送( | 避免重試導致重復數據 | Kafka |
發送異常處理與重試 | 網絡異常、超時必須重發 | 所有 MQ |
事務消息 | 與數據庫操作原子性一致 | Kafka / RocketMQ |
Outbox + CDC | 數據庫先落地消息,再異步投遞 MQ | 所有 MQ |
批量 flush 控制 | 防止 Producer 崩潰時緩沖區數據丟失 | Kafka / RocketMQ |
2.9.2. 2?? 消息中間件防丟
方法 | 作用 | MQ 特性 |
多副本存儲( | 避免單點故障丟消息 | Kafka / RocketMQ |
同步刷盤( | 避免 Broker 崩潰時丟內存數據 | RocketMQ |
ISR 同步策略( | 僅在副本足夠時才確認消息 | Kafka |
禁止 Unclean Leader 選舉 | 避免落后副本當 Leader 導致丟數據 | Kafka |
磁盤健康監控 | 防止物理故障丟消息 | 所有 MQ |
延遲隊列 + DLQ | 遇到處理異常可暫存或丟到死信隊列 | 所有 MQ |
2.9.3. 3?? 消費端防丟
方法 | 作用 | MQ 特性 |
手動 ACK / 手動提交 offset | 確保消費成功后才確認 | Kafka / RocketMQ |
先處理,再提交 offset | 防止處理失敗但 offset 已提交 | Kafka |
消費重試機制 | 消費失敗可重新投遞 | 所有 MQ |
死信隊列(DLQ) | 多次失敗的消息單獨存儲 | 所有 MQ |
冪等消費 | 防止重試導致重復處理 | 所有 MQ |
消費進度持久化 | 確保宕機恢復后能從上次位置繼續 | Kafka / RocketMQ |
2.9.4. 4?? 端到端防丟組合方案
根據業務重要性選擇:
- 高一致性(金融交易、風控決策)
- RocketMQ 事務消息 / Kafka 事務 + 冪等發送
- 消費端手動 ACK + 冪等消費
- Broker 多副本 + 同步刷盤
- 高吞吐實時流(埋點、日志分析)
- Kafka
acks=all
+ ISR 策略 - 消費端自動提交 offset + 異步批量處理(可容忍極少丟失)
- Kafka
- 可補償業務(異步任務、通知)
- 先寫數據庫 + 定時補償任務
- 或 Outbox + CDC
一句話總結:防丟必須從生產端可靠發送、MQ 高可用存儲、消費端可靠確認 三方面同時保障,單靠“先寫數據庫再發 MQ”并不是唯一解。
2.10. Outbox + CDC 是什么方案?
Outbox + CDC 是一種業界常用的防止消息丟失 & 保證數據庫與消息一致性的架構模式,尤其在金融、支付、風控等對一致性要求高的系統里很常見。
2.10.1. 為什么會有 Outbox + CDC
在普通架構里,常見有兩種做法:
- 先寫數據庫,再發 MQ → 如果發 MQ 失敗,需要補償邏輯,否則不一致。
- 先發MQ,再寫數據庫 → 如果寫庫失敗,就會產生無效消息。
這兩個都不是完美方案,尤其金融風控場景下,數據和消息必須100%一致。于是出現了 Outbox(發件箱)模式 + CDC(變更數據捕獲) 的組合。
2.10.2. Outbox 模式
核心思想:
- 在業務數據庫中增加一個消息表(outbox table)
- 在同一個本地事務中同時寫入:
- 業務數據(比如風控結果、交易流水)
- 消息數據(寫入消息表)
這樣,業務數據和消息數據要么一起成功,要么一起失敗,天然一致。
例子:
BEGIN;
INSERT INTO risk_decision (id, user_id, decision) VALUES (123, 456, 'REJECT');
INSERT INTO outbox (id, topic, payload, status) VALUES (999, 'risk_event', '{...}', 'NEW');
COMMIT;
2.10.3. CDC(Change Data Capture)
核心思想:
- 用一個獨立的消息發送服務(或中間件)實時監聽 outbox 表變化
- 通過讀取數據庫 binlog(如 MySQL binlog、PostgreSQL WAL)來捕獲新插入的消息
- 把消息發送到 MQ(Kafka / RocketMQ)
- 發送成功后更新 outbox 表的狀態(比如
status=PROCESSED
)
常用的 CDC 工具:
- Debezium(最常用,支持 Kafka、Pulsar、RabbitMQ)
- Canal(阿里開源)
- Maxwell
2.10.4. 方案優缺點
優點 | 缺點 |
強一致:數據庫和 MQ 消息一定一致 | 增加一張 outbox 表,額外占用數據庫存儲 |
無分布式事務:只用本地事務 | 需要部署 CDC 服務,運維復雜度增加 |
可重放:消息表是天然的“重發記錄” | 消息延遲略高(取決于 CDC 頻率) |
可審計:可以追蹤每條消息的生命周期 | 適合異步業務,不適合極低延遲場景 |
2.10.5. 在金融風控中的典型用法
- 風控結果寫庫 + 發消息:確保風控決策落庫成功才發消息給下游(凍結、人工復核、黑名單同步等)
- 交易流水落庫 + 事件廣播:先保證交易落庫,再異步廣播到風控、賬務、營銷等系統
- 反欺詐事件記錄 + 數據分析:確保反欺詐事件既能寫庫留痕,也能推送到 Kafka 做實時分析
2.10.6. 流程圖
[業務服務]| 本地事務寫兩份數據|----------------------------| 業務表 + Outbox 消息表↓
[數據庫]↓ CDC監聽 binlog
[CDC 服務 (Debezium/Canal)]↓
[MQ (Kafka/RocketMQ)]↓
[下游消費者]
💡 總結
Outbox + CDC 本質是:
- Outbox:保證業務數據和消息的原子性
- CDC:負責異步、可靠地把消息送到 MQ
這樣既避免了分布式事務的復雜性,又能保證數據與消息強一致,是金融風控里非常穩妥的方案。