FutureCompletableFuture實戰

1. Callable&Future&FutureTask介紹

直接繼承Thread或者實現Runnable接口都可以創建線程,但是這兩種方法都有一個問題就是:沒有返回值,也就是不能獲取執行完的結果。因此java1.5就提供了Callable接口來實現這一場景,而Future和FutureTask就可以和Callable接口配合起來使用。

@FunctionalInterface
public interface Runnable {public abstract void run();
}
@FunctionalInterface
public interface Callable<V> {V call() throws Exception;
}

Runnable 的缺陷:

  • 不能返回一個返回值
  • 不能拋出 checked Exception

Callable的call方法可以有返回值,可以聲明拋出異常。和 Callable 配合的有一個 Future 類,通過 Future 可以了解任務執行情況,或者取消任務的執行,還可獲取任務執行的結果,這些功能都是 Runnable 做不到的,Callable 的功能要比 Runnable 強大。

new Thread(new Runnable() {@Overridepublic void run() {System.out.println("通過Runnable方式執行任務");}
}).start();FutureTask task = new FutureTask(new Callable() {@Overridepublic Object call() throws Exception {System.out.println("通過Callable方式執行任務");Thread.sleep(3000);return "返回任務結果";}
});
new Thread(task).start();
System.out.println(task.get());

1.1 Future 的API

Future就是對于具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。必要時可以通過get方法獲取執行結果,該方法會阻塞直到任務返回結果。

  • boolean cancel (boolean mayInterruptIfRunning) 取消任務的執行。參數指定是否立即中斷任務執行,或者等等任務結束
  • boolean isCancelled () 任務是否已經取消,任務正常完成前將其取消,則返回 true
  • boolean isDone () 任務是否已經完成。需要注意的是如果任務正常終止、異常或取消,都將返回true
  • V get () throws InterruptedException, ExecutionException 等待任務執行結束,然后獲得V類型的結果。InterruptedException 線程被中斷異常, ExecutionException任務執行異常,如果任務被取消,還會拋出CancellationException
  • V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 同上面的get功能一樣,多了設置超時時間。參數timeout指定超時時間,uint指定時間的單位,在枚舉類TimeUnit中有相關的定義。如果計算超時,將拋出TimeoutException

1.2 FutureTask 使用

Future實際采用FutureTask實現,該對象相當于是消費者和生產者的橋梁,消費者通過 FutureTask 存儲任務的處理結果,更新任務的狀態:未開始、正在處理、已完成等。而生產者拿到的 FutureTask 被轉型為 Future 接口,可以阻塞式獲取任務的處理結果,非阻塞式獲取任務處理狀態。

FutureTask既可以被當做Runnable來執行,也可以被當做Future來獲取Callable的返回結果。

把 Callable 實例當作 FutureTask 構造函數的參數,生成 FutureTask 的對象,然后把這個對象當作一個 Runnable 對象,放到線程池中或另起線程去執行,最后還可以通過 FutureTask 獲取任務執行的結果。

public class FutureTaskDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {Task task = new Task();//構建futureTaskFutureTask<Integer> futureTask = new FutureTask<>(task);//作為Runnable入參new Thread(futureTask).start();System.out.println("task運行結果:"+futureTask.get());}static class Task implements Callable<Integer> {@Overridepublic Integer call() throws Exception {System.out.println("子線程正在計算");int sum = 0;for (int i = 0; i < 100; i++) {sum += i;}return sum;}}
}

使用案例:促銷活動中商品信息查詢

在維護促銷活動時需要查詢商品信息(包括商品基本信息、商品價格、商品庫存、商品圖片、商品銷售狀態等)。這些信息分布在不同的業務中心,由不同的系統提供服務。如果采用同步方式,假設一個接口需要50ms,那么一個商品查詢下來就需要200ms-300ms,這對于我們來說是不滿意的。如果使用Future改造則需要的就是最長耗時服務的接口,也就是50ms左右。

