1. Callable&Future&FutureTask介紹
直接繼承Thread或者實現Runnable接口都可以創建線程,但是這兩種方法都有一個問題就是:沒有返回值,也就是不能獲取執行完的結果。因此java1.5就提供了Callable接口來實現這一場景,而Future和FutureTask就可以和Callable接口配合起來使用。
@FunctionalInterface
public interface Runnable {public abstract void run();
}
@FunctionalInterface
public interface Callable<V> {V call() throws Exception;
}
Runnable 的缺陷:
- 不能返回一個返回值
- 不能拋出 checked Exception
Callable的call方法可以有返回值,可以聲明拋出異常。和 Callable 配合的有一個 Future 類,通過 Future 可以了解任務執行情況,或者取消任務的執行,還可獲取任務執行的結果,這些功能都是 Runnable 做不到的,Callable 的功能要比 Runnable 強大。
new Thread(new Runnable() {@Overridepublic void run() {System.out.println("通過Runnable方式執行任務");}
}).start();FutureTask task = new FutureTask(new Callable() {@Overridepublic Object call() throws Exception {System.out.println("通過Callable方式執行任務");Thread.sleep(3000);return "返回任務結果";}
});
new Thread(task).start();
System.out.println(task.get());
1.1 Future 的API
Future就是對于具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。必要時可以通過get方法獲取執行結果,該方法會阻塞直到任務返回結果。
- boolean cancel (boolean mayInterruptIfRunning) 取消任務的執行。參數指定是否立即中斷任務執行,或者等等任務結束
- boolean isCancelled () 任務是否已經取消,任務正常完成前將其取消,則返回 true
- boolean isDone () 任務是否已經完成。需要注意的是如果任務正常終止、異常或取消,都將返回true
- V get () throws InterruptedException, ExecutionException 等待任務執行結束,然后獲得V類型的結果。InterruptedException 線程被中斷異常, ExecutionException任務執行異常,如果任務被取消,還會拋出CancellationException
- V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 同上面的get功能一樣,多了設置超時時間。參數timeout指定超時時間,uint指定時間的單位,在枚舉類TimeUnit中有相關的定義。如果計算超時,將拋出TimeoutException
1.2 FutureTask 使用
Future實際采用FutureTask實現,該對象相當于是消費者和生產者的橋梁,消費者通過 FutureTask 存儲任務的處理結果,更新任務的狀態:未開始、正在處理、已完成等。而生產者拿到的 FutureTask 被轉型為 Future 接口,可以阻塞式獲取任務的處理結果,非阻塞式獲取任務處理狀態。
FutureTask既可以被當做Runnable來執行,也可以被當做Future來獲取Callable的返回結果。
把 Callable 實例當作 FutureTask 構造函數的參數,生成 FutureTask 的對象,然后把這個對象當作一個 Runnable 對象,放到線程池中或另起線程去執行,最后還可以通過 FutureTask 獲取任務執行的結果。
public class FutureTaskDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {Task task = new Task();//構建futureTaskFutureTask<Integer> futureTask = new FutureTask<>(task);//作為Runnable入參new Thread(futureTask).start();System.out.println("task運行結果:"+futureTask.get());}static class Task implements Callable<Integer> {@Overridepublic Integer call() throws Exception {System.out.println("子線程正在計算");int sum = 0;for (int i = 0; i < 100; i++) {sum += i;}return sum;}}
}
使用案例:促銷活動中商品信息查詢
在維護促銷活動時需要查詢商品信息(包括商品基本信息、商品價格、商品庫存、商品圖片、商品銷售狀態等)。這些信息分布在不同的業務中心,由不同的系統提供服務。如果采用同步方式,假設一個接口需要50ms,那么一個商品查詢下來就需要200ms-300ms,這對于我們來說是不滿意的。如果使用Future改造則需要的就是最長耗時服務的接口,也就是50ms左右。
public class FutureTaskDemo2 {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> ft1 = new FutureTask<>(new T1Task());FutureTask<String> ft2 = new FutureTask<>(new T2Task());FutureTask<String> ft3 = new FutureTask<>(new T3Task());FutureTask<String> ft4 = new FutureTask<>(new T4Task());FutureTask<String> ft5 = new FutureTask<>(new T5Task());//構建線程池ExecutorService executorService = Executors.newFixedThreadPool(5);executorService.submit(ft1);executorService.submit(ft2);executorService.submit(ft3);executorService.submit(ft4);executorService.submit(ft5);//獲取執行結果System.out.println(ft1.get());System.out.println(ft2.get());System.out.println(ft3.get());System.out.println(ft4.get());System.out.println(ft5.get());executorService.shutdown();}static class T1Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T1:查詢商品基本信息...");TimeUnit.MILLISECONDS.sleep(50);return "商品基本信息查詢成功";}}static class T2Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T2:查詢商品價格...");TimeUnit.MILLISECONDS.sleep(50);return "商品價格查詢成功";}}static class T3Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T3:查詢商品庫存...");TimeUnit.MILLISECONDS.sleep(50);return "商品庫存查詢成功";}}static class T4Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T4:查詢商品圖片...");TimeUnit.MILLISECONDS.sleep(50);return "商品圖片查詢成功";}}static class T5Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T5:查詢商品銷售狀態...");TimeUnit.MILLISECONDS.sleep(50);return "商品銷售狀態查詢成功";}}}
1.3 Future的局限性
從本質上說,Future表示一個異步計算的結果。它提供了isDone()來檢測計算是否已經完成,并且在計算結束后,可以通過get()方法來獲取計算結果。在異步計算中,Future確實是個非常優秀的接口。但是,它的本身也確實存在著許多限制:
- 并發執行多任務:Future只提供了get()方法來獲取結果,并且是阻塞的。所以,除了等待你別無他法;
- 無法對多個任務進行鏈式調用:如果你希望在計算任務完成后執行特定動作,比如發郵件,但Future卻沒有提供這樣的能力;
- 無法組合多個任務:如果你運行了10個任務,并期望在它們全部執行結束后執行特定動作,那么在Future中這是無能為力的;
- 沒有異常處理:Future接口中沒有關于異常處理的方法;
2. CompletableFuture使用詳解
簡單的任務,用Future獲取結果還好,但我們并行提交的多個異步任務,往往并不是獨立的,很多時候業務邏輯處理存在串行[依賴]、并行、聚合的關系。如果要我們手動用 Future 實現,是非常麻煩的。
CompletableFuture是Future接口的擴展和增強。CompletableFuture實現了Future接口,并在此基礎上進行了豐富地擴展,完美地彌補了Future上述的種種問題。更為重要的是,CompletableFuture實現了對任務的編排能力。借助這項能力,我們可以輕松地組織不同任務的運行順序、規則以及方式。從某種程度上說,這項能力是它的核心能力。而在以往,雖然通過CountDownLatch等工具類也可以實現任務的編排,但需要復雜的邏輯處理,不僅耗費精力且難以維護。
2.1 應用場景
描述依賴關系:
- thenApply() 把前面異步任務的結果,交給后面的Function
- thenCompose()用來連接兩個有依賴關系的任務,結果由第二個任務返回
描述and聚合關系:
- thenCombine:任務合并,有返回值
- thenAccepetBoth:兩個任務執行完成后,將結果交給thenAccepetBoth消耗,無返回值。
- runAfterBoth:兩個任務都執行完成后,執行下一步操作(Runnable)。
描述or聚合關系:
- applyToEither:兩個任務誰執行的快,就使用那一個結果,有返回值。
- acceptEither: 兩個任務誰執行的快,就消耗那一個結果,無返回值。
- runAfterEither: 任意一個任務執行完成,進行下一步操作(Runnable)。
并行執行:
CompletableFuture類自己也提供了anyOf()和allOf()用于支持多個CompletableFuture并行執行
2.2 創建異步操作
CompletableFuture 提供了四個靜態方法來創建一個異步操作:
public static CompletableFuture<Void> runAsync(Runnable runnable)public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)?
這四個方法區別在于:
- runAsync 方法以Runnable函數式接口類型為參數,沒有返回結果,supplyAsync 方法Supplier函數式接口類型為參數,返回結果類型為U;Supplier 接口的 get() 方法是有返回值的(會阻塞)
- 沒有指定Executor的方法會使用ForkJoinPool.commonPool() 作為它的線程池執行異步代碼。如果指定線程池,則使用指定的線程池運行。
- 默認情況下 CompletableFuture 會使用公共的 ForkJoinPool 線程池,這個線程池默認創建的線程數是 CPU 的核數(也可以通過 JVM?option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 來設置 ForkJoinPool 線程池的線程數)。如果所有 CompletableFuture 共享一個線程池,那么一旦有任務執行一些很慢的 I/O 操作,就會導致線程池中所有線程都阻塞在 I/O 操作上,從而造成線程饑餓,進而影響整個系統的性能。所以,強烈建議你要根據不同的業務類型創建不同的線程池,以避免互相干擾
runAsync&supplyAsync
Runnable runnable = () -> System.out.println("執行無返回結果的異步任務");CompletableFuture.runAsync(runnable);CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {System.out.println("執行有返回值的異步任務");try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}return "Hello World";});String result = future.get();System.out.println(result);
執行無返回結果的異步任務
執行有返回值的異步任務
2.3 獲取結果
join&get
join()和get()方法都是用來獲取CompletableFuture異步之后的返回值。join()方法拋出的是uncheck異常(即未經檢查的異常),不會強制開發者拋出。get()方法拋出的是經過檢查的異常,ExecutionException, InterruptedException?需要用戶手動處理(拋出或者 try catch)
2.4 結果處理
當CompletableFuture的計算結果完成,或者拋出異常的時候,我們可以執行特定的 Action。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
- Action的類型是BiConsumer,它可以處理正常的計算結果,或者異常情況。
- 方法不以Async結尾,意味著Action使用相同的線程執行,而Async可能會使用其它的線程去執行(如果使用相同的線程池,也可能會被同一個線程選中執行)。
- 這幾個方法都會返回CompletableFuture,當Action執行完畢后它的結果返回原始的CompletableFuture的計算結果或者返回異常
whenComplete&exceptionally
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {}if (new Random().nextInt(10) % 2 == 0) {int i = 12 / 0;}System.out.println("執行結束!");return "test";});future.whenComplete(new BiConsumer<String, Throwable>() {@Overridepublic void accept(String t, Throwable action) {System.out.println(t+" 執行完成!");}});future.exceptionally(new Function<Throwable, String>() {@Overridepublic String apply(Throwable t) {System.out.println("執行失敗:" + t.getMessage());return "異常xxxx";}}).join();
執行結束!
test 執行完成!
或者
執行失敗:java.lang.ArithmeticException: / by zero
null 執行完成!
2.5 結果轉換
所謂結果轉換,就是將上一段任務的執行結果作為下一階段任務的入參參與重新計算,產生新的結果。
thenApply
thenApply 接收一個函數作為參數,使用該函數處理上一個CompletableFuture 調用的結果,并返回一個具有處理結果的Future對象。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int result = 100;System.out.println("一階段:" + result);return result;}).thenApply(number -> {int result = number * 3;System.out.println("二階段:" + result);return result;});System.out.println("最終結果:" + future.get());
一階段:100
二階段:300
最終結果:300
thenCompose
thenCompose 的參數為一個返回 CompletableFuture 實例的函數,該函數的參數是先前計算步驟的結果。
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(30);System.out.println("第一階段:" + number);return number;}}).thenCompose(new Function<Integer, CompletionStage<Integer>>() {@Overridepublic CompletionStage<Integer> apply(Integer param) {return CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = param * 2;System.out.println("第二階段:" + number);return number;}});}});System.out.println("最終結果: " + future.get());
第一階段:10
第二階段:20
最終結果:20
thenApply 和 thenCompose的區別
- thenApply轉換的是泛型中的類型,并返回一個新的封裝了轉換結果的
CompletableFuture實例;
- thenCompose 將內部的 CompletableFuture 調用展開來并使用上一個CompletableFutre 調用的結果在下一步的 CompletableFuture 調用中進行運算,是生成一個新的CompletableFuture。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");CompletableFuture<String> result1 = future.thenApply(param -> param + " World");CompletableFuture<String> result2 = future.thenCompose(param -> CompletableFuture.supplyAsync(() -> param + " World"));System.out.println(result1.get());System.out.println(result2.get());
Hello World
Hello World
2.6 結果消費
與結果處理和結果轉換系列函數返回一個新的 CompletableFuture 不同,結果消費系列函數只對結果執行Action,而不返回新的計算值。
根據對結果的處理方式,結果消費函數又分為:
- thenAccept系列:對單個結果進行消費
- thenAcceptBoth系列:對兩個結果進行消費
- thenRun系列:不關心結果,只對結果執行Action
thenAccept
通過觀察該系列函數的參數類型可知,它們是函數式接口Consumer,這個接口只有輸入,沒有返回值。
public CompletionStage<Void> thenAccept(Consumer<? super T> action);public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
?
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {int number = new Random().nextInt(10);System.out.println("第一階段:" + number);return number;}).thenAccept(number ->System.out.println("第二階段:" + number * 5));System.out.println("最終結果:" + future.get());
第一階段:8
第二階段:40
最終結果:null
thenAcceptBoth
thenAcceptBoth 函數的作用是,當兩個 CompletionStage 都正常完成計算的時候,就會執行提供的action消費兩個異步的結果。
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, ????Executor executor);
CompletableFuture<Integer> futrue1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(3) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一階段:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(3) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二階段:" + number);return number;}});futrue1.thenAcceptBoth(future2, new BiConsumer<Integer, Integer>() {@Overridepublic void accept(Integer x, Integer y) {System.out.println("最終結果:" + (x + y));}}).join();
第二階段:1
第一階段:2
最終結果:3
thenRun
thenRun 也是對線程任務結果的一種消費函數,與thenAccept不同的是,thenRun 會在上一階段 CompletableFuture 計算完成的時候執行一個Runnable,Runnable并不使用該 CompletableFuture 計算的結果。
public CompletionStage<Void> thenRun(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {int number = new Random().nextInt(10);System.out.println("第一階段:" + number);return number;}).thenRun(() ->System.out.println("thenRun 執行"));System.out.println("最終結果:" + future.get());
第一階段:2
thenRun 執行
最終結果:null
2.7 結果組合
thenCombine
thenCombine 方法,合并兩個線程任務的結果,并進一步處理。
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第一階段:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第二階段:" + number);return number;}});CompletableFuture<Integer> result = future1.thenCombine(future2, new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(Integer x, Integer y) {return x + y;}});System.out.println("最終結果:" + result.get());
第一階段:9
第二階段:5
最終結果:14
2.8 任務交互
所謂線程交互,是指將兩個線程任務獲取結果的速度相比較,按一定的規則進行下一步處理。
applyToEither
兩個線程任務相比較,先獲得執行結果的,就對該結果進行下一步的轉化操作。
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第一階段start:" + number);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一階段end:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第二階段start:" + number);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二階段end:" + number);return number;}});future1.applyToEither(future2, new Function<Integer, Integer>() {@Overridepublic Integer apply(Integer number) {System.out.println("最快結果:" + number);return number * 2;}}).join();
第一階段start:6
第二階段start:5
第二階段end:5
最快結果:5
acceptEither
兩個線程任務相比較,先獲得執行結果的,就對該結果進行下一步的消費操作。
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一階段:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二階段:" + number);return number;}});future1.acceptEither(future2, new Consumer<Integer>() {@Overridepublic void accept(Integer number) {System.out.println("最快結果:" + number);}}).join();
第二階段:3
最快結果:3
runAfterEither
兩個線程任務相比較,有任何一個執行完成,就進行下一步操作,不關心運行結果。
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(5);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一階段:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(5);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二階段:" + number);return number;}});future1.runAfterEither(future2, new Runnable() {@Overridepublic void run() {System.out.println("已經有一個任務完成了");}}).join();"); } }).join();
第一階段:3
已經有一個任務完成了
runAfterBoth
兩個線程任務相比較,兩個全部執行完成,才進行下一步操作,不關心運行結果。
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一階段:1");return 1;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二階段:2");return 2;}});future1.runAfterBoth(future2, new Runnable() {@Overridepublic void run() {System.out.println("上面兩個任務都執行完成了。");}}).get();
第一階段:1
第二階段:2
上面兩個任務都執行完成了。
anyOf
anyOf 方法的參數是多個給定的 CompletableFuture,當其中的任何一個完成時,方法返回這個 CompletableFuture。
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
Random random = new Random();CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(random.nextInt(5));} catch (InterruptedException e) {e.printStackTrace();}return "hello";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(random.nextInt(1));} catch (InterruptedException e) {e.printStackTrace();}return "world";});CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);System.out.println(result.get());
world
allOf
allOf方法用來實現多 CompletableFuture 的同時返回。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("future1完成!");return "future1完成!";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("future2完成!");return "future2完成!";});CompletableFuture<Void> combindFuture = CompletableFuture.allOf(future1, future2);try {combindFuture.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println("future1: " + future1.isDone() + ",future2: " + future2.isDone());
future2完成!
future1完成!
future1: true,future2: true
2.9 使用案例:實現最優的“燒水泡茶”程序
著名數學家華羅庚先生在《統籌方法》這篇文章里介紹了一個燒水泡茶的例子,文中提到最優的工序應該是下面這樣:
對于燒水泡茶這個程序,一種最優的分工方案:用兩個線程 T1 和 T2 來完成燒水泡茶程序,T1 負責洗水壺、燒開水、泡茶這三道工序,T2 負責洗茶壺、洗茶杯、拿茶葉三道工序,其中 T1 在執行泡茶這道工序時需要等待 T2 完成拿茶葉的工序。
基于Future實現
public class FutureTaskDemo3{public static void main(String[] args) throws ExecutionException, InterruptedException {// 創建任務T2的FutureTaskFutureTask<String> ft2 = new FutureTask<>(new T2Task());// 創建任務T1的FutureTaskFutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));// 線程T1執行任務ft1Thread T1 = new Thread(ft1);T1.start();// 線程T2執行任務ft2Thread T2 = new Thread(ft2);T2.start();// 等待線程T1執行結果System.out.println(ft1.get());}}// T1Task需要執行的任務:// 洗水壺、燒開水、泡茶class T1Task implements Callable<String> {FutureTask<String> ft2;// T1任務需要T2任務的FutureTaskT1Task(FutureTask<String> ft2){this.ft2 = ft2;}@Overridepublic String call() throws Exception {System.out.println("T1:洗水壺...");TimeUnit.SECONDS.sleep(1);System.out.println("T1:燒開水...");TimeUnit.SECONDS.sleep(15);// 獲取T2線程的茶葉String tf = ft2.get();System.out.println("T1:拿到茶葉:"+tf);System.out.println("T1:泡茶...");return "上茶:" + tf;}}// T2Task需要執行的任務:// 洗茶壺、洗茶杯、拿茶葉class T2Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T2:洗茶壺...");TimeUnit.SECONDS.sleep(1);System.out.println("T2:洗茶杯...");TimeUnit.SECONDS.sleep(2);System.out.println("T2:拿茶葉...");TimeUnit.SECONDS.sleep(1);return "龍井";}}
基于CompletableFuture實現
public class CompletableFutureDemo2 {public static void main(String[] args) {//任務1:洗水壺->燒開水CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {System.out.println("T1:洗水壺...");sleep(1, TimeUnit.SECONDS);System.out.println("T1:燒開水...");sleep(15, TimeUnit.SECONDS);});//任務2:洗茶壺->洗茶杯->拿茶葉CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {System.out.println("T2:洗茶壺...");sleep(1, TimeUnit.SECONDS);System.out.println("T2:洗茶杯...");sleep(2, TimeUnit.SECONDS);System.out.println("T2:拿茶葉...");sleep(1, TimeUnit.SECONDS);return "龍井";});//任務3:任務1和任務2完成后執行:泡茶CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf) -> {System.out.println("T1:拿到茶葉:" + tf);System.out.println("T1:泡茶...");return "上茶:" + tf;});//等待任務3執行結果System.out.println(f3.join());}static void sleep(int t, TimeUnit u){try {u.sleep(t);} catch (InterruptedException e) {}}}