目錄
- 一、說明
- 二、TransactionInterceptor開啟事務
- (1)、攔截方法
- (2)、開啟事務綁定數據庫連接
- (3)、mybatis中sql執行數據庫連接獲取
- (4)、事務提交和當前線程ThreadLocal清理,sqlSession關閉
- 三、總結
一、說明
接著上一個博客SpringBoot 聲明式事務 源碼解析,下面看一下事務開啟后把當前數據庫連接綁定到ThreadLocal中,mybatis執行數據庫操作時,從ThreadLocal中獲取連接執行sql,最后攔截器提交或回滾事務,執行sqlsession(一個sqlsession對應一個數據庫連接)提交或回滾,然后清理ThreadLocal,關閉sqlsession,數據庫連接回收到數據庫連接池。
二、TransactionInterceptor開啟事務
(1)、攔截方法
@Override@Nullablepublic Object invoke(MethodInvocation invocation) throws Throwable {// Work out the target class: may be {@code null}.// The TransactionAttributeSource should be passed the target class// as well as the method, which may be from an interface.Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);// Adapt to TransactionAspectSupport's invokeWithinTransaction...return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);}
執行invokeWithinTransaction方法,如下圖,1是獲取目標方法上配置的隔離級別和傳播屬性等屬性,2是開啟事務的具體方法,3是繼續執行后續攔截器最終執行目標方。
下面重點看2方法中的內容。
繼續看status = tm.getTransaction(txAttr);
(2)、開啟事務綁定數據庫連接
下面跟蹤方法時,重點看開啟事務和綁定數據庫連接到TreadLocal中的內容。
在看代碼之前,先看看自動配置類中創建了DataSourceTransactionManager組件,不明白什么時候創建的可以看一下我上個博客。組件中放入了dataSource數據源,若依的框架中添加了動態數據源的配置。
下圖是status = tm.getTransaction(txAttr);方法中開啟事務的內容。doBegin方法時綁定數據庫連接到當前線程。doBegin下面prepareSynchronization里面也很重要,記錄一下往TransactionSynchronizationManager中設置了許多參數后面會用到,看一下 TransactionSynchronizationManager.initSynchronization();
下圖第一個箭頭設置了ActualTransctionActive=true在提交事務的時候會用到。往synchronizations放入了空集合,synchronizations是一個ThreadLocal。
下面看一下doBegin方法。
下面是從數據源中獲取連接,把連接設置到了txObject的ConnectionHolder數據庫連接描述對象中,后續還會從這里面取出。設置了newConnectionHolder=true.
1、先判斷連接是不是自動提交,如果是自動提交會設置成不可以自動提交。2、把事務可用狀態設置成true,后續會用到。3、把當前連接信息綁定到當前線程。
可以看到resources是ThreadLocal,ThreadLocal中放入了Map集合,key是動態數據源,value是數據庫連接描述對象ConnectionHolder。
(3)、mybatis中sql執行數據庫連接獲取
MybatisAutoConfiguration中會自動注入SqlSessionTemplate組件,@MapperScan中引入了ClassPathMapperScanner組件,組件掃描所有mapper.java文件,把接口設置成MapperFactoryBean類型的組件,可以一下我以前的博客Spring如何管理Mapper,在設置bean的描述時,也設置了SqlSessionTemplate組件到Mapper中,執行到Configuration.addMapper時,knownMappers.put(type, new MapperProxyFactory(type));往map中設置了MapperProxyFactory,當Configuration.getMapper時,會調用MapperProxyFactory生成代理類MapperProxy,默認使用的是jdk動態代理。綜上所述,當執行mapper的update方法時,會到MapperProxy代理方法invoke。后面會執行MapperMethod中執行execute方法。
會執行SqlSessionTemplate中的update方法。如下圖當SqlSessionTemplate創建的時候,會設置SqlSessionTemplate的代理類sqlSessionProxy,SqlSessionInterceptor是代理類的攔截方法。執行SqlSessionTemplate中的update方法會進入SqlSessionInterceptor的invoke方法。
從if判斷可以看到如果沒有開啟事務,sqlSession會提交事務。如果開啟了,使用事務攔截器統一提交事務。
看一下if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory))內容,如果ThreadLocal中已經存入了sqlSession并和當前的sqlSession是同一個說明開啟了事務,使用事務攔截器提交。
分析一下getSqlSession方法,主要是獲取sqlSession,先進入getSqlSession方法看ransactionSynchronizationManager.getResource(sessionFactory);
從resources中獲取以sessionFactory為key,值是defaultSqlSession的map,其中resources是ThreadLocal。第一次執行map是null。
請看源碼,我添加了注釋
public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType,PersistenceExceptionTranslator exceptionTranslator) {notNull(sessionFactory, NO_SQL_SESSION_FACTORY_SPECIFIED);notNull(executorType, NO_EXECUTOR_TYPE_SPECIFIED);//從ThreadLocal中獲取SqlSessionHolder,可以通過SqlSessionHolder獲取生成的sqlSessionSqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);//從SqlSessionHolder中獲取sqlSession,如果獲取都會直接返回SqlSession session = sessionHolder(executorType, holder);if (session != null) {return session;}//通過sessionFactory和執行器類型創建sqlSessionLOGGER.debug(() -> "Creating a new SqlSession");session = sessionFactory.openSession(executorType);//把創建好的sqlSession,放入到ThreadLocal中,只有開啟事務才能放入registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);return session;}
看一下openSession代碼, tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level,boolean autoCommit) {Transaction tx = null;try {final Environment environment = configuration.getEnvironment();final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);//1、創建了SpringManagedTransaction,傳入數據源參數tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);//2、創建執行器,傳入了SpringManagedTransactionfinal Executor executor = configuration.newExecutor(tx, execType);//3、創建DefaultSqlSession,傳入了執行器executor return createSqlSession(configuration, executor, autoCommit);} catch (Exception e) {closeTransaction(tx); // may have fetched a connection so lets call close()throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e);} finally {ErrorContext.instance().reset();}}
會創建Transaction類型的組件SpringManagedTransaction,傳入了動態數據源。
public Transaction newTransaction(DataSource dataSource, TransactionIsolationLevel level, boolean autoCommit) {return new SpringManagedTransaction(dataSource);}
繼續往下跟蹤到registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);代碼如下
1、里面代碼判斷synchronizations是不是null,這里上面提到過,在TranscationIntercetor中開啟事務后,prepareSynchronization方法中設置了空集合,所以這里是TransactionSynchronizationManager.isSynchronizationActive()=true,synchronizations也是TreadLocal避免了線程不安全問題。
2、創建了SqlSessionHolder其中包含創建好的DefaultSqlSession。
3、往ThreadLocal中放入了Map,map的key是sessionFactory,value是2中創建的SqlSessionHolder。開啟事務后,執行后面的sql可以從ThreadLocal取出。
4、創建了SqlSessionSynchronization其中包含創建好的sqlsession,放入到了集合中,此集合會放入1中synchronizations中,后面提交事務的時候會用到。
5、設置了SqlSessionHolder中SynchronizedWithTransaction=true。
總上所述,當開啟事務后,在同一個事務中,使用mybatis執行多個sql時,會重復使用同一個DefaultSqlSession,(DefaultSqlSession被綁定到了線程中),也會使用同一個數據庫連接,保證可以使用事務。如果沒有開啟事務,每次執行sql都會重新創建一個DefaultSqlSession。事務的開啟和提交回滾都是mybatis來負責的。
繼續回到MapperMethod.execute->sqlSession.update(command.getName(), param)
->executor.update(ms, wrapCollection(parameter));
->BaseExecutor.update
->SimpleExecutor.doUpdate
->prepareStatement(handler, ms.getStatementLog());
->Connection connection = getConnection(statementLog);–>openConnection();
-> this.connection = DataSourceUtils.getConnection(this.dataSource);
->doGetConnection 如下代碼可以看到從ThreadLocal里面獲取map使用動態數據源做key,獲取數據庫連接描述對象,這個數據庫連接對象在開啟事務后放入的,可以找找上面的內容有提到。SpringManagedTransaction類中
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
public static Connection doGetConnection(DataSource dataSource) throws SQLException {Assert.notNull(dataSource, "No DataSource specified");ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {conHolder.requested();if (!conHolder.hasConnection()) {logger.debug("Fetching resumed JDBC Connection from DataSource");conHolder.setConnection(fetchConnection(dataSource));}return conHolder.getConnection();}// Else we either got no holder or an empty thread-bound holder here.logger.debug("Fetching JDBC Connection from DataSource");Connection con = fetchConnection(dataSource);
獲取數據庫連接后會設置到SpringManagedTransaction類中的connection屬性中,下面看一下SpringManagedTransaction類關系。
每次創建sqlsession的時候會先都會創建SpringManagedTransaction,SpringManagedTransaction當獲取數據庫連接后會設置到本類的屬性connection上,創建執行器Excutor是會把SpringManagedTransaction設置進去,然后把Excutor設置到sqlsession中。這可以理解為同一個sqlsession對應同一個數據庫連接java.sql.Connection。
(4)、事務提交和當前線程ThreadLocal清理,sqlSession關閉
定位到TransactionAspectSupport.invokeWithinTransaction方法,方法內開啟事務,執行攔截器和目標方法最后提交事務。下面看看提交事務方法。
commitTransactionAfterReturning(txInfo);
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus())
->processCommit(defStatus);
->triggerBeforeCompletion
-> TransactionSynchronizationUtils.triggerBeforeCompletion();
遍歷TransactionSynchronization執行beforeCompletion
for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {try {synchronization.beforeCompletion();}catch (Throwable tsex) {logger.error("TransactionSynchronization.beforeCompletion threw exception", tsex);}}
public void beforeCompletion() {// Issue #18 Close SqlSession and deregister it now// because afterCompletion may be called from a different threadif (!this.holder.isOpen()) {if (LOGGER.isDebugEnabled()) {LOGGER.debug("Transaction synchronization deregistering SqlSession [" + this.holder.getSqlSession() + "]");}//刪除ThreadLocal中綁定的sessionFactory,和sqlSession
TransactionSynchronizationManager.unbindResource(sessionFactory);this.holderActive = false;if (LOGGER.isDebugEnabled()) {LOGGER.debug("Transaction synchronization closing SqlSession [" + this.holder.getSqlSession() + "]");}//關閉sqlSessionthis.holder.getSqlSession().close();}}
主要看一下 TransactionSynchronizationManager.unbindResource(sessionFactory);清理綁定的sessionFactory和sqlSession的map集合
public static Object unbindResource(Object key) throws IllegalStateException {Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);Object value = doUnbindResource(actualKey);if (value == null) {throw new IllegalStateException("No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");}return value;}
private static Object doUnbindResource(Object actualKey) {Map<Object, Object> map = resources.get();if (map == null) {return null;}Object value = map.remove(actualKey);// Remove entire ThreadLocal if empty...if (map.isEmpty()) {resources.remove();}// Transparently suppress a ResourceHolder that was marked as void...if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {value = null;}if (value != null && logger.isTraceEnabled()) {logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" +Thread.currentThread().getName() + "]");}return value;}
最終提交事務,方法在processCommit->triggerBeforeCommit->TransactionSynchronizationUtils.triggerBeforeCommit(status.isReadOnly())遍歷所有的TransactionSynchronization 類型的組件前面添加過SqlSessionSynchronization其中包含了創建好的sqlsession
public static void triggerBeforeCommit(boolean readOnly) {for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {synchronization.beforeCommit(readOnly);}}
獲取當前線程的sqlsession,執行提交操作。可以看一下if條件,就是開啟事務后在prepareSynchronization方法中設置的ThreadLocal類型的屬性actualTransactionActive中是true。
執行到sqlsession中執行器的commit,設置不能自動提交。
繼續執行執行器中的SpringManagedTransaction中的commit,SpringManagedTransaction在創建sqlsession的時候提到了,
最終獲取SpringManagedTransaction中的connection,進行事務提交。
三、總結
1、TransactionInterceptor攔截到目標方法開啟事務設置第一個ThreadLocal放入數據源為key,數據庫連接描述類為value的map集合。
2、執行mybatis的sql時,sqlSession中的excutor中SpringManagedTransaction類會從第一個ThreadLocal中根據動態數據源取出相應的數據庫連接執行sql,保證了開啟的事務和執行sql同一個數據庫連接。
3、在mybatis的sqlSessionTemplate執行增刪改方法時,sqlSession的代理類SqlSession執行getSqlSession,如果開啟了事務,出現第二個ThreadLocal,里面存放以sqlSessionFactory為key,defaultSqlSession為value的map集合,如何在同一個事務中,執行每個sql,defaultSqlSession會使用同一個。如果沒有開啟事務第二個ThreadLocal不生效,每次執行sql都會創建一次defaultSqlSession。事務的開啟和提交都是mybatis控制的。
為何開啟事務后,在事務中每個mapper增刪改查操作都使用同一個sqlsession呢?因為 MyBatis 的 SqlSession 在設計上就是數據庫連接(java.sql.Connection)的一個高級封裝和門面(Facade)對象。一個 SqlSession 實例在其生命周期內,內部始終持有且僅持有一個 Connection 對象,同一個 SqlSession 就是同一個數據庫連接,提交事務時,多個方法使用同一個SqlSession提交方法進而同一個數據庫連接提交,保證了事務一致性
一個 SqlSession 實例 → 包含一個 Executor 實例 → 持有一個 Transaction 對象 → 管理一個唯一的 Connection 對象。
4、當事務提交成功或回滾時,會自動清理掉兩個ThreadLocal中當前線程中的數據關閉SqlSession,回收 Connection 對象到線程池。