public class FutureTaskDemo2 {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> ft1 = new FutureTask<>(new T1Task());FutureTask<String> ft2 = new FutureTask<>(new T2Task());FutureTask<String> ft3 = new FutureTask<>(new T3Task());FutureTask<String> ft4 = new FutureTask<>(new T4Task());FutureTask<String> ft5 = new FutureTask<>(new T5Task());//構建線程池ExecutorService executorService = Executors.newFixedThreadPool(5);executorService.submit(ft1);executorService.submit(ft2);executorService.submit(ft3);executorService.submit(ft4);executorService.submit(ft5);//獲取執行結果System.out.println(ft1.get());System.out.println(ft2.get());System.out.println(ft3.get());System.out.println(ft4.get());System.out.println(ft5.get());executorService.shutdown();}static class T1Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T1:查詢商品基本信息...");TimeUnit.MILLISECONDS.sleep(50);return "商品基本信息查詢成功";}}static class T2Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T2:查詢商品價格...");TimeUnit.MILLISECONDS.sleep(50);return "商品價格查詢成功";}}static class T3Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T3:查詢商品庫存...");TimeUnit.MILLISECONDS.sleep(50);return "商品庫存查詢成功";}}static class T4Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T4:查詢商品圖片...");TimeUnit.MILLISECONDS.sleep(50);return "商品圖片查詢成功";}}static class T5Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T5:查詢商品銷售狀態...");TimeUnit.MILLISECONDS.sleep(50);return "商品銷售狀態查詢成功";}}}

1.3 Future的局限性

從本質上說,Future表示一個異步計算的結果。它提供了isDone()來檢測計算是否已經完成,并且在計算結束后,可以通過get()方法來獲取計算結果。在異步計算中,Future確實是個非常優秀的接口。但是,它的本身也確實存在著許多限制:

  • 并發執行多任務:Future只提供了get()方法來獲取結果,并且是阻塞的。所以,除了等待你別無他法;
  • 無法對多個任務進行鏈式調用:如果你希望在計算任務完成后執行特定動作,比如發郵件,但Future卻沒有提供這樣的能力;
  • 無法組合多個任務:如果你運行了10個任務,并期望在它們全部執行結束后執行特定動作,那么在Future中這是無能為力的;
  • 沒有異常處理:Future接口中沒有關于異常處理的方法;

2. CompletableFuture使用詳解

簡單的任務,用Future獲取結果還好,但我們并行提交的多個異步任務,往往并不是獨立的,很多時候業務邏輯處理存在串行[依賴]、并行、聚合的關系。如果要我們手動用 Future 實現,是非常麻煩的。

CompletableFuture是Future接口的擴展和增強。CompletableFuture實現了Future接口,并在此基礎上進行了豐富地擴展,完美地彌補了Future上述的種種問題。更為重要的是,CompletableFuture實現了對任務的編排能力。借助這項能力,我們可以輕松地組織不同任務的運行順序、規則以及方式。從某種程度上說,這項能力是它的核心能力。而在以往,雖然通過CountDownLatch等工具類也可以實現任務的編排,但需要復雜的邏輯處理,不僅耗費精力且難以維護。

2.1 應用場景

描述依賴關系:

  1. thenApply() 把前面異步任務的結果,交給后面的Function
  2. thenCompose()用來連接兩個有依賴關系的任務,結果由第二個任務返回

描述and聚合關系:

  1. thenCombine:任務合并,有返回值
  2. thenAccepetBoth:兩個任務執行完成后,將結果交給thenAccepetBoth消耗,無返回值。
  3. runAfterBoth:兩個任務都執行完成后,執行下一步操作(Runnable)。

描述or聚合關系:

  1. applyToEither:兩個任務誰執行的快,就使用那一個結果,有返回值。
  2. acceptEither: 兩個任務誰執行的快,就消耗那一個結果,無返回值。
  3. runAfterEither: 任意一個任務執行完成,進行下一步操作(Runnable)。

并行執行:

CompletableFuture類自己也提供了anyOf()和allOf()用于支持多個CompletableFuture并行執行

2.2 創建異步操作

CompletableFuture 提供了四個靜態方法來創建一個異步操作:

public static CompletableFuture<Void> runAsync(Runnable runnable)public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)?

