CompletableFuture 深度解析

本文將探討 Java 8 引入的 CompletableFuture,一個在異步編程中實現非阻塞、可組合操作的強大工具。我們將從 CompletableFuture 的基本概念、與傳統 Future 的區別、核心 API 用法,到復雜的鏈式調用、組合操作以及異常處理進行全面解析,并通過豐富的代碼示例,幫助 Java 后端開發者更好地理解和應用 CompletableFuture,提升系統性能和響應能力。

1. 為什么需要 CompletableFuture

在現代后端開發中,高并發和低延遲是衡量系統性能的重要指標。傳統的同步編程模型在處理耗時操作(如網絡請求、數據庫查詢)時,會阻塞當前線程,導致系統吞吐量下降,用戶體驗變差。為了解決這一問題,異步編程應運而生。Java 5 引入的 Future 接口是異步編程的初步嘗試,它代表了一個異步計算的結果,但其局限性也日益凸顯。

1.1 傳統 Future 的局限性

Future 接口雖然提供了異步執行任務的能力,但其設計存在以下幾個主要局限性:

  • 阻塞式獲取結果Future.get() 方法會阻塞當前線程,直到異步任務完成并返回結果。這意味著,如果任務執行時間過長,調用線程將被長時間阻塞,無法執行其他操作,從而降低了系統的響應能力和資源利用率。
  • 無法方便地進行鏈式操作和組合Future 接口沒有提供直接的方法來將多個異步操作串聯起來,或者將多個異步操作的結果進行組合。當需要執行一系列相互依賴的異步任務時,開發者往往需要手動管理線程和回調,代碼變得復雜且難以維護,容易出現“回調地獄”(Callback Hell)。
  • 異常處理不便Future 接口的異常處理機制相對簡單。當異步任務拋出異常時,只有在調用 get() 方法時才能捕獲到 ExecutionException,這使得異常的傳播和處理變得不靈活,難以在異步流程中進行細粒度的錯誤控制。

1.2 CompletableFuture 的優勢

為了克服 Future 的這些局限性,Java 8 引入了 CompletableFuture 類。CompletableFuture 不僅實現了 Future 接口,還實現了 CompletionStage 接口,這使得它在異步編程方面擁有了前所未有的靈活性和強大功能。CompletableFuture 的主要優勢體現在:

  • 非阻塞CompletableFuture 通過回調機制實現了非阻塞操作。它允許你注冊一個回調函數,當異步任務完成時,該回調函數會被自動執行,而不會阻塞當前線程。這極大地提高了系統的并發能力和響應速度。
  • 可組合CompletableFuture 提供了豐富的 API,支持將多個異步操作進行鏈式調用和組合。你可以輕松地將一個異步任務的結果作為另一個異步任務的輸入,或者等待多個異步任務都完成后再執行某個操作。這種可組合性使得復雜的異步流程能夠以聲明式的方式清晰地表達,代碼結構更加簡潔。
  • 更靈活的異常處理CompletableFuture 提供了 exceptionally()handle() 等方法,允許開發者在異步任務的任何階段捕獲和處理異常。這使得異常處理變得更加靈活和可控,避免了傳統 Future 中異常處理的痛點。
  • 更強大的并發控制CompletableFuture 內部使用了 ForkJoinPool 作為默認的異步執行線程池,也可以自定義線程池。它能夠更好地利用多核處理器的優勢,實現高效的并發控制。

總之,CompletableFuture 是 Java 異步編程領域的一個里程碑式的改進,它為開發者提供了構建高性能、高響應、易于維護的并發應用程序的強大工具。

2. CompletableFuture 核心概念與基本用法

CompletableFuture 是 Java 8 中引入的一個強大的并發工具,它位于 java.util.concurrent 包中。

2.1 CompletableFuture 是什么

CompletableFuture<T> 是一個類,它實現了 Future<T>CompletionStage<T> 兩個接口。這意味著它既可以作為傳統 Future 的替代品,用于獲取異步計算的結果,又具備了 CompletionStage 接口提供的強大功能,支持鏈式操作和組合多個異步計算步驟。

  • Future<T> 接口:代表一個異步計算的結果。通過 get() 方法可以阻塞地獲取結果,或者通過 cancel() 方法取消任務。
  • CompletionStage<T> 接口:定義了一系列用于描述異步計算階段的方法。這些方法允許你在一個異步操作完成后執行另一個操作,而無需阻塞當前線程。這是 CompletableFuture 強大之處的核心。

簡單來說,CompletableFuture 代表了一個可能在未來某個時間點完成的異步計算的結果。這個結果可以是成功的值,也可以是計算過程中拋出的異常。

2.2 創建 CompletableFuture

CompletableFuture 提供了多種靜態方法來創建不同類型的異步任務:

2.2.1 CompletableFuture.runAsync(Runnable runnable)

