一、核心特性
Springboot 集成
支持 @Async 注解,簡化異步方法調用。
參數可配置化
核心線程數、最大線程數、隊列容量、拒絕策略等均可通過配置調整。
生命周期管理
實現 Lifecycle 接口,支持線程池的啟動和關閉(如應用關閉時優雅終止任務)。
任務裝飾器
支持通過 TaskDecorator 對任務進行裝飾(如傳遞上下文信息)
二、添加依賴
在 pom.xml
文件中添加 Spring Boot Starter AOP 依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
三、參數詳解
通過 Spring 配置文件或 @Bean 定義線程池時,需設置以下關鍵參數:
參數名稱 | 說明 | 默認值 |
---|---|---|
corePoolSize | 核心線程數,即使空閑也不會被回收 | 1 |
maxPoolSize | 最大線程數,當隊列滿時創建新線程直到達到此值 | Integer.MAX_VALUE |
queueCapacity | 任務隊列容量(使用 LinkedBlockingQueue 或 ArrayBlockingQueue) | Integer.MAX_VALUE |
keepAliveSeconds | 非核心線程的空閑存活時間(秒) | 60 |
threadNamePrefix | 線程名前綴,便于日志追蹤 | "task-executor-" |
allowCoreThreadTimeOut | 是否允許核心線程超時回收 | false |
rejectedExecutionHandler | 拒絕策略(如 AbortPolicy、CallerRunsPolicy) | AbortPolicy(直接拋出異常) |
四、配置線程池
@Configuration
@EnableAsync
public class ExecutorConfig {private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);@Value("${async.executor.thread.core_pool_size}")private int corePoolSize;@Value("${async.executor.thread.max_pool_size}")private int maxPoolSize;@Value("${async.executor.thread.queue_capacity}")private int queueCapacity;@Value("${async.executor.thread.name.prefix}")private String namePrefix;@Bean(name = "asyncServiceExecutor")public Executor asyncServiceExecutor() {logger.info("start asyncServiceExecutor");ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//配置核心線程數executor.setCorePoolSize(corePoolSize);//配置最大線程數executor.setMaxPoolSize(maxPoolSize);//配置隊列大小executor.setQueueCapacity(queueCapacity);//配置線程池中的線程的名稱前綴executor.setThreadNamePrefix(namePrefix);// rejection-policy:當pool已經達到max size的時候,如何處理新任務// CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//執行初始化executor.initialize();return executor;}
}
@Value
是我配置在?application.yml
,可以參考配置,自由定義?
# 異步線程配置
# 配置核心線程數
async.executor.thread.core_pool_size = 5
# 配置最大線程數
async.executor.thread.max_pool_size = 5
# 配置隊列大小
async.executor.thread.queue_capacity = 99999
# 配置線程池中的線程的名稱前綴
async.executor.thread.name.prefix = async-service-
五、應用實踐
1、異步任務處理
創建一個服務類 AsyncService
,并在其方法上使用 @Async
注解來定義異步任務:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;@Service
public class AsyncService {private static final Logger logger = LoggerFactory.getLogger(AsyncService.class);@Async("taskExecutor")public void asyncTask(String taskName) {logger.info(Thread.currentThread().getName() + " 開始執行任務: " + taskName);try {Thread.sleep(2000); // 模擬耗時操作} catch (InterruptedException e) {Thread.currentThread().interrupt();logger.error("任務執行被中斷", e);} finally {logger.info(Thread.currentThread().getName() + " 任務執行完成: " + taskName);}}
}
創建一個控制器類 AsyncController
,用于觸發異步任務(線程安全的)
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.Future;@RestController
public class AsyncController {private static final Logger logger = LoggerFactory.getLogger(AsyncController.class);@Autowiredprivate AsyncService asyncService;@GetMapping("/trigger")public String triggerAsyncTasks() {logger.info("開始觸發異步任務");for (int i = 0; i < 10; i++) {asyncService.asyncTask("任務 " + i);}return "異步任務已觸發";}
}
創建一個監控組件 ThreadPoolMonitor
,用于定期監控線程池的狀態
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;@Component
public class ThreadPoolMonitor {private static final Logger logger = LoggerFactory.getLogger(ThreadPoolMonitor.class);@Autowiredprivate ThreadPoolTaskExecutor taskExecutor;@Scheduled(fixedRate = 60000) // 每分鐘執行一次public void monitorThreadPool() {int activeCount = taskExecutor.getActiveCount();int poolSize = taskExecutor.getPoolSize();int corePoolSize = taskExecutor.getCorePoolSize();int maxPoolSize = taskExecutor.getMaxPoolSize();int queueSize = taskExecutor.getThreadPoolExecutor().getQueue().size();int completedTaskCount = taskExecutor.getThreadPoolExecutor().getCompletedTaskCount();logger.info("線程池狀態 - 活動線程數: {}, 當前線程數: {}, 核心線程數: {}, 最大線程數: {}, 隊列大小: {}, 已完成任務數: {}",activeCount, poolSize, corePoolSize, maxPoolSize, queueSize, completedTaskCount);// 檢查線程池是否接近飽和if (activeCount >= maxPoolSize * 0.8 || queueSize >= taskExecutor.getQueueCapacity() * 0.8) {logger.warn("線程池負載過高!請考慮優化配置或檢查任務執行情況");}}
}
?確保在啟動類上添加 @EnableAsync
注解,以啟用異步任務支持
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;@SpringBootApplication
@EnableAsync
public class AsyncDemoApplication {public static void main(String[] args) {SpringApplication.run(AsyncDemoApplication.class, args);}
}
測試:
啟動 Spring Boot 應用后,訪問 http://localhost:8080/trigger
,即可看到異步任務在線程池中執行的情況,同時線程池的狀態也會定期輸出到日志中。
代碼說明
-
@EnableAsync 注解 :用于啟用 Spring 的異步方法執行支持,確保 Spring 容器能夠識別和處理帶有
@Async
注解的方法。 -
@Async 注解 :用于標注希望異步執行的方法,需指定所使用的線程池 Bean 的名稱,在本例中為 “taskExecutor”。當該方法被調用時,Spring 會將其提交到指定的線程池中執行。
-
ThreadPoolTaskExecutor :是 Spring 提供的一個線程池任務執行器,通過設置核心線程數、最大線程數、隊列容量等參數,可以根據應用的需求靈活地配置線程池。
-
異步任務失敗處理 :通過自定義的拒絕策略,在線程池滿時記錄詳細信息并拋出異常,以便及時發現任務執行失敗的情況。
-
線程池監控 :使用
@Scheduled
注解定期監控線程池的狀態,包括活動線程數、當前線程數、核心線程數、最大線程數、隊列大小和已完成任務數等,幫助開發者了解線程池的運行情況,以便及時進行優化和調整
2、高并發請求處理
在 Web 應用中處理大量并發請求,避免阻塞主線程
@RestController
public class MyController {@Autowiredprivate ThreadPoolTaskExecutor taskExecutor;@GetMapping("/process")public CompletableFuture<String> handleRequest() {return CompletableFuture.supplyAsync(() -> {// 耗時操作return "Result";}, taskExecutor);}}
3、定時任務調度
@EnableScheduling
@Configuration
public class SchedulerConfig {@Beanpublic ThreadPoolTaskScheduler taskScheduler() {ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();scheduler.setPoolSize(5);scheduler.setThreadNamePrefix("Scheduler-");return scheduler;}}@Service
public class ScheduledService {@Scheduled(fixedRate = 5000)public void scheduledTask() {// 定時任務邏輯}}
?
拒絕策略(Rejected Policies)
當線程池和隊列均滿時,處理新任務的策略:
策略類 | 行為描述 |
---|---|
AbortPolicy | 直接拋出 RejectedExecutionException(默認) |
CallerRunsPolicy | 由提交任務的線程直接執行任務(同步阻塞提交者) |
DiscardPolicy | 靜默丟棄新任務,不拋異常 |
DiscardOldestPolicy | 丟棄隊列中最舊的任務,然后重試提交新任務 |
如下給出不同拒絕策略的配置類,請結合上面的配置類整合使用
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;@Configuration
public class ThreadPoolConfig {@Bean(name = "abortPolicyExecutor")public ThreadPoolTaskExecutor abortPolicyExecutor() {return createExecutor(new ThreadPoolExecutor.AbortPolicy());}@Bean(name = "callerRunsPolicyExecutor")public ThreadPoolTaskExecutor callerRunsPolicyExecutor() {return createExecutor(new ThreadPoolExecutor.CallerRunsPolicy());}@Bean(name = "discardPolicyExecutor")public ThreadPoolTaskExecutor discardPolicyExecutor() {return createExecutor(new ThreadPoolExecutor.DiscardPolicy());}@Bean(name = "discardOldestPolicyExecutor")public ThreadPoolTaskExecutor discardOldestPolicyExecutor() {return createExecutor(new ThreadPoolExecutor.DiscardOldestPolicy());}private ThreadPoolTaskExecutor createExecutor(ThreadPoolExecutor.RejectedExecutionHandler rejectedExecutionHandler) {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5); // 核心線程數executor.setMaxPoolSize(10); // 最大線程數executor.setQueueCapacity(100); // 隊列容量executor.setThreadNamePrefix("Task-Executor-"); // 線程名前綴executor.setRejectedExecutionHandler(rejectedExecutionHandler);executor.initialize();return executor;}
}
創建一個服務類 TaskService
,用于執行任務
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;@Service
public class TaskService {@Async("abortPolicyExecutor")public void executeWithAbortPolicy(String taskName) {executeTask(taskName);}@Async("callerRunsPolicyExecutor")public void executeWithCallerRunsPolicy(String taskName) {executeTask(taskName);}@Async("discardPolicyExecutor")public void executeWithDiscardPolicy(String taskName) {executeTask(taskName);}@Async("discardOldestPolicyExecutor")public void executeWithDiscardOldestPolicy(String taskName) {executeTask(taskName);}private void executeTask(String taskName) {try {System.out.println(Thread.currentThread().getName() + " 開始執行任務: " + taskName);Thread.sleep(2000); // 模擬任務執行時間System.out.println(Thread.currentThread().getName() + " 任務執行完成: " + taskName);} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("任務執行被中斷: " + taskName);}}
}
創建一個控制器類 TaskController
,用于觸發任務執行
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class TaskController {@Autowiredprivate TaskService taskService;@GetMapping("/trigger/abort")public String triggerAbortPolicy(@RequestParam String taskName) {taskService.executeWithAbortPolicy(taskName);return "任務已提交到使用 AbortPolicy 的線程池";}@GetMapping("/trigger/caller")public String triggerCallerRunsPolicy(@RequestParam String taskName) {taskService.executeWithCallerRunsPolicy(taskName);return "任務已提交到使用 CallerRunsPolicy 的線程池";}@GetMapping("/trigger/discard")public String triggerDiscardPolicy(@RequestParam String taskName) {taskService.executeWithDiscardPolicy(taskName);return "任務已提交到使用 DiscardPolicy 的線程池";}@GetMapping("/trigger/discardoldest")public String triggerDiscardOldestPolicy(@RequestParam String taskName) {taskService.executeWithDiscardOldestPolicy(taskName);return "任務已提交到使用 DiscardOldestPolicy 的線程池";}
}
啟動 Spring Boot 應用后,分別訪問以下 URL 來測試不同拒絕策略的行為:
-
http://localhost:8080/trigger/abort?taskName=任務1
-
http://localhost:8080/trigger/caller?taskName=任務2
-
http://localhost:8080/trigger/discard?taskName=任務3
-
http://localhost:8080/trigger/discardoldest?taskName=任務4
-
代碼說明
-
線程池配置:
-
使用
ThreadPoolTaskExecutor
創建線程池。 -
配置了 4 個不同的線程池,每個線程池使用不同的拒絕策略。
-
每個線程池的核心線程數為 5,最大線程數為 10,隊列容量為 100。
-
-
拒絕策略:
-
AbortPolicy:直接拋出
RejectedExecutionException
。 -
CallerRunsPolicy:由提交任務的線程直接執行任務。
-
DiscardPolicy:靜默丟棄新任務,不拋異常。
-
DiscardOldestPolicy:丟棄隊列中最舊的任務,然后重試提交新任務。
-
任務執行:
-
TaskService
類中的每個方法都使用@Async
注解,并指定使用的線程池。 -
executeTask
方法模擬任務執行,包含一個 2 秒的睡眠時間。 -
通過這個示例,你可以觀察不同拒絕策略在任務被拒絕時的行為。例如,當線程池滿時,
AbortPolicy
會拋出異常,CallerRunsPolicy
會讓提交任務的線程執行任務,DiscardPolicy
會靜默丟棄任務,而DiscardOldestPolicy
會丟棄最舊的任務并嘗試提交新任務
-
-
6、最佳配置
-
· 合理設置線程池參數
CPU 密集型任務:核心線程數 ≈ CPU 核心數
I/O 密集型任務:核心線程數 ≈ CPU 核心數 * 2,并增大隊列容量。
·?避免隊列無限堆積
設置合理的 queueCapacity,防止內存溢出(OOM)。
· 統一異常處理
通過 AsyncUncaughtExceptionHandler 捕獲異步任務中的異常: -
@Configurationpublic class AsyncConfig implements AsyncConfigurer {@Overridepublic Executor getAsyncExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// ... 配置參數return executor;}@Overridepublic AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {return (ex, method, params) -> {// 處理異常};}}
應用退出時,調用?shutdown()?并等待剩余任務執行完畢
-
executor.shutdown();try {if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow();}} catch (InterruptedException e) {executor.shutdownNow();}
總結:
-
ThreadPoolTaskExecutor?是?Spring?生態中管理線程任務的利器,通過靈活的配置和與?Spring?的無縫集成,能夠高效處理異步任務、高并發請求和定時調度。合理設置參數、選擇拒絕策略,并結合監控手段,可顯著提升系統性能和穩定性。
-