一、異步編程基礎概念
1.1 同步 vs 異步
特性 | 同步 | 異步 |
---|---|---|
執行方式 | 順序執行,阻塞調用 | 非阻塞,調用后立即返回 |
線程使用 | 單線程完成所有任務 | 多線程并行處理 |
響應性 | 較差,需等待前任務完成 | 較好,可立即響應新請求 |
復雜度 | 簡單直觀 | 較復雜,需處理線程安全 |
適用場景 | 簡單流程,短時間任務 | IO密集型,長時間任務 |
通俗理解:同步就像在銀行柜臺排隊辦理業務,必須等前面的人辦完才能輪到你;異步則像取號后可以坐著玩手機,等叫號時再去辦理。
1.2 為什么要使用異步
- 提高吞吐量:服務器能同時處理更多請求
- 增強用戶體驗:避免用戶長時間等待
- 資源優化:合理利用系統資源,避免阻塞主線程
- 解耦:將耗時操作與主流程分離
1.3 Java中的異步編程方式
// 1. 傳統Thread方式
new Thread(() -> {// 異步任務
}).start();// 2. Future模式
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(() -> {Thread.sleep(1000);return "Result";
});// 3. CompletableFuture (Java8+)
CompletableFuture.supplyAsync(() -> {// 異步任務return "Result";
}).thenAccept(result -> {// 處理結果
});// 4. Spring @Async (本文重點)
@Async
public void asyncMethod() {// 異步方法體
}
二、@Async基礎使用
2.1 啟用@Async支持
步驟1:在Spring Boot主類或配置類上添加@EnableAsync
@SpringBootApplication
@EnableAsync // 啟用異步支持
public class AsyncApplication {public static void main(String[] args) {SpringApplication.run(AsyncApplication.class, args);}
}
步驟2:創建異步服務類
@Service
public class EmailService {// 無返回值異步方法@Asyncpublic void sendEmail(String to, String content) {// 模擬郵件發送耗時try {Thread.sleep(3000);System.out.println("郵件已發送至: " + to + ", 內容: " + content);} catch (InterruptedException e) {e.printStackTrace();}}// 有返回值異步方法@Asyncpublic Future<String> sendEmailWithResult(String to, String content) {try {Thread.sleep(3000);String result = "郵件已發送至: " + to;return new AsyncResult<>(result);} catch (InterruptedException e) {return new AsyncResult<>("發送失敗");}}
}
2.2 調用異步方法
@RestController
@RequestMapping("/api/email")
public class EmailController {@Autowiredprivate EmailService emailService;@GetMapping("/send")public String sendEmail() {long start = System.currentTimeMillis();// 調用異步方法emailService.sendEmail("user@example.com", "您的訂單已創建");long elapsed = System.currentTimeMillis() - start;return "請求已處理,耗時: " + elapsed + "ms"; // 立即返回,不會等待郵件發送完成}@GetMapping("/send-with-result")public String sendEmailWithResult() throws Exception {Future<String> future = emailService.sendEmailWithResult("user@example.com", "訂單詳情");// 可以在這里做其他事情// 當需要結果時(阻塞等待)String result = future.get();return "處理結果: " + result;}
}
2.3 @Async方法限制
- 必須public修飾:因為基于Spring AOP實現
- 同類調用無效:
this.asyncMethod()
不會異步執行 - 返回值限制:
- void
- Future及其子類(如AsyncResult)
- CompletableFuture (Spring 4.2+)
- ListenableFuture (Spring 4.2+)
三、線程池配置詳解
3.1 默認線程池問題
Spring默認使用SimpleAsyncTaskExecutor
,它不重用線程,每次調用都創建新線程,生產環境不推薦使用。
3.2 自定義線程池配置
方式1:配置類方式(推薦)
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {@Overridepublic Executor getAsyncExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 核心線程數:線程池創建時初始化的線程數executor.setCorePoolSize(5);// 最大線程數:線程池最大的線程數,只有在緩沖隊列滿了之后才會申請超過核心線程數的線程executor.setMaxPoolSize(10);// 緩沖隊列:用來緩沖執行任務的隊列executor.setQueueCapacity(50);// 線程名前綴executor.setThreadNamePrefix("Async-Executor-");// 線程池關閉時等待所有任務完成executor.setWaitForTasksToCompleteOnShutdown(true);// 等待時間executor.setAwaitTerminationSeconds(60);// 拒絕策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}@Overridepublic AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {return new CustomAsyncExceptionHandler();}
}// 自定義異常處理器
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {@Overridepublic void handleUncaughtException(Throwable ex, Method method, Object... params) {System.err.println("異步任務異常 - 方法: " + method.getName());System.err.println("異常信息: " + ex.getMessage());// 這里可以添加自定義處理邏輯,如記錄日志、發送告警等}
}
方式2:使用@Bean定義多個線程池
@Configuration
public class TaskExecutorConfig {@Bean(name = "emailExecutor")public Executor emailTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(3);executor.setMaxPoolSize(5);executor.setQueueCapacity(30);executor.setThreadNamePrefix("Email-Executor-");executor.initialize();return executor;}@Bean(name = "reportExecutor")public Executor reportTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(2);executor.setMaxPoolSize(4);executor.setQueueCapacity(20);executor.setThreadNamePrefix("Report-Executor-");executor.initialize();return executor;}
}
使用指定線程池:
@Async("emailExecutor")
public void sendEmail(String to) { /*...*/ }@Async("reportExecutor")
public void generateReport() { /*...*/ }
3.3 線程池參數詳解
參數名 | 說明 | 推薦設置建議 |
---|---|---|
corePoolSize | 核心線程數,即使空閑也不會被回收 | CPU密集型:CPU核數+1 IO密集型:2*CPU核數 |
maxPoolSize | 最大線程數,當隊列滿時才會創建新線程直到此值 | 建議為核心線程數的2-3倍 |
queueCapacity | 任務隊列容量,超過核心線程數的任務會進入隊列 | 根據業務量調整,太大可能導致OOM |
keepAliveSeconds | 非核心線程空閑存活時間(秒) | 60-300秒 |
threadNamePrefix | 線程名前綴,便于監控和日志追蹤 | 建議按業務命名,如"Order-Async-" |
allowCoreThreadTimeOut | 是否允許核心線程超時退出 | 默認false,長時間空閑應用可設為true |
waitForTasksToCompleteOnShutdown | 應用關閉時是否等待異步任務完成 | 生產環境建議true |
awaitTerminationSeconds | 等待任務完成的超時時間 | 根據業務最長執行時間設置 |
rejectedExecutionHandler | 拒絕策略,當線程池和隊列都滿時的處理方式 | 根據業務需求選擇 |
拒絕策略選項:
AbortPolicy
:默認,直接拋出RejectedExecutionExceptionCallerRunsPolicy
:由調用者線程執行該任務DiscardPolicy
:直接丟棄任務DiscardOldestPolicy
:丟棄隊列中最老的任務并重試
3.4 線程池監控
@Service
public class ThreadPoolMonitor {@Autowiredprivate ThreadPoolTaskExecutor emailExecutor;@Scheduled(fixedRate = 5000) // 每5秒監控一次public void monitor() {System.out.println("=== 線程池監控 ===");System.out.println("當前線程數: " + emailExecutor.getPoolSize());System.out.println("活躍線程數: " + emailExecutor.getActiveCount());System.out.println("完成任務數: " + emailExecutor.getCompletedTaskCount());System.out.println("隊列剩余容量: " + emailExecutor.getThreadPoolExecutor().getQueue().remainingCapacity());}
}
四、@Async高級特性
4.1 返回值處理
1. Future模式
@Async
public Future<String> processData(String input) {// 模擬處理耗時try {Thread.sleep(2000);return new AsyncResult<>("處理完成: " + input.toUpperCase());} catch (InterruptedException e) {Thread.currentThread().interrupt();return new AsyncResult<>("處理中斷");}
}// 調用方
Future<String> future = service.processData("hello");
// 可以做其他事情...
String result = future.get(3, TimeUnit.SECONDS); // 帶超時的等待
2. CompletableFuture (Java8+)
@Async
public CompletableFuture<String> fetchData(String param) {return CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);return "Data for " + param;} catch (InterruptedException e) {throw new RuntimeException(e);}});
}// 鏈式調用
service.fetchData("user123").thenApply(String::toUpperCase).thenAccept(System.out::println).exceptionally(ex -> {System.err.println("Error: " + ex.getMessage());return null;});
4.2 基于條件的異步執行
1. 使用Spring Expression Language (SpEL)
@Async("#{systemProperties['async.enabled'] ? 'emailExecutor' : 'syncExecutor'}")
public void conditionalAsync() {// 根據系統屬性決定使用哪個執行器
}
2. 基于配置的開關
@Async
@ConditionalOnProperty(name = "app.async.enabled", havingValue = "true")
public void configBasedAsync() {// 當app.async.enabled=true時才異步執行
}
4.3 事務處理
異步方法與事務的特殊關系:
- 事務邊界:
@Async
方法會在新線程中執行,與原方法不在同一事務中 - 傳播行為:需要在異步方法上單獨聲明
@Transactional
@Async
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void asyncWithTransaction() {// 這個方法會在新事務中執行userRepository.save(new User("AsyncUser"));// 如果發生異常,只會回滾當前方法內的操作
}
4.4 組合異步操作
場景:需要等待多個異步任務全部完成
@Async
public CompletableFuture<String> fetchUserInfo(String userId) {// 模擬獲取用戶信息return CompletableFuture.completedFuture("UserInfo-" + userId);
}@Async
public CompletableFuture<String> fetchOrderInfo(String userId) {// 模擬獲取訂單信息return CompletableFuture.completedFuture("OrderInfo-" + userId);
}// 組合多個異步任務
public CompletableFuture<Void> combineAsyncTasks(String userId) {return CompletableFuture.allOf(fetchUserInfo(userId),fetchOrderInfo(userId)).thenRun(() -> {// 所有任務完成后的處理System.out.println("所有異步任務已完成");});
}
五、異常處理機制
5.1 異常處理方式對比
處理方式 | 適用場景 | 優點 | 缺點 |
---|---|---|---|
AsyncUncaughtExceptionHandler | 處理void返回類型的異步方法異常 | 集中處理,統一管理 | 無法獲取方法返回值 |
Future.get() | 處理有返回值的異步方法異常 | 可以獲取具體異常信息 | 需要手動調用get() |
CompletableFuture.exceptionally | Java8+的優雅異常處理方式 | 鏈式調用,代碼簡潔 | 僅適用于CompletableFuture |
5.2 實踐示例
1. 全局異常處理器
public class GlobalAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {private static final Logger logger = LoggerFactory.getLogger(GlobalAsyncExceptionHandler.class);@Overridepublic void handleUncaughtException(Throwable ex, Method method, Object... params) {logger.error("異步任務異常 - 方法: {}, 參數: {}", method.getName(), Arrays.toString(params), ex);// 可以根據異常類型進行不同處理if (ex instanceof BusinessException) {// 業務異常處理sendAlert("業務異常警報: " + ex.getMessage());} else if (ex instanceof TimeoutException) {// 超時處理retryTask(method, params);}}private void sendAlert(String message) { /*...*/ }private void retryTask(Method method, Object... params) { /*...*/ }
}
2. Future方式的異常處理
@Async
public Future<String> asyncTaskWithException() {try {// 業務邏輯if (someCondition) {throw new BusinessException("業務異常");}return new AsyncResult<>("成功");} catch (BusinessException e) {return new AsyncResult<>("失敗: " + e.getMessage());}
}// 調用方處理
Future<String> future = service.asyncTaskWithException();
try {String result = future.get();if (result.startsWith("失敗")) {// 處理失敗情況}
} catch (ExecutionException e) {// 處理執行時異常
}
3. CompletableFuture的異常處理
@Async
public CompletableFuture<String> asyncProcess(String input) {return CompletableFuture.supplyAsync(() -> {if (input == null) {throw new IllegalArgumentException("輸入不能為空");}return "處理結果: " + input.toUpperCase();});
}// 調用方處理
service.asyncProcess(null).exceptionally(ex -> {System.err.println("發生異常: " + ex.getMessage());return "默認返回值";}).thenAccept(result -> {System.out.println("最終結果: " + result);});
六、性能優化與最佳實踐
6.1 性能優化建議
-
線程池參數調優
- 根據業務類型調整線程池大小
- 監控線程池狀態動態調整參數
- 使用有界隊列防止OOM
-
避免長時間阻塞
- 異步方法內避免同步阻塞操作
- 使用帶超時的阻塞調用
-
資源清理
- 確保異步方法正確釋放資源
- 使用try-with-resources管理資源
-
上下文傳遞
- 注意ThreadLocal變量在異步線程中的傳遞問題
- 使用
TaskDecorator
傳遞上下文
executor.setTaskDecorator(new ContextCopyingDecorator());public class ContextCopyingDecorator implements TaskDecorator {@Overridepublic Runnable decorate(Runnable runnable) {// 獲取當前線程的上下文RequestAttributes context = RequestContextHolder.currentRequestAttributes();return () -> {try {// 在新線程中設置上下文RequestContextHolder.setRequestAttributes(context);runnable.run();} finally {RequestContextHolder.resetRequestAttributes();}};}
}
6.2 最佳實踐清單
-
命名規范
- 異步方法名以
Async
后綴標識,如sendEmailAsync
- 線程池按業務命名,如
orderTaskExecutor
- 異步方法名以
-
日志記錄
- 記錄異步任務開始/結束時間
- 為異步線程設置可追蹤的上下文ID
@Async
public void asyncWithLogging() {String traceId = UUID.randomUUID().toString();MDC.put("traceId", traceId);try {log.info("異步任務開始");// 業務邏輯log.info("異步任務完成");} finally {MDC.clear();}
}
-
防御性編程
- 檢查異步方法參數有效性
- 添加合理的超時控制
-
資源限制
- 限制并發異步任務數量
- 對重要任務設置優先級
-
監控告警
- 監控線程池關鍵指標
- 設置異常告警閾值
七、與其他技術的整合
7.1 與Spring Retry整合
實現異步任務失敗重試:
@Async
@Retryable(value = {RemoteAccessException.class}, maxAttempts = 3, backoff = @Backoff(delay = 1000, multiplier = 2))
public CompletableFuture<String> callExternalService() {// 調用可能失敗的外部服務return CompletableFuture.completedFuture(externalService.call());
}// 重試全部失敗后的處理
@Recover
public CompletableFuture<String> recover(RemoteAccessException e) {return CompletableFuture.completedFuture("默認返回值");
}
7.2 與Spring Cache整合
異步緩存更新:
@Async
@CacheEvict(value = "users", key = "#userId")
public void evictUserCacheAsync(String userId) {// 異步清理緩存
}@Async
@CachePut(value = "users", key = "#user.id")
public CompletableFuture<User> updateUserAsync(User user) {// 異步更新用戶并更新緩存return CompletableFuture.completedFuture(userRepository.save(user));
}
7.3 與WebFlux響應式編程對比
特性 | @Async | WebFlux |
---|---|---|
編程模型 | 命令式 | 響應式 |
線程模型 | 線程池-based | 事件循環 |
資源消耗 | 較高(每個請求一個線程) | 較低(少量線程處理所有請求) |
學習曲線 | 較低 | 較高 |
適用場景 | 傳統Servlet應用 | 高并發IO密集型應用 |
背壓支持 | 不支持 | 支持 |
集成復雜度 | 簡單 | 中等 |
八、常見問題與解決方案
8.1 問題排查表
問題現象 | 可能原因 | 解決方案 |
---|---|---|
@Async方法不異步執行 | 同類調用 | 確保通過Spring代理調用,使用@Autowired注入自身 |
未加@EnableAsync | 在主配置類添加@EnableAsync | |
異步方法拋出異常不顯示 | 未正確處理AsyncUncaughtException | 實現AsyncUncaughtExceptionHandler |
線程池不生效 | 未正確命名或注入 | 確保@Async(“executorName”)與@Bean名稱一致 |
應用關閉時任務丟失 | 未配置優雅關閉 | 設置setWaitForTasksToCompleteOnShutdown(true)和awaitTerminationSeconds |
性能未提升反而下降 | 線程池配置不合理 | 調整核心/最大線程數和隊列容量 |
ThreadLocal值丟失 | 線程切換導致上下文丟失 | 使用TaskDecorator傳遞上下文 |
8.2 實戰問題案例
案例1:數據庫連接泄漏
@Async
public void processData() {// 錯誤示范:未關閉ConnectionConnection conn = dataSource.getConnection();// 使用conn...
}
解決方案:
@Async
public void processData() {try (Connection conn = dataSource.getConnection()) {// 使用conn...} catch (SQLException e) {// 異常處理}
}
案例2:訂單超時未支付取消
@Async
@Scheduled(fixedDelay = 60000) // 每分鐘檢查一次
public void cancelUnpaidOrders() {List<Order> unpaidOrders = orderRepository.findByStatusAndCreateTimeBefore(OrderStatus.UNPAID, LocalDateTime.now().minusMinutes(30));unpaidOrders.forEach(order -> {order.setStatus(OrderStatus.CANCELLED);orderRepository.save(order);notificationService.sendCancelNotice(order);});
}
九、總結
9.1 核心要點總結
-
基礎使用:
@EnableAsync
啟用支持@Async
標注異步方法- 避免同類調用
-
線程池配置:
- 生產環境必須自定義線程池
- 合理設置核心參數
- 監控線程池狀態
-
異常處理:
- 區分返回值類型選擇處理方式
- 實現全局異常處理器
- 日志記錄完整上下文
-
高級特性:
- 組合多個異步操作
- 事務邊界處理
- 條件異步執行
-
最佳實踐:
- 命名規范
- 防御性編程
- 資源清理
- 上下文傳遞
9.2 完整示例代碼
訂單處理服務示例:
@Service
public class OrderProcessingService {private static final Logger logger = LoggerFactory.getLogger(OrderProcessingService.class);@Autowiredprivate OrderRepository orderRepository;@Autowiredprivate PaymentService paymentService;@Autowiredprivate NotificationService notificationService;@Async("orderTaskExecutor")@Transactional(propagation = Propagation.REQUIRES_NEW)public CompletableFuture<OrderResult> processOrderAsync(Order order) {logger.info("開始異步處理訂單: {}", order.getId());long startTime = System.currentTimeMillis();try {// 1. 保存訂單Order savedOrder = orderRepository.save(order);// 2. 處理支付PaymentResult paymentResult = paymentService.processPayment(savedOrder);if (!paymentResult.isSuccess()) {throw new PaymentException("支付處理失敗: " + paymentResult.getErrorMessage());}// 3. 更新訂單狀態savedOrder.setStatus(OrderStatus.PAID);orderRepository.save(savedOrder);// 4. 發送通知notificationService.sendOrderConfirmation(savedOrder);long elapsed = System.currentTimeMillis() - startTime;logger.info("訂單處理完成: {}, 耗時: {}ms", savedOrder.getId(), elapsed);return CompletableFuture.completedFuture(new OrderResult(true, "訂單處理成功", savedOrder));} catch (PaymentException e) {logger.error("訂單支付異常: {}", order.getId(), e);return CompletableFuture.completedFuture(new OrderResult(false, e.getMessage(), order));} catch (Exception e) {logger.error("訂單處理未知異常: {}", order.getId(), e);return CompletableFuture.failedFuture(e);}}// 批量異步處理訂單@Async("batchOrderExecutor")public CompletableFuture<Void> processOrdersBatch(List<Order> orders) {List<CompletableFuture<OrderResult>> futures = orders.stream().map(this::processOrderAsync).collect(Collectors.toList());return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).exceptionally(ex -> {logger.error("批量處理訂單異常", ex);return null;});}
}// 配置類
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {@Overridepublic Executor getAsyncExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(10);executor.setMaxPoolSize(20);executor.setQueueCapacity(100);executor.setThreadNamePrefix("Order-Async-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setWaitForTasksToCompleteOnShutdown(true);executor.setAwaitTerminationSeconds(60);executor.initialize();return executor;}@Bean(name = "batchOrderExecutor")public Executor batchOrderExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setQueueCapacity(50);executor.setThreadNamePrefix("Batch-Order-");executor.initialize();return executor;}@Overridepublic AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {return new OrderAsyncExceptionHandler();}
}// 全局異常處理器
public class OrderAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {private static final Logger logger = LoggerFactory.getLogger(OrderAsyncExceptionHandler.class);@Overridepublic void handleUncaughtException(Throwable ex, Method method, Object... params) {logger.error("異步訂單處理異常 - 方法: {}, 參數: {}", method.getName(), Arrays.toString(params), ex);// 發送告警郵件if (ex instanceof CriticalOrderException) {sendCriticalAlert(method, ex, params);}}private void sendCriticalAlert(Method method, Throwable ex, Object... params) {// 實現告警邏輯}
}