深入剖析 RocketMQ 分布式事務:原理、流程與實踐

Apache RocketMQ 是一種分布式消息隊列系統,支持分布式事務消息,以確保在分布式系統中數據的一致性。它通過一種基于兩階段提交(2PC)的機制結合補償邏輯來實現分布式事務的最終一致性。以下是對 RocketMQ 分布式事務的詳細講解,包括其核心概念、工作原理、流程、實現機制及注意事項。


一、分布式事務背景與問題

在分布式系統中,事務的執行往往涉及多個服務或數據庫。例如,在電商場景中,用戶下單可能需要同時更新訂單狀態、扣減庫存、增加積分等操作,這些操作分布在不同的微服務和數據庫中。由于網絡延遲、服務宕機或事務回滾等原因,很難保證所有操作的原子性和一致性。RocketMQ 的事務消息機制通過將消息發送與本地事務綁定,解決了本地事務執行與消息發送的原子性問題,從而實現分布式系統的最終一致性。

關鍵問題

  • 如果先發送消息后執行本地事務,可能因本地事務失敗導致數據不一致。
  • 如果先執行本地事務后發送消息,可能因服務宕機導致消息未發送。
  • 分布式系統中需要一種機制來確保消息發送和本地事務的原子性。

RocketMQ 的事務消息通過半消息(Half Message)事務狀態檢查機制解決了上述問題。


二、RocketMQ 分布式事務的核心概念

  1. 事務消息(Transactional Message)

    • RocketMQ 提供的一種高級消息類型,用于確保消息發送和本地事務的原子性。
    • 目標是實現分布式系統的最終一致性,即消息的生產和本地事務要么都成功,要么都不執行。
  2. 半消息(Half Message)

    • 半消息是指生產者發送到 RocketMQ Broker 的消息,初始狀態下對消費者不可見。
    • 只有在事務提交(Commit)后,半消息才會變成正常消息,供消費者消費;如果事務回滾(Rollback),半消息會被丟棄。
  3. 事務狀態檢查(Message Checkback)

    • RocketMQ Broker 會定期檢查未確定狀態(Pending)的半消息,向生產者發起回調,查詢本地事務的狀態,以決定是提交(Commit)還是回滾(Rollback)。
  4. 兩階段提交(2PC)

    • RocketMQ 的事務消息基于 2PC 思想:
      • 第一階段:發送半消息,標記為“暫時不可投遞”。
      • 第二階段:根據本地事務的執行結果,提交(Commit)或回滾(Rollback)消息。
  5. 操作消息(Op Message)

    • RocketMQ 使用 Op 消息來記錄半消息的最終狀態(Commit 或 Rollback)。
    • Op 消息用于標識事務消息是否已確定狀態,避免重復處理。

三、RocketMQ 事務消息的工作原理與流程

RocketMQ 事務消息的工作流程可以分為正常消息發送與提交事務補償兩個部分。以下是詳細的流程:

1. 正常事務消息發送與提交流程
  1. 生產者發送半消息

    • 生產者通過 TransactionMQProducer 發送一個事務消息(半消息)到 RocketMQ Broker。
    • Broker 接收到半消息后,將其存儲在事務存儲系統中,但不生成消息索引,因此對消費者不可見。
    • Broker 返回一個確認(ACK)給生產者,表示半消息已接收。
  2. 生產者執行本地事務

    • 生產者在發送半消息成功后,執行本地事務(如數據庫操作)。
    • 本地事務的結果可能是成功(Commit)或失敗(Rollback)。
  3. 生產者提交事務狀態

    • 根據本地事務的結果,生產者向 Broker 發送第二次確認(ACK),通知事務狀態:
      • Commit:Broker 將半消息標記為可投遞,生成消息索引,消費者可以消費該消息。
      • Rollback:Broker 丟棄半消息,消費者不會看到該消息。
    • 如果是 Commit,Broker 會記錄一個 Op 消息,標記該半消息已提交。
  4. 消費者消費消息

    • 如果半消息被提交,消費者可以從 Broker 獲取并處理消息。
    • 如果半消息被回滾,消費者不會收到消息。
2. 事務補償流程

