文章目錄
- 問題分析
- 解決方案
- 實現原理解析
- 執行流程說明
- 運行實例
- 正常流程執行
- 執行異常流程
- 關鍵優勢
在分布式系統開發中,我們經常會遇到本地事務與遠程服務調用結合的場景。當本地事務包含RPC調用時,如果事務回滾,RPC調用已經執行就會導致數據不一致。本文將介紹如何優雅地解決這個問題。
問題分析
考慮以下場景:有方法A、B、C組成一個大事務,其中B方法需要調用RPC服務。如果C方法執行失敗導致整個事務回滾,但RPC已經調用并提交,就會造成數據不一致。
核心問題是:RPC調用默認會在本地事務提交前執行,無法參與事務回滾。
解決方案
Spring提供了事務同步機制(TransactionSynchronization),允許我們注冊回調函數,在事務完成后執行特定操作。利用這個機制,我們可以確保:
只有當本地事務成功提交后,才會執行RPC調用;如果事務回滾,則不執行RPC
以下是實現代碼:
package cn.bb.mydemo.service;import cn.bb.mydemo.entity.Student;
import cn.bb.mydemo.mapper.StudentMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;import java.time.LocalTime;
import java.util.concurrent.CompletableFuture;@Service
public class StudentService {@Autowiredprivate StudentMapper studentMapper;@Transactional(rollbackFor = Exception.class)public void saveWithRpcAfterTx(String name) {// 1. 執行本地數據庫操作Student student = new Student();student.setName(name);studentMapper.insert(student);// 2. 注冊事務同步回調:在事務提交后異步執行RPCTransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {@Overridepublic void afterCommit() {// 使用CompletableFuture異步執行RPC,避免阻塞事務提交CompletableFuture.runAsync(() -> doRpc(name));}});}@Transactional(rollbackFor = Exception.class)public void run(String name) {Student student = new Student();student.setName(name);studentMapper.insert(student);}@Transactional(rollbackFor = Exception.class)public void allRun() {// 調用不同方法組成一個大事務this.saveWithRpcAfterTx("Alice");this.run("a");this.run("b");System.out.println("本地事務已提交");// int i = 1 / 0; // 取消注釋此行會觸發異常,導致事務回滾}// 模擬RPC調用private void doRpc(String name) {System.out.println(LocalTime.now() + " RPC調用收到:" + name);}
}
測試代碼驗證實現效果:
@SpringBootTest
class TxAsyncTest {@Autowiredprivate StudentService studentService;@Testvoid shouldRpcAfterTxCommit2() throws InterruptedException {// 調用組合事務方法studentService.allRun();// 等待異步RPC執行完成Thread.sleep(20000);}
}
實現原理解析
這個解決方案的核心在于TransactionSynchronizationManager
和TransactionSynchronizationAdapter
的使用:
-
TransactionSynchronizationManager:Spring提供的事務同步管理器,允許注冊回調函數監聽事務生命周期事件
-
TransactionSynchronizationAdapter:事務同步適配器,我們可以重寫其中的關鍵方法:
afterCommit()
:事務成功提交后執行afterCompletion()
:事務完成后執行(無論成功或失敗)beforeCommit()
:事務提交前執行
-
異步執行:使用
CompletableFuture.runAsync()
確保RPC調用不會阻塞事務提交過程
執行流程說明
當調用allRun()
方法時,執行流程如下:
- 開啟數據庫事務
- 執行
saveWithRpcAfterTx()
:插入數據庫記錄并注冊事務同步回調 - 執行
run()
兩次:插入兩條額外記錄 - 如果沒有異常,事務提交
- 事務提交后,觸發注冊的
afterCommit()
回調 - 異步執行RPC調用
- 如果任何步驟發生異常,事務回滾,不會觸發RPC調用
運行實例
正常流程執行
新增了ab 并且 rpc是最后被調用的 雖然這個方法被放在了中間
執行異常流程
數據也沒有添加什么 事務被回滾了 這就保證了我們的需求【當事務被提交 rpc才被調用 否則錯誤就不執行rpc 防止重復數據的生成】
關鍵優勢
- 數據一致性:確保本地事務與RPC調用的最終一致性
- 非侵入性:不需要修改現有事務管理代碼
- 異步執行:不影響事務性能
- 簡單可靠:基于Spring內置機制,無需額外框架
這種方法特別適合那些需要保證本地數據與遠程服務數據一致性的場景,是解決大事務中RPC無法回滾問題的優雅方案。