用于執行一個沒有返回值的異步任務。它接受一個 Runnable 類型的參數,并在 ForkJoinPool.commonPool() 中異步執行該任務。如果需要指定線程池,可以使用 CompletableFuture.runAsync(Runnable runnable, Executor executor)

示例代碼:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCreation {public static void main(String[] args) throws InterruptedException {System.out.println("主線程開始");CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(2);System.out.println("異步任務執行完成,無返回值");} catch (InterruptedException e) {e.printStackTrace();}});// 主線程可以繼續執行其他任務,無需等待異步任務完成System.out.println("主線程繼續執行其他任務");// 等待異步任務完成(非阻塞方式,通過回調)future.thenRun(() -> System.out.println("異步任務真正完成后的回調"));// 為了演示效果,讓主線程等待一段時間,確保異步任務有時間執行TimeUnit.SECONDS.sleep(3);System.out.println("主線程結束");}
}

2.2.2 CompletableFuture.supplyAsync(Supplier supplier)

用于執行一個有返回值的異步任務。它接受一個 Supplier<T> 類型的參數,并在 ForkJoinPool.commonPool() 中異步執行該任務,返回一個 CompletableFuture<T>。同樣,也可以指定線程池:CompletableFuture.supplyAsync(Supplier<T> supplier, Executor executor)

示例代碼:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCreation {public static void main(String[] args) throws Exception {System.out.println("主線程開始");CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);System.out.println("異步任務執行完成,有返回值");return "Hello, CompletableFuture!";} catch (InterruptedException e) {throw new IllegalStateException(e);}});System.out.println("主線程繼續執行其他任務");// 阻塞式獲取結果(僅為演示,實際應用中應避免長時間阻塞)String result = future.get(); System.out.println("異步任務返回結果: " + result);System.out.println("主線程結束");}
}

2.2.3 new CompletableFuture()

你可以手動創建一個 CompletableFuture 實例,并在后續通過 complete()completeExceptionally() 方法來手動完成它。這在某些場景下非常有用,例如當你需要將一個非 CompletableFuture 風格的異步操作包裝成 CompletableFuture 時。

