seata提供了ProxyUtil工具類為事務組件創建代理對象,在spring環境中,seata提供了GlobalTransactionScanner類和SeataAutoDataSourceProxyCreator為組件創建AOP代理,本文重點分析這兩個類。
ProxyUtil
io.seata.integration.tx.api.util.ProxyUtil類。
public class ProxyUtil {private static final Map<Object, Object> PROXYED_SET = new HashMap<>();public static <T> T createProxy(T target) {return createProxy(target, target.getClass().getName());}public static <T> T createProxy(T target, String beanName) {try {synchronized (PROXYED_SET) {if (PROXYED_SET.containsKey(target)) {return (T) PROXYED_SET.get(target);}// 使用InterfaceParser獲取與target對應的ProxyInvocationHandler// 1. GlobalTransactionalInterceptorHandler// 2. TccActionInterceptorHandler// 內部檢查@GlobalTransactional、@GlobalLock、// @TwoPhaseBusinessAction等注解,來判斷是否需要創建代理ProxyInvocationHandler proxyInvocationHandler =DefaultInterfaceParser.get().parserInterfaceToProxy(target, beanName);if (proxyInvocationHandler == null) {return target;}// 創建代理對象// 把proxyInvocationHandler注入到攔截邏輯中// DefaultInvocationHandler實現了java InvocationHandler接口// invoke方法會執行proxyInvocationHandler的代理邏輯T proxy = (T) new ByteBuddy().subclass(target.getClass()).method(isDeclaredBy(target.getClass())).intercept(InvocationHandlerAdapter.of(new DefaultInvocationHandler(proxyInvocationHandler, target))).make().load(target.getClass().getClassLoader()).getLoaded().getDeclaredConstructor().newInstance();PROXYED_SET.put(target, proxy);return proxy;}} catch (Throwable t) {throw new RuntimeException("error occurs when create seata proxy", t);}}
}
DefaultInvocationHandler
public class DefaultInvocationHandler implements InvocationHandler {private ProxyInvocationHandler proxyInvocationHandler;private Object delegate;public DefaultInvocationHandler(ProxyInvocationHandler proxyInvocationHandler, Object delegate) {this.proxyInvocationHandler = proxyInvocationHandler;this.delegate = delegate;}// 動態代理邏輯@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {InvocationWrapper invocation = new DefaultInvocationWrapper(proxy, delegate, method, args);Object result;if (proxyInvocationHandler != null) {result = proxyInvocationHandler.invoke(invocation);} else {result = invocation.proceed();}return result;}
}
GlobalTransactionScanner
繼承了spring的AbstractAutoProxyCreator類:
- 初始化TM和RM客戶端
- 為seata的事務組件創建AOP代理
initClient方法創建TM和RM客戶端
使用seata提供的TMClient和RMClient工具欄初始化TM、RM客戶端:
private void initClient() {// ...if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));}// 創建TmNettyRemotingClientTMClient.init(applicationId, txServiceGroup, accessKey, secretKey);// 創建RmNettyRemotingClientRMClient.init(applicationId, txServiceGroup);// 優雅關閉客戶端registerSpringShutdownHook();
}
wrapIfNecessary方法創建AOP代理
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {// do checkersif (!doCheckers(bean, beanName)) {return bean;}try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}interceptor = null;// 和ProxyUtil工具類一樣ProxyInvocationHandler proxyInvocationHandler =DefaultInterfaceParser.get().parserInterfaceToProxy(bean, beanName);// 如果proxyInvocationHandler為null表示不需要AOP代理if (proxyInvocationHandler == null) {return bean;}// AdapterSpringSeataInterceptor實現了MethodInterceptor接口// 用來封裝spring aop的代理攔截邏輯// 這是spring aop的原理,此處不展開分析interceptor = new AdapterSpringSeataInterceptor(proxyInvocationHandler);if (!AopUtils.isAopProxy(bean)) {// 如果bean不是一個代理對象則使用父類方法創建代理對象bean = super.wrapIfNecessary(bean, beanName, cacheKey);} else {// 如果bean已經是一個代理對象// 則需要將事務攔截邏輯添加到代理通知集AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));int pos;for (Advisor avr : advisor) {pos = findAddSeataAdvisorPosition(advised, avr);advised.addAdvisor(pos, avr);}}PROXYED_SET.add(beanName);return bean;}} catch (Exception exx) {throw new RuntimeException(exx);}
}
AdapterSpringSeataInterceptor
public class AdapterSpringSeataInterceptor implements MethodInterceptor, SeataInterceptor, Ordered {private ProxyInvocationHandler proxyInvocationHandler;public AdapterSpringSeataInterceptor(ProxyInvocationHandler proxyInvocationHandler) {this.proxyInvocationHandler = proxyInvocationHandler;}@Overridepublic Object invoke(MethodInvocation invocation) throws Throwable {AdapterInvocationWrapper adapterInvocationWrapper = new AdapterInvocationWrapper(invocation);Object result = proxyInvocationHandler.invoke(adapterInvocationWrapper);return result;}@Overridepublic int getOrder() {return proxyInvocationHandler.getOrder();}@Overridepublic void setOrder(int order) {proxyInvocationHandler.setOrder(order);}@Overridepublic SeataInterceptorPosition getPosition() {return proxyInvocationHandler.getPosition();}@Overridepublic String toString() {return proxyInvocationHandler.getClass().getName();}
}
SeataAutoConfiguration配置類
用來裝配GlobalTransactionScanner組件。
SeataAutoDataSourceProxyCreator
在spring環境中,將原始的DataSource保證為SeataDataSourceProxy對象。
SeataDataSourceAutoConfiguration配置類
用來裝配SeataAutoDataSourceProxyCreator組件。
ProxyInvocationHandler
用來封裝代理的攔截邏輯。
接口定義
public interface ProxyInvocationHandler extends SeataInterceptor {Set<String> getMethodsToProxy();// 代理攔截邏輯Object invoke(InvocationWrapper invocation) throws Throwable;SeataInterceptorPosition getPosition();
}
實現類
ProxyInvocationHandler|-- AbstractProxyInvocationHandler|-- GlobalTransactionalInterceptorHandler|-- TccActionInterceptorHandler
AbstractProxyInvocationHandler
使用模板方法封裝通用的代理檢查邏輯:
public abstract class AbstractProxyInvocationHandler implements ProxyInvocationHandler {// 子類實現具體的代理邏輯protected abstract Object doInvoke(InvocationWrapper invocation) throws Throwable;protected int order = Integer.MAX_VALUE;@Overridepublic Object invoke(InvocationWrapper invocation) throws Throwable {if (CollectionUtils.isNotEmpty(getMethodsToProxy()) &&!getMethodsToProxy().contains(invocation.getMethod().getName())) {return invocation.proceed();}return doInvoke(invocation);}// 其他方法 略
}
GlobalTransactionalInterceptorHandler
The type Global transactional interceptor handler.
處理@GlobalTransactional和@GlobalLock注解。
TccActionInterceptorHandler
處理@TwoPhaseBusinessAction注解。