CompletableFuture異步編程
- CompletableFuture介紹
- 與傳統 Future 的對比
- 使用方法
- 1. 使用 supplyAsync(有返回值)
- 使用 runAsync(無返回值)
- 指定自定義線程池
- 處理異步結果
- 1. thenApply:轉換結果
- 2.thenAccept:消費結果
- 3.thenRun:完成后執行操作
- 組合任務
- 1. thenCompose:串聯兩個任務
- 2. thenCombine:合并兩個任務結果
- 3. allOf:等待所有任務完成
- 4. anyOf:任意一個任務完成
- 異常處理
- 1. exceptionally:捕獲異常并返回默認值
- 2. handle:無論成功/失敗都處理
- 3. whenComplete:記錄日志但不修改結果
- 完整示例:鏈式調用 + 異常處理
- 關鍵點總結
CompletableFuture介紹
1.基礎概念
CompletableFuture 是 Java 8 引入的一個類,用于表示異步計算的結果。它實現了 Future 接口,但比傳統的 Future 更強大,支持:
-
非阻塞操作:通過回調函數處理結果,無需手動調用 get() 阻塞線程。
-
鏈式編程:將多個異步任務串聯或并聯,形成復雜的執行流水線。
-
異常處理:提供統一的異常捕獲和恢復機制。
2. 核心思想
-
異步編程:將耗時的操作(如I/O、網絡請求)交給其他線程執行,主線程繼續處理其他任務。
-
函數式風格:通過 thenApply、thenAccept 等方法,以聲明式的方式組合任務。
3. 關鍵特點
-
回調驅動:任務完成后自動觸發后續操作。
-
線程池集成:支持自定義線程池,避免資源競爭。
-
結果依賴管理:輕松處理多個任務之間的依賴關系(如A任務的結果是B任務的輸入)。
與傳統 Future 的對比
特性 | Future | CompletableFuture |
---|---|---|
結果獲取 | 阻塞調用 get() | 非阻塞回調(thenAccept) |
任務組合 | 需要手動輪詢 | 鏈式調用(thenApply、thenCompose) |
異常處理 | 需在調用代碼中處理 | 內置 exceptionally、handle |
線程控制 | 依賴 ExecutorService | 支持自定義線程池 |
適用場景 | 簡單的異步任務 | 復雜的異步流水線 |
使用方法
1. 使用 supplyAsync(有返回值)
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {// 模擬耗時操作try {Thread.sleep(1000);}catch (InterruptedException e){e.printStackTrace();}return "00";});// 獲取結果(阻塞)String result = future.get();System.out.println("result:"+result);}
}
使用 runAsync(無返回值)
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("sleep 1m");});//等待任務完成completableFuture.get();}
}
指定自定義線程池
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executor = Executors.newFixedThreadPool(2);CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {return "Custom Thread Pool";}, executor);String s = future.get();System.out.println(s);}
}
處理異步結果
1. thenApply:轉換結果
thenApply 方法用于在 CompletableFuture 完成時應用一個函數,并返回計算的結果。它返回一個新的 CompletableFuture,該 CompletableFuture 的類型由函數返回值的類型決定。
語法:
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
示例:
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello").thenApply(s -> s + " World");String s = future.get();System.out.println(s);}
輸出
2.thenAccept:消費結果
thenAccept 方法用于在 CompletableFuture 完成時執行一個消費者(Consumer)操作,但不返回任何值(即它的返回類型是 void)。這通常用于執行一些副作用,比如打印日志、更新UI等,而不關心計算的結果。
語法:
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
示例:
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello").thenAccept(s -> System.out.println("Result: " + s));future.get();}
}
輸出
3.thenRun:完成后執行操作
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "Hello").thenRun(() -> System.out.println("Task finished"));future.get();}
}
輸出
組合任務
1. thenCompose:串聯兩個任務
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() ->{String a = "Hello";System.out.println(Thread.currentThread().getName() + "-a:" + a);return a;}).thenCompose(s -> CompletableFuture.supplyAsync(() -> {String r = " World";System.out.println(Thread.currentThread().getName() + "-r:" + r);return s + r;}));String s = future.get();// "Hello World"System.out.println(Thread.currentThread().getName() + "s:"+s);}
}
輸出:
2. thenCombine:合并兩個任務結果
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {String h = "Hello";System.out.println(Thread.currentThread().getName() + " h:" + h);return h;});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() ->{String w = " World";System.out.println(Thread.currentThread().getName() + " w:" + w);return w;});CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);String s = combined.get();System.out.println(Thread.currentThread().getName() + " s:" + s);}
}
輸出:
3. allOf:等待所有任務完成
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "Task1");CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "Task2");CompletableFuture<Void> all = CompletableFuture.allOf(task1, task2);all.thenRun(() -> System.out.println("All tasks completed"));}
}
輸出
4. anyOf:任意一個任務完成
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);}catch (InterruptedException e){e.printStackTrace();}return "Task1";});CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "Task2");CompletableFuture<Object> any = CompletableFuture.anyOf(task1, task2);any.thenAccept(result -> System.out.println("First result: " + result)); // 輸出 "Task2"}
}
輸出
異常處理
1. exceptionally:捕獲異常并返回默認值
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {if (true) throw new RuntimeException("Error!");return "Success";}).exceptionally(ex -> "Fallback Value");String s = future.get();// 返回 "Fallback Value"System.out.println(s);}
}
2. handle:無論成功/失敗都處理
public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {if (new Random().nextBoolean()) throw new RuntimeException("Error!");return "Success";}).handle((result, ex) -> {if (ex != null) return "Fallback";return result;});}
3. whenComplete:記錄日志但不修改結果
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello").whenComplete((result, ex) -> {if (ex != null) ex.printStackTrace();else System.out.println("Result: " + result);});String s = future.get();System.out.println(s);}
}
完整示例:鏈式調用 + 異常處理
public class MyThreadTest {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture.supplyAsync(() -> {// 步驟1:獲取用戶IDreturn 123;}).thenApply(userId -> {// 步驟2:根據用戶ID查詢名稱if (userId == 123) return "Alice";else throw new IllegalArgumentException("Invalid User ID");}).thenApply(userName -> {// 步驟3:轉換為大寫return userName.toUpperCase();}).exceptionally(ex -> {// 統一異常處理System.out.println("Error: " + ex.getMessage());return "DEFAULT_USER";}).thenAccept(finalResult -> {System.out.println("Final Result: " + finalResult); // 輸出 "ALICE" 或 "DEFAULT_USER"});}
}
輸出
關鍵點總結
異步執行:使用 supplyAsync/runAsync 啟動異步任務。
鏈式調用:通過 thenApply/thenAccept/thenRun 串聯操作。
組合任務:thenCompose(依賴)和 thenCombine(并行)合并結果。
異常處理:優先使用 exceptionally 或 handle 提供容錯。
線程池控制:避免使用默認線程池處理阻塞任務(如I/O)