使用ThreadPoolExecutor并行化獨立的單線程任務

Java SE 5.0中引入的任務執行框架是簡化多線程應用程序的設計和開發的巨大飛躍。 該框架提供了用于管理任務概念,管理線程生命周期及其執行策略的工具。

在此博客文章中,我們將描述該框架的功能,靈活性和簡單性,以展示一個簡單的用例。

基礎

執行程序框架引入了一個接口來管理任務執行: 執行程序。 Executor是用于提交任務的接口,表示為Runnable實例。 此接口還將任務提交與任務執行隔離開來 :具有不同執行策略的執行者都發布相同的提交接口:如果您更改執行策略,則提交邏輯將不受更改的影響。

如果您想提交一個Runnable實例來執行,它很簡單:

Executor exec = …;
exec.execute(runnable);

線程池

如上一節所述,執行器合同未指定執行器如何執行可運行對象:這取決于您所使用的執行器的特定類型。 該框架提供了一些不同類型的執行器,每種執行器都有針對不同用例量身定制的特定執行策略。

您將要處理的最常見的執行程序類型是線程池執行程序 。,它們是ThreadPoolExecutor類(及其子類)的實例。 線程池執行程序管理一個線程池 (即將要執行任務的工作線程池)和一個工作隊列

您肯定已經在其他技術中看到池的概念。 使用池的主要優點是減少了資源創建的開銷,重用了使用后釋放的結構(在這種情況下為線程)。 使用池的另一個隱式優勢是可以調整資源使用量 :可以調整線程池大小以實現所需的負載,而不會損害系統資源。

該框架為線程池提供了一個工廠類,稱為Executors 。 使用該工廠,您將能夠創建具有不同特征的線程池。 通常,底層實現通常是相同的( ThreadPoolExecutor ),但是工廠類可幫助您快速配置線程池,而無需使用更復雜的構造函數。 出廠方法是:

  • newFixedThreadPool :此方法返回最大大小固定的線程池。 它將根據需要創建新線程,直到最大配置大小。 當線程數達到最大值時,線程池將保持大小不變??。
  • newCachedThreadPool :此方法返回無限制的線程池,即沒有最大大小的線程池。 但是,當負載減少時,這種線程池將拆除未使用的線程。
  • newSingleThreadedExecutor :此方法返回一個執行程序,該執行程序保證將在單個線程中執行任務。
  • newScheduledThreadPool :此方法返回固定大小的線程池,該線程池支持延遲和定時任務執行。

這僅僅是個開始。 執行器還提供了本教程中未涵蓋的其他功能,我強烈建議您學習以下內容:

  • 生命周期管理方法,由ExecutorService接口聲明(例如shutdown ()和awaitTermination ())。
  • 完成服務可輪詢任務狀態并檢索其返回值(如果適用)。

該ExecutorService的接口就顯得尤為重要,因為它提供了一種方法來關閉一個線程池,這是一件好事,你幾乎肯定希望能夠干凈利落做。 幸運的是, ExecutorService接口非常簡單且易于解釋,我建議您徹底研究其JavaDoc。

基本上,您會向ExecutorService發送shutdown ()消息,此后它將不接受新提交的任務,但將繼續處理已排隊的作業。 您可以使用isTerminated ()來收集執行程序服務的終止狀態,也可以使用awaitTermination (…)方法等待終止。 不過, awaitTermination方法不會永遠等待:您必須將最大等待超時作為參數傳遞。

警告 :錯誤和混亂的根源是理解為什么JVM進程永不退出的原因。 如果不關閉執行程序服務,從而破壞基礎線程,則JVM將永遠不會退出: JVM在其最后一個非守護線程退出時退出。

配置ThreadPoolExecutor

如果決定手動創建ThreadPoolExecutor而不是使用Executors工廠類,則需要使用其構造函數之一來創建和配置ThreadPoolExecutor 。 此類的最廣泛的構造方法是:

public ThreadPoolExecutor(
int corePoolSize,
int maxPoolSize,
long keepAlive,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler);

如您所見,您可以配置:

  • 核心池大小(線程池將嘗試使用的大小)。
  • 最大池大小。
  • 保持活動時間,在該時間之后,空閑線程有資格被拆除。
  • 工作隊列中包含等待執行的任務。
  • 拒絕任務提交時應用的策略。

限制排隊的任務數

在可預測性和穩定性方面,限制正在執行的并發任務的數量,調整線程池的大小對您的應用程序及其執行環境具有巨大的好處:無限制的線程創建最終將耗盡運行時資源,結果您的應用程序可能會遇到嚴重的性能問題,甚至可能導致應用程序不穩定。

這只是解決部分問題的一種解決方案:您限制了正在執行的任務數量,但沒有限制可以提交并排隊供以后執行的作業數量。 該應用程序將在以后遇到資源短缺的問題,但是如果提交率始終超過執行率,它將最終遇到這種情況。

