CompletableFuture工具類可以幫助實現Java并發編程中的任務編排
以上除了join用于阻塞調用該方法的線程并且接受CompletableFuture的返回值以外其它方法皆有Async異步和Executor指定線程池選項
對于supply,run,apply,accept的區別在于函數式編程的接口類型不同:
- supply: Supplier<T> supplier =>?T get()
- run:??Runnable runnable => void run()
- apply:??Function<T, R> =>?apply(T t)
- accept:?Consumer<T> =>?void accept(T t)
對于以上方法,根據接受參數不同分為兩類,一類參數接受如supply,run,apply,accept接口的實現類,另一類參數接受新的CompletableFuture:
FunctionInterface | CompletableFuture |
---|---|
supplyAsync | thenCompose?? |
runAsync | thenCombine? |
thenApply | thenAcceptBoth |
thenAccept | runAfterBoth |
thenRun | applyToEither |
acceptEither | |
runAfterEither |
thenCompose用于先執行A再執行B
thenCombine,thenAcceptBoth,runAfterBoth用于A,B同時執行
applyToEither,acceptEither,runAfterEither用于A,B誰先執行完就跳到C
?exceptionally,handle,whenComplete用于在并發編程中處理結果和異常:
exceptionally:
該方法僅處理異常情況:發生異常時,不會把異常拋出,而是由exceptionally處理結果返回
public static CompletableFuture exceptionally(int a, int b){return CompletableFuture.supplyAsync(() -> a/b).exceptionally(ex -> {System.out.println("ex:\t"+ex.getMessage());return 0;});
}
handle:
處理上一階段返回值和異常,不會把異常拋出,而是由handle處理結果返回
public class CreateThread_FutureTask {public static void main(String[] args) {CompletableFuture<String> ctf = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);return "1";} catch (InterruptedException e) {throw new RuntimeException(e);}}).applyToEither(CompletableFuture.supplyAsync(() -> {try {Thread.sleep(50);return String.valueOf(1/0);} catch (InterruptedException e) {throw new RuntimeException(e);}}), str -> {return str;}).handle((str,ex)->{if(ex!=null){return ex.getMessage();}else{return str;}});System.out.println(ctf.join());System.out.println("主線程能繼續執行");}
}
whenComplete:
處理上一階段返回值和異常,會把異常拋出
public static CompletableFuture whenComplete(int a, int b){return CompletableFuture.supplyAsync(() -> a/b).whenComplete((result, ex) -> {if (null != ex) {System.out.println("whenComplete error:\t"+ex.getMessage());}});
}
CompletableFuture.allOf和anyOf
allOf:接受CompletableFuture<?>...ctfs,等待全部執行完畢
anyOf:接受CompletableFuture<?>...ctfs,等待任意一個執行完畢
CompletableFuture[] dishes = IntStream.rangeClosed(1, 10).mapToObj(i -> new Dish("菜" + i, 1L)).map(dish -> CompletableFuture.runAsync(dish::make)).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(dishes).join();
System.out.println("菜全都做好了");
使用@Async指定線程池+CompletableFuture.completedFuture實現異步
CompletableFuture.completedFuture用于在已經知道返回值時生成CompletableFuture對象,直接使用當前方法的線程,可以與SpringBoot的@Async配合實現異步
@Configuration
public class ExecutorConfig {@Beanpublic Executor myAsyncTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(2);executor.setMaxPoolSize(2);executor.setQueueCapacity(500);executor.setThreadNamePrefix("GithubLookup-");executor.initialize();return executor;}
}
@Async("myAsyncTaskExecutor")
@Override
public CompletableFuture<String> completeTask(int i) throws InterruptedException {log.info("當前線程為",Thread.currentThread().getName());Thread.sleep(1000*i);String value=String.valueOf(i);return CompletableFuture.completedFuture(value);
}