這四個方法區別在于:

  1. runAsync 方法以Runnable函數式接口類型為參數,沒有返回結果,supplyAsync 方法Supplier函數式接口類型為參數,返回結果類型為U;Supplier 接口的 get() 方法是有返回值的(會阻塞
  2. 沒有指定Executor的方法會使用ForkJoinPool.commonPool() 作為它的線程池執行異步代碼。如果指定線程池,則使用指定的線程池運行。
  3. 默認情況下 CompletableFuture 會使用公共的 ForkJoinPool 線程池,這個線程池默認創建的線程數是 CPU 的核數(也可以通過 JVM?option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 來設置 ForkJoinPool 線程池的線程數)。如果所有 CompletableFuture 共享一個線程池,那么一旦有任務執行一些很慢的 I/O 操作,就會導致線程池中所有線程都阻塞在 I/O 操作上,從而造成線程饑餓,進而影響整個系統的性能。所以,強烈建議你要根據不同的業務類型創建不同的線程池,以避免互相干擾

runAsync&supplyAsync

Runnable runnable = () -> System.out.println("執行無返回結果的異步任務");CompletableFuture.runAsync(runnable);CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {System.out.println("執行有返回值的異步任務");try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}return "Hello World";});String result = future.get();System.out.println(result);

執行無返回結果的異步任務

執行有返回值的異步任務

2.3 獲取結果

join&get

join()和get()方法都是用來獲取CompletableFuture異步之后的返回值。join()方法拋出的是uncheck異常(即未經檢查的異常),不會強制開發者拋出。get()方法拋出的是經過檢查的異常,ExecutionException, InterruptedException?需要用戶手動處理(拋出或者 try catch)

2.4 結果處理

當CompletableFuture的計算結果完成,或者拋出異常的時候,我們可以執行特定的 Action。主要是下面的方法:

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
  • Action的類型是BiConsumer,它可以處理正常的計算結果,或者異常情況。
  • 方法不以Async結尾,意味著Action使用相同的線程執行,而Async可能會使用其它的線程去執行(如果使用相同的線程池,也可能會被同一個線程選中執行)。
  • 這幾個方法都會返回CompletableFuture,當Action執行完畢后它的結果返回原始的CompletableFuture的計算結果或者返回異常

whenComplete&exceptionally

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {}if (new Random().nextInt(10) % 2 == 0) {int i = 12 / 0;}System.out.println("執行結束!");return "test";});future.whenComplete(new BiConsumer<String, Throwable>() {@Overridepublic void accept(String t, Throwable action) {System.out.println(t+" 執行完成!");}});future.exceptionally(new Function<Throwable, String>() {@Overridepublic String apply(Throwable t) {System.out.println("執行失敗:" + t.getMessage());return "異常xxxx";}}).join();

執行結束!

test 執行完成!

或者

執行失敗:java.lang.ArithmeticException: / by zero

null 執行完成!

2.5 結果轉換

所謂結果轉換,就是將上一段任務的執行結果作為下一階段任務的入參參與重新計算,產生新的結果。

thenApply

thenApply 接收一個函數作為參數,使用該函數處理上一個CompletableFuture 調用的結果,并返回一個具有處理結果的Future對象。

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int result = 100;System.out.println("一階段:" + result);return result;}).thenApply(number -> {int result = number * 3;System.out.println("二階段:" + result);return result;});System.out.println("最終結果:" + future.get());

一階段:100

二階段:300

最終結果:300

thenCompose

thenCompose 的參數為一個返回 CompletableFuture 實例的函數,該函數的參數是先前計算步驟的結果。

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(30);System.out.println("第一階段:" + number);return number;}}).thenCompose(new Function<Integer, CompletionStage<Integer>>() {@Overridepublic CompletionStage<Integer> apply(Integer param) {return CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = param * 2;System.out.println("第二階段:" + number);return number;}});}});System.out.println("最終結果: " + future.get());

第一階段:10

第二階段:20

最終結果:20

thenApply 和 thenCompose的區別

  • thenApply轉換的是泛型中的類型,并返回一個新的封裝了轉換結果的

