如何使用rocketmq實現分布式事務?

什么是rocketmq事務消息

事務消息是 Apache RocketMQ 提供的一種高級消息類型,支持在分布式場景下保障消息生產和本地事務的最終一致性。

RocketMQ的分布式事務又稱為“半消息事務”。

事務消息處理流程

RocketMQ是靠半消息機制實現分布式事務

事務消息:MQ 提供類似 X/Open XA 的分布事務功能,通過 MQ 事務消息能達到分布式事務的最終一致。

半消息:暫不能投遞的消息,發送方已經將消息成功發送到了 MQ 服務端,但是服務端未收到生產者對該消息的二次確認,此時該消息被標記成“暫不能投遞”狀態,處于該種狀態下的消息即半消息。

半消息回查:由于網絡閃斷、生產者應用重啟等原因,導致某條事務消息的二次確認丟失,MQ 服務端通過掃描發現某條消息長期處于“半消息”時,需要主動向消息生產者詢問該消息的最終狀態(Commit 或是 Rollback),該過程即消息回查。

事務消息交互流程如下圖所示。

圖片

1. 生產者將消息發送至Apache RocketMQ服務端。

2. Apache RocketMQ服務端將消息持久化成功之后,向生產者返回Ack確認消息已經發送成功,此時消息被標記為"暫不能投遞",這種狀態下的消息即為半事務消息。

3. 生產者開始執行本地事務邏輯。

4. 生產者根據本地事務執行結果向服務端提交二次確認結果(Commit或是Rollback),服務端收到確認結果后處理邏輯如下:

??二次確認結果為Commit:服務端將半事務消息標記為可投遞,并投遞給消費者。

??二次確認結果為Rollback:服務端將回滾事務,不會將半事務消息投遞給消費者。

5. 在斷網或者是生產者應用重啟的特殊情況下,若服務端未收到發送者提交的二次確認結果,或服務端收到的二次確認結果為Unknown未知狀態,經過固定時間后,服務端將對消息生產者即生產者集群中任一生產者實例發起消息回查。?說明?服務端回查的間隔時間和最大回查次數,請參見參數限制。

6. 生產者收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。

7. 生產者根據檢查到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟4對半事務消息進行處理。

事務消息生命周期

圖片

事務消息

??初始化:半事務消息被生產者構建并完成初始化,待發送到服務端的狀態。

??事務待提交:半事務消息被發送到服務端,和普通消息不同,并不會直接被服務端持久化,而是會被單獨存儲到事務存儲系統中,等待第二階段本地事務返回執行結果后再提交。此時消息對下游消費者不可見。

??消息回滾:第二階段如果事務執行結果明確為回滾,服務端會將半事務消息回滾,該事務消息流程終止。

??提交待消費:第二階段如果事務執行結果明確為提交,服務端會將半事務消息重新存儲到普通存儲系統中,此時消息對下游消費者可見,等待被消費者獲取并消費。

??消費中:消息被消費者獲取,并按照消費者本地的業務邏輯進行處理的過程。此時服務端會等待消費者完成消費并提交消費結果,如果一定時間后沒有收到消費者的響應,Apache RocketMQ會對消息進行重試處理。

??消費提交:消費者完成消費處理,并向服務端提交消費結果,服務端標記當前消息已經被處理(包括消費成功和失敗)。Apache RocketMQ默認支持保留所有消息,此時消息數據并不會立即被刪除,只是邏輯標記已消費。消息在保存時間到期或存儲空間不足被刪除前,消費者仍然可以回溯消息重新消費。

??消息刪除:Apache RocketMQ按照消息保存機制滾動清理最早的消息數據,將消息從物理文件中刪除。

示例

下面是使用 RocketMQ 實現事務的一個例子:

生產者實現事務監聽器:

首先,需要實現一個 RocketMQ 的事務監聽器接口RocketMQLocalTransactionListener,這個接口定義了在發送和確認事務消息時的回調方法。您需要根據業務邏輯來實現這些方法。

executeLocalTransaction 方法:

這個方法在發送事務消息時被調用,用于執行本地事務。具體步驟如下:

1. 獲取消息中的事務 ID。

2. 根據事務索引來模擬本地事務執行的狀態。

