?????????官網文檔:How To Do @Async in Spring | Baeldung。
@Async注解
Spring框架基于@Async注解提供了對異步執行流程的支持。
最簡單的例子是:使用@Async注解修飾一個方法,那么這個方法將在一個單獨的線程中被執行,即:從同步執行流程轉換為異步執行流程。?
此外,Spring框架中,事件Event也是支持異步處理操作的。
?@EnableAsync注解|核心接口
通過在配置類上添加@EnableAsync注解,可以為Spring應用程序啟用異步執行流程的支持
@Configuration
@EnableAsync
public class SpringAsyncConfig { ... }
? ? ? ? 該注解提供了一些可配置屬性,
?annotation:默認情況下,@EnableAsync會告訴Spring程序去探查所有被@Async注解修飾的、以及@javax.ejb.Asynchronous.注解;
mode:指定異步流程生效的方式:JDK Proxy還是AspectJ;
proxtTargetClass:只有在mode為AdviceMode.PROXY(JDK動態代理)時才會生效,用于指定要使用的動態代理類型:CGLIB或者JDK;
order:設置AsyncAnnotationBeanPostProcessor執行異步調用的順序。


? ? ? ? 而在AsyncAnnotationBeanPostProcessor類的內部,則是通過TaskExecutor提供了一個線程池,來更加具體的負責執行某一個異步流程的。
? ? ? ? 再往深入查看,就會發現,該接口的父接口Executor,與之相關的就是我們經常談論的和并發編程相關的Executor框架了。
? ? ? ? 再看Spring框架內部提供的TaskExecutor接口的繼承結構,如下圖所示,
? ? ? ? ?因此,要使用@EnableAsync注解開啟異步流程執行的支持,可能就需要去對TaskExecutor接口實例的線程池參數進行配置。
<task:executor id="myexecutor" pool-size="5" ?/>
<task:annotation-driven executor="myexecutor"/>
SpringBoot默認線程池配置
?根據以上解讀,不難發現:其實SpringBoot內部是通過維護線程池的方式去執行異步任務的,那么,這個默認的線程池對應于Exector框架的哪一個實現子類?相關的配置參數又是什么呢?
? ? ? ? 要解決上述疑惑,需要先去找到TaskExecutionAutoConfiguration類,該類定義了默認注入的線程池實例及其配置參數。
默認線程池類型:ThreadPoolTaskExecutor
默認線程池配置參數:TaskExecutionProperties.Pool
? ? ? ? 詳細參數信息,對應于一個靜態內部類Pool,源碼如下,
public static class Pool {/*** Queue capacity. An unbounded capacity does not increase the pool and therefore* ignores the "max-size" property.*/private int queueCapacity = Integer.MAX_VALUE;/*** Core number of threads.*/private int coreSize = 8;/*** Maximum allowed number of threads. If tasks are filling up the queue, the pool* can expand up to that size to accommodate the load. Ignored if the queue is* unbounded.*/private int maxSize = Integer.MAX_VALUE;/*** Whether core threads are allowed to time out. This enables dynamic growing and* shrinking of the pool.*/private boolean allowCoreThreadTimeout = true;/*** Time limit for which threads may remain idle before being terminated.*/private Duration keepAlive = Duration.ofSeconds(60);public int getQueueCapacity() {return this.queueCapacity;}public void setQueueCapacity(int queueCapacity) {this.queueCapacity = queueCapacity;}public int getCoreSize() {return this.coreSize;}public void setCoreSize(int coreSize) {this.coreSize = coreSize;}public int getMaxSize() {return this.maxSize;}public void setMaxSize(int maxSize) {this.maxSize = maxSize;}public boolean isAllowCoreThreadTimeout() {return this.allowCoreThreadTimeout;}public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {this.allowCoreThreadTimeout = allowCoreThreadTimeout;}public Duration getKeepAlive() {return this.keepAlive;}public void setKeepAlive(Duration keepAlive) {this.keepAlive = keepAlive;}}
從中可以找到默認的配置參數,

