在此博客文章中,我們將描述該框架的功能,靈活性和簡單性,以展示一個簡單的用例。
基礎
執行程序框架引入了一個接口來管理任務執行: 執行程序。 Executor是用于提交任務的接口,表示為Runnable實例。 此接口還將任務提交與任務執行隔離開來 :具有不同執行策略的執行者都發布相同的提交接口:如果您更改執行策略,則提交邏輯將不受更改的影響。
如果您想提交一個Runnable實例來執行,它很簡單:
Executor exec = …;
exec.execute(runnable);
線程池
如上一節所述,執行器合同未指定執行器如何執行可運行對象:這取決于您所使用的執行器的特定類型。 該框架提供了一些不同類型的執行器,每種執行器都有針對不同用例量身定制的特定執行策略。
您將要處理的最常見的執行程序類型是線程池執行程序 。,它們是ThreadPoolExecutor類(及其子類)的實例。 線程池執行程序管理一個線程池 (即將要執行任務的工作線程池)和一個工作隊列 。
您肯定已經在其他技術中看到池的概念。 使用池的主要優點是減少了資源創建的開銷,重用了使用后釋放的結構(在這種情況下為線程)。 使用池的另一個隱式優勢是可以調整資源使用量 :可以調整線程池大小以實現所需的負載,而不會損害系統資源。
該框架為線程池提供了一個工廠類,稱為Executors 。 使用該工廠,您將能夠創建具有不同特征的線程池。 通常,底層實現通常是相同的( ThreadPoolExecutor ),但是工廠類可幫助您快速配置線程池,而無需使用更復雜的構造函數。 出廠方法是:
- newFixedThreadPool :此方法返回最大大小固定的線程池。 它將根據需要創建新線程,直到最大配置大小。 當線程數達到最大值時,線程池將保持大小不變??。
- newCachedThreadPool :此方法返回無限制的線程池,即沒有最大大小的線程池。 但是,當負載減少時,這種線程池將拆除未使用的線程。
- newSingleThreadedExecutor :此方法返回一個執行程序,該執行程序保證將在單個線程中執行任務。
- newScheduledThreadPool :此方法返回固定大小的線程池,該線程池支持延遲和定時任務執行。
這僅僅是個開始。 執行器還提供了本教程中未涵蓋的其他功能,我強烈建議您學習以下內容:
- 生命周期管理方法,由ExecutorService接口聲明(例如shutdown ()和awaitTermination ())。
- 完成服務可輪詢任務狀態并檢索其返回值(如果適用)。
該ExecutorService的接口就顯得尤為重要,因為它提供了一種方法來關閉一個線程池,這是一件好事,你幾乎肯定希望能夠干凈利落做。 幸運的是, ExecutorService接口非常簡單且易于解釋,我建議您徹底研究其JavaDoc。
基本上,您會向ExecutorService發送shutdown ()消息,此后它將不接受新提交的任務,但將繼續處理已排隊的作業。 您可以使用isTerminated ()來收集執行程序服務的終止狀態,也可以使用awaitTermination (…)方法等待終止。 不過, awaitTermination方法不會永遠等待:您必須將最大等待超時作為參數傳遞。
警告 :錯誤和混亂的根源是理解為什么JVM進程永不退出的原因。 如果不關閉執行程序服務,從而破壞基礎線程,則JVM將永遠不會退出: JVM在其最后一個非守護線程退出時退出。
配置ThreadPoolExecutor
如果決定手動創建ThreadPoolExecutor而不是使用Executors工廠類,則需要使用其構造函數之一來創建和配置ThreadPoolExecutor 。 此類的最廣泛的構造方法是:
public ThreadPoolExecutor(
int corePoolSize,
int maxPoolSize,
long keepAlive,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler);
如您所見,您可以配置:
- 核心池大小(線程池將嘗試使用的大小)。
- 最大池大小。
- 保持活動時間,在該時間之后,空閑線程有資格被拆除。
- 工作隊列中包含等待執行的任務。
- 拒絕任務提交時應用的策略。
限制排隊的任務數
在可預測性和穩定性方面,限制正在執行的并發任務的數量,調整線程池的大小對您的應用程序及其執行環境具有巨大的好處:無限制的線程創建最終將耗盡運行時資源,結果您的應用程序可能會遇到嚴重的性能問題,甚至可能導致應用程序不穩定。
這只是解決部分問題的一種解決方案:您限制了正在執行的任務數量,但沒有限制可以提交并排隊供以后執行的作業數量。 該應用程序將在以后遇到資源短缺的問題,但是如果提交率始終超過執行率,它將最終遇到這種情況。
該問題的解決方案是:
- 向執行者提供阻塞隊列以保留等待的任務。 如果隊列已滿,提交的任務將被“拒絕”。
- 當拒絕任務提交時,將調用RejectedExecutionHandler ,這就是為什么在上一項中引用了被拒絕的動詞的原因。 您可以實施自己的拒絕策略,也可以使用框架提供的內置策略之一。
默認拒絕策略使執行程序拋出RejectedExecutionException 。 但是,其他內置策略可讓您:
- 靜默丟棄作業。
- 丟棄最舊的作業,然后嘗試重新提交最后一份。
- 在調用者的線程上執行被拒絕的任務。
什么時候以及為什么要使用這樣的線程池配置? 讓我們來看一個例子。
一個示例:并行化獨立的單線程任務
最近,有人打電話給我解決我的客戶自很久以前就在運行的一項舊工作的問題。 基本上,作業由等待一組目錄層次結構上的文件系統事件的組件組成。 每當觸發事件時,都必須處理文件。 文件處理由專有的單線程進程執行。 說實話,就其本身的性質而言,即使我可以,但如果我可以并行化它,我就不會。 事件全天的到達率很高,不需要實時處理文件,而只需要在第二天之前進行處理即可。
當前的實現是技術的混合與匹配,包括UNIX shell腳本,該腳本負責掃描巨大的目錄層次結構以檢測應用更改的位置。 實施該實現后,執行環境中的核心數量也就只有兩個。 同樣,事件的發生率也很低:如今,它們的數量級約為數百萬 ,總共要處理1到2 TB的原始數據。
如今,客戶端正在運行這些進程的服務器是十二臺核心計算機:這是并行化那些舊的單線程任務的巨大機會。 我們已經基本掌握了配方的所有成分,我們只需要決定如何構建和調整它即可。 在編寫任何代碼之前,需要進行一些思考以了解負載的性質,這些是我檢測到的約束:
- 定期要掃描大量文件:每個目錄包含一到兩百萬個文件。
- 掃描算法非常快,可以并行化。
- 處理文件至少需要1秒,甚至可能需要2或3秒的峰值。
- 處理文件時,除CPU外沒有其他瓶頸。
- CPU使用率必須是可調的,以便根據一天中的時間使用不同的負載配置文件。
因此,我需要一個線程池,該線程池的大小由調用流程時活動的負載配置文件確定。 然后,我傾向于創建根據負載策略配置的固定大小的線程池執行程序。 由于處理線程僅受CPU限制,其核心使用率為100%,并且無需等待其他資源,因此負載策略非常容易計算:只需獲取處理環境中可用的核心數量,然后使用負載按比例縮小當時處于活動狀態的因素(并檢查在峰值時刻至少使用了一個內核):
int cpus = Runtime.getRuntime().availableProcessors();
int maxThreads = cpus * scaleFactor;
maxThreads = (maxThreads > 0 ? maxThreads : 1);
然后,我需要使用阻塞隊列來創建ThreadPoolExecutor來限制提交的任務數。 為什么? 好吧:目錄掃描算法非常快,并且會生成大量文件以非常快速地處理。 有多大? 很難預測,其可變性很高。 我不會讓執行者的內部隊列亂七八糟地用代表我的任務的對象(包括一個非常大的文件描述符)填充。 我寧愿讓執行程序在隊列填滿時拒絕文件。
另外,我將使用ThreadPoolExecutor.CallerRunsPolicy作為拒絕策略。 為什么? 好吧,因為當隊列已滿并且池中的線程正在忙于處理文件時,我將擁有正在提交執行該文件的任務的線程。 這樣,掃描將停止處理文件,并在完成當前任務后立即恢復掃描。
這是創建執行程序的代碼:
ExecutorService executorService =new ThreadPoolExecutor(maxThreads, // core thread pool sizemaxThreads, // maximum thread pool size1, // time to wait before resizing poolTimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(maxThreads, true),new ThreadPoolExecutor.CallerRunsPolicy());
代碼的框架如下(已大大簡化):
// scanning loop: fake scanning
while (!dirsToProcess.isEmpty()) {File currentDir = dirsToProcess.pop();// listing childrenFile[] children = currentDir.listFiles();// processing childrenfor (final File currentFile : children) {// if it's a directory, defer processingif (currentFile.isDirectory()) {dirsToProcess.add(currentFile);continue;}executorService.submit(new Runnable() {@Overridepublic void run() {try {// if it's a file, process itnew ConvertTask(currentFile).perform();} catch (Exception ex) {// error management logic}}});
}// ...// wait for all of the executor threads to finish
executorService.shutdown();try {if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {// pool didn't terminate after the first tryexecutorService.shutdownNow();}if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {// pool didn't terminate after the second try}
} catch (InterruptedException ex) {executorService.shutdownNow();Thread.currentThread().interrupt();
}
結論
如您所見,Java并發API非常易于使用,非常靈活并且功能強大。 幾年前,我會花更多的精力編寫這樣一個簡單的程序。 這樣,我可以在幾個小時內快速解決由遺留的單線程組件引起的可伸縮性問題。
參考: The Gray Blog中的 JCG合作伙伴 Enrico Crisostomo 使用ThreadPoolExecutor并行化獨立的單線程任務 。
- 受限連接池的阻塞隊列示例
- 更一般的等待/通知機制的CountDownLatch示例
- 任務運行器的重入鎖示例
- 限制URL連接的信號量示例
相關文章 :
- Java并發教程–線程池
- 有益的CountDownLatch和棘手的Java死鎖
- 并發優化–減少鎖粒度
- Java并發教程– CountDownLatch
- JVM如何處理鎖
- Java教程和Android教程列表
翻譯自: https://www.javacodegeeks.com/2011/12/using-threadpoolexecutor-to-parallelize.html