參考:CompletableFuture 詳解 | JavaGuide
實際項目中,一個接口可能需要同時獲取多種不同的數據,然后再匯總返回,舉個例子:用戶請求獲取訂單信息,可能需要同時獲取用戶信息、商品詳情、物流信息、等數據。
如果是串行(按順序依次執行每個任務)執行的話,接口的響應速度會非常慢。
考慮到這些任務之間有大部分都是 無前后順序關聯 的,可以 并行執行 ,就比如說調用獲取商品詳情的時候,可以同時調用獲取物流信息。通過并行執行多個任務的方式,接口的響應速度會得到大幅優化。
Future介紹
Future是Java5引入的接口,提供了基本的異步處理功能,但它的局限性在于只能通過 get()方法阻塞獲取結果,無法鏈式調用多個任務,也缺 少異常處理機制。
CompletableFuture 是 Future 的增強版,提供了非阻塞的結果處理、任務組合和異常處理,使得異步編程更加靈活和強大。
在 Java 中,Future
只是一個泛型接口,位于 java.util.concurrent
包下,其中定義了 5 個方法,主要包括下面這 4 個功能:
-
取消任務;
-
判斷任務是否被取消;
-
判斷任務是否已經執行完成;
-
獲取任務執行結果。
// V 代表了Future執行的任務返回值的類型 public interface Future<V> {// 取消任務執行// 成功取消返回 true,否則返回 falseboolean cancel(boolean mayInterruptIfRunning);// 判斷任務是否被取消boolean isCancelled();// 判斷任務是否已經執行完成boolean isDone();// 獲取任務執行結果V get() throws InterruptedException, ExecutionException;// 指定時間內沒有返回計算結果就拋出 TimeOutException 異常V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutExceptio }
CompletableFuture
是 java 8引入的一個強大的異步工具類。允許非阻塞地處理異步任務,并且可以通過鏈式調用組合多個異步操作。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { }
核心特性如下:
-
異步執行
通過
runAsync
和supplyAsync
方法,可以異步地執行任務。 -
任務完成回調
使用
thenApply、thenAccept、thenRun
等方法,可以在任務完成后執行回調。 -
任務組合
可以將多個 CompletableFuture 組合在一起,通過 thenCombine、thenCompose 等方法,處理多個異步任務之間的依賴關系。
-
異常處理
提供了
exceptionally、handle
等方法,可以在異步任務發生異常時進行處理。 -
并行處理
可以通過
allOf
和anyOf
方法,并行地執行多個異步任務,并在所有任務完成或任意一個任務完成時執行回調。
擴展
創建異步任務
-
通過 new 關鍵字
CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>(); //獲取異步計算的結果,調用 get() 方法的線程會 阻塞 直到 CompletableFuture 完成運算 rpcResponse = completableFuture.get(); ? // complete() 方法只能調用一次,后續調用將被忽略。 resultFuture.complete(rpcResponse); ? //如果你已經知道計算的結果的話,可以使用靜態方法 completedFuture() 來創建 CompletableFuture CompletableFuture<String> future = CompletableFuture.completedFuture("hello!"); assertEquals("hello!", future.get());
-
runAsync: 創建異步任務,不返回結果
-
supplyAsync: 創建異步任務并返回結果
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> { ?//前面的這個future1不是用來接收數據的,僅僅表示完成狀態// 異步任務 }); ? //future2接收異步執行的結果,即里面返回出來的字符串,后續可以利用thenAccept等方法處理這個結果 CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { // 異步任務并返回結果return "Hello, nihao.com!"; });
任務完成回調
-
thenApply: 在任務完成后可以對任務結果進行轉換返回
-
thenAccept: 在任務完成后對結果進行消費,但不返回新結果
-
thenRun: 在任務完成后執行一個操作,但不需要使用任務結果
// 轉換任務結果 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello").thenApply(result -> result + " nihao.com");// 消費任務結果,不返回新結果 CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello").thenAccept(result -> System.out.println(result));// 不消費任務結果,也不返回新結果 CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello") .thenRun(() -> System.out.println("Task finished"));
任務組合
-
thenCombine: 合并兩個CompletableFuture 的結果
-
thenCompose: 將一個CompletableFuture 的結果作為另一個 CompletableFuture 的輸入。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello"); ? //1 CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "nihao"); ? //2 ? //future1.thenComebine(fu2,...) CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + " " + result2); ? //將前者的result 傳給 后者 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello").thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " nihao"));
異常處理
-
exceptionally: 在任務發生異常時提供默認值。
-
handle: 在任務完成或發生異常時進行處理。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {if (true) {throw new RuntimeException("Exception");}return "Hello"; }).exceptionally(ex -> "nihao"); ? ? //提供發生異常時的默認值 ? ? CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {if (true) {throw new RuntimeException("Exception");}return "Hello"; }).handle((result, ex) -> { ? ? ? //提供發生異常后的處理if (ex != null) {return "Default Value";}return result; });
并行處理
-
allOf: 等待多個CompletableFuture 全部完成
-
anyOf: 任意一個 CompletableFuture 完成時執行操作
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2); allFutures.thenRun(() -> System.out.println("nihao tasks finished")); ? ?//全部完成在執行 ? ? CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2); anyFuture.thenAccept(result -> System.out.println("nihao task finished with result: " + result)); ?//任意一個完成即可執行
使用建議
-
CompletableFuture
默認使用全局共享的ForkJoinPool.commonPool()
作為執行器,所以應用程序、多個庫或框架(如 Spring、第三方庫)若都依賴CompletableFuture
,默認情況下它們都會共享同一個線程池。 -
為避免這些問題,建議為
CompletableFuture
提供自定義線程池,根據任務特性調整線程池大小和隊列類型,也更好地處理線程中的異常情況 -
CompletableFuture
的get()
方法是阻塞的,盡量避免使用。如果必須要使用的話,需要添加超時時間,否則可能會導致主線程一直等待,無法執行其他任務。