異步回調
所謂異步回調,本質上就是多線程中線程的通信,如今很多業務系統中,某個業務或者功能調用多個外部接口,通常這種調用就是異步的調用。如何得到這些異步調用的結果自然也就很重要了。
Callable、Future、FutureTask
public class test implements Callable<Boolean>{public static void main(String[] args) {test a=new test();FutureTask futureTask=new FutureTask<>(a);new Thread(futureTask).start();Object su=null;try {su=futureTask.get();}catch (Exception e){e.printStackTrace();}System.out.println(su);}@Overridepublic Boolean call() throws Exception {return null;}
}
FutureTask和Callable都是泛型類,泛型參數表示返回結果的類型。通過FutureTask獲取異步線程的執行結果,但是其調用get()方法獲取異步結果時,主線程也會被阻塞。屬于異步阻塞模式。異步阻塞模式屬于主動模式的異步調用,異步回調屬于被動模式的異步調用。Java中回調模式的標準實現類為CompletableFuture。由于此類出現時間比較晚,期間Guava和Netty等都提出了自己的異步回調模式API來使用。這里主要介紹CompletableFuture,其他的有時間后面再學習。
CompletableFuture
CompletableFuture實現Future和CompletionStage兩個接口。此類的實例作為一個異步任務,可以在自己異步執行完成之后觸發一些其他的異步任務,從而達到異步回調的效果。主要方法如下所示:
runAsync和supplyAsync創建子任務
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {return asyncSupplyStage(screenExecutor(executor), supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {return asyncRunStage(screenExecutor(executor), runnable);
}
可以看出runAsync沒有返回值,supplyAsync有返回值,此處用supplyAsync舉例:
ExecutorService executor= Executors.newFixedThreadPool(10);
CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{return "你好,周先生";
},executor);
System.out.println(completableFuture.get());//輸出你好,周先生
executor.shutdown();
上例中的線程池可以自己構造,如若不指定使用CompletableFuture中默認的線程池ForkJoinPool。
handle()方法統一處理異常和結果
//在執行任務的同一個線程中處理異常和結果
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {return uniHandleStage(null, fn);
}
//可能不在執行任務的同一個線程中處理異常和結果
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) {return uniHandleStage(asyncPool, fn);
}
//在指定線程池executor中處理異常和結果
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {return uniHandleStage(screenExecutor(executor), fn);
}
案例:
CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{throw new RuntimeException("你好");
});
completableFuture.handle(new BiFunction<String,Throwable,String>(){@Overridepublic String apply(String s, Throwable throwable) {if(throwable==null){System.out.println("mei");;}else {System.out.println("出錯了");}return "ok";}
});
異步任務的串行執行
主要方法為以下幾種:thenApply()、thenAccept()、thenRun()和 thenCompose()。
thenApply()
此方法實現異步任務的串行化執行,前一個任務結果作為下一個任務的入參。
后一個任務與前一個任務在同一個線程中執行public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {return uniApplyStage(null, fn);}//后一個任務與前一個任務不在同一個線程中執行public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {return uniApplyStage(asyncPool, fn);}//后一個任務在指定的executor線程池中執行public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) {return uniApplyStage(screenExecutor(executor), fn);}
其中泛型參數T:上一個任務所返回結果的類型。泛型參數U:當前任務的返回類型。
案例:
ExecutorService executor= Executors.newFixedThreadPool(10);CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getId());//12return "你好,周先生";},executor).thenApplyAsync(new Function<String,String>() {@Overridepublic String apply(String s) {System.out.println(Thread.currentThread().getId());//13return "你好,毛先生";}});System.out.println(completableFuture.get());//輸出你好,毛先生executor.shutdown();
thenRun()
此方法不關心任務的處理結果。只要前一個任務執行完成,就開始執行后一個串行任務。而且沒有返回值。
//后一個任務與前一個任務在同一個線程中執行public CompletableFuture<Void> thenRun(Runnable action) {return uniRunStage(null, action);}//后一個任務與前一個任務可以不在同一個線程中執行public CompletableFuture<Void> thenRunAsync(Runnable action) {return uniRunStage(asyncPool, action);}//后一個任務在executor線程池中執行public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor) {return uniRunStage(screenExecutor(executor), action);}
thenAccept()
使用此方法時一個任務可以接收(或消費)前一個任務的處理結果,但是后一個任務沒有結果輸出。
//后一個任務與前一個任務在同一個線程中執行public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) {return biAcceptStage(null, other, action);}//后一個任務與前一個任務不在同一個線程中執行public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action) {return biAcceptStage(asyncPool, other, action);}//后一個任務在指定的executor線程池中執行public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor) {return biAcceptStage(screenExecutor(executor), other, action);}
thenCompose()
對兩個任務進行串行的調度操作,第一個任務操作完成時,將其結果作為參數傳遞給第二個任務。
//后一個任務與前一個任務在同一個線程中執行public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {return uniComposeStage(null, fn);}//后一個任務與前一個任務不在同一個線程中執行public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) {return uniComposeStage(asyncPool, fn);}//后一個任務在指定的executor線程池中執行public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor) {return uniComposeStage(screenExecutor(executor), fn);}
thenCompose()方法第二個任務的返回值是一個CompletionStage異步實例。
ExecutorService executor= Executors.newFixedThreadPool(10);CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getId());//12return "你好,周先生";},executor).thenComposeAsync(new Function<String,CompletableFuture<String>>(){@Overridepublic CompletableFuture<String> apply(String s) {return CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getId());//12return "你好,毛先生";});}});System.out.println(completableFuture.get());//輸出你好,毛先生executor.shutdown();
異步任務的合并執行
主要實現為以下幾個方法:thenCombine()、runAfterBoth()、
thenAcceptBoth()。
thenCombine()
thenCombine()會在兩個CompletionStage任務都執行完成后,一塊來處理兩個任務的執行結果。如果要合并多個任務,可以使用allOf()。
//合并第二步任務的CompletionStage實例,返回第三步任務的CompletionStagepublic <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) {return biApplyStage(null, other, fn);}//不一定在同一個線程中執行第三步任務的CompletionStage實例public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) {return biApplyStage(asyncPool, other, fn);}//第三步任務的CompletionStage實例在指定的executor線程池中執行public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor) {return biApplyStage(screenExecutor(executor), other, fn);}
其中泛型參數T:表示第一個任務所返回結果的類型。泛型參數U:表示第二個任務所返回結果的類型。泛型參數V:表示第三個任務所返回結果的類型。
案例:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getId());//12return "你好,周先生";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getId());//12return "你好,毛先生";});CompletableFuture<String> future3 = future1.thenCombine(future2, new BiFunction<String, String, String>(){@Overridepublic String apply(String s, String s2) {return s+"-----"+s2;}});String s = future3.get();System.out.println(s);//你好,周先生-----你好,毛先生
而runAfterBoth()方法不關注每一步任務的輸入參數和輸出參數,thenAcceptBoth()中第三個任務接收第一個和第二個任務的結果,但是不返回結果。
異步任務的選擇執行
若異步任務的選擇執行不是按照某種條件進行選擇的,而按照執行速度進行選擇的:前面兩并行任務,誰的結果返回速度快,其結果將作為第三步任務的輸入。對兩個異步任務的選擇可以通過CompletionStage接口的applyToEither()、acceptEither()等方法來實現。
applyToEither()
//和其他任務進行速度比較,最快返回的結果用于執行fn回調函數public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) {return orApplyStage(null, other, fn);}//和其他任務進行速度比較,最快返回的結果用于執行fn回調函數,不一定在同一個線程中執行fn回調函數public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn) {return orApplyStage(asyncPool, other, fn);}//和其他任務進行速度比較,最快返回的結果用于執行fn回調函數,在指定線程池執行fn回調函數public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor) {return orApplyStage(screenExecutor(executor), other, fn);}
案例:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getId());//12try {Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}return "你好,周先生";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getId());//12return "你好,毛先生";});CompletableFuture<String> future3 = future1.applyToEither(future2, new Function<String, String>(){@Overridepublic String apply(String s) {return s;}});String s = future3.get();System.out.println(s);//你好,毛先生