CompletableFuture實例;

  • thenCompose 將內部的 CompletableFuture 調用展開來并使用上一個CompletableFutre 調用的結果在下一步的 CompletableFuture 調用中進行運算,是生成一個新的CompletableFuture。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");CompletableFuture<String> result1 = future.thenApply(param -> param + " World");CompletableFuture<String> result2 = future.thenCompose(param -> CompletableFuture.supplyAsync(() -> param + " World"));System.out.println(result1.get());System.out.println(result2.get());

Hello World

Hello World

2.6 結果消費

與結果處理和結果轉換系列函數返回一個新的 CompletableFuture 不同,結果消費系列函數只對結果執行Action,而不返回新的計算值。

根據對結果的處理方式,結果消費函數又分為:

  • thenAccept系列:對單個結果進行消費
  • thenAcceptBoth系列:對兩個結果進行消費
  • thenRun系列:不關心結果,只對結果執行Action

thenAccept

通過觀察該系列函數的參數類型可知,它們是函數式接口Consumer,這個接口只有輸入,沒有返回值。

public CompletionStage<Void> thenAccept(Consumer<? super T> action);public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);


?

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {int number = new Random().nextInt(10);System.out.println("第一階段:" + number);return number;}).thenAccept(number ->System.out.println("第二階段:" + number * 5));System.out.println("最終結果:" + future.get());

第一階段:8

第二階段:40

最終結果:null

thenAcceptBoth

thenAcceptBoth 函數的作用是,當兩個 CompletionStage 都正常完成計算的時候,就會執行提供的action消費兩個異步的結果。

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, ????Executor executor);
CompletableFuture<Integer> futrue1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(3) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一階段:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(3) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二階段:" + number);return number;}});futrue1.thenAcceptBoth(future2, new BiConsumer<Integer, Integer>() {@Overridepublic void accept(Integer x, Integer y) {System.out.println("最終結果:" + (x + y));}}).join();

第二階段:1

第一階段:2

最終結果:3

thenRun

thenRun 也是對線程任務結果的一種消費函數,與thenAccept不同的是,thenRun 會在上一階段 CompletableFuture 計算完成的時候執行一個Runnable,Runnable并不使用該 CompletableFuture 計算的結果。

public CompletionStage<Void> thenRun(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {int number = new Random().nextInt(10);System.out.println("第一階段:" + number);return number;}).thenRun(() ->System.out.println("thenRun 執行"));System.out.println("最終結果:" + future.get());

第一階段:2

thenRun 執行

最終結果:null

2.7 結果組合

thenCombine

thenCombine 方法,合并兩個線程任務的結果,并進一步處理。

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第一階段:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第二階段:" + number);return number;}});CompletableFuture<Integer> result = future1.thenCombine(future2, new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(Integer x, Integer y) {return x + y;}});System.out.println("最終結果:" + result.get());

第一階段:9

第二階段:5

最終結果:14

2.8 任務交互

所謂線程交互,是指將兩個線程任務獲取結果的速度相比較,按一定的規則進行下一步處理。

applyToEither

兩個線程任務相比較,先獲得執行結果的,就對該結果進行下一步的轉化操作。

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第一階段start:" + number);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一階段end:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第二階段start:" + number);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二階段end:" + number);return number;}});future1.applyToEither(future2, new Function<Integer, Integer>() {@Overridepublic Integer apply(Integer number) {System.out.println("最快結果:" + number);return number * 2;}}).join();

第一階段start:6

第二階段start:5

第二階段end:5

最快結果:5

acceptEither

兩個線程任務相比較,先獲得執行結果的,就對該結果進行下一步的消費操作。

public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一階段:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二階段:" + number);return number;}});future1.acceptEither(future2, new Consumer<Integer>() {@Overridepublic void accept(Integer number) {System.out.println("最快結果:" + number);}}).join();

第二階段:3

最快結果:3

runAfterEither

兩個線程任務相比較,有任何一個執行完成,就進行下一步操作,不關心運行結果。

public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(5);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一階段:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(5);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二階段:" + number);return number;}});future1.runAfterEither(future2, new Runnable() {@Overridepublic void run() {System.out.println("已經有一個任務完成了");}}).join();"); } }).join();

第一階段:3

已經有一個任務完成了

runAfterBoth

兩個線程任務相比較,兩個全部執行完成,才進行下一步操作,不關心運行結果。