該問題的解決方案是:

  • 向執行者提供阻塞隊列以保留等待的任務。 如果隊列已滿,提交的任務將被“拒絕”。
  • 拒絕任務提交時,將調用RejectedExecutionHandler ,這就是為什么在上一項中引用了被拒絕的動詞的原因。 您可以實施自己的拒絕策略,也可以使用框架提供的內置策略之一。

默認拒絕策略使執行程序拋出RejectedExecutionException 。 但是,其他內置策略可讓您:

  • 靜默丟棄作業。
  • 丟棄最舊的作業,然后嘗試重新提交最后一份。
  • 在調用者的線程上執行被拒絕的任務。

什么時候以及為什么要使用這樣的線程池配置? 讓我們來看一個例子。

一個示例:并行化獨立的單線程任務

最近,有人打電話給我解決我的客戶自很久以前就在運行的一項舊工作的問題。 基本上,作業由等待一組目錄層次結構上的文件系統事件的組件組成。 每當觸發事件時,都必須處理文件。 文件處理由專有的單線程進程執行。 說實話,就其本身的性質而言,即使我可以,但如果我可以并行化它,我就不會。 事件全天的到達率很高,不需要實時處理文件,而只需要在第二天之前進行處理即可。

當前的實現是技術的混合與匹配,包括UNIX shell腳本,該腳本負責掃描巨大的目錄層次結構以檢測應用更改的位置。 實施該實現后,執行環境中的核心數量也就只有兩個。 同樣,事件的發生率也很低:如今,它們的數量級約為數百萬 ,總共要處理1到2 TB的原始數據。

如今,客戶端正在運行這些進程的服務器是十二臺核心計算機:這是并行化那些舊的單線程任務的巨大機會。 我們已經基本掌握了配方的所有成分,我們只需要決定如何構建和調整它即可。 在編寫任何代碼之前,需要進行一些思考以了解負載的性質,這些是我檢測到的約束:

  • 定期要掃描大量文件:每個目錄包含一到兩百萬個文件。
  • 掃描算法非常快,可以并行化。
  • 處理文件至少需要1秒,甚至可能需要2或3秒的峰值。
  • 處理文件時,除CPU外沒有其他瓶頸。
  • CPU使用率必須是可調的,以便根據一天中的時間使用不同的負載配置文件。

因此,我需要一個線程池,該線程池的大小由調用流程時活動的負載配置文件確定。 然后,我傾向于創建根據負載策略配置的固定大小的線程池執行程序。 由于處理線程僅受CPU限制,其核心使用率為100%,并且無需等待其他資源,因此負載策略非常容易計算:只需獲取處理環境中可用的核心數量,然后使用負載按比例縮小當時處于活動狀態的因素(并檢查在峰值時刻至少使用了一個內核):

int cpus = Runtime.getRuntime().availableProcessors();
int maxThreads = cpus * scaleFactor;
maxThreads = (maxThreads > 0 ? maxThreads : 1);

然后,我需要使用阻塞隊列來創建ThreadPoolExecutor來限制提交的任務數。 為什么? 好吧:目錄掃描算法非常快,并且會生成大量文件以非常快速地處理。 有多大? 很難預測,其可變性很高。 我不會讓執行者的內部隊列亂七八糟地用代表我的任務的對象(包括一個非常大的文件描述符)填充。 我寧愿讓執行程序在隊列填滿時拒絕文件。

另外,我將使用ThreadPoolExecutor.CallerRunsPolicy作為拒絕策略。 為什么? 好吧,因為當隊列已滿并且池中的線程正在忙于處理文件時,我將擁有正在提交執行該文件的任務的線程。 這樣,掃描將停止處理文件,并在完成當前任務后立即恢復掃描。

這是創建執行程序的代碼:

ExecutorService executorService =new ThreadPoolExecutor(maxThreads, // core thread pool sizemaxThreads, // maximum thread pool size1, // time to wait before resizing poolTimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(maxThreads, true),new ThreadPoolExecutor.CallerRunsPolicy());

代碼的框架如下(已大大簡化):

// scanning loop: fake scanning
while (!dirsToProcess.isEmpty()) {File currentDir = dirsToProcess.pop();// listing childrenFile[] children = currentDir.listFiles();// processing childrenfor (final File currentFile : children) {// if it's a directory, defer processingif (currentFile.isDirectory()) {dirsToProcess.add(currentFile);continue;}executorService.submit(new Runnable() {@Overridepublic void run() {try {// if it's a file, process itnew ConvertTask(currentFile).perform();} catch (Exception ex) {// error management logic}}});
}// ...// wait for all of the executor threads to finish
executorService.shutdown();try {if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {// pool didn't terminate after the first tryexecutorService.shutdownNow();}if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {// pool didn't terminate after the second try}
} catch (InterruptedException ex) {executorService.shutdownNow();Thread.currentThread().interrupt();
}

結論

如您所見,Java并發API非常易于使用,非常靈活并且功能強大。 幾年前,我會花更多的精力編寫這樣一個簡單的程序。 這樣,我可以在幾個小時內快速解決由遺留的單線程組件引起的可伸縮性問題。

參考: The Gray Blog中的 JCG合作伙伴 Enrico Crisostomo 使用ThreadPoolExecutor并行化獨立的單線程任務 。

相關片段:
  • 受限連接池的阻塞隊列示例
  • 更一般的等待/通知機制的CountDownLatch示例
  • 任務運行器的重入鎖示例
  • 限制URL連接的信號量示例

相關文章 :

  • Java并發教程–線程池
  • 有益的CountDownLatch和棘手的Java死鎖
  • 并發優化–減少鎖粒度
  • Java并發教程– CountDownLatch
  • JVM如何處理鎖
  • Java教程和Android教程列表

翻譯自: https://www.javacodegeeks.com/2011/12/using-threadpoolexecutor-to-parallelize.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/374431.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/374431.shtml
英文地址,請注明出處:http://en.pswp.cn/news/374431.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

python定義一個圓_Python-矩形和圓形

原博文 2019-11-11 12:34 ? Exercise 15.1. 定義一個叫做Circle 類&#xff0c;類的屬性是圓心 (center) 和半徑 (radius) , 其中&#xff0c;圓心 (center) 是一個 Point 類&#xff0c;而半徑 (radius) 是一個數字。 實例化一個圓心 (center) 為 (150, 100) &#xff0c;半…

C語言代碼規范(四)命名規則

一、宏定義全部字母大寫&#xff0c;單詞間下劃線間隔 #define FLASH_PAGE_SIZE 256 #define FLASH_SECTOR_SIZE (4 * 1024) #define FLASH_BLOCK_SIZE (64 * 1024) #define FLASH_SIZE (16 * 1024 * 1024) 二、const修飾的常量全部字母大寫&#xff0c;單詞間…

Forbidden You don't have permission to access / on this server PHP

Forbidden You dont have permission to access / on this server PHP 在新安裝的谷歌游覽器里&#xff0c;打不了PHP網站了&#xff0c;錯誤顯示&#xff1a; Forbidden You dont have permission to access / on this server. 原因還是配置權限問題 解決辦法&#xff1a; wa…

Spring 3.1和JPA的持久層

1.概述 本教程顯示了如何使用Hibernate作為持久性提供程序使用JPA設置Spring 。 有關使用基于Java的配置和項目的基本Maven pom設置Spring上下文的分步介紹&#xff0c;請參閱本文 。 2. Java的JPA Spring配置 要在Spring項目中使用JPA&#xff0c; 需要設置EntityManager 。…

150928錯誤認識

1. $arr array(); foreach ($re as $k>$v){  $arr[] $v[updatetime];} $arr的返回結果為&#xff1a; Array ([0] > 2014-09[1] > 2015-04[2] > 2015-09 )$arr array(); foreach ($re as $k>$v){  $arr[$k] $v[updatetime];} $arr的返回結果為&#xff…

STM32F1筆記(一)GPIO輸出

GPIO&#xff1a;General Purpose Input Output &#xff08;通用輸入/輸出&#xff09;。 GPIO最經典應用&#xff1a;LED燈。 先看電路。聲明&#xff1a;參考正點原子戰艦開發板。 與LED串聯的電阻稱為限流電阻。 限流電阻計算公式&#xff1a;R(U-LED壓降)/20ma。 U為LE…

dataframe轉化為array_【Python專欄】12 種高效 Numpy 和 Pandas 函數為你加速分析

來源&#xff1a;機器之心編譯&#xff1a;Jamin、杜偉、張倩我們都知道&#xff0c;Numpy 是 Python 環境下的擴展程序庫&#xff0c;支持大量的維度數組和矩陣運算&#xff1b;Pandas 也是 Python 環境下的數據操作和分析軟件包&#xff0c;以及強大的數據分析庫。二者在日常…

具有GlassFish和一致性的高性能JPA –第1部分

您以前聽說過連貫性嗎&#xff1f; 大概是。 它是那些著名的內存網格解決方案之一&#xff0c;該解決方案承諾了超快的數據訪問速度和對經常使用的數據的無限空間。 一些眾所周知的競爭對手是Infinispan &#xff0c; Memcached和Terracotta Ehcache 。 它們都很棒&#xff0c;…

如何在自己的代碼中實現分享視頻文件或者是圖片文件到微信 QQ微博 新浪微博等!!!...

首先在文檔第一句我先自嘲下 &#xff0c; 我是大傻逼&#xff0c; 弄了兩天微信是視頻分享&#xff0c;一直被說為啥跟系統的相冊分享的不一樣&#xff0c;尼瑪&#xff01;&#xff01;&#xff01; 這里來說正文&#xff0c;我這里不像多少太多&#xff0c;大家都是程序猿&a…

sql 數據庫中用創建好的視圖修改表數據

只要滿足下列條件&#xff0c;即可通過視圖修改基礎基表的數據&#xff1a; 1、任何修改&#xff08;包括 UPDATE、INSERT 和 DELETE 語句&#xff09;都只能引用一個基表的列。 2、視圖中被修改的列必須直接引用表列中的基礎數據。不能通過任何其他方式對這些列進行派生&#…

boost原理與sklearn源碼_機器學習sklearn系列之決策樹

一、 Sklearn庫 Scikit learn 也簡稱 sklearn, 自2007年發布以來&#xff0c;scikit-learn已經成為Python重要的機器學習庫了。支持包括分類、回歸、降維和聚類四大機器學習算法。還包含了特征提取、數據處理和模型評估三大模塊。sklearn是Scipy的擴展&#xff0c;建立在NumPy和…

STM32F1筆記(二)GPIO輸入

STM32 GPIO輸入的經典應用是按鍵。 先看電路。聲明&#xff1a;參考正點原子戰艦開發板。 在這里可以看到&#xff0c;KEY_UP按鍵是高電平有效的&#xff0c;即當按下該按鍵時&#xff0c;GPIO讀到高電平。 KEY0/1/2是低電平有效的&#xff0c;即當按下該按鍵時&#xff0c;G…

Google Authenticator:將其與您自己的Java身份驗證服務器配合使用

用于移動設備的Google Authenticator應用程序是一個非常方便的應用程序&#xff0c;它實現了TOTP算法&#xff08;在RFC 6238中指定&#xff09;。 使用Google Authenticator&#xff0c;您可以生成時間密碼&#xff0c;該密碼可用于在共享請求用戶密鑰的身份驗證服務器中授權用…

[Week2 作業] 代碼規范之爭

這四個問題均是出自 http://goodmath.scientopia.org/2011/07/14/stuff-everyone-should-do-part-2-coding-standards/ 。 我對這四個問題均持反駁的看法&#xff0c;下面是我的理由~ Q1&#xff1a;這些規范都是官僚制度下產生的浪費大家的編程時間、影響人們開發效率, 浪費時…

STM32F1筆記(三)UART/USART

UART&#xff1a;Universal Asynchronous Receiver/Transmitter&#xff08;通用異步收/發器&#xff09; USART&#xff1a;Universal Synchronous/Asynchronous Receiver/Transmitter&#xff08;通用同步/異步串行收/發器&#xff09; 從命名即可看出USART就是UART的基礎上…

python安裝界面翻譯_python環境搭建

如果想要運行python需要有解釋器和編輯器。 什么是解釋器 解釋器我們可以把它理解成翻譯官&#xff0c;它是將我們寫的python代碼翻譯成計算機能夠懂得機器語言。 然后計算機收到解釋器的命令來干活&#xff0c;最終再將結果反饋在解釋器中。 解釋器推薦使用anaconda3 什么是an…

無需重新部署Eclipse和Tomcat即可進行更改

他們說&#xff0c;由于應用程序服務器過大&#xff0c;Java的開發速度很慢–您必須重新部署應用程序才能看到所做的更改。 使用PHP&#xff0c;Python等腳本語言時&#xff0c;可以“保存并刷新”。 這個法定問題總結了這個“神話”。 是的&#xff0c;這是一個神話。 您也可以…

進階篇-用戶界面:4.Android中常用組件

1.下拉菜單 在Web開發中&#xff0c;HTML提供了下拉列表的實現&#xff0c;就是使用<select>元素實現一個下拉列表&#xff0c;在其中每個下拉列表項使用<option>表示即可。這是在Web開發中一個必不可少的交互性組件&#xff0c;而在Android中的對應實現就是Spinne…

收款單單據編號不正確

問題現象:現在在應收&#xff0c;應付的收款單錄入和付款單錄入里點擊增加的話&#xff0c;單據編號如果是出現2024呢&#xff0c;按保存的話&#xff0c;就會出現單據號重復&#xff1b;查到的最大的單據號是3034&#xff0c;在流水號里改成3038后再增回加的話還是出現2024。然…

STM32F1筆記(四)NVIC中斷優先級管理

STM32將中斷分為5個組&#xff0c;組0~4。配置代碼如下&#xff1a; NVIC_PriorityGroupConfig(NVIC_PriorityGroup_2); 在標準庫里&#xff0c;分組的定義如下&#xff1a; /** defgroup Preemption_Priority_Group * {*/#define NVIC_PriorityGroup_0 ((uint32_t…