異步線程池配置:
@Configuration
@EnableAsync
public class AsyncConfig {@Bean(name = "taskExecutor")public Executor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(2); executor.setMaxPoolSize(5); executor.setQueueCapacity(500); executor.setThreadNamePrefix("Async-"); executor.initialize();return executor;}
}
異步線程服務
@Component
public class AsyncTaskService {private static final Logger logger = LoggerFactory.getLogger(AsyncTaskService.class);private static final int DEFAULT_MAX_ATTEMPTS = 3;private static final long DEFAULT_DELAY_BETWEEN_ATTEMPTS = 1000;@Asyncpublic <T> CompletableFuture<T> executeAsyncTaskWithRetry(Callable<T> task) {return executeAsyncTaskWithRetry(task, DEFAULT_MAX_ATTEMPTS, DEFAULT_DELAY_BETWEEN_ATTEMPTS);}@Asyncpublic <T> CompletableFuture<T> executeAsyncTaskWithRetry(Callable<T> task, int maxAttempts, long initialDelay) {CompletableFuture<T> future = new CompletableFuture<>();ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);executeTaskWithRetry(task, maxAttempts, initialDelay, future, scheduler, 0, new ArrayList<>());return future;}private <T> void executeTaskWithRetry(Callable<T> task, int maxAttempts, long initialDelay, CompletableFuture<T> future, ScheduledExecutorService scheduler, int attempt, List<Exception> exceptions) {if (attempt >= maxAttempts) {RuntimeException exception = new RuntimeException("任務失敗,已達最大重試次數: " + maxAttempts);exceptions.forEach(exception::addSuppressed);future.completeExceptionally(exception);scheduler.shutdown();logger.error("任務失敗,已達最大重試次數: {}", maxAttempts, exception); return;}CompletableFuture<T> attemptFuture = CompletableFuture.supplyAsync(() -> {try {return task.call();} catch (Exception e) {throw new CompletionException(e);}});attemptFuture.whenComplete((result, throwable) -> {if (throwable == null) {future.complete(result);scheduler.shutdown();} else {long delay = (long) (initialDelay * Math.pow(2, attempt));logger.info("任務失敗,將在 {} 毫秒后重試,嘗試第:{} 次", delay, attempt + 1);exceptions.add((Exception) throwable.getCause());scheduler.schedule(() -> executeTaskWithRetry(task, maxAttempts, initialDelay, future, scheduler, attempt + 1, exceptions), delay, TimeUnit.MILLISECONDS);}});}@Asyncpublic void executeAsyncRunnableTask(Runnable task) {task.run();}
}
執行測試:
@RestController
public class AsyncController {@Resourceprivate AsyncTaskService genericAsyncService;@GetMapping("/startAsyncTask")@ApiOperation("執行異步任務")public String startAsyncTask() {genericAsyncService.executeAsyncTaskWithRetry(() -> {System.out.println("執行異步任務: " + Thread.currentThread().getName());Thread.sleep(5000); System.out.println("異步任務完成: " + Thread.currentThread().getName());return "任務結果";});return "異步任務已啟動";}@GetMapping("/startAsyncRunnableTask")@ApiOperation("不需要返回值的操作的任務")public String startAsyncRunnableTask() {genericAsyncService.executeAsyncRunnableTask(() -> {System.out.println("執行異步Runnable任務: " + Thread.currentThread().getName());System.out.println("異步Runnable任務完成: " + Thread.currentThread().getName());});return "異步Runnable任務已啟動";}private static final Random RANDOM = new Random();@GetMapping("/executeTask")@ApiOperation("執行異步任務[重試]")public ResponseEntity<CompletableFuture<String>> executeTask() {Callable<String> myTask = () -> {int randomNumber = RANDOM.nextInt(10);System.out.println("我是任務 ====== 隨機數" + randomNumber);if (randomNumber < 5) {System.out.println("隨機數: " + randomNumber + ",任務將失敗。");throw new Exception("隨機失敗");}return "Task executed successfully";};CompletableFuture<String> resultFuture = genericAsyncService.executeAsyncTaskWithRetry(myTask);return new ResponseEntity<>(resultFuture, HttpStatus.OK);}}