示例代碼:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCreation {public static void main(String[] args) throws Exception {System.out.println("主線程開始");CompletableFuture<String> future = new CompletableFuture<>();// 在另一個線程中執行耗時操作,并手動完成 CompletableFuturenew Thread(() -> {try {TimeUnit.SECONDS.sleep(2);future.complete("手動完成的 CompletableFuture");System.out.println("手動 CompletableFuture 已完成");} catch (InterruptedException e) {future.completeExceptionally(e);}}).start();System.out.println("主線程繼續執行其他任務");String result = future.get(); // 阻塞等待結果System.out.println("獲取到手動 CompletableFuture 的結果: " + result);System.out.println("主線程結束");}
}

2.2.4 CompletableFuture.completedFuture(T value)

如果你已經知道一個異步操作的結果,可以直接使用 completedFuture() 方法創建一個已經完成的 CompletableFuture。這對于測試或者某些特定場景非常方便,可以避免不必要的異步執行。

示例代碼:

import java.util.concurrent.CompletableFuture;public class CompletableFutureCreation {public static void main(String[] args) throws Exception {System.out.println("主線程開始");CompletableFuture<String> future = CompletableFuture.completedFuture("這是一個已完成的 CompletableFuture");System.out.println("主線程繼續執行其他任務");String result = future.get(); // 不會阻塞,立即返回結果System.out.println("獲取到已完成 CompletableFuture 的結果: " + result);System.out.println("主線程結束");}
}

2.3 獲取結果

CompletableFuture 完成后,你可以通過以下方法獲取其結果:

2.3.1 get()

get() 方法是 Future 接口中定義的方法,它會阻塞當前線程,直到 CompletableFuture 完成并返回結果。如果任務在完成時拋出異常,get() 方法會拋出 ExecutionException,其 getCause() 方法可以獲取到原始異常。

注意:在實際應用中,應盡量避免長時間阻塞主線程,get() 方法通常用于測試或在確定異步任務很快完成的場景。

2.3.2 join()

join() 方法與 get() 方法類似,也會阻塞當前線程并等待 CompletableFuture 完成。但不同的是,join() 方法不會拋出受檢異常 ExecutionException,而是將原始異常包裝成非受檢異常 CompletionException 拋出。這使得在鏈式調用中處理異常更加方便,無需在每個 get() 調用處都進行 try-catch

2.3.3 complete(T value)

complete() 方法用于手動完成 CompletableFuture,并為其設置一個結果值。如果 CompletableFuture 已經完成(無論是正常完成還是異常完成),再次調用 complete() 將無效。

2.3.4 completeExceptionally(Throwable ex)

completeExceptionally() 方法用于手動使 CompletableFuture 異常完成,并為其設置一個異常。這在異步任務執行過程中發生錯誤時非常有用,可以將異常信息傳遞給 CompletableFuture 的消費者。

示例代碼(get() vs join()):

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;public class CompletableFutureGetJoin {public static void main(String[] args) {// 正常完成的 CompletableFutureCompletableFuture<String> successFuture = CompletableFuture.supplyAsync(() -> "Success");try {String resultGet = successFuture.get();System.out.println("get() 正常結果: " + resultGet);String resultJoin = successFuture.join();System.out.println("join() 正常結果: " + resultJoin);} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}// 異常完成的 CompletableFutureCompletableFuture<String> exceptionFuture = CompletableFuture.supplyAsync(() -> {throw new RuntimeException("Something went wrong!");});try {exceptionFuture.get(); // 拋出 ExecutionException} catch (InterruptedException | ExecutionException e) {System.out.println("get() 捕獲到異常: " + e.getCause().getMessage());}try {exceptionFuture.join(); // 拋出 CompletionException} catch (Exception e) {System.out.println("join() 捕獲到異常: " + e.getCause().getMessage());}}
}

3. 鏈式操作:構建異步任務流

CompletableFuture 最強大的特性之一是其支持鏈式操作,允許我們將多個異步任務串聯起來,形成一個有向無環圖(DAG),從而構建復雜的異步任務流。這極大地簡化了異步編程的復雜性,避免了傳統回調模式帶來的“回調地獄”。

3.1 結果轉換:thenApply()

thenApply() 方法用于對上一個 CompletableFuture 的結果進行轉換,并返回一個新的 CompletableFuture。它接受一個 Function 函數式接口作為參數,該函數接收上一個 CompletableFuture 的結果作為輸入,并返回一個轉換后的新結果。thenApply() 是同步執行的,即轉換操作會在完成上一個 CompletableFuture 的線程中執行。如果需要異步執行轉換操作,可以使用 thenApplyAsync()

方法簽名:

<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

示例代碼:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureChaining {public static void main(String[] args) throws Exception {CompletableFuture<String> initialFuture = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Hello";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> transformedFuture = initialFuture.thenApply(s -> {System.out.println("thenApply 在 " + Thread.currentThread().getName() + " 線程中執行");return s + " World";});System.out.println("結果: " + transformedFuture.get()); // 輸出: 結果: Hello World}
}

3.2 消費結果:thenAccept()

thenAccept() 方法用于消費上一個 CompletableFuture 的結果,但不會返回新的結果(即返回 CompletableFuture<Void>)。它接受一個 Consumer 函數式接口作為參數,該函數接收上一個 CompletableFuture 的結果作為輸入,但沒有返回值。thenAccept() 同樣有異步版本 thenAcceptAsync()

方法簽名:

CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

示例代碼:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureChaining {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Hello";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<Void> voidFuture = future.thenAccept(s -> {System.out.println("thenAccept 在 " + Thread.currentThread().getName() + " 線程中執行");System.out.println("消費結果: " + s + ", 無返回值");});voidFuture.get(); // 等待消費完成}
}

3.3 任務完成:thenRun()

thenRun() 方法用于在上一個 CompletableFuture 完成后執行一個不關心結果且沒有返回值的任務。它接受一個 Runnable 函數式接口作為參數。thenRun() 同樣有異步版本 thenRunAsync()

方法簽名:

CompletableFuture<Void> thenRun(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action)
CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

示例代碼:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureChaining {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Hello";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<Void> voidFuture = future.thenRun(() -> {System.out.println("thenRun 在 " + Thread.currentThread().getName() + " 線程中執行");System.out.println("上一個任務已完成,執行不關心結果的任務");});voidFuture.get(); // 等待任務完成}
}

3.4 異步轉換:thenCompose()

thenCompose() 方法是 CompletableFuture 中非常重要的一個方法,它用于將上一個 CompletableFuture 的結果作為參數,創建并返回一個新的 CompletableFuture。這與 thenApply() 的區別在于,thenApply() 返回的是一個包含轉換后結果的 CompletableFuture,而 thenCompose() 返回的是一個扁平化的 CompletableFuture。當你的轉換函數本身也返回一個 CompletableFuture 時,thenCompose() 可以避免出現 CompletableFuture<CompletableFuture<T>> 這種嵌套結構,從而保持鏈的扁平化。

方法簽名:

<U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
<U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
<U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

示例代碼:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureChaining {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "User ID: 123";} catch (InterruptedException e) {throw new IllegalStateException(e);}});// thenApply 示例 (會產生嵌套)CompletableFuture<CompletableFuture<String>> nestedFuture = future1.thenApply(userId -> {System.out.println("thenApply 內部獲取到: " + userId);return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return userId + " - User Name: Alice";} catch (InterruptedException e) {throw new IllegalStateException(e);}});});System.out.println("thenApply 結果 (嵌套): " + nestedFuture.get().get()); // 需要兩次 get()// thenCompose 示例 (扁平化)CompletableFuture<String> flatFuture = future1.thenCompose(userId -> {System.out.println("thenCompose 內部獲取到: " + userId);return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return userId + " - User Name: Bob";} catch (InterruptedException e) {throw new IllegalStateException(e);}});});System.out.println("thenCompose 結果 (扁平化): " + flatFuture.get()); // 只需一次 get()}
}

4. 組合操作:處理多個異步任務

在實際應用中,我們經常需要處理多個獨立的異步任務,并在它們全部完成或其中任意一個完成時執行后續操作。CompletableFuture 提供了強大的組合方法,使得這些場景的處理變得非常優雅和高效。

4.1 組合兩個結果:thenCombine()

thenCombine() 方法用于當兩個 CompletableFuture 都完成后,將它們的結果組合起來,并返回一個新的 CompletableFuture。它接受另一個 CompletionStage 和一個 BiFunction 作為參數,BiFunction 接收兩個 CompletableFuture 的結果作為輸入,并返回一個組合后的新結果。

方法簽名:

<U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
<U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

示例代碼:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCombination {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Hello";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);return "World";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (s1, s2) -> {System.out.println("thenCombine 在 " + Thread.currentThread().getName() + " 線程中執行");return s1 + " " + s2;});System.out.println("組合結果: " + combinedFuture.get()); // 輸出: 組合結果: Hello World}
}

4.2 所有任務完成:allOf()

allOf() 方法用于等待所有給定的 CompletableFuture 都完成。它接受一個 CompletableFuture 數組作為參數,并返回一個 CompletableFuture<Void>。當所有輸入的 CompletableFuture 都成功完成時,返回的 CompletableFuture 才會完成。如果其中任何一個 CompletableFuture 異常完成,那么 allOf() 返回的 CompletableFuture 也會異常完成。

方法簽名:

static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

示例代碼:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCombination {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);return "Result from Future 1";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Result from Future 2";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(3);return "Result from Future 3";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);// 等待所有任務完成allFutures.get(); System.out.println("所有任務都已完成!");// 可以通過各自的 future.get() 獲取結果System.out.println(future1.get());System.out.println(future2.get());System.out.println(future3.get());}
}

