目錄
- 前言
- 一、Springboot項目如何開啟異步?
- 二、存在的問題
- 三、自定義線程池
- 四、自定義線程池使用
- 五、阻塞隊列和拒絕策略
前言
當開發中遇到不影響主流程任務時,使用異步去處理。
如有以下場景:
1、業務需要生成一個季度的數據進行員工排名(涉及到的數據很多),數據查詢、組裝、按規則排名耗時比較長,并且開發方案能接受延時查看具體排名信息數據。在數據變動時,需要調用重新排名的方法,故把排名方法設置為異步。
一、Springboot項目如何開啟異步?
啟動類 上添加或者 自定義線程池 上添加注解:@EnableAsync。
執行方法上加上注解 @Async。
啟動類配置
Controller
Service
打印信息
到這里就可以正常使用異步了。
二、存在的問題
雖然在 Spring 框架中,@Async 注解可以用于異步執行方法。
但是Spring 會自動創建一個默認的線程池用于執行方法。這個默認線程池是由 SimpleAsyncTaskExecutor 管理的,它為每個任務創建一個新的線程。雖然這可以工作,但可能會遇到以下問題:
- 無限制的線程創建:SimpleAsyncTaskExecutor 會為每個任務創建一個新的線程,而沒有最大線程數的限制。如果異步任務的數量非常多,這可能導致大量的線程被創建,消耗大量的系統資源,最終可能導致 OutOfMemoryError 或降低系統性能。
- 線程管理:由于每次調用都會創建新線程,沒有線程復用,這可能會導致線程管理上的開銷,尤其是在高并發場景下。
- 調試和監控困難:默認線程池創建的線程名稱沒有明確的命名規則,這可能會使得在日志中或監控工具中跟蹤異步任務變得困難。
- 資源競爭:大量的線程可能會引起CPU和內存資源的激烈競爭,尤其是在JVM和操作系統層面上的上下文切換。
- 安全性問題:如果異步任務執行的時間過長,而默認線程池沒有適當的管理策略,可能會因為線程過多而影響到系統的穩定性和安全性。
三、自定義線程池
鑒于以上問題,建議使用自定義線程池。
import cn.hutool.core.thread.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;/*** 全局異步任務配置類*/
@Slf4j
@EnableAsync
@Configuration
public class GlobalAsyncConfig implements AsyncConfigurer {// 從 application.yml 中注入配置參數@Value("${async.executor.core-pool-size:10}")private int corePoolSize;@Value("${async.executor.max-pool-size:50}")private int maxPoolSize;@Value("${async.executor.keep-alive-seconds:60}")private int keepAliveSeconds;@Value("${async.executor.queue-capacity:200}")private int queueCapacity;/*** 創建主異步線程池** @return Executor*/@Bean(name = "asyncExecutor")public Executor asyncExecutor() {return createThreadPool("async-pool-", "MainAsyncTask");}/*** 創建郵件發送專用線程池(可選擴展)** @return Executor*/@Bean(name = "emailExecutor")public Executor emailExecutor() {return createThreadPool("email-pool-", "EmailTask");}/*** 創建線程池通用方法** @param namePrefix 線程名稱前綴* @param taskType 任務類型描述(用于日志區分)* @return ThreadPoolTaskExecutor*/private Executor createThreadPool(String namePrefix, String taskType) {// 使用 Spring 提供的 ThreadPoolTaskExecutor,相較于原生 ThreadPoolExecutor,// 更加適合與 Spring 的 @Async 注解配合使用,并且支持更豐富的配置選項。ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 設置核心線程數,線程池初始化時創建的線程數量executor.setCorePoolSize(corePoolSize);// 設置最大線程數,當任務隊列滿時,線程池最多可擴容到的線程數量executor.setMaxPoolSize(maxPoolSize);// 設置非核心線程空閑存活時間(單位為秒)executor.setKeepAliveSeconds(keepAliveSeconds);// 設置任務隊列容量,用于緩存待執行的任務executor.setQueueCapacity(queueCapacity);// 設置線程工廠,用于創建具有指定命名前綴的線程,便于日志追蹤和問題定位executor.setThreadFactory(createThreadFactory(namePrefix));// 設置拒絕策略:當線程池和任務隊列都已滿時,由調用線程(即提交任務的線程)自己執行該任務executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 允許核心線程在空閑時超時并被回收,有助于節省資源(適用于負載波動較大的場景)executor.setAllowCoreThreadTimeOut(true);// 設置線程名稱前綴,方便在日志中識別不同線程池中的線程executor.setThreadNamePrefix("[" + taskType + "] ");// 必須顯式調用 initialize() 來啟動線程池executor.initialize();// 返回配置完成的線程池實例return executor;}/*** 創建線程工廠(統一命名格式)** @param prefix 線程名稱前綴* @return ThreadFactory*/private ThreadFactory createThreadFactory(String prefix) {return new ThreadFactoryBuilder().setNamePrefix(prefix).build();}@Overridepublic Executor getAsyncExecutor() {return asyncExecutor();}@Overridepublic AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {return new GlobalAsyncUncaughtExceptionHandler();}/*** 異步任務全局異常處理器,這里可以單獨放一個類文件(這里鑒于篇幅寫在一起)*/@Slf4jpublic static class GlobalAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {@Overridepublic void handleUncaughtException(Throwable ex, Method method, Object... params) {log.error("[Async Task Error] Method: {}, Params: {}", method.getName(), Arrays.deepToString(params), ex);}}
}
application.yml配置(已有默認值,看個人需求配置)
# 全局線程池相關配置
async:executor:core-pool-size: 10max-pool-size: 50keep-alive-seconds: 60queue-capacity: 200
四、自定義線程池使用
在業務開發中,建議每塊業務區分線程池使用,模塊互不影響,便于日志收集。
五、阻塞隊列和拒絕策略
Java 中常用的阻塞隊列(BlockingQueue)
隊列類型 | 特點 | 適用場景 |
---|---|---|
ArrayBlockingQueue | 基于數組、有界、FIFO | 內存敏感、任務量可控的系統 |
LinkedBlockingQueue | 基于鏈表、可有界也可無界、FIFO | 通用型,吞吐量要求較高 |
PriorityBlockingQueue | 支持優先級排序、無界 | 需要按優先級處理的任務(如報警、日志級別) |
SynchronousQueue | 不存儲元素,插入必須等待取出 | 高并發、低延遲場景,任務直接由消費者線程執行 |
🚫 線程池拒絕策略(RejectedExecutionHandler)
策略類名 | 行為說明 | 使用建議 |
---|---|---|
AbortPolicy | 拋出異常 RejectedExecutionException | 默認策略,適用于不能丟失任務的場景 |
CallerRunsPolicy | 由調用線程自己執行任務 | 減緩提交速度,適合臨時過載時 |
DiscardOldestPolicy | 丟棄隊列中最老的任務,嘗試重新提交當前任務 | 可接受部分任務丟失,希望保留最新任務 |
DiscardPolicy | 默默丟棄任務,不做任何處理 | 可容忍任務丟失,如非關鍵日志、監控等任務 |
默認使用的是LinkedBlockingQueue隊列,建議初始化大小,防止內存溢出。