使用SpringMVC異步處理Servlet解決的問題
- 可以不阻塞有限的tomcat 線程(默認是200~250個,springboot3是200個),確保網絡請求可以持續響應
- 特定業務使用自定義線程池,可以處理的業務量更大
- 對上層業務完全無感知(但如果中間鏈路有超時,則需要注意,比如 nginx 代理60秒超時)
- SpringMVC 的包裝比較好,可以完全無感 Servlet 的底層實現
注意事項
- springboot3里面會默認開啟異步,雖然官方文檔說,如果使用了AbstractAnnotationConfigDispatcherServletInitializer則會開啟異步支持,但我看代碼并沒有使用,實際也是開啟的(可能還有我不懂的,比如使用了@EnableAsync注解,先保留意見)。SpringMVC 會默認將所有的 Servlet 和 Filter 都注冊為支持異步,但是否需要開啟異步是根據返回值的類型判斷的。
- 官方文檔說返回結果支持DeferredResult和Callable,實測其實還有其他的類型可以支持,比如CompletableFuture。
- 對于 Callable 的返回結果,處理流程如下:
- 對于這個類型的返回值,Spring 會采用CallableMethodReturnValueHandler進行處理異步結果,注意這里的結果類型都是Callable。
- CallableMethodReturnValueHandler會調用 org.springframework.web.context.request.async.WebAsyncManager#startCallableProcessing(org.springframework.web.context.request.async.WebAsyncTask<?>, java.lang.Object…)處理結果,Callable 是包裝成WebAsyncTask執行的。
- 其中的攔截器之類的都忽略,但這有非常重要的一點this.taskExecutor.submit(() -> {});,這里是 SpringMVC 會將異步的任務提交到線程池處理,也就是調用Callable.call()來獲取結果,也就意味著這個線程池是阻塞的。而 Servlet 線程會在此提交到線程池后釋放(假定此處的 servlet 線程是http-nio-7001-exec-1)。
- 所謂釋放線程,其實就是 servlet 執行結束了,然后 Filter 也會執行結束,相當于整個請求的線程執行全部結束,僅僅是,沒有給 response 結果而已。
- 這一步的線程池taskExecutor,默認是每次new Thread 的,存在風險,需要替換為執行線程池,可以實現 MVC 配置,并指定線程池org.springframework.web.servlet.config.annotation.WebMvcConfigurer#configureAsyncSupport。
- 在異步執行完成后,SpringMVC 會再次觸發DispatcherServlet中doDispatch(這也是個坑),在這里直接將結果返回,不再執行 Controller。這是因為 Request 中已經標記request.getDispatcherType()是ASYNC值。
- HandlerInterceptor攔截器的實現類需要注意,因為DispatcherServlet中doDispatch會被再次調用,所以preHandle方法在一開始會調用一次,異步執行完成后,發送結果給客戶端的時候會重復調用一次,如果不希望執行,可以用DispatcherType.ASYNC.equals(request.getDispatcherType())判斷,并跳過執行,注意返回結果是 true即可。
- 再次觸發的doDispatch,通常是一個新的線程(如:http-nio-7001-exec-2),因為線程不同,所以在 ThreadLocal 中存儲的全鏈路跟蹤 TraceId 會丟失,需要采用其他方式,比如放到 Request的Attribute 里面。
- WebAsyncTask 和 Callable 一樣,也存在線程池問題。
- 對于DeferredResult的返回結果,處理流程如下:
- DeferredResultMethodReturnValueHandler
- org.springframework.web.context.request.async.WebAsyncManager#startDeferredResultProcessing
- 這里沒有線程池,因為DeferredResult 需要你在業務執行的地方setResult,Spring 會在setResult后觸發后續鏈路。
- CompletableFuture、Mono、Flux 返回值類型:會被認為是 Reactive 類型的返回值,通過ResponseBodyEmitterReturnValueHandler處理,最終會包裝為DeferredResultSubscriber,執行到org.springframework.web.context.request.async.WebAsyncManager#startDeferredResultProcessing中,和DeferredResult的處理方式一樣。
- 注意,在
handleReturnValue
中會有判斷,如果異步的返回 Reactive 對象是同步完成態,比如 Mono 只有一個確定的結果,里面沒有采用訂閱的模式進行異步處理,或者 Future 是完結態,那么handleReturnValue
會直接同步返回結果,而不是異步處理。所以開發的時候要注意:- 一定要自己啟動異步線程,在Future 中做業務邏輯,比如直接用
CompletableFuture.supplyAsync(()->{}, threadPool)
- 在
Publisher.subscribe
訂閱的時候,啟動異步線程threadPool.submit()->{ subscriber.onSubscribe(subscription); --> Subscription.request(x) --> subscriber.onNext(T); }
,在異步線程中onNext
時輸出結果,才能確保在 Servlet 層面是異步的,非阻塞的。
- 一定要自己啟動異步線程,在Future 中做業務邏輯,比如直接用
- 注意,在
結論
只有 Callable 用了簡單線程池,存在線程問題,如果使用需要指定線程池,但也只有Callable使用最簡單,return 即可。
另外,推薦采用DeferredResult和CompletableFuture類型的返回值,需要自己在線程池中處理業務并賦值結果。
代碼示例
package com.test;import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import reactor.core.publisher.Mono;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;@Slf4j
@RestController
@RequestMapping("/restapi/testAsync")
public class TestAsyncToolboxController {private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10,100,60,TimeUnit.SECONDS,new ArrayBlockingQueue<>(10000),new ThreadPoolExecutor.AbortPolicy());@RequestMapping("/test")public DeferredResult<Object> test(HttpServletRequest request, HttpServletResponse response) {log.info("TestAsyncToolboxController.test 1 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());DeferredResult<Object> deferredResult = new DeferredResult<>();threadPoolExecutor.submit(() -> {try {log.info("TestAsyncToolboxController.test 2 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());TimeUnit.SECONDS.sleep(5);log.info("TestAsyncToolboxController.test 3 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());} catch (InterruptedException e) {deferredResult.setResult("exception");throw new RuntimeException(e);}log.info("TestAsyncToolboxController.test 4 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());deferredResult.setResult("success");});return deferredResult;}@RequestMapping("/test3")public CompletableFuture<String> test3(HttpServletRequest request, HttpServletResponse response) {log.info("TestAsyncToolboxController.test3 1 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {try {log.info("TestAsyncToolboxController.test3 2 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());TimeUnit.SECONDS.sleep(5);log.info("TestAsyncToolboxController.test3 3 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());} catch (InterruptedException e) {throw new RuntimeException(e);}log.info("TestAsyncToolboxController.test3 4 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());return "success";}, threadPoolExecutor);return stringCompletableFuture;}@RequestMapping("/test4")public Mono<String> test4(HttpServletRequest request, HttpServletResponse response) {log.info("TestAsyncToolboxController.test4 1 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());try {return Mono.from(new Publisher<String>() {@Overridepublic void subscribe(Subscriber<? super String> subscriber) {threadPoolExecutor.submit(() -> {try {log.info("TestAsyncToolboxController.test4 2 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());TimeUnit.SECONDS.sleep(5);log.info("TestAsyncToolboxController.test4 3 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());} catch (InterruptedException e) {throw new RuntimeException(e);}log.info("TestAsyncToolboxController.test4 4 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());Subscription subscription = new Subscription() {@Overridepublic void request(long n) {log.info("TestAsyncToolboxController.test4 7 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());subscriber.onNext("success");}@Overridepublic void cancel() {log.info("TestAsyncToolboxController.test4 8 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());}};log.info("TestAsyncToolboxController.test4 10 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());subscriber.onSubscribe(subscription);log.info("TestAsyncToolboxController.test4 9 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());});log.info("TestAsyncToolboxController.test4 6 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());}});} finally {log.info("TestAsyncToolboxController.test4 5 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());}}
}
其他
Servlet 的異步性能和 Reactor 的性能是否存在較大差異?為什么 Servlet 依然健在,且沒有明顯的被 Reactor 模式替換的跡象?
參考
https://docs.spring.io/spring-framework/reference/web/webmvc/mvc-ann-async.html
https://blog.csdn.net/sinat_33543436/article/details/88971367