3.?將事務狀態放入?localTrans?映射中,以備后續?checkLocalTransaction?方法使用。

根據您的代碼,executeLocalTransaction?方法中模擬了三種狀態:

??如果狀態為 0,表示本地事務成功,返回?RocketMQLocalTransactionState.COMMIT,消息將被提交。

??如果狀態為 1,表示本地事務失敗,返回?RocketMQLocalTransactionState.ROLLBACK,消息將被回滾。

??如果狀態為 2,表示本地事務狀態未知,返回?RocketMQLocalTransactionState.UNKNOWN

checkLocalTransaction 方法:

這個方法在消息的確認狀態時被調用,用于檢查本地事務的狀態。具體步驟如下:

  1. 獲取消息中的事務 ID。

  2. 根據之前保存在?localTrans?映射中的事務狀態,決定消息的提交、回滾或未知。

checkLocalTransaction?方法會根據之前在?executeLocalTransaction?方法中保存的狀態來返回相應的事務狀態。

@RocketMQTransactionListener??
public?class?TransactionListenerImpl?implements?RocketMQLocalTransactionListener?{??private?AtomicInteger?transactionIndex?=?new?AtomicInteger(0);??private?ConcurrentHashMap<String,?Integer>?localTrans?=?new?ConcurrentHashMap<String,?Integer>();??@Override??public?RocketMQLocalTransactionState?executeLocalTransaction(Message?msg,?Object?arg)?{??String?transId?=?(String)?msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);??System.out.printf("####?executeLocalTransaction?is?executed,?msgTransactionId=%s?%n",??transId);??int?value?=?transactionIndex.getAndIncrement();??int?status?=?value?%?3;??localTrans.put(transId,?status);??if?(status?==?0)?{??//?Return?local?transaction?with?success(commit),?in?this?case,??//?this?message?will?not?be?checked?in?checkLocalTransaction()??System.out.printf("?#?COMMIT?#?Simulating?msg?%s?related?local?transaction?exec?succeeded!?###?%n",?msg.getPayload());??return?RocketMQLocalTransactionState.COMMIT;??}??if?(status?==?1)?{??//?Return?local?transaction?with?failure(rollback)?,?in?this?case,??//?this?message?will?not?be?checked?in?checkLocalTransaction()??System.out.printf("?#?ROLLBACK?#?Simulating?%s?related?local?transaction?exec?failed!?%n",?msg.getPayload());??return?RocketMQLocalTransactionState.ROLLBACK;??}??System.out.printf("?#?UNKNOW?#?Simulating?%s?related?local?transaction?exec?UNKNOWN!?\n");??return?RocketMQLocalTransactionState.UNKNOWN;??}??@Override??public?RocketMQLocalTransactionState?checkLocalTransaction(Message?msg)?{??String?transId?=?(String)?msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);??RocketMQLocalTransactionState?retState?=?RocketMQLocalTransactionState.COMMIT;??Integer?status?=?localTrans.get(transId);??if?(null?!=?status)?{??switch?(status)?{??case?0:??retState?=?RocketMQLocalTransactionState.COMMIT;??break;??case?1:??retState?=?RocketMQLocalTransactionState.ROLLBACK;??break;??case?2:??retState?=?RocketMQLocalTransactionState.UNKNOWN;??break;??}??}??System.out.printf("------?!!!?checkLocalTransaction?is?executed?once,"?+??"?msgTransactionId=%s,?TransactionState=%s?status=%s?%n",??transId,?retState,?status);??return?retState;??}??
}

消費者

@Service??
@RocketMQMessageListener(topic?=?"${demo.rocketmq.transTopic}",?consumerGroup?=?"string_trans_consumer")??
public?class?StringTransactionalConsumer?implements?RocketMQListener<String>?{??@Override??public?void?onMessage(String?message)?{??System.out.printf("-------?StringTransactionalConsumer?received:?%s?\n",?message);??}??
}

這些步驟基本上涵蓋了使用 RocketMQ 實現事務的主要過程。可以根據具體的業務需求和環境進行調整和配置。

總結

使用半消息實現分布式事務在提供分布式事務支持和保證消息傳遞的原子性方面具有優勢,但需要引入MQ并提供查詢事務接口。在選擇是否使用半消息實現分布式事務時,需要根據具體的業務需求和系統性能要求來進行權衡和選擇。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/696789.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/696789.shtml
英文地址,請注明出處:http://en.pswp.cn/news/696789.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Spring之AOP源碼解析(上)

Aop相關注解 EnableTransactionManagementEnableAspectJAutoProxyEnableAsync... 從注解切入來看看這些注解都干了什么 Import注解作用簡述 注入的類一般繼承ImportSelector或者ImportBeanDefinitionRegistrar接口 繼承ImportSelector接口&#xff1a;selectImports方法返回…

pandas/geopandas 筆記:判斷地點在不在路網上 不在路網的點和路網的距離

0 導入庫 import osimport pandas as pd pd.set_option(display.max_rows,5)import osmnx as oximport geopandas as gpd from shapely.geometry import Point 1 讀取數據 假設我們有 如下的數據&#xff1a; 1.1 新加坡室外基站位置數據 cell_stationpd.read_csv(outdoor…

TSINGSEE青犀AI智能分析網關V4初始配置與算法相關配置介紹

TSINGSEE青犀AI智能分析網關V4內置了近40種AI算法模型&#xff0c;支持對接入的視頻圖像進行人、車、物、行為等實時檢測分析&#xff0c;上報識別結果&#xff0c;并能進行語音告警播放。硬件管理平臺支持RTSP、GB28181協議、以及廠家私有協議接入&#xff0c;可兼容市面上常見…

通過例子學習golang的Goroutine

Go 語言中的 Goroutine 是一種輕量級的并發執行單位。它可以與其他 Goroutine 并發地執行&#xff0c;而不需要顯式地管理線程的創建和銷毀。Goroutine 是 Go 語言并發模型的核心組成部分&#xff0c;它使得編寫并發程序變得更加簡單和高效。 例一 創建兩個function&#xff0…

linux下ffmpeg調用GPU硬件解碼(VDPAU/VAAPI)保存文件

本文講解在linux下面&#xff0c;如何通過ffmpeg調用GPU硬件解碼&#xff0c;并保存解碼完的yuv文件。 其實&#xff0c;ffmpeg自帶的例子hw_decode.c這個文件&#xff0c;就已經能滿足要求了&#xff0c;因此&#xff0c;本文就嘗試講解以下hw_decode這個例子。hw_decode.c可以…

watchpoint

前言 內存被踩&#xff0c;通過 watchpoint 找到真兇 實例 以 smsc911x 網卡驅動為基體&#xff0c;進行實驗&#xff0c;和網卡本身功能無關&#xff0c; 每執行一次 ifconfig eth0 up&#xff0c;就會調用一次 smsc911x_open()&#xff0c;我在這里設計了一段代碼&#xf…

數學知識(四)(容斥原理、博弈論)

一、容斥原理 容斥原理公式 一共加或者減的式子個數 &#xff08;一&#xff09;利用容斥原理解決求能被質數整除的數的個數 890計算能被整除的數的個數 因為一共有2^n-1種選法&#xff0c;可以用位運算的方式枚舉&#xff0c;對于得到的每一種選法&#xff0c;根據存在的數…

六、回歸與聚類算法 - 邏輯回歸與二分類

線性回歸欠擬合與過擬合線性回歸的改進 - 嶺回歸分類算法&#xff1a;邏輯回歸模型保存與加載無監督學習&#xff1a;K-means算法 1、應用場景 2、原理 2.1 輸入 2.2 激活函數 3、損失以及優化 3.1 損失 3.2 優化 4、邏輯回歸API 5、分類的評估方法 5.1 精確率和召回率 5.2…

找出作弊的人

文章目錄 題目描述輸入描述輸出描述樣例1解釋:樣例2代碼 題目描述 公司組織了一次考試,現在考試結果出來了&#xff0c;想看一下有沒人存在作弊行為,但是員工太多了,需要先對員工進行一次過濾,再進一步確定是否存在作弊行為。 過濾的規則為:找到分差最小的員工ID對(p1,p2)列表…

【Spring】IoC容器 控制反轉 與 DI依賴注入 配置類實現版本 第四期

文章目錄 基于 配置類 方式管理 Bean一、 配置類和掃描注解二、Bean定義組件三、高級特性&#xff1a;Bean注解細節四、高級特性&#xff1a;Import擴展五、基于注解配置類方式整合三層架構組件總結 基于 配置類 方式管理 Bean Spring 完全注解配置&#xff08;Fully Annotatio…

Kotlin學習 6

1.接口 interface Movable {var maxSpeed: Intvar wheels: Intfun move(movable: Movable): String}class Car(var name: String, override var wheels: Int 4, _maxSpeed: Int) : Movable {override var maxSpeed: Int _maxSpeedget() fieldset(value) {field value}overr…

C語言讀取 ini 配置文件,修改/添加鍵值對

C語言讀取 ini 配置文件&#xff0c;修改/添加鍵值對 C語言讀取 ini 配置文件&#xff0c;對section中的鍵值對進行修改/添加&#xff0c;如果section不存在&#xff0c;則在末尾將新的section/key/value 添加進去。 一、了解什么是INI文件&#xff1f; ini 文件是Initializ…

【大數據】Flink 之部署篇

Flink 之部署篇 1.概述和參考架構2.可重復的資源清理3.部署模式3.1 Application 模式3.2 Per-Job 模式&#xff08;已廢棄&#xff09;3.3 Session 模式 Flink 是一個多用途框架&#xff0c;支持多種不同的混合部署方案。下面&#xff0c;我們將簡要介紹 Flink 集群的構建模塊、…

流動資金貸款管理辦法

流動資金貸款管理辦法 (2024年1月30日國家金融監督管理總局令2024年第2號公布 自2024年7月1日起施行) 第一章 總 則 第一條 為規范銀行業金融機構流動資金貸款業務經營行為&#xff0c;加強流動資金貸款審慎經營管理&#xff0c;促進流動資金貸款業務健康發展&#xff0c;依…

【html學習筆記】3.表單元素

1.文本框 1.1 語法 <input type "text">表示文本框。且只能寫一行 1.2 屬性 使用屬性size 設置文本框大小 <input type"text" size"10">2. 使用屬性value 來設置文本框的默認文字 <input type"text" size"…

Vue狀態管理庫-Pinia

一、Pinia是什么&#xff1f; Pinia 是 Vue 的專屬狀態管理庫&#xff0c;它允許支持跨組件或頁面共享狀態&#xff0c;即共享數據&#xff0c;他的初始設計目的是設計一個支持組合式API的 Vue 狀態管理庫&#xff08;因為vue3一個很大的改變就是組合式API&#xff09;,當然這…

PFA三角燒瓶實驗室PFA錐形瓶本底純凈耐腐蝕性強

PFA三角燒瓶外觀呈平底圓錐狀&#xff0c;下闊上狹&#xff0c;有一圓柱形頸部&#xff0c;上方有一較頸部闊的開口&#xff0c;可用塞子封閉。PFA三角燒瓶也稱PFA錐形瓶&#xff0c;PFA反應瓶&#xff0c;PFA三角燒瓶、PFA依氏燒瓶、PFA錐形燒瓶&#xff0c;PFA鄂倫麥爾瓶等。…

普中51單片機學習(串口通信)

串口通信 原理 計算機通信是將計算機技術和通信技術的相結合&#xff0c;完成計算機與外部設備或計算機與計算機之間的信息交換 。可以分為兩大類&#xff1a;并行通信與串行通信。并行通信通常是將數據字節的各位用多條數據線同時進行傳送 。控制簡單、傳輸速度快&#xff1…

【大模型】finetune 百川2

使用官網例子finetune百川2&#xff0c;微調腳本如下 模型為baichuan_chat_13B_v1 export CUDA_VISIBLE_DEVICES1 hostfile"" deepspeed --hostfile$hostfile baichuan_fineturn/fine-tune/fine-tune.py \--report_to "none" \--data_path "baichu…

2.22號qt

1.使用信號和槽實現多個界面跳轉 1.1準備兩個界面 1.2第一個界面準備signal 1.3第二個界面準備slot 1.4將第一個界面的信號和槽進行連接 2.qss登錄界面升級優化 2.1概念 Qss是Qt程序界面中用來設置控件的背景圖片、大小、字體顏色、字體類型、按鈕狀態變化等屬性&#xff…