一、 CompletableFuture介紹
多線程開發一般使用Runnable,Callable,Thread,FutureTask,ThreadPoolExecutor,但也有不近如意的地方
Thread + Runnable:執行異步任務,沒有返回結果。
Thread + Callable + FutureTask:執行一步任務,有返回結果。
獲取返回結果,基于get方法獲取,線程需要掛起在WaitNode里。
獲取返回結果,基于isDone判斷任務的狀態,但是這里需要不斷輪詢。
上述的方式都是有一定的局限性的
CompletableFuture是Java 8中引入的一種實現異步編程模型的方式,它是Future的擴展,提供了更強大、更靈活的功能。CompletableFuture可以表示兩個異步任務之間的順序關系或并行關系,同時提供了一些方便的工具方法來組合這些關系。
二、 應用
首先對CompletableFuture提供的函數式編程中三個函數有一個掌握
Supplier 生產者,沒有入參,有返回結果
Consumer 消費者,有入參,但是沒有返回結果
Function<T,U> 函數,有入參,又有返回結果
2.1 功能
并行執行
allOf():當所有給定的 CompletableFuture 完成時,返回一個新的 CompletableFuture 。
anyOf():當任何一個給定的CompletablFuture完成時,返回一個新的CompletableFutur。
依賴關系
thenApply():把前面任務的執行結果,交給后面的Function。
thenCompose():用來連接兩個有依賴關系的任務,結果由第二個任務返回。
or聚合關系
applyToEither():兩個任務哪個執行的快,就使用哪一個結果,有返回值 。
acceptEither():兩個任務哪個執行的快,就消費哪一個結果,無返回值 。
runAfterEither():任意一個任務執行完成,進行下一步操作(Runnable類型任務。
and集合關系
thenCombine():合并任務,有返回值 。
thenAccepetBoth():兩個任務執行完成后,將結果交給thenAccepetBoth處理,無返回值 。
runAfterBoth():兩個任務都執行完成后,執行下一步操作(Runnable類型任務。
結果處理
whenComplete:當任務完成時,將使用結果(或 null)和此階段的異常(或 null如果沒有)執行給定操作。
exceptionally:返回一個新的CompletableFuture,當前面的CompletableFuture完成時,它也完成,當它異常完成時,給定函數的異常觸發這個CompletableFuture的完成。
2.2 supplyAsync
CompletableFuture如果不提供線程池的話,默認使用的ForkJoinPool,而ForkJoinPool內部是守護線程,如果main線程結束了,守護線程會跟著一起結束。
java 體驗AI代碼助手 代碼解讀復制代碼 public static void main(String[] args) {
CompletableFuture firstTask = CompletableFuture.supplyAsync(() -> {
System.out.println(“task begin!”);
System.out.println(“task end!”);
return “result”;
});
String result1 = firstTask.join();
String result2 = null;
try {result2 = firstTask.get();
} catch (InterruptedException e) {e.printStackTrace();
} catch (ExecutionException e) {e.printStackTrace();
}System.out.println(result1 + "," + result2);
}
2.3 runAsync
當前方式既不會接收參數,也不會返回任何結果,非常基礎的任務編排方式
java 體驗AI代碼助手 代碼解讀復制代碼 public static void main(String[] args) throws IOException {
CompletableFuture.runAsync(() -> {
System.out.println(“task begin!”);
System.out.println(“task end!”);
});
System.in.read();
}
2.4 thenApply,thenApplyAsync
有任務A,還有任務B。任務B需要在任務A執行完畢后再執行。而且任務B需要任務A的返回結果。
任務B自身也有返回結果。thenApply可以拼接異步任務,前置任務處理完之后,將返回結果交給后置任務,然后后置任務再執行thenApply提供了帶有Async的方法,可以指定每個任務使用的具體線程池。
thenApply:
java 體驗AI代碼助手 代碼解讀復制代碼 public static void main(String[] args) {
CompletableFuture taskA = CompletableFuture.supplyAsync(() -> {
String id = UUID.randomUUID().toString();
System.out.println(“taskA:” + id);
return id;
});
CompletableFuture taskB = taskA.thenApply(result -> {
System.out.println(“taskA resule:” + result);
result = result.replace(“-”, “”);
return result;
});
System.out.println("main task deal result:" + taskB.join());
}
thenApplyAsync:
java 體驗AI代碼助手 代碼解讀復制代碼 public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture taskB = CompletableFuture.supplyAsync(() -> {
String id = UUID.randomUUID().toString();
System.out.println(“taskA:” + id + “,” + Thread.currentThread().getName());
return id;
}).thenApplyAsync(result -> {
System.out.println(“taskB get taskA result:” + result + “,” + Thread.currentThread().getName());
result = result.replace(“-”, “”);
return result;
},executor);
System.out.println("main thread:" + taskB.join());
}
2.5 thenAccept,thenAcceptAsync(自定義線程池)
套路和thenApply一樣,都是任務A和任務B的拼接前置任務需要有返回結果,后置任務會接收前置任務的結果,返回后置任務,沒有返回值
java 體驗AI代碼助手 代碼解讀復制代碼 public static void main(String[] args) throws IOException {
CompletableFuture.supplyAsync(() -> {
System.out.println(“taskA”);
return “abcdefg”;
}).thenAccept(result -> {
System.out.println(“taskB,get result:” + result);
});
System.in.read();
}
2.6 thenRun,thenRunAsync(自定義線程池)
套路和thenApply,thenAccept一樣,都是任務A和任務B的拼接,前置任務沒有返回結果,后置任務不接收前置任務結果,后置任務也沒有返回結果。
java 體驗AI代碼助手 代碼解讀復制代碼 public static void main(String[] args) throws IOException {
CompletableFuture.runAsync(() -> {
System.out.println(“taskA begin!”);
}).thenRun(() -> {
System.out.println(“taskB begin!”);
});
System.in.read();
}
2.7 thenCombine,thenAcceptBoth,runAfterBoth
比如有任務A,任務B,任務C。任務A和任務B并行執行,等到任務A和任務B全部執行完畢后,再執行任務C。
2.7.1 thenCombine
當前方式當前方式前置任務需要有返回結果,后置任務接收前置任務的結果,有返回值
java 體驗AI代碼助手 代碼解讀復制代碼 public static void main(String[] args) {
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println(“taskA begin!!”);
return 10;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
System.out.println(“taskB begin!!”);
return 10;
}), (r1, r2) -> {
System.out.println(“taskC begin!!”);
return r1 + r2;
});
System.out.println("taskC result = " + future.join());
}
2.7.2 thenAcceptBoth
當前方式前置任務需要有返回結果,后置任務接收前置任務的結果,沒有返回值
java 體驗AI代碼助手 代碼解讀復制代碼 public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
System.out.println(“taskA begin!!”);
return 10;
}).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {
System.out.println(“taskB begin!!”);
return 10;
}), (r1, r2) -> {
System.out.println(“taskC begin!!”);
int r = r2 + r1;
System.out.println("taskC result = " + r);
});
}
2.7.3 runAfterBoth
當前方式前置任務不需要有返回結果,后置任務不會接收前置任務的結果,沒有返回值
java 體驗AI代碼助手 代碼解讀復制代碼 public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
System.out.println(“taskA begin!!”);
return 10;
}).runAfterBoth(CompletableFuture.supplyAsync(() -> {
System.out.println(“taskB begin!!”);
return 10;
}), () -> {
System.out.println(“taskC begin!!”);
});
}
2.8 applyToEither,acceptEither,runAfterEither
這三個方法:比如有任務A,任務B,任務C。任務A和任務B并行執行,只要任務A或者任務B執行完畢,開始執行任務C.
applyToEither:可以接收結果并且返回結果,acceptEither:可以接收結果沒有返回結果,runAfterEither:不接收結果也沒返回結果,三個方法拼接任務的方式都是一樣的,applyToEither:只演示一個其它套路一樣。
java 體驗AI代碼助手 代碼解讀復制代碼 public static void main(String[] args) throws IOException {
CompletableFuture taskC = CompletableFuture.supplyAsync(() -> {
System.out.println(“taskA begin!!”);
return 78;
}).applyToEither(CompletableFuture.supplyAsync(() -> {
System.out.println(“taskB begin!!”);
return 66;
}), resultFirst -> {
System.out.println(“taskC begin!!”);
return resultFirst;
});
System.out.println(taskC.join());
System.in.read();
}
2.9 exceptionally,thenCompose,handle
exceptionally:
這個也是拼接任務的方式,但是只有前面業務執行時出現異常了,才會執行當前方法來處理.
只有異常出現時,CompletableFuture的編排任務沒有處理完時,才會觸發。
拿不到任務結果。
whenComplete,handle:
這兩個也是異常處理的套路,可以根據方法描述發現,他的功能方向比exceptionally要更加豐富
whenComplete:
可以拿到返回結果同時也可以拿到出現的異常信息,但是whenComplete本身是Consumer不能返回結果。無法幫你捕獲異常,但是可以拿到異常返回的結果。
handle:
可以拿到返回結果同時也可以拿到出現的異常信息,并且也可以指定返回托底數據。可以捕獲異常的,異常不會拋出去。
java 體驗AI代碼助手 代碼解讀復制代碼 public static void main(String[] args) throws IOException {
CompletableFuture taskC = CompletableFuture.supplyAsync(() -> {
System.out.println(“taskA begin!!”);
return 78;
}).applyToEither(CompletableFuture.supplyAsync(() -> {
System.out.println(“taskB begin!!”);
return 66;
}), resultFirst -> {
System.out.println(“taskC begin!!”);
return resultFirst;
}).handle((r,ex) -> {
System.out.println(“handle:” + r);
System.out.println(“handle:” + ex);
return -1;
});
/.exceptionally(ex -> {
System.out.println(“exceptionally:” + ex);
return -1;
});/
/.whenComplete((r,ex) -> {
System.out.println(“whenComplete:” + r);
System.out.println(“whenComplete:” + ex);
});/
System.out.println(taskC.join());
}
2.10 allOf,anyOf
2.10.1 allOf
allOf的方式是讓內部編寫多個CompletableFuture的任務,多個任務都執行完后,才會繼續執行你后續拼接的任務。
allOf返回的CompletableFuture是Void,沒有返回結果
java 體驗AI代碼助手 代碼解讀復制代碼 public static void main(String[] args) throws IOException {
CompletableFuture.allOf(
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“taskA begin!!”);
}),
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“taskB begin!!”);
}),
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“taskC begin!!”);
})
).thenRun(() -> {
System.out.println(“taskD begin!!”);
});
}
2.10.2 anyOf:
anyOf是基于多個CompletableFuture的任務,只要有一個任務執行完畢就繼續執行后續,最先執行完的任務做作為返回結果的入參
java 體驗AI代碼助手 代碼解讀復制代碼 public static void main(String[] args) throws IOException {
CompletableFuture.anyOf(
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“taskA begin!!”);
return “A”;
}),
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“taskB begin!!”);
return “B”;
}),
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(“taskC begin!!”);
return “C”;
})
).thenAccept(r -> {
System.out.println(“taskD begin,” + r + “first end”);
});
}