4.3 任意任務完成:anyOf()

anyOf() 方法用于當任何一個給定的 CompletableFuture 完成時,就完成當前的 CompletableFuture。它接受一個 CompletableFuture 數組作為參數,并返回一個 CompletableFuture<Object>。返回的 CompletableFuture 的結果將是第一個完成的 CompletableFuture 的結果。

方法簽名:

static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

示例代碼:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class CompletableFutureCombination {public static void main(String[] args) throws Exception {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(3);return "Result from Future 1";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);return "Result from Future 2";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);return "Result from Future 3";} catch (InterruptedException e) {throw new IllegalStateException(e);}});CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);System.out.println("第一個完成的任務結果: " + anyOfFuture.get()); // 輸出: 第一個完成的任務結果: Result from Future 2}
}

5. 異常處理

在異步編程中,異常處理是一個非常重要的環節。CompletableFuture 提供了多種機制來優雅地處理異步任務中可能出現的異常,避免了傳統 Future 中異常處理的繁瑣和不便。

5.1 異常處理:exceptionally()

exceptionally() 方法用于當 CompletableFuture 出現異常時,提供一個替代結果。它接受一個 Function<Throwable, ? extends T> 作為參數,當上一個 CompletableFuture 異常完成時,該函數會被調用,并接收異常作為輸入,然后返回一個替代值作為當前 CompletableFuture 的結果。如果上一個 CompletableFuture 正常完成,exceptionally() 不會執行。

方法簽名:

CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

示例代碼:

import java.util.concurrent.CompletableFuture;public class CompletableFutureExceptionHandling {public static void main(String[] args) throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {if (Math.random() < 0.5) {throw new RuntimeException("模擬異常");}return "正常結果";}).exceptionally(ex -> {System.out.println("捕獲到異常: " + ex.getMessage());return "從異常中恢復的默認結果";});System.out.println("最終結果: " + future.get());}
}

5.2 統一處理:handle()

handle() 方法是一個更通用的處理方法,無論 CompletableFuture 是正常完成還是異常完成,它都會執行。它接受一個 BiFunction<? super T, Throwable, ? extends U> 作為參數,該函數接收上一個 CompletableFuture 的結果和可能發生的異常作為輸入。如果正常完成,異常參數為 null;如果異常完成,結果參數為 nullhandle() 的返回值將作為當前 CompletableFuture 的結果。

方法簽名:

<U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
<U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

示例代碼:

import java.util.concurrent.CompletableFuture;public class CompletableFutureExceptionHandling {public static void main(String[] args) throws Exception {// 正常情況CompletableFuture<String> normalFuture = CompletableFuture.supplyAsync(() -> "正常數據").handle((result, ex) -> {if (ex != null) {System.out.println("handle 捕獲到異常: " + ex.getMessage());return "處理異常后的結果";} else {System.out.println("handle 正常處理結果: " + result);return result + " - 處理完成";}});System.out.println("正常情況最終結果: " + normalFuture.get());// 異常情況CompletableFuture<String> exceptionFuture = CompletableFuture.supplyAsync(() -> {throw new RuntimeException("模擬異常");}).handle((result, ex) -> {if (ex != null) {System.out.println("handle 捕獲到異常: " + ex.getMessage());return "處理異常后的結果";} else {System.out.println("handle 正常處理結果: " + result);return result + " - 處理完成";}});System.out.println("異常情況最終結果: " + exceptionFuture.get());}
}

5.3 完成時回調:whenComplete()

whenComplete() 方法用于當 CompletableFuture 完成時執行一個回調,無論它是正常完成還是異常完成。它接受一個 BiConsumer<? super T, ? super Throwable> 作為參數,該函數接收結果和異常作為輸入。與 handle() 不同的是,whenComplete() 不會修改 CompletableFuture 的結果,主要用于日志記錄、資源清理或觸發后續不依賴結果的操作。如果 whenComplete() 內部拋出異常,該異常會覆蓋原始的異常(如果存在)。

方法簽名:

CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)

示例代碼:

