本文將探討 Java 8 引入的
CompletableFuture
,一個在異步編程中實現非阻塞、可組合操作的強大工具。我們將從CompletableFuture
的基本概念、與傳統Future
的區別、核心 API 用法,到復雜的鏈式調用、組合操作以及異常處理進行全面解析,并通過豐富的代碼示例,幫助 Java 后端開發者更好地理解和應用CompletableFuture
,提升系統性能和響應能力。
1. 為什么需要 CompletableFuture
在現代后端開發中,高并發和低延遲是衡量系統性能的重要指標。傳統的同步編程模型在處理耗時操作(如網絡請求、數據庫查詢)時,會阻塞當前線程,導致系統吞吐量下降,用戶體驗變差。為了解決這一問題,異步編程應運而生。Java 5 引入的 Future
接口是異步編程的初步嘗試,它代表了一個異步計算的結果,但其局限性也日益凸顯。
1.1 傳統 Future 的局限性
Future
接口雖然提供了異步執行任務的能力,但其設計存在以下幾個主要局限性:
- 阻塞式獲取結果:
Future.get()
方法會阻塞當前線程,直到異步任務完成并返回結果。這意味著,如果任務執行時間過長,調用線程將被長時間阻塞,無法執行其他操作,從而降低了系統的響應能力和資源利用率。 - 無法方便地進行鏈式操作和組合:
Future
接口沒有提供直接的方法來將多個異步操作串聯起來,或者將多個異步操作的結果進行組合。當需要執行一系列相互依賴的異步任務時,開發者往往需要手動管理線程和回調,代碼變得復雜且難以維護,容易出現“回調地獄”(Callback Hell)。 - 異常處理不便:
Future
接口的異常處理機制相對簡單。當異步任務拋出異常時,只有在調用get()
方法時才能捕獲到ExecutionException
,這使得異常的傳播和處理變得不靈活,難以在異步流程中進行細粒度的錯誤控制。
1.2 CompletableFuture 的優勢
為了克服 Future
的這些局限性,Java 8 引入了 CompletableFuture
類。CompletableFuture
不僅實現了 Future
接口,還實現了 CompletionStage
接口,這使得它在異步編程方面擁有了前所未有的靈活性和強大功能。CompletableFuture
的主要優勢體現在:
- 非阻塞:
CompletableFuture
通過回調機制實現了非阻塞操作。它允許你注冊一個回調函數,當異步任務完成時,該回調函數會被自動執行,而不會阻塞當前線程。這極大地提高了系統的并發能力和響應速度。 - 可組合:
CompletableFuture
提供了豐富的 API,支持將多個異步操作進行鏈式調用和組合。你可以輕松地將一個異步任務的結果作為另一個異步任務的輸入,或者等待多個異步任務都完成后再執行某個操作。這種可組合性使得復雜的異步流程能夠以聲明式的方式清晰地表達,代碼結構更加簡潔。 - 更靈活的異常處理:
CompletableFuture
提供了exceptionally()
、handle()
等方法,允許開發者在異步任務的任何階段捕獲和處理異常。這使得異常處理變得更加靈活和可控,避免了傳統Future
中異常處理的痛點。 - 更強大的并發控制:
CompletableFuture
內部使用了 ForkJoinPool 作為默認的異步執行線程池,也可以自定義線程池。它能夠更好地利用多核處理器的優勢,實現高效的并發控制。
總之,CompletableFuture
是 Java 異步編程領域的一個里程碑式的改進,它為開發者提供了構建高性能、高響應、易于維護的并發應用程序的強大工具。
2. CompletableFuture 核心概念與基本用法
CompletableFuture
是 Java 8 中引入的一個強大的并發工具,它位于 java.util.concurrent
包中。
2.1 CompletableFuture 是什么
CompletableFuture<T>
是一個類,它實現了 Future<T>
和 CompletionStage<T>
兩個接口。這意味著它既可以作為傳統 Future
的替代品,用于獲取異步計算的結果,又具備了 CompletionStage
接口提供的強大功能,支持鏈式操作和組合多個異步計算步驟。
Future<T>
接口:代表一個異步計算的結果。通過get()
方法可以阻塞地獲取結果,或者通過cancel()
方法取消任務。CompletionStage<T>
接口:定義了一系列用于描述異步計算階段的方法。這些方法允許你在一個異步操作完成后執行另一個操作,而無需阻塞當前線程。這是CompletableFuture
強大之處的核心。
簡單來說,CompletableFuture
代表了一個可能在未來某個時間點完成的異步計算的結果。這個結果可以是成功的值,也可以是計算過程中拋出的異常。
2.2 創建 CompletableFuture
CompletableFuture
提供了多種靜態方法來創建不同類型的異步任務:
2.2.1 CompletableFuture.runAsync(Runnable runnable)
用于執行一個沒有返回值的異步任務。它接受一個 Runnable
類型的參數,并在 ForkJoinPool.commonPool() 中異步執行該任務。如果需要指定線程池,可以使用 CompletableFuture.runAsync(Runnable runnable, Executor executor)
。
示例代碼:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCreation {public static void main(String[] args) throws InterruptedException {System.out.println("主線程開始");CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(2);System.out.println("異步任務執行完成,無返回值");} catch (InterruptedException e) {e.printStackTrace();}});// 主線程可以繼續執行其他任務,無需等待異步任務完成System.out.println("主線程繼續執行其他任務");// 等待異步任務完成(非阻塞方式,通過回調)future.thenRun(() -> System.out.println("異步任務真正完成后的回調"));// 為了演示效果,讓主線程等待一段時間,確保異步任務有時間執行TimeUnit.SECONDS.sleep(3);System.out.println("主線程結束");}
}
2.2.2 CompletableFuture.supplyAsync(Supplier supplier)
用于執行一個有返回值的異步任務。它接受一個 Supplier<T>
類型的參數,并在 ForkJoinPool.commonPool() 中異步執行該任務,返回一個 CompletableFuture<T>
。同樣,也可以指定線程池:CompletableFuture.supplyAsync(Supplier<T> supplier, Executor executor)
。
示例代碼:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCreation {public static void main(String[] args) throws Exception {System.out.println("主線程開始");CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);System.out.println("異步任務執行完成,有返回值");return "Hello, CompletableFuture!";} catch (InterruptedException e) {throw new IllegalStateException(e);}});System.out.println("主線程繼續執行其他任務");// 阻塞式獲取結果(僅為演示,實際應用中應避免長時間阻塞)String result = future.get(); System.out.println("異步任務返回結果: " + result);System.out.println("主線程結束");}
}
2.2.3 new CompletableFuture()
你可以手動創建一個 CompletableFuture
實例,并在后續通過 complete()
或 completeExceptionally()
方法來手動完成它。這在某些場景下非常有用,例如當你需要將一個非 CompletableFuture
風格的異步操作包裝成 CompletableFuture
時。
示例代碼:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCreation {public static void main(String[] args) throws Exception {System.out.println("主線程開始");CompletableFuture<String> future = new CompletableFuture<>();// 在另一個線程中執行耗時操作,并手動完成 CompletableFuturenew Thread(() -> {try {TimeUnit.SECONDS.sleep(2);future.complete("手動完成的 CompletableFuture");System.out.println("手動 CompletableFuture 已完成");} catch (InterruptedException e) {future.completeExceptionally(e);}}).start();System.out.println("主線程繼續執行其他任務");String result = future.get(); // 阻塞等待結果System.out.println("獲取到手動 CompletableFuture 的結果: " + result);System.out.println("主線程結束");}
}
2.2.4 CompletableFuture.completedFuture(T value)
如果你已經知道一個異步操作的結果,可以直接使用 completedFuture()
方法創建一個已經完成的 CompletableFuture
。這對于測試或者某些特定場景非常方便,可以避免不必要的異步執行。
示例代碼:
import java.util.concurrent.CompletableFuture;public class CompletableFutureCreation {public static void main(String[] args) throws Exception {System.out.println("主線程開始");CompletableFuture<String> future = CompletableFuture.completedFuture("這是一個已完成的 CompletableFuture");System.out.println("主線程繼續執行其他任務");String result = future.get(); // 不會阻塞,立即返回結果System.out.println("獲取到已完成 CompletableFuture 的結果: " + result);System.out.println("主線程結束");}
}
2.3 獲取結果
當 CompletableFuture
完成后,你可以通過以下方法獲取其結果:
2.3.1 get()
get()
方法是 Future
接口中定義的方法,它會阻塞當前線程,直到 CompletableFuture
完成并返回結果。如果任務在完成時拋出異常,get()
方法會拋出 ExecutionException
,其 getCause()
方法可以獲取到原始異常。
注意:在實際應用中,應盡量避免長時間阻塞主線程,get()
方法通常用于測試或在確定異步任務很快完成的場景。
2.3.2 join()
join()
方法與 get()
方法類似,也會阻塞當前線程并等待 CompletableFuture
完成。但不同的是,join()
方法不會拋出受檢異常 ExecutionException
,而是將原始異常包裝成非受檢異常 CompletionException
拋出。這使得在鏈式調用中處理異常更加方便,無需在每個 get()
調用處都進行 try-catch
。
2.3.3 complete(T value)
complete()
方法用于手動完成 CompletableFuture
,并為其設置一個結果值。如果 CompletableFuture
已經完成(無論是正常完成還是異常完成),再次調用 complete()
將無效。
2.3.4 completeExceptionally(Throwable ex)
completeExceptionally()
方法用于手動使 CompletableFuture
異常完成,并為其設置一個異常。這在異步任務執行過程中發生錯誤時非常有用,可以將異常信息傳遞給 CompletableFuture
的消費者。
示例代碼(get()
vs join()
):
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class CompletableFutureGetJoin {public static void main(String[] args) {// 正常完成的 CompletableFutureCompletableFuture<String> successFuture = CompletableFuture.supplyAsync(() -> "Success");try {String resultGet = successFuture.get();System.out.println("get() 正常結果: " + resultGet);String resultJoin = successFuture.join();System.out.println("join() 正常結果: " + resultJoin);} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}// 異常完成的 CompletableFutureCompletableFuture<String> exceptionFuture = CompletableFuture.supplyAsync(() -> {throw new RuntimeException("Something went wrong!");});try {exceptionFuture.get(); // 拋出 ExecutionException} catch (InterruptedException | ExecutionException e) {System.out.println("get() 捕獲到異常: " + e.getCause().getMessage());}try {exceptionFuture.join(); // 拋出 CompletionException} catch (Exception e) {System.out.println("join() 捕獲到異常: " + e.getCause().getMessage());}}
}
3. 鏈式操作:構建異步任務流
CompletableFuture
最強大的特性之一是其支持鏈式操作,允許我們將多個異步任務串聯起來,形成一個有向無環圖(DAG),從而構建復雜的異步任務流。這極大地簡化了異步編程的復雜性,避免了傳統回調模式帶來的“回調地獄”。
3.1 結果轉換:thenApply()
thenApply()
方法用于對上一個 CompletableFuture
的結果進行轉換,并返回一個新的 CompletableFuture
。它接受一個 Function
函數式接口作為參數,該函數接收上一個 CompletableFuture
的結果作為輸入,并返回一個轉換后的新結果。thenApply()
是同步執行的,即轉換操作會在完成上一個 CompletableFuture
的線程中執行。如果需要異步執行轉換操作,可以使用 thenApplyAsync()
。
方法簽名:
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
示例代碼:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureChaining {public static void main(String[] args) throws Exception {CompletableFuture<String> initialFuture = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Hello";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> transformedFuture = initialFuture.thenApply(s -> {System.out.println("thenApply 在 " + Thread.currentThread().getName() + " 線程中執行");return s + " World";});System.out.println("結果: " + transformedFuture.get()); // 輸出: 結果: Hello World}
}
3.2 消費結果:thenAccept()
thenAccept()
方法用于消費上一個 CompletableFuture
的結果,但不會返回新的結果(即返回 CompletableFuture<Void>
)。它接受一個 Consumer
函數式接口作為參數,該函數接收上一個 CompletableFuture
的結果作為輸入,但沒有返回值。thenAccept()
同樣有異步版本 thenAcceptAsync()
。
方法簽名:
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
示例代碼:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureChaining {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Hello";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<Void> voidFuture = future.thenAccept(s -> {System.out.println("thenAccept 在 " + Thread.currentThread().getName() + " 線程中執行");System.out.println("消費結果: " + s + ", 無返回值");});voidFuture.get(); // 等待消費完成}
}
3.3 任務完成:thenRun()
thenRun()
方法用于在上一個 CompletableFuture
完成后執行一個不關心結果且沒有返回值的任務。它接受一個 Runnable
函數式接口作為參數。thenRun()
同樣有異步版本 thenRunAsync()
。
方法簽名:
CompletableFuture<Void> thenRun(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
示例代碼:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureChaining {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Hello";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<Void> voidFuture = future.thenRun(() -> {System.out.println("thenRun 在 " + Thread.currentThread().getName() + " 線程中執行");System.out.println("上一個任務已完成,執行不關心結果的任務");});voidFuture.get(); // 等待任務完成}
}
3.4 異步轉換:thenCompose()
thenCompose()
方法是 CompletableFuture
中非常重要的一個方法,它用于將上一個 CompletableFuture
的結果作為參數,創建并返回一個新的 CompletableFuture
。這與 thenApply()
的區別在于,thenApply()
返回的是一個包含轉換后結果的 CompletableFuture
,而 thenCompose()
返回的是一個扁平化的 CompletableFuture
。當你的轉換函數本身也返回一個 CompletableFuture
時,thenCompose()
可以避免出現 CompletableFuture<CompletableFuture<T>>
這種嵌套結構,從而保持鏈的扁平化。
方法簽名:
<U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
<U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
<U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
示例代碼:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureChaining {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "User ID: 123";} catch (InterruptedException e) {throw new IllegalStateException(e);}});// thenApply 示例 (會產生嵌套)CompletableFuture<CompletableFuture<String>> nestedFuture = future1.thenApply(userId -> {System.out.println("thenApply 內部獲取到: " + userId);return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return userId + " - User Name: Alice";} catch (InterruptedException e) {throw new IllegalStateException(e);}});});System.out.println("thenApply 結果 (嵌套): " + nestedFuture.get().get()); // 需要兩次 get()// thenCompose 示例 (扁平化)CompletableFuture<String> flatFuture = future1.thenCompose(userId -> {System.out.println("thenCompose 內部獲取到: " + userId);return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return userId + " - User Name: Bob";} catch (InterruptedException e) {throw new IllegalStateException(e);}});});System.out.println("thenCompose 結果 (扁平化): " + flatFuture.get()); // 只需一次 get()}
}
4. 組合操作:處理多個異步任務
在實際應用中,我們經常需要處理多個獨立的異步任務,并在它們全部完成或其中任意一個完成時執行后續操作。CompletableFuture
提供了強大的組合方法,使得這些場景的處理變得非常優雅和高效。
4.1 組合兩個結果:thenCombine()
thenCombine()
方法用于當兩個 CompletableFuture
都完成后,將它們的結果組合起來,并返回一個新的 CompletableFuture
。它接受另一個 CompletionStage
和一個 BiFunction
作為參數,BiFunction
接收兩個 CompletableFuture
的結果作為輸入,并返回一個組合后的新結果。
方法簽名:
<U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
示例代碼:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCombination {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Hello";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);return "World";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (s1, s2) -> {System.out.println("thenCombine 在 " + Thread.currentThread().getName() + " 線程中執行");return s1 + " " + s2;});System.out.println("組合結果: " + combinedFuture.get()); // 輸出: 組合結果: Hello World}
}
4.2 所有任務完成:allOf()
allOf()
方法用于等待所有給定的 CompletableFuture
都完成。它接受一個 CompletableFuture
數組作為參數,并返回一個 CompletableFuture<Void>
。當所有輸入的 CompletableFuture
都成功完成時,返回的 CompletableFuture
才會完成。如果其中任何一個 CompletableFuture
異常完成,那么 allOf()
返回的 CompletableFuture
也會異常完成。
方法簽名:
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
示例代碼:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCombination {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);return "Result from Future 1";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Result from Future 2";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(3);return "Result from Future 3";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);// 等待所有任務完成allFutures.get(); System.out.println("所有任務都已完成!");// 可以通過各自的 future.get() 獲取結果System.out.println(future1.get());System.out.println(future2.get());System.out.println(future3.get());}
}
4.3 任意任務完成:anyOf()
anyOf()
方法用于當任何一個給定的 CompletableFuture
完成時,就完成當前的 CompletableFuture
。它接受一個 CompletableFuture
數組作為參數,并返回一個 CompletableFuture<Object>
。返回的 CompletableFuture
的結果將是第一個完成的 CompletableFuture
的結果。
方法簽名:
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
示例代碼:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCombination {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(3);return "Result from Future 1";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Result from Future 2";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);return "Result from Future 3";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);System.out.println("第一個完成的任務結果: " + anyOfFuture.get()); // 輸出: 第一個完成的任務結果: Result from Future 2}
}
5. 異常處理
在異步編程中,異常處理是一個非常重要的環節。CompletableFuture
提供了多種機制來優雅地處理異步任務中可能出現的異常,避免了傳統 Future
中異常處理的繁瑣和不便。
5.1 異常處理:exceptionally()
exceptionally()
方法用于當 CompletableFuture
出現異常時,提供一個替代結果。它接受一個 Function<Throwable, ? extends T>
作為參數,當上一個 CompletableFuture
異常完成時,該函數會被調用,并接收異常作為輸入,然后返回一個替代值作為當前 CompletableFuture
的結果。如果上一個 CompletableFuture
正常完成,exceptionally()
不會執行。
方法簽名:
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
示例代碼:
import java.util.concurrent.CompletableFuture;public class CompletableFutureExceptionHandling {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {if (Math.random() < 0.5) {throw new RuntimeException("模擬異常");}return "正常結果";}).exceptionally(ex -> {System.out.println("捕獲到異常: " + ex.getMessage());return "從異常中恢復的默認結果";});System.out.println("最終結果: " + future.get());}
}
5.2 統一處理:handle()
handle()
方法是一個更通用的處理方法,無論 CompletableFuture
是正常完成還是異常完成,它都會執行。它接受一個 BiFunction<? super T, Throwable, ? extends U>
作為參數,該函數接收上一個 CompletableFuture
的結果和可能發生的異常作為輸入。如果正常完成,異常參數為 null
;如果異常完成,結果參數為 null
。handle()
的返回值將作為當前 CompletableFuture
的結果。
方法簽名:
<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
示例代碼:
import java.util.concurrent.CompletableFuture;public class CompletableFutureExceptionHandling {public static void main(String[] args) throws Exception {// 正常情況CompletableFuture<String> normalFuture = CompletableFuture.supplyAsync(() -> "正常數據").handle((result, ex) -> {if (ex != null) {System.out.println("handle 捕獲到異常: " + ex.getMessage());return "處理異常后的結果";} else {System.out.println("handle 正常處理結果: " + result);return result + " - 處理完成";}});System.out.println("正常情況最終結果: " + normalFuture.get());// 異常情況CompletableFuture<String> exceptionFuture = CompletableFuture.supplyAsync(() -> {throw new RuntimeException("模擬異常");}).handle((result, ex) -> {if (ex != null) {System.out.println("handle 捕獲到異常: " + ex.getMessage());return "處理異常后的結果";} else {System.out.println("handle 正常處理結果: " + result);return result + " - 處理完成";}});System.out.println("異常情況最終結果: " + exceptionFuture.get());}
}
5.3 完成時回調:whenComplete()
whenComplete()
方法用于當 CompletableFuture
完成時執行一個回調,無論它是正常完成還是異常完成。它接受一個 BiConsumer<? super T, ? super Throwable>
作為參數,該函數接收結果和異常作為輸入。與 handle()
不同的是,whenComplete()
不會修改 CompletableFuture
的結果,主要用于日志記錄、資源清理或觸發后續不依賴結果的操作。如果 whenComplete()
內部拋出異常,該異常會覆蓋原始的異常(如果存在)。
方法簽名:
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
示例代碼:
import java.util.concurrent.CompletableFuture;public class CompletableFutureExceptionHandling {public static void main(String[] args) throws Exception {// 正常情況CompletableFuture<String> normalFuture = CompletableFuture.supplyAsync(() -> "正常數據").whenComplete((result, ex) -> {if (ex != null) {System.out.println("whenComplete 捕獲到異常: " + ex.getMessage());} else {System.out.println("whenComplete 正常完成,結果: " + result);}});System.out.println("正常情況最終結果: " + normalFuture.get());// 異常情況CompletableFuture<String> exceptionFuture = CompletableFuture.supplyAsync(() -> {throw new RuntimeException("模擬異常");}).whenComplete((result, ex) -> {if (ex != null) {System.out.println("whenComplete 捕獲到異常: " + ex.getMessage());} else {System.out.println("whenComplete 正常完成,結果: " + result);}});try {exceptionFuture.get(); // 原始異常會被重新拋出} catch (Exception e) {System.out.println("get() 捕獲到原始異常: " + e.getCause().getMessage());}}
}
6. 實際應用場景與最佳實踐
CompletableFuture
在 Java 后端開發中有著廣泛的應用,尤其是在需要處理大量并發請求、優化系統響應時間以及構建高吞吐量服務的場景下。合理地運用 CompletableFuture
可以顯著提升系統性能和用戶體驗。
6.1 實際應用場景
6.1.1 并行調用多個微服務
在微服務架構中,一個業務請求可能需要調用多個下游微服務來獲取數據。如果這些調用是串行的,那么總的響應時間將是所有微服務響應時間之和。通過 CompletableFuture
,我們可以并行地發起對多個微服務的調用,然后使用 allOf()
或 thenCombine()
等方法等待所有結果或組合結果,從而大大縮短響應時間。
示例場景: 用戶下單時,需要同時查詢商品庫存、用戶積分和優惠券信息。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class MicroserviceParallelCall {// 模擬查詢商品庫存的微服務public static CompletableFuture<Integer> getProductStock(String productId) {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1); // 模擬網絡延遲System.out.println("查詢商品 " + productId + " 庫存完成");return 100; // 假設庫存100} catch (InterruptedException e) {throw new IllegalStateException(e);}});}// 模擬查詢用戶積分的微服務public static CompletableFuture<Integer> getUserPoints(String userId) {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1.5); // 模擬網絡延遲System.out.println("查詢用戶 " + userId + " 積分完成");return 2000; // 假設積分2000} catch (InterruptedException e) {throw new IllegalStateException(e);}});}// 模擬查詢優惠券信息的微服務public static CompletableFuture<String> getCouponInfo(String userId) {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(0.8); // 模擬網絡延遲System.out.println("查詢用戶 " + userId + " 優惠券完成");return "滿100減10"; // 假設優惠券信息} catch (InterruptedException e) {throw new IllegalStateException(e);}});}public static void main(String[] args) throws Exception {long start = System.currentTimeMillis();CompletableFuture<Integer> stockFuture = getProductStock("P001");CompletableFuture<Integer> pointsFuture = getUserPoints("U001");CompletableFuture<String> couponFuture = getCouponInfo("U001");// 等待所有異步任務完成CompletableFuture.allOf(stockFuture, pointsFuture, couponFuture).join();// 獲取結果并處理Integer stock = stockFuture.get();Integer points = pointsFuture.get();String coupon = couponFuture.get();System.out.println("\n所有信息查詢完成:");System.out.println("商品庫存: " + stock);System.out.println("用戶積分: " + points);System.out.println("優惠券信息: " + coupon);long end = System.currentTimeMillis();System.out.println("總耗時: " + (end - start) + " ms");}
}
6.1.2 異步發送通知(郵件、短信)
在用戶注冊、訂單支付成功等場景下,系統通常需要發送郵件、短信或站內信等通知。這些通知操作通常不影響主業務流程,但如果同步執行,可能會增加主業務的響應時間。使用 CompletableFuture
可以將這些通知操作異步化,提升主業務的響應速度。
示例場景: 用戶注冊成功后,異步發送歡迎郵件和短信。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class AsyncNotification {public static CompletableFuture<Void> sendEmail(String email, String content) {return CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(1); // 模擬郵件發送耗時System.out.println("郵件發送成功到: " + email + ", 內容: " + content);} catch (InterruptedException e) {e.printStackTrace();}});}public static CompletableFuture<Void> sendSms(String phoneNumber, String content) {return CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(0.5); // 模擬短信發送耗時System.out.println("短信發送成功到: " + phoneNumber + ", 內容: " + content);} catch (InterruptedException e) {e.printStackTrace();}});}public static void main(String[] args) {System.out.println("用戶注冊成功,開始處理通知...");CompletableFuture<Void> emailFuture = sendEmail("test@example.com", "歡迎注冊!");CompletableFuture<Void> smsFuture = sendSms("13800138000", "歡迎注冊!");// 主業務流程可以繼續,無需等待通知發送完成System.out.println("主業務流程繼續執行...");// 可以選擇等待所有通知發送完成,或者不等待CompletableFuture.allOf(emailFuture, smsFuture).join();System.out.println("所有通知任務已完成。");}
}
6.1.3 批量數據處理
當需要處理大量數據,并且每個數據的處理是獨立的時,可以使用 CompletableFuture
將數據分成小批次并行處理,從而提高整體處理效率。
示例場景: 批量處理用戶數據,對每個用戶進行數據清洗和存儲。
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;public class BatchDataProcessing {public static CompletableFuture<String> processUserData(String user) {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(200); // 模擬數據處理耗時System.out.println("處理用戶數據: " + user + " 完成");return user.toUpperCase(); // 模擬數據清洗} catch (InterruptedException e) {throw new IllegalStateException(e);}});}public static void main(String[] args) {List<String> users = Arrays.asList("user1", "user2", "user3", "user4", "user5");long start = System.currentTimeMillis();List<CompletableFuture<String>> futures = users.stream().map(BatchDataProcessing::processUserData).collect(Collectors.toList());// 等待所有用戶數據處理完成CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();// 獲取所有處理結果List<String> processedUsers = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());System.out.println("\n所有用戶數據處理完成,結果: " + processedUsers);long end = System.currentTimeMillis();System.out.println("總耗時: " + (end - start) + " ms");}
}
6.2 最佳實踐
-
合理選擇線程池:
CompletableFuture
默認使用ForkJoinPool.commonPool()
。對于 CPU 密集型任務,默認線程池通常是合適的。但對于 I/O 密集型任務,建議自定義線程池,并根據實際情況調整線程數量,避免線程饑餓或資源浪費。例如,可以使用Executors.newFixedThreadPool()
或ThreadPoolExecutor
。// 自定義線程池示例 ExecutorService customExecutor = Executors.newFixedThreadPool(10); CompletableFuture.supplyAsync(() -> {// 耗時操作return "Result"; }, customExecutor);
-
避免過度使用
get()
和join()
:雖然get()
和join()
可以獲取CompletableFuture
的結果,但它們是阻塞的。過度使用會導致異步優勢喪失,甚至引入死鎖。應盡量使用thenApply()
、thenAccept()
、thenCompose()
等非阻塞的回調方法來構建異步鏈。 -
善用異常處理機制:
exceptionally()
、handle()
和whenComplete()
提供了靈活的異常處理方式。根據業務需求選擇合適的異常處理策略,確保異步任務的健壯性。exceptionally()
適用于從異常中恢復并提供替代結果的場景,handle()
適用于無論成功失敗都需要統一處理的場景,而whenComplete()
適用于執行一些副作用操作(如日志記錄、資源清理)而不改變結果的場景。 -
鏈式調用與組合的合理運用:充分利用
CompletableFuture
提供的鏈式調用和組合方法,將復雜的異步邏輯拆解成更小、更易管理的部分。這不僅使代碼更具可讀性,也更容易進行測試和維護。特別注意thenApply()
和thenCompose()
的區別,避免不必要的嵌套。 -
超時處理:對于可能長時間運行的異步任務,考慮添加超時機制,避免資源無限期占用。雖然
CompletableFuture
本身沒有直接的超時方法,但可以通過CompletableFuture.orTimeout()
(Java 9+) 或結合CompletableFuture.runAfter()
等方法實現。// Java 9+ 超時處理示例 CompletableFuture<String> futureWithTimeout = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(5); // 模擬長時間任務return "Task Completed";} catch (InterruptedException e) {throw new IllegalStateException(e);} }).orTimeout(2, TimeUnit.SECONDS); // 2秒后超時try {System.out.println(futureWithTimeout.get()); } catch (Exception e) {System.out.println("任務超時或異常: " + e.getMessage()); }
-
日志記錄:在異步任務的關鍵節點添加日志,便于追蹤任務執行狀態和排查問題。可以使用
whenComplete()
或handle()
來記錄任務的成功或失敗。
7. 總結
CompletableFuture
是 Java 8 引入的異步編程利器,它通過提供非阻塞、可組合、靈活的異常處理機制,極大地提升了 Java 在并發編程領域的表現力。本文從 CompletableFuture
的基本概念、創建方式、鏈式操作、組合操作以及異常處理等方面進行了深入解析,并通過豐富的代碼示例展示了其在實際應用中的強大功能。
掌握 CompletableFuture
的核心 API 和最佳實踐,能夠幫助 Java 后端開發者更好地應對高并發、低延遲的挑戰,構建出高性能、高響應、易于維護的現代后端服務。