如果由于網絡中斷或生產者宕機,導致 Broker 未收到第二次 ACK(事務狀態),Broker 會啟動事務狀態檢查機制:

  1. Broker 定期檢查

    • Broker 每隔一段時間(如默認 60 秒)檢查未確定狀態的半消息。
    • Broker 向生產者發送回調請求,查詢對應半消息的本地事務狀態。
  2. 生產者實現回調接口

    • 生產者需要實現 TransactionListener 接口的 checkLocalTransaction 方法,用于響應 Broker 的狀態查詢。
    • 在該方法中,生產者檢查本地事務的狀態(如查詢數據庫),返回 Commit、Rollback 或 Unknown。
  3. Broker 處理回調結果

    • 如果返回 Commit,Broker 標記半消息為可投遞。
    • 如果返回 Rollback,Broker 丟棄半消息。
    • 如果返回 Unknown 或無響應,Broker 會在下一次檢查時繼續查詢,直到達到最大檢查次數(默認 15 次)或超時,之后可能丟棄消息。
流程圖

以下是 RocketMQ 事務消息的流程圖:

生產者                       Broker                       消費者|                            |                            || 1. 發送半消息            |                            ||------------------------->| 2. 存儲半消息(不可見)    ||                          |------------------------->|| 3. 收到ACK               |                            ||<-------------------------|                            || 4. 執行本地事務          |                            ||                          |                            || 5. 發送Commit/Rollback   |                            ||------------------------->| 6. 更新消息狀態            ||                          |   - Commit: 生成索引       ||                          |   - Rollback: 丟棄消息     ||                          |------------------------->||                          | 7. 消費者拉取消息          ||                          |<-------------------------|
3. 事務補償流程圖
Broker                       生產者|                            || 1. 檢查未確定狀態的半消息  ||------------------------->|| 2. 查詢本地事務狀態       ||<-------------------------|| 3. 根據狀態更新消息       ||   - Commit: 生成索引      ||   - Rollback: 丟棄消息    |

四、RocketMQ 事務消息的實現機制

  1. 半消息的存儲與不可見性

    • RocketMQ 通過修改消息的 Topic 和 Queue 屬性來實現半消息的不可見性。
    • 半消息存儲在特殊的 Topic(如 RMQ_SYS_TRANS_OP_HALF_TOPIC)中,消費者無法直接訪問。
    • 在提交(Commit)時,Broker 將消息的 Topic 和 Queue 恢復為原始值,并生成索引,使其對消費者可見。
  2. Op 消息的引入

    • RocketMQ 引入 Op 消息來標記半消息的最終狀態(Commit 或 Rollback)。
    • Op 消息存儲在 Broker 的獨立隊列中,用于記錄事務消息的狀態。
    • 如果半消息沒有對應的 Op 消息,說明事務狀態未確定,Broker 會觸發狀態檢查。
  3. 事務狀態檢查的實現

    • Broker 維護一個事務消息檢查定時任務,默認每 60 秒檢查一次未確定狀態的半消息。
    • 檢查時,Broker 通過生產者的 Group ID 找到對應的生產者實例,調用其 checkLocalTransaction 方法。
    • 生產者需要實現該方法,返回事務狀態。
  4. 異步刷盤的優化

    • RocketMQ 默認使用異步刷盤(Async Flush)來提高性能,但可能導致半消息未及時落盤。
    • 在高吞吐量場景中,RocketMQ 5.0 引入了批量 Op 消息優化,多個半消息可對應一個 Op 消息,減少寫放大問題。

五、代碼示例

以下是一個簡單的 Java 代碼示例,展示如何使用 RocketMQ 的事務消息:

