本文來自 Apache Seata官方文檔,歡迎訪問官網,查看更多深度文章。
本文來自 Apache Seata官方文檔,歡迎訪問官網,查看更多深度文章。
一 .導讀
spring 模塊分析中講到,Seata 的 spring 模塊會對涉及到分布式業務的 bean 進行處理。項目啟動時,當 GlobalTransactionalScanner 掃描到 TCC 服務的 reference 時(即tcc事務參與方),會對其進行動態代理,即給 bean 織入 TCC 模式下的 MethodInterceptor 的實現類。tcc 事務發起方依然使用 @GlobalTransactional 注解開啟,織入的是通用的 MethodInterceptor 的實現類。
TCC 模式下的 MethodInterceptor 實現類即 TccActionInterceptor(spring模塊) ,這個類中調用了 ActionInterceptorHandler(tcc模塊) 進行 TCC 模式下事務流程的處理。
TCC 動態代理的主要功能是:生成TCC運行時上下文、透傳業務參數、注冊分支事務記錄。
二 .TCC模式介紹
在2PC(兩階段提交)協議中,事務管理器分兩階段協調資源管理,資源管理器對外提供三個操作,分別是一階段的準備操作,和二階段的提交操作和回滾操作。
public interface TccAction {@TwoPhaseBusinessAction(name = "tccActionForTest" , commitMethod = "commit", rollbackMethod = "rollback")public boolean prepare(BusinessActionContext actionContext,@BusinessActionContextParameter(paramName = "a") int a,@BusinessActionContextParameter(paramName = "b", index = 0) List b,@BusinessActionContextParameter(isParamInProperty = true) TccParam tccParam);public boolean commit(BusinessActionContext actionContext);public boolean rollback(BusinessActionContext actionContext);
}
這是 TCC 參與者實例,參與者需要實現三個方法,第一個參數必須是 BusinessActionContext ,方法返回類型固定,對外發布成微服務,供事務管理器調用。
prepare:資源的檢查和預留。例:扣減賬戶的余額,并增加相同的凍結余額。
commit:使用預留的資源,完成真正的業務操作。例:減少凍結余額,扣減資金業務完成。
cancel:釋放預留資源。例:凍結余額加回賬戶的余額。
其中 BusinessActionContext 封裝了本次事務的上下文環境:xid、branchId、actionName 和被 @BusinessActionContextParam 注解的參數等。
參與方業務有幾個需要注意的地方:
1.控制業務冪等性,需要支持同一筆事務的重復提交和重復回滾。
2.防懸掛,即二階段的回滾,比一階段的 try 先執行。
3.放寬一致性協議,最終一致,所以是讀已修改
三 . remoting 包解析
包中所有的類都是為包中的 DefaultRemotingParser 服務,Dubbo、LocalTCC、SofaRpc 分別負責解析各自RPC協議下的類。
DefaultRemotingParser 的主要方法:
1.判斷 bean 是否是 remoting bean,代碼:
@Overridepublic boolean isRemoting(Object bean, String beanName) throws FrameworkException {//判斷是否是服務調用方或者是否是服務提供方return isReference(bean, beanName) || isService(bean, beanName);}
2.遠程 bean 解析,把 rpc類 解析成 RemotingDesc,,代碼:
@Overridepublic boolean isRemoting(Object bean, String beanName) throws FrameworkException {//判斷是否是服務調用方或者是否是服務提供方return isReference(bean, beanName) || isService(bean, beanName);}
利用 allRemotingParsers 來解析遠程 bean 。allRemotingParsers是在:initRemotingParser() 中調用EnhancedServiceLoader.loadAll(RemotingParser.class) 動態進行 RemotingParser 子類的加載,即 SPI 加載機制。
如果想擴展,比如實現一個feign遠程調用的解析類,只要把RemotingParser相關實現類寫在 SPI 的配置中就可以了,擴展性很強。
RemotingDesc 事務流程需要的遠程 bean 的一些具體信息,比如 targetBean、interfaceClass、interfaceClassName、protocol、isReference等等。
3.TCC資源注冊
public RemotingDesc parserRemotingServiceInfo(Object bean, String beanName) {RemotingDesc remotingBeanDesc = getServiceDesc(bean, beanName);if (remotingBeanDesc == null) {return null;}remotingServiceMap.put(beanName, remotingBeanDesc);Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();Method[] methods = interfaceClass.getMethods();if (isService(bean, beanName)) {try {//service bean, registry resourceObject targetBean = remotingBeanDesc.getTargetBean();for (Method m : methods) {TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);if (twoPhaseBusinessAction != null) {TCCResource tccResource = new TCCResource();tccResource.setActionName(twoPhaseBusinessAction.name());tccResource.setTargetBean(targetBean);tccResource.setPrepareMethod(m);tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());tccResource.setCommitMethod(ReflectionUtil.getMethod(interfaceClass, twoPhaseBusinessAction.commitMethod(),new Class[] {BusinessActionContext.class}));tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());tccResource.setRollbackMethod(ReflectionUtil.getMethod(interfaceClass, twoPhaseBusinessAction.rollbackMethod(),new Class[] {BusinessActionContext.class}));//registry tcc resourceDefaultResourceManager.get().registerResource(tccResource);}}} catch (Throwable t) {throw new FrameworkException(t, "parser remoting service error");}}if (isReference(bean, beanName)) {//reference bean, TCC proxyremotingBeanDesc.setReference(true);}return remotingBeanDesc;}
首先判斷是否是事務參與方,如果是,拿到 RemotingDesc 中的 interfaceClass,遍歷接口中的方法,判斷方法上是否有@TwoParserBusinessAction 注解,如果有,把參數封裝成 TCCRecource,通過 DefaultResourceManager 進行 TCC 資源的注冊。
這里 DefaultResourceManager 會根據 Resource 的 BranchType 來尋找對應的資源管理器,TCC 模式下資源管理類,在 tcc 模塊中。
這個 rpc 解析類主要提供給 spring 模塊進行使用。parserRemotingServiceInfo() 被封裝到了 spring 模塊的 TCCBeanParserUtils 工具類中。spring 模塊的 GlobalTransactionScanner 在項目啟動的時候,通過工具類解析 TCC bean,工具類 TCCBeanParserUtils 會調用 TCCResourceManager 進行資源的注冊,并且如果是全局事務的服務提供者,會織入 TccActionInterceptor 代理。這些個流程是 spring 模塊的功能,tcc 模塊是提供功能類給 spring 模塊使用。
三 .tcc 資源管理器
TCCResourceManager 負責管理 TCC 模式下資源的注冊、分支的注冊、提交、和回滾。
1.在項目啟動時, spring 模塊的 GlobalTransactionScanner 掃描到 bean 是 tcc bean 時,會本地緩存資源,并向 server 注冊:
@Overridepublic void registerResource(Resource resource) {TCCResource tccResource = (TCCResource)resource;tccResourceCache.put(tccResource.getResourceId(), tccResource);super.registerResource(tccResource);}
與server通信的邏輯被封裝在了父類 AbstractResourceManage 中,這里根據 resourceId 對 TCCResource 進行緩存。父類 AbstractResourceManage 注冊資源的時候,使用 resourceGroupId + actionName,actionName 就是 @TwoParseBusinessAction 注解中的 name,resourceGroupId 默認是 DEFAULT。
2.事務分支的注冊在 rm-datasource 包下的 AbstractResourceManager 中,注冊時參數 lockKeys 為 null,和 AT 模式下事務分支的注冊還是有些不一樣的。
3.分支的提交或者回滾:
@Overridepublic BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,String applicationData) throws TransactionException {TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);if (tccResource == null) {throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId);}Object targetTCCBean = tccResource.getTargetBean();Method commitMethod = tccResource.getCommitMethod();if (targetTCCBean == null || commitMethod == null) {throw new ShouldNeverHappenException("TCC resource is not available, resourceId:" + resourceId);}try {boolean result = false;//BusinessActionContextBusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,applicationData);Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);if (ret != null) {if (ret instanceof TwoPhaseResult) {result = ((TwoPhaseResult)ret).isSuccess();} else {result = (boolean)ret;}}return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;} catch (Throwable t) {LOGGER.error(msg, t);throw new FrameworkException(t, msg);}}
通過參數 xid、branchId、resourceId、applicationData 恢復業務的上下文 businessActionContext。
根據獲取到的上下文通過反射執行 commit 方法,并返回執行結果。回滾方法類似。
這里 branchCommit() 和 branchRollback() 提供給 rm 模塊資源處理的抽象類 AbstractRMHandler 調用,這個 handler 是 core 模塊定義的模板方法的進一步實現類。和 registerResource() 不一樣,后者是 spring 掃描時主動注冊資源。
四 . tcc 模式事務處理
spring 模塊中的 TccActionInterceptor 的 invoke() 方法在被代理的 rpc bean 被調用時執行。該方法先獲取 rpc 攔截器透傳過來的全局事務 xid ,然后 TCC 模式下全局事務參與者的事務流程還是交給 tcc 模塊 ActionInterceptorHandler 處理。
也就是說,事務參與者,在項目啟動的時候,被代理。真實的業務方法,在 ActionInterceptorHandler 中,通過回調執行。
public Map<String, Object> proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,Callback<Object> targetCallback) throws Throwable {Map<String, Object> ret = new HashMap<String, Object>(4);//TCC nameString actionName = businessAction.name();BusinessActionContext actionContext = new BusinessActionContext();actionContext.setXid(xid);//set action anmeactionContext.setActionName(actionName);//Creating Branch RecordString branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);actionContext.setBranchId(branchId);//set the parameter whose type is BusinessActionContextClass<?>[] types = method.getParameterTypes();int argIndex = 0;for (Class<?> cls : types) {if (cls.getName().equals(BusinessActionContext.class.getName())) {arguments[argIndex] = actionContext;break;}argIndex++;}//the final parameters of the try methodret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);//the final resultret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());return ret;}
這里有兩個重要操作:
1.doTccActionLogStore() 這個方法中,調用了兩個比較重要的方法:
fetchActionRequestContext(method, arguments),這個方法把被 @BusinessActionContextParam 注解的參數取出來,在下面的 init 方法中塞入 BusinessActionComtext ,同時塞入的還有事務相關參數。
DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid,applicationContextStr, null),這個方法執行 TCC 模式下事務參與者事務分支的注冊。
2.回調執行 targetCallback.execute() ,被代理的 bean 具體的業務,即 prepare() 方法。
五 .總結
tcc模塊,主要提供以下功能 :
- 定義兩階段協議注解,提供 tcc 模式下事務流程需要的屬性。
- 提供解析不同 rpc 框架 remoting bean 的 ParserRemoting 實現,供 spring 模塊調用。
- 提供 TCC 模式下資源管理器,進行資源注冊、事務分支注冊提交回滾等。
- 提供 TCC 模式下事務流程的處理類,讓 MethodInterceptor 代理類不執行具體模式的事務流程,而是下放到 tcc 模塊。
五 .相關
作者:趙潤澤,系列地址。