public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一階段:1");return 1;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二階段:2");return 2;}});future1.runAfterBoth(future2, new Runnable() {@Overridepublic void run() {System.out.println("上面兩個任務都執行完成了。");}}).get();

第一階段:1

第二階段:2

上面兩個任務都執行完成了。

anyOf

anyOf 方法的參數是多個給定的 CompletableFuture,當其中的任何一個完成時,方法返回這個 CompletableFuture。

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
Random random = new Random();CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(random.nextInt(5));} catch (InterruptedException e) {e.printStackTrace();}return "hello";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(random.nextInt(1));} catch (InterruptedException e) {e.printStackTrace();}return "world";});CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);System.out.println(result.get());

world

allOf

allOf方法用來實現多 CompletableFuture 的同時返回。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("future1完成!");return "future1完成!";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("future2完成!");return "future2完成!";});CompletableFuture<Void> combindFuture = CompletableFuture.allOf(future1, future2);try {combindFuture.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println("future1: " + future1.isDone() + ",future2: " + future2.isDone());

future2完成!

future1完成!

future1: true,future2: true

2.9 使用案例:實現最優的“燒水泡茶”程序

著名數學家華羅庚先生在《統籌方法》這篇文章里介紹了一個燒水泡茶的例子,文中提到最優的工序應該是下面這樣:

對于燒水泡茶這個程序,一種最優的分工方案:用兩個線程 T1 和 T2 來完成燒水泡茶程序,T1 負責洗水壺、燒開水、泡茶這三道工序,T2 負責洗茶壺、洗茶杯、拿茶葉三道工序,其中 T1 在執行泡茶這道工序時需要等待 T2 完成拿茶葉的工序。

基于Future實現

public class FutureTaskDemo3{public static void main(String[] args) throws ExecutionException, InterruptedException {// 創建任務T2的FutureTaskFutureTask<String> ft2 = new FutureTask<>(new T2Task());// 創建任務T1的FutureTaskFutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));// 線程T1執行任務ft1Thread T1 = new Thread(ft1);T1.start();// 線程T2執行任務ft2Thread T2 = new Thread(ft2);T2.start();// 等待線程T1執行結果System.out.println(ft1.get());}}// T1Task需要執行的任務:// 洗水壺、燒開水、泡茶class T1Task implements Callable<String> {FutureTask<String> ft2;// T1任務需要T2任務的FutureTaskT1Task(FutureTask<String> ft2){this.ft2 = ft2;}@Overridepublic String call() throws Exception {System.out.println("T1:洗水壺...");TimeUnit.SECONDS.sleep(1);System.out.println("T1:燒開水...");TimeUnit.SECONDS.sleep(15);// 獲取T2線程的茶葉String tf = ft2.get();System.out.println("T1:拿到茶葉:"+tf);System.out.println("T1:泡茶...");return "上茶:" + tf;}}// T2Task需要執行的任務:// 洗茶壺、洗茶杯、拿茶葉class T2Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T2:洗茶壺...");TimeUnit.SECONDS.sleep(1);System.out.println("T2:洗茶杯...");TimeUnit.SECONDS.sleep(2);System.out.println("T2:拿茶葉...");TimeUnit.SECONDS.sleep(1);return "龍井";}}

基于CompletableFuture實現

public class CompletableFutureDemo2 {public static void main(String[] args) {//任務1:洗水壺->燒開水CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {System.out.println("T1:洗水壺...");sleep(1, TimeUnit.SECONDS);System.out.println("T1:燒開水...");sleep(15, TimeUnit.SECONDS);});//任務2:洗茶壺->洗茶杯->拿茶葉CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {System.out.println("T2:洗茶壺...");sleep(1, TimeUnit.SECONDS);System.out.println("T2:洗茶杯...");sleep(2, TimeUnit.SECONDS);System.out.println("T2:拿茶葉...");sleep(1, TimeUnit.SECONDS);return "龍井";});//任務3:任務1和任務2完成后執行:泡茶CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf) -> {System.out.println("T1:拿到茶葉:" + tf);System.out.println("T1:泡茶...");return "上茶:" + tf;});//等待任務3執行結果System.out.println(f3.join());}static void sleep(int t, TimeUnit u){try {u.sleep(t);} catch (InterruptedException e) {}}}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/63333.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/63333.shtml
英文地址,請注明出處:http://en.pswp.cn/web/63333.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

什么是MyBatis

MyBatis是一款優秀的持久層框架&#xff0c;它支持定制化SQL、存儲過程以及高級映射。以下是關于MyBatis的詳細介紹&#xff1a; 一、MyBatis的起源與發展 MyBatis本是Apache的一個開源項目iBATIS&#xff0c;2010年這個項目由Apache遷移到了Google Code&#xff0c;并且改名…

阿爾茨海默癥數據集,使用yolo,voc,coco格式對2013張原始圖片進行標注,可識別輕微,中等和正常的癥狀

阿爾茨海默癥數據集,使用yolo&#xff0c;voc&#xff0c;coco格式對2013張原始圖片進行標注&#xff0c;可識別輕微&#xff0c;中等&#xff0c;嚴重和正常的癥狀 數據集分割 訓練組100&#xff05; 2013圖片 有效集&#xff05; 0圖片 測試集&#xf…

[代碼隨想錄21二叉樹]二叉樹的修改和改造,修剪二叉樹,將有序數組轉為二叉搜索樹

前言 二叉樹章節最后的題目了&#xff0c;就是對搜索二叉樹的改造&#xff0c; 題目鏈接 669. 修剪二叉搜索樹 - 力扣&#xff08;LeetCode&#xff09; 108. 將有序數組轉換為二叉搜索樹 - 力扣&#xff08;LeetCode&#xff09; 一、修剪二叉搜索樹 思路&#xff1a;等會…

Android 13 Aosp SystemServer功能裁剪(PackageManager.hasSystemFeature())

系統定制,裁剪Wifi,bt等模塊 UI部分可參考: SystemUI 隱藏下拉快捷面板部分模塊(wifi,bt,nfc等)入口 Android系統啟動后Zygote進程會forkSystemServer進程。SystemServer啟動Andorid服務. frameworks/base/services/java/com/android/server/SystemServer.java if (contex…

Scala的惰性求值:深入理解與實踐

在編程中&#xff0c;我們經常需要處理那些計算成本高昂或者可能永遠不會用到的值。在這種情況下&#xff0c;惰性求值&#xff08;Lazy Evaluation&#xff09;是一種非常有用的策略。它允許我們推遲計算&#xff0c;直到這些值真正需要被使用。Scala&#xff0c;作為一種多功…

事務-介紹與操作四大特性

一.數據準備&#xff1a; 1.員工表&#xff1a; -- 員工管理 create table tb_emp (id int unsigned primary key auto_increment comment ID,username varchar(20) not null unique comment 用戶名,password varchar(32) default 123456 comment 密碼,n…

Golang學習歷程【第一篇 入門】

Golang學習歷程【第一篇 入門Hello World】 1. 學習文檔2. Window 本地安裝Go2.1 安裝2.2 驗證 3. 開發環境——VsCode3.1 VsCode 安裝3.2 安裝插件3.2.1 language 語言漢化插件安裝3.2.2 Go插件安裝 4. Hello World 入門4.1 建工程4.2 創建項目文件4.3 編寫Hello World程序4.4…

微積分復習筆記 Calculus Volume 2 - 4.3 Separable Equations

4.3 Separable Equations - Calculus Volume 2 | OpenStax

Day43 動態規劃part10

300.最長遞增子序列 今天開始正式子序列系列,本題是比較簡單的,感受感受一下子序列題目的思路。 視頻講解:動態規劃之子序列問題,元素不連續!| LeetCode:300.最長遞增子序列_嗶哩嗶哩_bilibili 代碼隨想錄 class Solution {public int lengthOfLIS(int[] nums) {int[] …

Doris SQL 特技

group_concat description Syntax VARCHAR GROUP_CONCAT([DISTINCT] VARCHAR str[, VARCHAR sep] [ORDER BY { col_name | expr} [ASC | DESC]) 該函數是類似于 sum() 的聚合函數&#xff0c;group_concat 將結果集中的多行結果連接成一個字符串。第二個參數 sep 為字符串之…

Metaploit-永恒之藍漏洞利用

1&#xff1a;Metaploit介紹   本次測試主要是利用永恒之藍漏洞對windows7進行控制利用&#xff0c;掌握Metaploit工具的使用&#xff0c;知道永恒之藍的漏洞利用原理。永恒之藍是在Windows的SMB服務處理SMB v1請求時發生的漏洞&#xff0c;這個漏洞導致攻擊者在目標系統上可…

電容Q值、損耗角、應用

電容發熱的主要原因&#xff1a;紋波電壓 當電容兩端施加紋波電壓時&#xff0c;電容承受的是變化的電壓&#xff0c;由于電容內部存在寄生電阻&#xff08;ESR&#xff09;和寄生電感&#xff08;ESL&#xff09;.因此電容會有能量損耗&#xff0c;從而產生熱量&#xff0c;這…

css三角形源碼

效果圖 如下圖所示&#xff0c;讓一個 div 變成三角形&#xff0c;并且可隨意更改大小&#xff0c; 本文提供了可運行示例源碼&#xff0c;直接復制即可。 實現源碼 建議創建一個 demo.html 文件&#xff0c;一鍵復制代碼運行。 <style> .div{width: 0px;height: 0p…

pyparsing restOfLine

在 pyparsing 中&#xff0c;restOfLine 是一個解析器&#xff08;parser&#xff09;&#xff0c;用于匹配當前位置到行尾的所有內容&#xff0c;通常在解析文件或處理逐行數據時非常有用。 restOfLine 的特性 匹配內容&#xff1a;從當前位置一直匹配到換行符 \n 或字符串結…

【附源碼】Electron Windows桌面壁紙開發中的 CommonJS 和 ES Module 引入問題以及 Webpack 如何處理這種兼容

背景 在嘗試讓 ChatGPT 自動開發一個桌面壁紙更改的功能時&#xff0c;發現引入了一個 wallpaper 庫&#xff0c;這個庫的入口文件是 index.js&#xff0c;但是 package.json 文件下的 type:"module"&#xff0c;這樣造成了無論你使用 import from 還是 require&…

【計算機網絡篇】計算機網絡期末復習題庫詳解

&#x1f9f8;安清h&#xff1a;個人主頁 &#x1f3a5;個人專欄&#xff1a;【計算機網絡】【Mybatis篇】 &#x1f6a6;作者簡介&#xff1a;一個有趣愛睡覺的intp&#xff0c;期待和更多人分享自己所學知識的真誠大學生。 目錄 &#x1f3af;單選 &#x1f3af;填空 &am…

JS使用random隨機數實現簡單的四則算數驗證

1.效果圖 2.代碼實現 index.html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</ti…

GIN中間件

感覺中間件是gin中挺重要的內容&#xff0c;就拿出來單獨講講吧&#xff01; 什么是中間件&#xff1f; Gin框架允許開發者在處理請求的過程中&#xff0c;加入用戶自己的 HandlerFunc 函數。 它適合處理一些公共的業務邏輯&#xff0c;比如登錄認證、權限校驗、數據分頁、記…

SLM510A系列——24V,15到150mA單通道可調電流線性恒流LED驅動芯片

SLM510A 系列產品是單通道、高精度、可調電流線性恒流源的 LED 驅動芯片&#xff0c;在各種 LED 照明產品中非常簡單易用。其在寬電壓輸入范圍內&#xff0c;能保證極高的輸出電流精度&#xff0c;從而在大面積的光源照明中&#xff0c;都能讓 LED 照明亮度保持均勻一致。 由于…

回歸預測 | MATLAB實現SVM-Adaboost集成學習結合支持向量機多輸入單輸出回歸預測

回歸預測 | MATLAB實現SVM-Adaboost集成學習結合支持向量機多輸入單輸出回歸預測 目錄 回歸預測 | MATLAB實現SVM-Adaboost集成學習結合支持向量機多輸入單輸出回歸預測基本介紹程序設計基本介紹 SVM-Adaboost集成學習是一種將支持向量機(SVM)與AdaBoost算法相結合的集成學習…