多線程編程
文章目錄
- 多線程編程
- @[toc]
- 引言
- 創建多線程的方式
- 繼承Thread類
- 實現Runnable接口
- 實現Callable接口
- Callable和Runnable的區別
- Lambda表達式
- 線程的實現原理
- Future&FutureTask
- 具體使用
- submit方法
- Future到FutureTask類
- Future
- 注意事項
- 局限性
- CompletionService
- 引言
- 使用
- 使用場景
- CompletableFuture
- 引言
- 繼承結構
- 任務的異步回調
- 多個任務組合處理
- 注意點
- Future需要獲取返回值,才能獲取異常信息
- CompletableFuture的get()方法是阻塞的。
- 默認線程池的注意點
- 自定義線程池時,注意飽和策略
文章目錄
- 多線程編程
- @[toc]
- 引言
- 創建多線程的方式
- 繼承Thread類
- 實現Runnable接口
- 實現Callable接口
- Callable和Runnable的區別
- Lambda表達式
- 線程的實現原理
- Future&FutureTask
- 具體使用
- submit方法
- Future到FutureTask類
- Future
- 注意事項
- 局限性
- CompletionService
- 引言
- 使用
- 使用場景
- CompletableFuture
- 引言
- 繼承結構
- 任務的異步回調
- 多個任務組合處理
- 注意點
- Future需要獲取返回值,才能獲取異常信息
- CompletableFuture的get()方法是阻塞的。
- 默認線程池的注意點
- 自定義線程池時,注意飽和策略
引言
為什么使用多線程?
- 最直接的就是提升程序性能,使用多線程可以充分利用硬件資源,同時執行多個任務,從而提高程序的整體性能。通過并行執行任務,可以將工作負載分布到多個線程上,從而更有效地利用 CPU 資源。
- 提高響應性:可以將長時間處理的請求放在后臺另一個線程進行處理,不妨礙主線程的用戶執行其他的請求
- 實現并發編程:現在的工作中,多線程是并發編程的一種重要方式。利用好多線程機制可以大大提高系統整體的并發能力以及性能。
創建多線程的方式
從實現上來說,Java提供了三種創建線程的方式,但從原理上來看,其實只有一種方式,我們先從實現上來簡單介紹一下這三種方式
繼承Thread類
直接創建一個ThreadTest的實例,調用它的start()方法就可以創建一個線程了
class ThreadTest extends Thread{@Overridepublic void run() {System.out.println(Thread.currentThread().getName());}
}
實現Runnable接口
如果只是簡單的實現了Runnable接口,它與線程并沒有任何關系,只是相當于創建了一個線程執行的任務類而已,要想真正的創建線程,還是需要創建一個Thread對象,把RunnableTest實例作為構造方法的入參
1.2 實現Runnable接口
如果只是簡單的實現了Runnable接口,它與線程并沒有任何關系,只是相當于創建了一個線程執行的任務類而已,要想真正的創建線程,還是需要創建一個Thread對象,把RunnableTest實例作為構造方法的入參
class RunnableTest implements Runnable{@Overridepublic void run() {System.out.println(Thread.currentThread().getName());}
}public class CreateThreadTest {public static void main(String[] args) {RunnableTest runnableTest = new RunnableTest();Thread thread = new Thread(runnableTest);thread.start();}
}
實現Callable接口
與Runnable很相似,它相當于也是也個任務的實現類,需要結合線程池的submit()方法才能使用,但與Runnable最本質的區別是,Callable的call()方法可以有返回值
class CallableTest implements Callable<Integer>{@Overridepublic Integer call() throws Exception {return ThreadLocalRandom.current().nextInt();}
}public class CreateThreadTest {public static void main(String[] args) {CallableTest callableTest = new CallableTest();ExecutorService executorService = Executors.newFixedThreadPool(10);Future<Integer> future = executorService.submit(callableTest);}
}
Callable和Runnable的區別
Callable的call方法可以有返回值,可以聲明拋出異常。和 Callable
配合的有一個Future
類,通過Future
可以了解任務執行情況,或者取消任務的執行,還可獲取任務執行的結果,這些功能都是Runnable
做不到的,Callable
的功能要比Runnable
強大。
@FunctionalInterface
public interface Runnable {// 沒有返回值public abstract void run();
}@FunctionalInterface
public interface Callable<V> {// 有返回值V call() throws Exception;
}
Lambda表達式
這種方式與第二種方式其實是一樣的,只是寫法比較簡潔明了
Thread thread = new Thread(() -> System.out.println(Thread.currentThread().getName()
線程的實現原理
這三種創建線程的方式,第一種是直接通過繼承Thread
進行實現,第二種是通過實現Runnable
接口,然后將類作為創建Thread
類的入參,其實也是通過實現創建Thread
類進行實現。
現在看第三種Callable
的方式到底是怎么實現的,他是通過實現Callable
接口,然后使用submit
進行執行,這里我們通過debug
這個方法,最后發現其實也是通過創建的Thread
進行實現。
**總結:**所以最后我們發現三種方式其實都是創建Thread
類進行實現
Future&FutureTask
我們一共有三種創建線程的方式,繼承Thread
和實現Runnable
接口都是沒有返回值的,所以我們不知道線程的執行狀態,不能獲取執行完成的一個結果。所以這時就需要Callable
來解決上面的問題,通過Callable
和Future
能夠獲得執行的結果。
具體使用
public class CompletableFutureTest {private static ThreadPoolExecutor executor;static {executor = new ThreadPoolExecutor(10, 10, 100, TimeUnit.HOURS, new ArrayBlockingQueue<>(100), new ThreadFactory() {private int count = 0;@Overridepublic Thread newThread(Runnable r) {count++;System.out.printf("CustomerThread- %d :", count);return new Thread(r, "CustomerThread-" + count);}});}static class CallableTest implements Callable<String>{@Overridepublic String call() throws Exception {Thread.sleep(1000);System.out.println("線程開始運行");return "返回值";}}public static void main(String[] args) throws Exception{CallableTest test = new CallableTest();FutureTask<String> futureTask = new FutureTask<>(test);executor.submit(futureTask);System.out.println(futureTask.get());executor.shutdown();}
}
submit方法
在該方法中,我們傳入的是FutureTask
類型的,結果把參數轉成了RunnableFuture
,任務執行依然是execute()方法
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;
}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);
}
這里的Runnable
其實是FutureTask的父類
當我們使用Callable
類的子類作為參數時候,其實也是轉換為RunnableFuture
,然后使用excute
進行執行。
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}
Future到FutureTask類
Future
其實就是定義了一組接口,Future就是對于具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。必要時可以通過get方法獲取執行結果,該方法會阻塞直到任務返回結果。
FutureTask實現了這個接口,同時還實現了Runnalbe接口,這樣FutureTask就相當于是消費者和生產者的橋梁了,消費者可以通過FutureTask存儲任務的執行結果,跟新任務的狀態:未開始、處理中、已完成、已取消等等。而任務的生產者可以拿到FutureTask被轉型為Future接口,可以阻塞式的獲取處理結果,非阻塞式獲取任務處理狀態
**總結:**FutureTask既可以被當做Runnable放入Excutor
來執行,也可以被當做Future來獲取Callable的返回結果。
Future
注意事項
- 當 for 循環批量獲取Future的結果時容易 block,get 方法調用時應使用 timeout 限制
- 因為我們可能會有耗時的任務,后面的任務只能等前面耗時的任務完成以后才能獲取結果,所以有時會卡住
- Future 的生命周期不能后退。一旦完成了任務,它就永久停在了“已完成”的狀態,不能從頭再來
局限性
從本質上說,Future表示一個異步計算的結果。它提供了isDone()來檢測計算是否已經完成,并且在計算結束后,可以通過get()方法來獲取計算結果。在異步計算中,Future確實是個非常優秀的接口。但是,它的本身也確實存在著許多限制:
- 并發執行多任務:Future只提供了get()方法來獲取結果,并且是阻塞的。所以,除了等待你別無他法;
- 無法對多個任務進行鏈式調用:如果你希望在計算任務完成后執行特定動作,比如發郵件,但Future卻沒有提供這樣的能力;
- 無法組合多個任務:如果你運行了10個任務,并期望在它們全部執行結束后執行特定動作,那么在Future中這是無能為力的;
- 沒有異常處理:Future接口中沒有關于異常處理的方法;
而這些局限性CompletionService和CompletableFuture都解決了。
CompletionService
引言
CompletionService
是一個為了解決我們并發執行多個線程的任務的時候,能夠及時獲取已經完成任務的結果而創建的一個抽象的接口類,我們一般是使用的他的實現類ExecutorCompletionService
使用
具體的使用規則還有方法的作用博客鏈接
使用場景
- 當需要批量提交異步任務的時候建議使用CompletionService。CompletionService將線程池Executor和阻塞隊列BlockingQueue的功能融合在了一起,能夠讓批量異步任務的管理更簡單。
- CompletionService能夠讓異步任務的執行結果有序化。先執行完的先進入阻塞隊列,利用這個特性,你可以輕松實現后續處理的有序性,避免無謂的等待,同時還可以快速實現諸如Forking Cluster這樣的需求。
- 線程池隔離。CompletionService支持自己創建線程池,這種隔離性能避免幾個特別耗時的任務拖垮整個應用的風險。
CompletableFuture
引言
我們使用CompletionService
能夠解決多個線程并發執行時,獲取執行結果的返回值時的阻塞問題。但是假如我們并發執行的多線程任務需要遵循一定的規則,或者執行的順序時候,CompletionService
就不能滿足我們的需求了。
所以CompletableFuture
其實是對Future
進行擴展,彌補了Future
的局限性,同時CompletableFuture
實現了對任務編排的能力。
在以往,雖然通過**
CountDownLatch
**等工具類也可以實現任務的編排,但需要復雜的邏輯處理,不僅耗費精力且難以維護。
更加詳細介紹博客
繼承結構
CompletionStage
接口定義了任務編排的方法,執行某一階段,可以向下執行后續階段。異步執行的,默認線程池是ForkJoinPool.commonPool()
,但為了業務之間互不影響,且便于定位問題,強烈推薦使用自定義線程池。
任務的異步回調
多個任務組合處理
注意點
Future需要獲取返回值,才能獲取異常信息
ExecutorService executorService = new ThreadPoolExecutor(5, 10, 5L,TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {int a = 0;int b = 666;int c = b / a;return true;},executorService).thenAccept(System.out::println);//如果不加 get()方法這一行,看不到異常信息//future.get();
Future需要獲取返回值,才能獲取到異常信息。如果不加 get()/join()方法,看不到異常信息。小伙伴們使用的時候,注意一下哈,考慮是否加try…catch…或者使用exceptionally方法。
CompletableFuture的get()方法是阻塞的。
CompletableFuture的get()方法是阻塞的,如果使用它來獲取異步調用的返回值,需要添加超時時間~
csharp復制代碼//反例CompletableFuture.get();
//正例
CompletableFuture.get(5, TimeUnit.SECONDS);
默認線程池的注意點
CompletableFuture代碼中又使用了默認的線程池,處理的線程個數是電腦CPU核數-1。在大量請求過來的時候,處理邏輯復雜的話,響應會很慢。一般建議使用自定義線程池,優化線程池配置參數。
自定義線程池時,注意飽和策略
CompletableFuture的get()方法是阻塞的,我們一般建議使用future.get(3, TimeUnit.SECONDS)
。并且一般建議使用自定義線程池。
但是如果線程池拒絕策略是DiscardPolicy
或者DiscardOldestPolicy
,當線程池飽和時,會直接丟棄任務,不會拋棄異常。因此建議,CompletableFuture線程池策略最好使用AbortPolicy,然后耗時的異步線程,做好線程池隔離哈。