import java.util.concurrent.CompletableFuture;public class CompletableFutureExceptionHandling {public static void main(String[] args) throws Exception {// 正常情況CompletableFuture<String> normalFuture = CompletableFuture.supplyAsync(() -> "正常數據").whenComplete((result, ex) -> {if (ex != null) {System.out.println("whenComplete 捕獲到異常: " + ex.getMessage());} else {System.out.println("whenComplete 正常完成,結果: " + result);}});System.out.println("正常情況最終結果: " + normalFuture.get());// 異常情況CompletableFuture<String> exceptionFuture = CompletableFuture.supplyAsync(() -> {throw new RuntimeException("模擬異常");}).whenComplete((result, ex) -> {if (ex != null) {System.out.println("whenComplete 捕獲到異常: " + ex.getMessage());} else {System.out.println("whenComplete 正常完成,結果: " + result);}});try {exceptionFuture.get(); // 原始異常會被重新拋出} catch (Exception e) {System.out.println("get() 捕獲到原始異常: " + e.getCause().getMessage());}}
}

6. 實際應用場景與最佳實踐

CompletableFuture 在 Java 后端開發中有著廣泛的應用,尤其是在需要處理大量并發請求、優化系統響應時間以及構建高吞吐量服務的場景下。合理地運用 CompletableFuture 可以顯著提升系統性能和用戶體驗。

6.1 實際應用場景

6.1.1 并行調用多個微服務

在微服務架構中,一個業務請求可能需要調用多個下游微服務來獲取數據。如果這些調用是串行的,那么總的響應時間將是所有微服務響應時間之和。通過 CompletableFuture,我們可以并行地發起對多個微服務的調用,然后使用 allOf()thenCombine() 等方法等待所有結果或組合結果,從而大大縮短響應時間。

示例場景: 用戶下單時,需要同時查詢商品庫存、用戶積分和優惠券信息。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class MicroserviceParallelCall {// 模擬查詢商品庫存的微服務public static CompletableFuture<Integer> getProductStock(String productId) {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1); // 模擬網絡延遲System.out.println("查詢商品 " + productId + " 庫存完成");return 100; // 假設庫存100} catch (InterruptedException e) {throw new IllegalStateException(e);}});}// 模擬查詢用戶積分的微服務public static CompletableFuture<Integer> getUserPoints(String userId) {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1.5); // 模擬網絡延遲System.out.println("查詢用戶 " + userId + " 積分完成");return 2000; // 假設積分2000} catch (InterruptedException e) {throw new IllegalStateException(e);}});}// 模擬查詢優惠券信息的微服務public static CompletableFuture<String> getCouponInfo(String userId) {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(0.8); // 模擬網絡延遲System.out.println("查詢用戶 " + userId + " 優惠券完成");return "滿100減10"; // 假設優惠券信息} catch (InterruptedException e) {throw new IllegalStateException(e);}});}public static void main(String[] args) throws Exception {long start = System.currentTimeMillis();CompletableFuture<Integer> stockFuture = getProductStock("P001");CompletableFuture<Integer> pointsFuture = getUserPoints("U001");CompletableFuture<String> couponFuture = getCouponInfo("U001");// 等待所有異步任務完成CompletableFuture.allOf(stockFuture, pointsFuture, couponFuture).join();// 獲取結果并處理Integer stock = stockFuture.get();Integer points = pointsFuture.get();String coupon = couponFuture.get();System.out.println("\n所有信息查詢完成:");System.out.println("商品庫存: " + stock);System.out.println("用戶積分: " + points);System.out.println("優惠券信息: " + coupon);long end = System.currentTimeMillis();System.out.println("總耗時: " + (end - start) + " ms");}
}

6.1.2 異步發送通知(郵件、短信)

在用戶注冊、訂單支付成功等場景下,系統通常需要發送郵件、短信或站內信等通知。這些通知操作通常不影響主業務流程,但如果同步執行,可能會增加主業務的響應時間。使用 CompletableFuture 可以將這些通知操作異步化,提升主業務的響應速度。

示例場景: 用戶注冊成功后,異步發送歡迎郵件和短信。

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;public class AsyncNotification {public static CompletableFuture<Void> sendEmail(String email, String content) {return CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(1); // 模擬郵件發送耗時System.out.println("郵件發送成功到: " + email + ", 內容: " + content);} catch (InterruptedException e) {e.printStackTrace();}});}public static CompletableFuture<Void> sendSms(String phoneNumber, String content) {return CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(0.5); // 模擬短信發送耗時System.out.println("短信發送成功到: " + phoneNumber + ", 內容: " + content);} catch (InterruptedException e) {e.printStackTrace();}});}public static void main(String[] args) {System.out.println("用戶注冊成功,開始處理通知...");CompletableFuture<Void> emailFuture = sendEmail("test@example.com", "歡迎注冊!");CompletableFuture<Void> smsFuture = sendSms("13800138000", "歡迎注冊!");// 主業務流程可以繼續,無需等待通知發送完成System.out.println("主業務流程繼續執行...");// 可以選擇等待所有通知發送完成,或者不等待CompletableFuture.allOf(emailFuture, smsFuture).join();System.out.println("所有通知任務已完成。");}
}

6.1.3 批量數據處理

當需要處理大量數據,并且每個數據的處理是獨立的時,可以使用 CompletableFuture 將數據分成小批次并行處理,從而提高整體處理效率。

示例場景: 批量處理用戶數據,對每個用戶進行數據清洗和存儲。

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;public class BatchDataProcessing {public static CompletableFuture<String> processUserData(String user) {return CompletableFuture.supplyAsync(() -> {try {TimeUnit.MILLISECONDS.sleep(200); // 模擬數據處理耗時System.out.println("處理用戶數據: " + user + " 完成");return user.toUpperCase(); // 模擬數據清洗} catch (InterruptedException e) {throw new IllegalStateException(e);}});}public static void main(String[] args) {List<String> users = Arrays.asList("user1", "user2", "user3", "user4", "user5");long start = System.currentTimeMillis();List<CompletableFuture<String>> futures = users.stream().map(BatchDataProcessing::processUserData).collect(Collectors.toList());// 等待所有用戶數據處理完成CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();// 獲取所有處理結果List<String> processedUsers = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());System.out.println("\n所有用戶數據處理完成,結果: " + processedUsers);long end = System.currentTimeMillis();System.out.println("總耗時: " + (end - start) + " ms");}
}

6.2 最佳實踐

  • 合理選擇線程池CompletableFuture 默認使用 ForkJoinPool.commonPool()。對于 CPU 密集型任務,默認線程池通常是合適的。但對于 I/O 密集型任務,建議自定義線程池,并根據實際情況調整線程數量,避免線程饑餓或資源浪費。例如,可以使用 Executors.newFixedThreadPool()ThreadPoolExecutor

    // 自定義線程池示例
    ExecutorService customExecutor = Executors.newFixedThreadPool(10);
    CompletableFuture.supplyAsync(() -> {// 耗時操作return "Result";
    }, customExecutor);
    
  • 避免過度使用 get()join():雖然 get()join() 可以獲取 CompletableFuture 的結果,但它們是阻塞的。過度使用會導致異步優勢喪失,甚至引入死鎖。應盡量使用 thenApply()thenAccept()thenCompose() 等非阻塞的回調方法來構建異步鏈。

  • 善用異常處理機制exceptionally()handle()whenComplete() 提供了靈活的異常處理方式。根據業務需求選擇合適的異常處理策略,確保異步任務的健壯性。exceptionally() 適用于從異常中恢復并提供替代結果的場景,handle() 適用于無論成功失敗都需要統一處理的場景,而 whenComplete() 適用于執行一些副作用操作(如日志記錄、資源清理)而不改變結果的場景。

  • 鏈式調用與組合的合理運用:充分利用 CompletableFuture 提供的鏈式調用和組合方法,將復雜的異步邏輯拆解成更小、更易管理的部分。這不僅使代碼更具可讀性,也更容易進行測試和維護。特別注意 thenApply()thenCompose() 的區別,避免不必要的嵌套。

  • 超時處理:對于可能長時間運行的異步任務,考慮添加超時機制,避免資源無限期占用。雖然 CompletableFuture 本身沒有直接的超時方法,但可以通過 CompletableFuture.orTimeout() (Java 9+) 或結合 CompletableFuture.runAfter() 等方法實現。

    // Java 9+ 超時處理示例
    CompletableFuture<String> futureWithTimeout = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(5); // 模擬長時間任務return "Task Completed";} catch (InterruptedException e) {throw new IllegalStateException(e);}
    }).orTimeout(2, TimeUnit.SECONDS); // 2秒后超時try {System.out.println(futureWithTimeout.get());
    } catch (Exception e) {System.out.println("任務超時或異常: " + e.getMessage());
    }
    
  • 日志記錄:在異步任務的關鍵節點添加日志,便于追蹤任務執行狀態和排查問題。可以使用 whenComplete()handle() 來記錄任務的成功或失敗。

7. 總結

CompletableFuture 是 Java 8 引入的異步編程利器,它通過提供非阻塞、可組合、靈活的異常處理機制,極大地提升了 Java 在并發編程領域的表現力。本文從 CompletableFuture 的基本概念、創建方式、鏈式操作、組合操作以及異常處理等方面進行了深入解析,并通過豐富的代碼示例展示了其在實際應用中的強大功能。

掌握 CompletableFuture 的核心 API 和最佳實踐,能夠幫助 Java 后端開發者更好地應對高并發、低延遲的挑戰,構建出高性能、高響應、易于維護的現代后端服務。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/89003.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/89003.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/89003.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

給自己網站增加一個免費的AI助手,純HTML

助手效果圖 看完這篇文章&#xff0c;你將免費擁有你自己的Ai助手&#xff0c;全程干貨&#xff0c;先到先得 獲取免費的AI大模型接口 訪問這個地址 生成key https://openrouter.ai/mistralai/mistral-small-3.2-24b-instruct:free/api 或者調用其他的免費大模型&#xff0c;這…

ASProxy64.dll導致jetbrains家的IDE都無法打開。

在Windows11中,無法打開jetbrains的IDE的軟件,經過排查,發現與ASProxy64.dll有關。 E:\idea\IntelliJ IDEA 2024.1.7\bin>idea.bat CompileCommand: exclude com/intellij/openapi/vfs/impl/FilePartNodeRoot.trieDescend bool exclude = true # # A fatal error has bee…

springboot+Vue逍遙大藥房管理系統

概述 基于springbootVue開發的逍遙大藥房管理系統。該系統功能完善&#xff0c;既包含強大的后臺管理模塊&#xff0c;又具備用戶友好的前臺展示界面。 主要內容 一、后臺管理系統功能 ??核心管理模塊??&#xff1a; 用戶管理&#xff1a;管理員與普通用戶權限分級藥品分…

探索阿里云智能媒體管理IMM:解鎖媒體處理新境界

一、引言&#xff1a;開啟智能媒體管理新時代 在數字化浪潮的席卷下&#xff0c;媒體行業正經歷著前所未有的變革。從傳統媒體到新媒體的轉型&#xff0c;從內容生產到傳播分發&#xff0c;每一個環節都在尋求更高效、更智能的解決方案。而云計算&#xff0c;作為推動這一變革…

[附源碼+數據庫+畢業論文]基于Spring+MyBatis+MySQL+Maven+jsp實現的新生報道管理系統,推薦!

摘要 隨著信息技術在管理上越來越深入而廣泛的應用&#xff0c;管理信息系統的實施在技術上已逐步成熟。本文介紹了新生報道管理系統的開發全過程。通過分析高校新生入學報到信息管理的不足&#xff0c;創建了一個計算機管理高校新生入學報到信息的方案。文章介紹了新生報道管…

給定一個整型矩陣map,求最大的矩形區域為1的數量

題目: 給定一個整型矩陣map,其中的值只有0和1兩種,求其中全是1的 所有矩形區域中,最大的矩形區域為1的數量。 例如: 1 1 1 0 其中,最大的矩形區域有3個1,所以返回3。 再如: 1 0 1 1 1 1 1 1 1 1 1 0 其中,最大的矩形區域有6個1,所以返回6。 解題思…

第8章-財務數據

get_fund # 查看股票代碼000001.XSHE在2022年9月1日的總市值 q query( valuation ).filter( valuation.code 000001.XSHE ) df get_fundamentals(q, 2022-09-01) print(df[market_cap][0]) # 獲取第一行的market_cap值 這段代碼看起來是用于查詢股票在特定日期的總…

SQL關鍵字三分鐘入門:ROW_NUMBER() —— 窗口函數為每一行編號

在進行數據分析時&#xff0c;我們常常需要為查詢結果集中的每條記錄生成一個唯一的序號或行號。例如&#xff1a; 為每位員工按照入職時間排序并編號&#xff1b;按照訂單金額對訂單進行排序&#xff0c;并給每個訂單分配一個順序編號&#xff1b;在分組數據內為每條記錄編號…

微信小程序如何實現通過郵箱驗證修改密碼功能

基于騰訊云開發&#xff08;Tencent Cloud Base&#xff09;實現小程序郵箱驗證找回密碼功能的完整邏輯說明及關鍵代碼實現。結合安全性和開發效率&#xff0c;方案采用 ??云函數 小程序前端?? 的架構&#xff0c;使用 ??Nodemailer?? 發送郵件。Nodemailer 是一個專為…

C# VB.NET中Tuple輕量級數據結構和固定長度數組

C# VB.NET取字符串中全角字符數量和半角字符數量-CSDN博客 https://blog.csdn.net/xiaoyao961/article/details/148871910 在VB.NET中&#xff0c;使用Tuple和固定長度數組在性能上有細微差異&#xff0c;以下是詳細分析&#xff1a; 性能對比測試 通過測試 100 萬次調用&am…

建筑物年代預測與空間異質性分析解決方案

建筑物年代預測與空間異質性分析解決方案 1. 問題分析與創新點設計 核心任務:預測建筑物建造年代,并分析空間異質性對預測的影響 創新點設計: 空間權重矩陣集成:構建空間鄰接矩陣量化地理鄰近效應多尺度特征提取:融合建筑物微觀特征與街區宏觀特征異質性分區建模:基于…

FOUPK3system5XOS

Foupk3systemX5OS系統19.60內測版&#xff08;X9&#xff09;2023年4月16日正式發布 1.0Foupk3systemX5OS系統19.60&#xff08;X9&#xff09;2024年10月6日發布 Foupk3systemX5OS系統19.60增強版&#xff08;X9X5&#xff09;2024年10月6日發布Foupk3systemX5OS系統19.60正…

隨機生成的亂碼域名”常由**域名生成算法(DGA)** 產生

“隨機生成的亂碼域名”常由**域名生成算法&#xff08;DGA&#xff09;** 產生&#xff0c;是網絡攻擊&#xff08;尤其是僵尸網絡、惡意軟件控制場景 &#xff09;中躲避檢測的手段&#xff0c;以下是關鍵解析&#xff1a; ### 一、本質與產生邏輯 亂碼域名是攻擊者利用 **DG…

Solidity學習 - 繼承

文章目錄 前言繼承的基本概念繼承的基本用法單繼承實現函數重寫&#xff08;overriding&#xff09; 構造函數的繼承處理多重繼承抽象合約 前言 繼承是面向對象編程中的核心概念之一&#xff0c;Solidity作為一種面向對象的智能合約語言&#xff0c;同樣支持繼承機制。通過繼承…

依賴注入(Dependency Injection, DI)的核心概念和解決的核心問題

核心概念&#xff1a; 依賴注入是一種設計模式&#xff0c;也是實現控制反轉&#xff08;Inversion of Control, IoC&#xff09; 原則的一種具體技術。其核心思想是&#xff1a; 解耦&#xff1a; 將一個類&#xff08;客戶端&#xff09;所依賴的其他類或服務&#xff08;依…

Reactor Schedulers

Reactor 是一個基于響應式編程的庫&#xff0c;它提供了豐富的調度器&#xff08;Schedulers&#xff09;機制&#xff0c;用于管理異步操作的執行環境。Schedulers 是 Reactor 中的核心組件之一&#xff0c;它們允許開發者靈活地控制操作符和訂閱操作在哪個線程上執行&#xf…

設備樹引入

一、設備樹的基本知識 1、什么是設備樹&#xff1f;為什么會有設備樹&#xff1f; 2011年&#xff0c;Linux之父Linus Torvalds發現這個問題后&#xff0c;就通過郵件向ARM-Linux開發社區發了一封郵件&#xff0c;不禁的發出了一句“This whole ARM thing is a f*cking pain i…

【數據標注師】3D標注

目錄 一、 **3D標注知識體系框架**二、 **五階能力培養體系**? **階段1&#xff1a;空間認知筑基&#xff08;2-3周&#xff09;**? **階段2&#xff1a;核心標注技能深化**? **階段3&#xff1a;復雜場景解決方案**? **階段4&#xff1a;領域深度專精? **階段5&#xff1…

華為HN8145V光貓改華為藍色公版界面,三網通用,xgpon公版光貓

咸魚只賣20多元一個&#xff0c;還是xgpon的萬兆貓&#xff0c;性價比不錯哦 除了沒有2.5G網口&#xff0c;其他還行。 改成公版光貓后&#xff0c;運營商是無法納管光貓&#xff0c;無法后臺修改光貓數據及超密。 華為 HN8145V 光貓具有以下特點&#xff1a; 性能方面 高速接…

【LeetCode 熱題 100】438. 找到字符串中所有字母異位詞——(解法二)定長滑動窗口+數組

Problem: 438. 找到字符串中所有字母異位詞 題目&#xff1a;給定兩個字符串 s 和 p&#xff0c;找到 s 中所有 p 的 異位詞 的子串&#xff0c;返回這些子串的起始索引。不考慮答案輸出的順序。 【LeetCode 熱題 100】438. 找到字符串中所有字母異位詞——&#xff08;解法一&…