1、背景
最近接受到接口優化的任務,查看代碼邏輯后發現在批量處理數據耗時長,想到使用多線程處理批量數據,又要保持原來的事務一致性。
2、實現方法
(1)、創建多線程事務管理
@Component
@Slf4j
public class MultiThreadingTransactionManager {/*** 數據源事務管理器*/@Autowiredprivate DataSourceTransactionManager dataSourceTransactionManager;@Autowiredprivate ThreadPoolTaskExecutor executorService;private long timeout = 120;/*** 用于判斷子線程業務是否處理完成* 處理完成時threadCountDownLatch的值為0*/private CountDownLatch threadCountDownLatch;/*** 用于等待子線程全部完成后,子線程統一進行提交和回滾* 進行提交和回滾時mainCountDownLatch的值為0*/private final CountDownLatch mainCountDownLatch = new CountDownLatch(1);/*** 是否提交事務,默認是true,當子線程有異常發生時,設置為false,回滾事務*/private final AtomicBoolean isSubmit = new AtomicBoolean(true);public boolean execute(List<Runnable> runnableList,String factorySchema) {isSubmit.set(true);setThreadCountDownLatch(runnableList.size());runnableList.forEach(runnable -> executorService.execute(() -> executeThread(factorySchema,runnable, threadCountDownLatch, mainCountDownLatch, isSubmit)));// 等待子線程全部執行完畢try {// 若計數器變為零了,則返回 trueboolean isFinish = threadCountDownLatch.await(timeout, TimeUnit.SECONDS);if (!isFinish) {// 如果還有為執行完成的就回滾isSubmit.set(false);log.info("存在子線程在預期時間內未執行完畢,任務將全部回滾");}} catch (Exception exception) {log.info("主線程發生異常,異常為: " + exception.getMessage());} finally {// 計數器減1,代表該主線程執行完畢mainCountDownLatch.countDown();}// 返回結果,是否執行成功,事務提交即為執行成功,事務回滾即為執行失敗return isSubmit.get();}private void executeThread(String factorySchema,Runnable runnable, CountDownLatch threadCountDownLatch, CountDownLatch mainCountDownLatch, AtomicBoolean isSubmit) {log.info("子線程: [" + Thread.currentThread().getName() + "]");// 判斷別的子線程是否已經出現錯誤,錯誤別的線程已經出現錯誤,那么所有的都要回滾,這個子線程就沒有必要執行了if (!isSubmit.get()) {log.info("整個事務中有子線程執行失敗需要回滾, 子線程: [" + Thread.currentThread().getName() + "] 終止執行");// 計數器減1,代表該子線程執行完畢threadCountDownLatch.countDown();return;}//動態數據源切換SchemaContextHolder.setSchema(factorySchema);// 開啟事務DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(defaultTransactionDefinition);try {// 執行業務邏輯runnable.run();} catch (Exception exception) {// 發生異常需要進行回滾,設置isSubmit為falseisSubmit.set(false);log.info("子線程: [" + Thread.currentThread().getName() + "]執行業務發生異常,異常為: " + exception.getMessage());} finally {// 計數器減1,代表該子線程執行完畢threadCountDownLatch.countDown();}try {// 等待主線程執行mainCountDownLatch.await();} catch (Exception exception) {log.info("子線程: [" + Thread.currentThread().getName() + "]等待提交或回滾異常,異常為: " + exception.getMessage());}try {// 提交if (isSubmit.get()) {dataSourceTransactionManager.commit(transactionStatus);log.info("子線程: [" + Thread.currentThread().getName() + "]進行事務提交");} else {dataSourceTransactionManager.rollback(transactionStatus);log.info("子線程: [" + Thread.currentThread().getName() + "]進行事務回滾");}} catch (Exception exception) {log.info("子線程: [" + Thread.currentThread().getName() + "]進行事務提交或回滾出現異常,異常為:" + exception.getMessage());}}private void setThreadCountDownLatch(int num) {this.threadCountDownLatch = new CountDownLatch(num);}
}
(2)、測試類
@RestController
@RequestMapping("test")
public class TestController {@AutowiredTestService testService;@AutowiredMultiThreadingTransactionManager multiThreadingTransactionManager;@RequestMapping("test")public String test(){List<TestBean> list = new ArrayList<>();list.add(new TestBean("2",1));list.add(new TestBean("3",2));List<Runnable> runnableList = new ArrayList<>();list.forEach(testBean -> runnableList.add(() -> {testService.insert(testBean);}));boolean isSuccess = multiThreadingTransactionManager.execute(runnableList,"db9771");System.out.println(isSuccess);return "ok";};
}
3、總結
大體思路,就是所有子線程在各自線程內開啟事務,執行業務邏輯后,判斷是否拋錯,一旦拋錯,會把全局AtomicBoolean置為false,因為其具有原子性所以不會有線程不安全問題。所有子線程完業務代碼會等待主線程,全部子線程執行業務結束后,主線程等待結束,判斷AtomicBoolean是什么狀態,一旦false,所有子線程回滾,否則提交。