import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;public class TransactionProducer {public static void main(String[] args) throws Exception {// 初始化事務消息生產者TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroup");producer.setNamesrvAddr("localhost:9876");// 設置事務監聽器producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 執行本地事務try {// 模擬數據庫操作System.out.println("Executing local transaction for message: " + msg);// 假設事務成功return LocalTransactionState.COMMIT_MESSAGE;} catch (Exception e) {// 事務失敗return LocalTransactionState.ROLLBACK_MESSAGE;}}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 事務狀態檢查// 檢查本地事務狀態(如查詢數據庫)System.out.println("Checking transaction status for message: " + msg);return LocalTransactionState.COMMIT_MESSAGE; // 或 ROLLBACK_MESSAGE}});// 啟動生產者producer.start();// 發送事務消息Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.println("Send result: " + sendResult);// 關閉生產者producer.shutdown();}
}

六、事務消息的優缺點

優點
  • 原子性保證:確保本地事務和消息發送的原子性,解決了分布式事務中的一致性問題。
  • 最終一致性:通過事務狀態檢查機制,保障消息的可靠投遞。
  • 高性能:RocketMQ 的事務消息機制基于異步刷盤和高可用架構,適合高并發場景。
  • 易用性:生產者只需實現 TransactionListener 接口,簡化分布式事務開發。
缺點
  • 復雜性:需要實現事務狀態檢查邏輯,增加了開發復雜度。
  • 性能開銷:事務消息的兩次提交和狀態檢查會增加一定的性能開銷。
  • 最終一致性:不保證強一致性,僅提供最終一致性,適合對實時性要求不高的場景。
  • 局限性:消費者端的事務一致性需自行處理(如通過重試機制)。

七、應用場景

RocketMQ 的事務消息廣泛應用于需要分布式事務的場景,例如:

  • 電商系統:用戶下單后,訂單系統更新訂單狀態并發送消息通知庫存、積分、物流系統。
  • 金融系統:轉賬操作需要同時扣款和通知目標賬戶,確保一致性。
  • 微服務架構:在多個微服務之間通過消息傳遞實現異步協作。

示例:在電商場景中,用戶支付訂單后:

  1. 訂單服務發送半消息到 RocketMQ,通知積分服務增加積分。
  2. 訂單服務執行本地數據庫更新(如訂單狀態從“未支付”改為“已支付”)。
  3. 如果數據庫更新成功,提交消息;否則,回滾消息。
  4. 積分服務消費消息,更新用戶積分。

八、注意事項

  1. 事務消息的隔離性

    • 事務消息不保證隔離性,消費者可能需要處理重復消息(通過冪等性設計)。
  2. Group ID 的唯一性

    • 事務消息的 Group ID 不能與其他類型的消息共享,Broker 通過 Group ID 定位生產者進行狀態檢查。
  3. 超時與重試

    • 配置合理的檢查間隔(默認 60 秒)和最大檢查次數(默認 15 次)。
    • 過多的檢查可能增加 Broker 負載,過少可能導致消息丟失。
  4. 本地事務的冪等性

    • 確保本地事務的 checkLocalTransaction 方法具有冪等性,以應對重復檢查。
  5. 高可用性

    • RocketMQ 支持主從復制和 Raft 協議(如 DLedger),確保事務消息在 Broker 故障時的高可用性。

九、與其他分布式事務方案的對比

方案描述優點缺點
2PC基于 XA 協議的同步兩階段提交,事務管理器協調所有參與者的提交或回滾。強一致性高延遲,阻塞式,單點故障風險
3PC2PC 的改進,增加預提交階段以減少阻塞時間。減少阻塞時間復雜性高,性能開銷大
TCC應用層事務,Try-Confirm-Cancel 模式,需手動實現補償邏輯。靈活性高,適合復雜業務開發復雜,需手動實現補償邏輯
RocketMQ 事務消息基于消息隊列的異步事務,結合 2PC 和補償邏輯實現最終一致性。異步高性能,易于微服務集成最終一致性,非強一致性
Saga將事務拆分為多個本地事務,通過事件驅動執行后續操作。高吞吐量,易擴展復雜補償邏輯,需處理回滾失敗

RocketMQ 事務消息的優勢

  • 相比 2PC/3PC,RocketMQ 事務消息異步執行,性能更高,適合高并發場景。
  • 相比 TCC,RocketMQ 的事務消息機制更簡單,無需手動實現 Confirm/Cancel 邏輯。
  • 相比 Saga,RocketMQ 的事務消息通過 Broker 的狀態檢查機制,減少了補償邏輯的開發量。

十、總結

RocketMQ 的分布式事務消息通過兩階段提交和事務狀態檢查機制,有效解決了分布式系統中本地事務與消息發送的原子性問題。其核心在于半消息的不可見性和 Broker 的事務狀態檢查,確保消息的可靠投遞和最終一致性。事務消息適用于電商、金融、微服務等場景,能夠簡化分布式事務的開發復雜度,同時提供高性能和高可用性。

關鍵點

  • 使用半消息和 Op 消息實現事務的原子性。
  • 通過定期檢查未確定狀態的半消息,確保事務的最終一致性。
  • 適合需要異步處理和最終一致性的分布式系統。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/91167.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/91167.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/91167.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

具身智能 自動駕駛相關崗位的技術棧與能力地圖

一、硬技能技術棧&#xff08;優先級排序&#xff09; 1. 核心領域技術&#xff08;★★★★★&#xff09;技術方向具體技能學習建議大模型實戰- VLA架構&#xff08;RT-2、PaLM-E&#xff09;開發/微調- 多模態對齊&#xff08;CLIP、Flamingo&#xff09;- 生成式策略&#…

實現了加載 正向 碰撞 雅可比 仿真

""" # 此示例從 URDF 文件中加載一個 UR10 機械臂的模型 # 隨后演示 Pinocchio 庫的基本功能,如正向運動學計算 # 雅可比矩陣計算、碰撞檢測以及動力學仿真 """ # 導入 meshcat 的幾何模塊,用于創建和管理可視化的幾何對象 import meshcat.geo…

【0基礎PS】PS工具詳解--畫筆工具

目錄前言一、畫筆工具的位置與快捷鍵?二、畫筆工具選項欄設置?三、畫筆工具的進階應用?四、常見問題及解決方法?總結前言 在 Photoshop 的眾多工具中&#xff0c;畫筆工具無疑是極具創造力和實用性的工具之一。無論是進行圖像繪制、照片修飾&#xff0c;還是特效制作&…

window10和ubuntu22.04雙系統之卸載ubuntu系統

window10和ubuntu22.04雙系統之卸載ubuntu系統&#xff09;1. 刪除Ubuntu系統占用的磁盤分區&#xff08;在Windows下操作&#xff09;2. 刪除ubuntu開機引導項1. winr出來終端提示框后輸入2. 然后會在命令行中顯示電腦的硬盤列表&#xff0c;輸入命令選擇安裝Windows的那個硬盤…

(C++)C++類和類的方法(基礎教程)(與Python類的區別)

前言&#xff1a; 本篇博客建議搭配&#xff1a;&#xff08;Python&#xff09;類和類的方法&#xff08;基礎教程介紹&#xff09;&#xff08;Python基礎教程&#xff09;-CSDN博客 一起學習使用&#xff1b; 源代碼&#xff1a; #include <iostream> #include &…

【NLP輿情分析】基于python微博輿情分析可視化系統(flask+pandas+echarts) 視頻教程 - 微博文章數據可視化分析-文章分類下拉框實現

大家好&#xff0c;我是java1234_小鋒老師&#xff0c;最近寫了一套【NLP輿情分析】基于python微博輿情分析可視化系統(flaskpandasecharts)視頻教程&#xff0c;持續更新中&#xff0c;計劃月底更新完&#xff0c;感謝支持。今天講解微博文章數據可視化分析-文章分類下拉框實現…

Git命令保姆級教程

Git 入門網站 https://learngitbranching.js.org/?localezh_CN Git 命令 git init // 在本地目錄內部會生成.git文件夾 git initgit clone // 從git服務器拉取代碼 // 代碼下載完成后在當前文件夾中會有一個 shop 的目錄&#xff0c;通過 cd shop 命令進入目錄。 git clone ht…

Java Ai For循環 (day07)

循環結構 for&#xff1a;循環語句的作用&#xff1a;可以將一段代碼重復的執行很多次for 循環語句格式&#xff1a;執行流程&#xff1a; 初始化語句執行條件判斷語句&#xff0c;看結果是 true&#xff0c;還是 false false結束&#xff0c;true繼續執行循環體語句執行條件控…

Directory Opus 使用優化

自定義快捷鍵 Directory Opus 移動標簽到另一欄 設置快捷鍵&#xff1a;ctrl←/→ 設置步驟&#xff1a; 打開【設置】—>選擇【自定義工具欄和快捷鍵】 選擇【新建】—>【新建窗口快捷鍵】 輸入快捷鍵命令 Go TABMOVEother此時可以點擊運行進行測試&#xff0c;…

Qt知識點2『Ubuntu24.04.2安裝Qt5.12.9各種報錯』

問題1&#xff1a;Qt安裝完畢后&#xff0c;新建一個最簡單的測試程序&#xff0c;但是QtCreator左側構建的三個按鈕呈現灰色&#xff0c;無法進行構建操作答&#xff1a;進入QtCreator的Kits界面&#xff08;工具-選項&#xff09;&#xff0c;點擊"自動檢測"下的De…

TS面試題

1.TS有哪些類型&#xff08;對比與js&#xff09;&#xff1f;關鍵字/語法用途示例any關閉類型檢查let a: any 4unknown類型安全的 anylet u: unknown 4; if (typeof u number) …never永不存在的值function err(): never { throw 0; }void無返回值function f(): void {}enu…

借助Early Hints和HarperDB改善網頁性能

對電商網站來說&#xff0c;糟糕的頁面性能可能會增加交易放棄率。一直以來&#xff0c;人們會使用CDN進行緩存從而縮短頁面加載時間&#xff0c;但即便實施了強大的緩存&#xff0c;消費者在通過移動網絡訪問這些網站時可能仍然會需要頻繁等待。最近誕生了一種名為“早期提示”…

MEMS陀螺如何成為無人機穩定飛行的核心?

在無人機自主翱翔、靈活機動并適應多變環境的背后&#xff0c;對其運動狀態——尤其是姿態——的精確感知是基石。作為飛行控制系統&#xff08;飛控&#xff09;的“內耳”&#xff0c;陀螺儀實時捕捉機體繞X、Y、Z三軸的旋轉角速度。這一核心數據是飛控進行姿態解算和維持飛行…

騰訊云拉取docker鏡像失敗怎么辦

ps:我直接按照步驟1和2就解決了 以下內容來自豆包 在騰訊云服務器上拉取 Docker 鏡像失敗&#xff0c;可以按照以下步驟排查和解決&#xff1a; 一、檢查網絡連接 確認服務器網絡正常 bash ping www.baidu.com # 測試公網連通性如果無法 ping 通&#xff0c;檢查服務器防火墻…

Apache FOP實踐——pdf模板引擎

文章目錄 基本概念設計思想具體實踐完整應用 基本概念 Apache FOP&#xff08;Formatting Objects Processor&#xff09;是一個基于Java的開源工具&#xff0c;用于將 XSL-FO&#xff08;XSL Formatting Objects&#xff09; 文檔轉換為PDF、圖像等格式。 設計思想 將內容&…

WebRTC核心組件技術解析:架構、作用與協同機制

引言&#xff1a;WebRTC的技術定位與價值 WebRTC&#xff08;Web Real-Time Communication&#xff09;作為一項開源實時通信標準&#xff0c;已成為瀏覽器原生音視頻交互、P2P數據傳輸的技術基石。自2011年開源以來&#xff0c;其標準化進程由W3C&#xff08;API層&#xff0…

OmniParser:提升工作效率的視覺界面解析工具

OmniParser&#xff1a;基于視覺的用戶界面解析工具在現代軟件開發中&#xff0c;用戶界面的自動化處理變得愈發重要。OmniParser 是一個強大的工具&#xff0c;旨在將用戶界面的截圖解析為結構化的、易于理解的元素&#xff0c;從而顯著提升了大型語言模型&#xff08;如GPT-4…

C#程序員計算器

使用C#語言編寫程序員計算器&#xff0c;使其能夠進行加減乘除和與或非等邏輯運算。 calculator.cs 代碼如下 using System; using System.Numerics; using System.Globalization;namespace Calculator1 {public enum CalcBase { Bin 2, Oct 8, Dec 10, Hex 16 }public en…

國產音頻DA轉換芯片DP7361支持192K六通道24位DA轉換器

產品概述 DP7361 是一款立體聲六通道線性輸出的數模轉換器&#xff0c;內含插值濾波器、Multi-Bit 數模轉換 器、模擬輸出濾波器&#xff0c;支持主流的音頻數據格式。 DP7361 片上集成線性低通模擬濾波器和四階 Multi-Bit Δ-∑調制器&#xff0c;能自動檢測信號頻率和主時鐘頻…

【C51單片機四個按鍵控制流水燈】2022-9-30

緣由C51&#xff0c;四個按鍵控制流水燈-嵌入式-CSDN問答 #include "REG52.h" sbit k1P3^0; sbit k2P3^1; sbit k3P3^2; sbit k4P3^3; unsigned char code lsd[]{127,191,223,239,247,251,253,254};//跑馬燈 void jsys(unsigned char y,unsigned char s){unsigned c…