ThreadPoolTaskExecutor 的使用案例
1. 依賴說明
< dependency> < groupId> org.springframework.retry</ groupId> < artifactId> spring-retry</ artifactId> < version> 1.3.1</ version>
</ dependency>
< dependency> < groupId> org.springframework</ groupId> < artifactId> spring-aspects</ artifactId> < version> 5.3.22</ version>
</ dependency>
< dependency> < groupId> org.slf4j</ groupId> < artifactId> slf4j-api</ artifactId> < version> 1.7.36</ version>
</ dependency>
2. 完整配置代碼
import org. springframework. context. annotation. Bean ;
import org. springframework. context. annotation. Configuration ;
import org. springframework. scheduling. concurrent. ThreadPoolTaskExecutor ;
import org. springframework. util. concurrent. ListenableFuture ;
import org. springframework. util. backoff. FixedBackOff ;
import org. springframework. retry. RetryCallback ;
import org. springframework. retry. RetryContext ;
import org. springframework. retry. support. RetryTemplate ;
import org. springframework. retry. policy. SimpleRetryPolicy ;
import org. springframework. retry. backoff. FixedBackOffPolicy ;
import org. springframework. core. task. TaskDecorator ; import java. util. concurrent. * ;
import java. util. Map ;
import java. util. concurrent. atomic. AtomicInteger ; import org. slf4j. MDC; @Configuration
public class TaskExecutorConfig { @Bean ( name = "taskExecutor" ) public ThreadPoolTaskExecutor taskExecutor ( ) { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ( ) ; executor. setCorePoolSize ( 5 ) ; executor. setMaxPoolSize ( 10 ) ; executor. setKeepAliveSeconds ( 60 ) ; BlockingQueue < Runnable > queue = new ArrayBlockingQueue < > ( 25 ) ; executor. setQueue ( queue) ; executor. setRejectedExecutionHandler ( new ThreadPoolExecutor. CallerRunsPolicy ( ) ) ; executor. setThreadNamePrefix ( "TaskExecutor-" ) ; executor. setTaskDecorator ( new MdcTaskDecorator ( ) ) ; executor. initialize ( ) ; return executor; } public static RetryTemplate createRetryTemplate ( ) { RetryTemplate retryTemplate = new RetryTemplate ( ) ; SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy ( ) ; retryPolicy. setMaxAttempts ( 3 ) ; retryTemplate. setRetryPolicy ( retryPolicy) ; FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy ( ) ; backOffPolicy. setBackOffPeriod ( 1000 ) ; retryTemplate. setBackOffPolicy ( backOffPolicy) ; return retryTemplate; } public static class RetryCallableWrapper implements Runnable { private final Runnable targetTask; private final RetryTemplate retryTemplate; public RetryCallableWrapper ( Runnable targetTask, RetryTemplate retryTemplate) { this . targetTask = targetTask; this . retryTemplate = retryTemplate; } @Override public void run ( ) { retryTemplate. execute ( context -> { targetTask. run ( ) ; return null ; } ) ; } } public static class MdcTaskDecorator implements TaskDecorator { @Override public Runnable decorate ( Runnable runnable) { Map < String , String > contextMap = MDC. getCopyOfContextMap ( ) ; return ( ) -> { try { MDC. setContextMap ( contextMap) ; runnable. run ( ) ; } finally { MDC. clear ( ) ; } } ; } } public void useTaskExecutor ( ) { ThreadPoolTaskExecutor executor = taskExecutor ( ) ; Runnable retryableTask = new RetryCallableWrapper ( ( ) -> { System . out. println ( Thread . currentThread ( ) . getName ( ) + " 執行任務" ) ; throw new RuntimeException ( "模擬任務失敗" ) ; } , createRetryTemplate ( ) ) ; executor. execute ( retryableTask) ; MDC. put ( "traceId" , "123456" ) ; executor. execute ( ( ) -> { System . out. println ( "當前 traceId: " + MDC. get ( "traceId" ) ) ; System . out. println ( Thread . currentThread ( ) . getName ( ) + " 執行任務" ) ; } ) ; }
}
3. 配置項詳解
3.1 線程池核心配置 配置項 說明 推薦值/示例 corePoolSize
核心線程數 CPU密集型:Runtime.getRuntime().availableProcessors()
;IO密集型:2 * CPU核心數
maxPoolSize
最大線程數 根據業務負載調整(如 2 * corePoolSize
) keepAliveSeconds
空閑線程存活時間 與任務平均執行時間匹配(如 60
秒) queue
任務隊列 ArrayBlockingQueue(25)
(有界隊列)或 LinkedBlockingQueue
(無界隊列)rejectedExecutionHandler
拒絕策略 CallerRunsPolicy
(調用線程執行)、AbortPolicy
(拋異常)、DiscardPolicy
(丟棄任務)threadNamePrefix
線程名稱前綴 便于日志追蹤(如 "TaskExecutor-"
)
3.2 上下文傳遞機制
MDC(Mapped Diagnostic Context) :SLF4J 提供的日志上下文工具,用于記錄日志追蹤 ID。TaskDecorator :Spring 提供的裝飾器接口,用于在任務執行前后傳遞上下文。實現方式 :
在主線程中設置 MDC 上下文(如 MDC.put("traceId", "123456")
)。 通過 TaskDecorator
將上下文傳遞給異步任務。 任務執行時恢復上下文,確保日志可追蹤。
3.3 重試機制
RetryTemplate :Spring Retry 提供的重試模板,封裝了重試邏輯。SimpleRetryPolicy :定義最大重試次數(如 3 次)。FixedBackOffPolicy :定義重試間隔(如 1 秒)。使用方式 :
將任務包裝為 RetryCallableWrapper
。 通過 RetryTemplate
執行任務,自動處理重試邏輯。
4. 使用場景建議
日志追蹤 :通過 MDC 傳遞 traceId
,確保異步任務日志與主線程關聯。任務重試 :適用于網絡請求、數據庫操作等需要自動重試的場景。資源控制 :通過隊列和線程數限制,防止系統過載。
5. 注意事項
重試邏輯與異常處理 :
默認對所有異常進行重試,可通過 SimpleRetryPolicy
自定義異常類型。 重試后仍失敗的任務需通過日志或監控告警處理。
上下文傳遞的開銷 :
如果上下文較大(如包含復雜對象),需評估性能影響。 使用 InheritableThreadLocal
替代 MDC
可以更靈活地傳遞上下文。
線程池生命周期 :
確保在應用關閉時正確關閉線程池(調用 executor.shutdown()
)。 Spring 容器會自動關閉線程池,但手動創建時需顯式調用。
隊列容量與拒絕策略 :
有界隊列需合理設置容量,防止任務堆積。 拒絕策略應根據業務需求選擇(如 CallerRunsPolicy
適合輕量任務)。
6. 擴展功能(可選)
動態調整線程池參數 :通過 ThreadPoolTaskExecutor
提供的方法(如 setCorePoolSize()
)動態修改配置。監控與調優 :通過 ThreadPoolTaskExecutor
的統計方法(如 getPoolSize()
、getQueueSize()
)監控線程池狀態。自定義重試策略 :結合 Spring Retry
實現更復雜的重試邏輯(如指數退避、重試條件判斷)。
7. 代碼結構圖
TaskExecutorConfig
├── taskExecutor() // 配置線程池
│ ├── corePoolSize // 核心線程數
│ ├── maxPoolSize // 最大線程數
│ ├── keepAliveSeconds // 空閑線程存活時間
│ ├── queue // 任務隊列
│ ├── rejectedHandler // 拒絕策略
│ └── setTaskDecorator // 上下文傳遞機制
├── createRetryTemplate() // 創建重試模板
├── RetryCallableWrapper // 重試任務包裝器
└── useTaskExecutor() // 使用示例├── execute() // 提交普通任務└── submitListenable()// 提交帶返回值任務
8. 總結
上下文傳遞 :確保異步任務中能訪問到主線程的上下文(如日志追蹤 ID)。任務重試 :對失敗任務自動重試,提升系統魯棒性。靈活的線程池管理 :根據業務需求調整線程池參數,避免資源浪費或過載。