@Async注解使用
使用時的兩個限制
1.它只能應用于公共方法
2.從同一個類中調用異步方法將不起作用(會繞過代理,而直接去調用底層方法本身)
注解修飾對象
查看@Async注解的源碼,可看到:它用于修飾class、interface、method,并且提供了一個value屬性,用于在@Autowired和@@Qualifier注解組合自動裝配時,指定要使用哪一個線程池。因為原則上來講,我們是可以通過@Bean注解,在一個Spring容器中注入多個線程池實例的。
package org.springframework.scheduling.annotation;import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** Annotation that marks a method as a candidate for <i>asynchronous</i> execution.** <p>Can also be used at the type level, in which case all the type's methods are* considered as asynchronous. Note, however, that {@code @Async} is not supported* on methods declared within a* {@link org.springframework.context.annotation.Configuration @Configuration} class.** <p>In terms of target method signatures, any parameter types are supported.* However, the return type is constrained to either {@code void} or* {@link java.util.concurrent.Future}. In the latter case, you may declare the* more specific {@link org.springframework.util.concurrent.ListenableFuture} or* {@link java.util.concurrent.CompletableFuture} types which allow for richer* interaction with the asynchronous task and for immediate composition with* further processing steps.** <p>A {@code Future} handle returned from the proxy will be an actual asynchronous* {@code Future} that can be used to track the result of the asynchronous method* execution. However, since the target method needs to implement the same signature,* it will have to return a temporary {@code Future} handle that just passes a value* through: e.g. Spring's {@link AsyncResult}, EJB 3.1's {@link javax.ejb.AsyncResult},* or {@link java.util.concurrent.CompletableFuture#completedFuture(Object)}.*/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {/*** A qualifier value for the specified asynchronous operation(s).* <p>May be used to determine the target executor to be used when executing* the asynchronous operation(s), matching the qualifier value (or the bean* name) of a specific {@link java.util.concurrent.Executor Executor} or* {@link org.springframework.core.task.TaskExecutor TaskExecutor}* bean definition.* <p>When specified on a class-level {@code @Async} annotation, indicates that the* given executor should be used for all methods within the class. Method-level use* of {@code Async#value} always overrides any value set at the class level.* @since 3.1.2*/String value() default "";}
使用方式1:修飾方法method
根據上面的使用限制,被@Async注解修飾的方法,和主調方法不能位于同一個類中,并且也必須是public類型的公共方法。
package com.example.soiladmin;import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;//@Async注解測試類
@Component
public class AsyncTestMethod {@Asyncpublic void asyncTest(){try {System.out.println(String.format("start:%s",Thread.currentThread().getName()));Thread.sleep(1500);System.out.println(String.format("end:%s",Thread.currentThread().getName()));} catch (InterruptedException e) {e.printStackTrace();}}
}
? ? ? ? 調用方法,
注意到:被@Async修飾的方法線程睡眠了1.5s,如果它是異步執行的,那么就不會阻塞后面for循環的執行。
@Autowiredprivate AsyncTestMethod asyncTestMethod;@Testpublic void asyncTestMethod_1(){asyncTestClass.asyncTest();for (int i = 0; i < 10; i++) {try {Thread.sleep(300);System.out.println(String.format("i=%d\n",i));} catch (InterruptedException e) {e.printStackTrace();}}}
? ? ? ? 打印結果,
使用方式2:修飾類class
?為了方便,我們直接將使用方式1中的類拷貝一份,然后使用@Async注解修飾class類,而非Method方法。
package com.example.soiladmin;import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;@Component
@Async
public class AsyncTestClass {public void asyncTest(){try {System.out.println(String.format("start:%s",Thread.currentThread().getName()));Thread.sleep(1500);System.out.println(String.format("end:%s",Thread.currentThread().getName()));} catch (InterruptedException e) {e.printStackTrace();}}
}
? ? ? ? 主調方法,
@Autowiredprivate AsyncTestClass asyncTestClass;@Testpublic void asyncTestClass_2(){asyncTestClass.asyncTest();for (int i = 0; i < 10; i++) {try {Thread.sleep(300);System.out.println(String.format("i=%d\n",i));} catch (InterruptedException e) {e.printStackTrace();}}}
? ? ? ? 打印結果,
使用方式3:帶返回值的方法
以上測試案例都是不帶返回值的,但是一般情況下,我們可能還希望獲取異步執行的結果,然后對結果進行合并、分析等,那么就可以為@Async注解修飾的方法聲明一java.util.concurrent接口類型的返回類型。
但是這里有一個注意點:就是要Future是一個接口,我們沒辦法直接去new一個接口,所以還要找到Future接口的實現子類。
比較常用的是Spring框架提供的實現子類AsyncResult,?
? ? ? ? 我們繼續簡單看一下AsyncResult實現子類的基本結構,
基本上提供了獲取異步執行結果的get方法、對成功/失敗情況進行處理的回調函數addCallBack、將返回結果繼續進行封裝為AsyncResult類型值的forValue,簡單來講,就是對jdk原生的concurrent包下的Future接口進行了功能拓展和增強。
? ? ? ? ?異步方法如下,
/*** 數列求和: An = 2 ^ n(n>=0),求累加和S(n)---這里為了測試效果(出于增加耗時考慮),不直接使用求和公式* @param n* @return*/@Asyncpublic ListenableFuture<Double> asyncSequenceSum(int n){Double sum = 0.0;for (int i = 1; i <= n; i++) {sum += Math.pow(2,i);}return new AsyncResult<>(sum);}
? ? ? ? 測試方法如下,
以下兩種方式都可以拿到執行結果,調用回調函數。
官網給的是第二種寫法,
@Testpublic void asyncTestMethodWithReturns_nor(){System.out.println("開始執行...");ListenableFuture<Double> asyncResult = asyncTestMethod.asyncSequenceSum(10);//添加回調函數asyncResult.addCallback(new SuccessCallback<Double>() {@Overridepublic void onSuccess(Double result) {System.out.println("執行成功:" + result.doubleValue());}},new FailureCallback() {@Overridepublic void onFailure(Throwable ex) {System.out.println("執行失敗:"+ex.getMessage());}});//直接嘗試獲取結果-[只能拿到結果,如果執行出錯,就會拋出異常]try {Double aDouble = asyncResult.get();System.out.println("計算結果:"+aDouble.doubleValue());} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}}@Testpublic void asyncTestMethodWithReturns_normal(){System.out.println("開始執行...");ListenableFuture<Double> asyncResult = asyncTestMethod.asyncSequenceSum(10);//等待執行結果while (true){if (asyncResult.isDone()){//直接嘗試獲取結果-[只能拿到結果,如果執行出錯,就會拋出異常]try {Double aDouble = asyncResult.get();System.out.println("計算結果:"+aDouble.doubleValue());//添加回調函數asyncResult.addCallback(new SuccessCallback<Double>() {@Overridepublic void onSuccess(Double result) {System.out.println("執行成功:" + result.doubleValue());}},new FailureCallback() {@Overridepublic void onFailure(Throwable ex) {System.out.println("執行失敗:"+ex.getMessage());}});} catch (ExecutionException | InterruptedException e) {e.printStackTrace();}//終止循環break;}System.out.println("Continue doing something else. ");}}
? ? ? ? 執行結果,
自定義線程池配置參數
上面提到,SpringBoot內置了1個ThreadPoolTaskExecutor線程池實例,在實際開發中,根據需要,我們也可以結合@Configuration注解自定義新的線程池,也可以通過通過實現AsyncConfigurer接口直接替換掉原有的線程池。
定義新的線程池
這種情況下,Spring容器就會出現多個線程池實例,所以在使用@Async注解時,要通過value屬性指定具體要使用哪一個線程池實例。
@Configuration
@EnableAsync
public class SpringAsyncConfig {@Bean(name = "threadPoolTaskExecutor")public Executor threadPoolTaskExecutor() {return new ThreadPoolTaskExecutor();}
}
? ? ? ? 使用示例,
@Async("threadPoolTaskExecutor")
public void asyncMethodWithConfiguredExecutor() {System.out.println("Execute method with configured executor - "+ Thread.currentThread().getName());
}
替換默認線程池?
替換默認線程池需要實現AsyncConfigurer接口,通過重寫getAsyncExecutor()?,從而讓自定義的線程池變為Spring框架默認使用的線程池。
? ? ? ? 示例代碼如下,
@Configuration
@EnableAsync
public class SpringAsyncConfig implements AsyncConfigurer {@Overridepublic Executor getAsyncExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();threadPoolTaskExecutor.initialize();return threadPoolTaskExecutor;}
}
配置線程池參數
除了上述自定義新的線程池的方法,也可以通過SpringBoot配置文件,重新對默認線程池的參數進行修改。
?
異步處理流程的異常處理
內置異常處理類:SimpleAsyncUncaughtExceptionHandler
SpringBoot框架內置的異常處理類為SimpleAsyncUncaughtExceptionHandler,僅僅是對異
常信息進行了打印處理。
package org.springframework.aop.interceptor;import java.lang.reflect.Method;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;/*** A default {@link AsyncUncaughtExceptionHandler} that simply logs the exception.** @author Stephane Nicoll* @author Juergen Hoeller* @since 4.1*/
public class SimpleAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {private static final Log logger = LogFactory.getLog(SimpleAsyncUncaughtExceptionHandler.class);@Overridepublic void handleUncaughtException(Throwable ex, Method method, Object... params) {if (logger.isErrorEnabled()) {logger.error("Unexpected exception occurred invoking async method: " + method, ex);}}}
當我們不做任何處理時,默認就是上述異常處理類在起作用。
繼續向上扒拉源碼,會發現它的父接口AsyncUncaughtExceptionHandler,其作用就是:指定異步方法執行過程中,拋出異常時的因對策略。
?
自定義異常處理類|配置
我們也可以通過實現接口AsyncUncaughtExceptionHandler,來自定義異常處理邏輯。
如下所示,為自定義的異常處理類:CustomAsyncExceptionHandler。
package com.example.soilcommon.core.async;import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;import java.lang.reflect.Method;/*** 異步處理流程異常處理類:* [1]對于返回值為Future類型的異步執行方法,異常會被拋出給主調方法* [2]對于返回值為void類型的異步執行方法,異常不會被拋出,即:在主調方法中沒辦法通過try...catch捕獲到異常信息* 當前配置類針對情況[2]進行統一的異常處理*/
@Component("customAsyncExceptionHandler")
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {/*** Handle the given uncaught exception thrown from an asynchronous method.* @param throwable the exception thrown from the asynchronous method* @param method the asynchronous method* @param params the parameters used to invoke the method*/@Overridepublic void handleUncaughtException(Throwable throwable, Method method, Object... params) {System.out.println("Exception message - " + throwable.getMessage());System.out.println("Method name - " + method.getName());for (Object param : params) {System.out.println("Parameter value - " + param);}}
}
接下來我們對其進行配置,使其生效,需要重寫AsyncConfigurer接口getAsyncUncaughtExceptionHandler方法,
package com.example.soilcommon.core.async;import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {/*** 指定要使用哪一個具體的異常處理類* 原因:SpringBoot框架默認使用內置的SimpleAsyncUncaughtExceptionHandler進行異常處理*/@Autowired@Qualifier("customAsyncExceptionHandler")private AsyncUncaughtExceptionHandler asyncUncaughtExceptionHandler;/*** The {@link AsyncUncaughtExceptionHandler} instance to be used* when an exception is thrown during an asynchronous method execution* with {@code void} return type.*/@Overridepublic AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {return this.asyncUncaughtExceptionHandler;}
}
? ? ? ? 最終異步方法執行拋出異常時,打印的信息就是我們自定義的了,
????????參考文章:How To Do @Async in Spring | Baeldung