為了在廣泛的上下文中有用, ThreadPoolExecutor提供了一些可調整的參數。 很好,但是這也讓我們(開發人員)決定為我們的具體案例選擇正確的設置。 這是ThreadPoolExecutor的最大構造函數。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) { ... }
線程池類型
就資源消耗和所導致的系統穩定性而言,上面構造器中顯示的某些參數非常明智。 根據構造函數的不同參數設置,可以區分線程池的一些基本類別。 這是Executors類提供的一些默認線程池設置。
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
在“緩存的線程池”中,線程數不受限制。 這是由于Integer.MAX_VALUE的maximumPoolSize與SynchronousQueue一起引起的。 如果將任務以突發方式提交到該線程池,則可能會為每個任務創建一個線程。 在這種情況下,創建的線程在空閑60秒后會終止。 第二個示例顯示“固定線程池”,其中maximumPoolSize設置為特定的固定值。 池線程數永遠不會超過該值。 如果任務突發,并且所有線程都忙,那么它們將在工作隊列(此處為LinkedBlockingQueue )中排隊。 此固定線程池中的線程永不消亡。 無限制池的缺點很明顯:兩種設置都可能導致JVM內存故障(如果幸運的話,會出現OutOfMemoryErrors )。
讓我們看一下一些有限的線程池設置:
ThreadPoolExecutor pool = new ThreadPoolExecutor(0, 50, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());ThreadPoolExecutor pool = new ThreadPoolExecutor(50, 50, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100000));
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
第一個代碼段創建了一個受緩沖的線程池,其線程數限制為50。如果任務突發,并且所有線程都處于繁忙狀態,則現在通過發出a來拒絕對ThreadPoolExecutor.execute()方法的調用。 RejectedExecutionException 。 通常這不是我通常想要的,因此我通過將rejected-execution-handler設置為CallerRunsPolicy來更改飽和策略。 此策略將工作推回給調用者。 也就是說,發出任務以異步執行的客戶端線程現在將同步運行任務。 您可以通過實現自己的RejectedExecutionHandler來開發自己的飽和度策略。 第二個片段創建一個具有50個線程的固定線程池和一個工作隊列,該工作隊列的值限制為100000個任務。 如果工作隊列已滿,則飽和策略會將工作推回客戶端。 高速緩存的池按需創建線程,如果線程空閑60秒,則終止線程。 固定池使線程保持活動狀態。
線程池邊界
如上所示,有兩種定義線程池的基本方法:有界和無界線程池。 無限制的線程池(如Executors類的默認線程池)可以正常工作,只要您不突發地提交太多任務即可。 如果發生這種情況,無邊界線程池可能會損害您的系統穩定性。 高速緩存的線程池創建了太多線程,或者固定線程池中有太多任務排隊。 這封信較難實現,但仍有可能。 對于生產用途,最好將邊界設置為一些有意義的值,例如最后兩個線程池設置中的值。 因為定義那些“有意義的界限”可能很棘手,所以我開發了一個小程序對我有用。
/*** A class that calculates the optimal thread pool boundaries. It takes the desired target utilization and the desired* work queue memory consumption as input and retuns thread count and work queue capacity.* * @author Niklas Schlimm* */
public abstract class PoolSizeCalculator {/*** The sample queue size to calculate the size of a single {@link Runnable} element.*/private final int SAMPLE_QUEUE_SIZE = 1000;/*** Accuracy of test run. It must finish within 20ms of the testTime otherwise we retry the test. This could be* configurable.*/private final int EPSYLON = 20;/*** Control variable for the CPU time investigation.*/private volatile boolean expired;/*** Time (millis) of the test run in the CPU time calculation.*/private final long testtime = 3000;/*** Calculates the boundaries of a thread pool for a given {@link Runnable}.* * @param targetUtilization* the desired utilization of the CPUs (0 <= targetUtilization <= 1)* @param targetQueueSizeBytes* the desired maximum work queue size of the thread pool (bytes)*/protected void calculateBoundaries(BigDecimal targetUtilization, BigDecimal targetQueueSizeBytes) {calculateOptimalCapacity(targetQueueSizeBytes);Runnable task = creatTask();start(task);start(task); // warm up phaselong cputime = getCurrentThreadCPUTime();start(task); // test intervallcputime = getCurrentThreadCPUTime() - cputime;long waittime = (testtime * 1000000) - cputime;calculateOptimalThreadCount(cputime, waittime, targetUtilization);}private void calculateOptimalCapacity(BigDecimal targetQueueSizeBytes) {long mem = calculateMemoryUsage();BigDecimal queueCapacity = targetQueueSizeBytes.divide(new BigDecimal(mem), RoundingMode.HALF_UP);System.out.println("Target queue memory usage (bytes): " + targetQueueSizeBytes);System.out.println("createTask() produced " + creatTask().getClass().getName() + " which took " + mem+ " bytes in a queue");System.out.println("Formula: " + targetQueueSizeBytes + " / " + mem);System.out.println("* Recommended queue capacity (bytes): " + queueCapacity);}/*** Brian Goetz' optimal thread count formula, see 'Java Concurrency in Practice' (chapter 8.2)* * @param cpu* cpu time consumed by considered task* @param wait* wait time of considered task* @param targetUtilization* target utilization of the system*/private void calculateOptimalThreadCount(long cpu, long wait, BigDecimal targetUtilization) {BigDecimal waitTime = new BigDecimal(wait);BigDecimal computeTime = new BigDecimal(cpu);BigDecimal numberOfCPU = new BigDecimal(Runtime.getRuntime().availableProcessors());BigDecimal optimalthreadcount = numberOfCPU.multiply(targetUtilization).multiply(new BigDecimal(1).add(waitTime.divide(computeTime, RoundingMode.HALF_UP)));System.out.println("Number of CPU: " + numberOfCPU);System.out.println("Target utilization: " + targetUtilization);System.out.println("Elapsed time (nanos): " + (testtime * 1000000));System.out.println("Compute time (nanos): " + cpu);System.out.println("Wait time (nanos): " + wait);System.out.println("Formula: " + numberOfCPU + " * " + targetUtilization + " * (1 + " + waitTime + " / "+ computeTime + ")");System.out.println("* Optimal thread count: " + optimalthreadcount);}/*** Runs the {@link Runnable} over a period defined in {@link #testtime}. Based on Heinz Kabbutz' ideas* (http://www.javaspecialists.eu/archive/Issue124.html).* * @param task* the runnable under investigation*/public void start(Runnable task) {long start = 0;int runs = 0;do {if (++runs > 5) {throw new IllegalStateException("Test not accurate");}expired = false;start = System.currentTimeMillis();Timer timer = new Timer();timer.schedule(new TimerTask() {public void run() {expired = true;}}, testtime);while (!expired) {task.run();}start = System.currentTimeMillis() - start;timer.cancel();} while (Math.abs(start - testtime) > EPSYLON);collectGarbage(3);}private void collectGarbage(int times) {for (int i = 0; i < times; i++) {System.gc();try {Thread.sleep(10);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}}/*** Calculates the memory usage of a single element in a work queue. Based on Heinz Kabbutz' ideas* (http://www.javaspecialists.eu/archive/Issue029.html).* * @return memory usage of a single {@link Runnable} element in the thread pools work queue*/public long calculateMemoryUsage() {BlockingQueue<Runnable> queue = createWorkQueue();for (int i = 0; i < SAMPLE_QUEUE_SIZE; i++) {queue.add(creatTask());}long mem0 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();long mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();queue = null;collectGarbage(15);mem0 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();queue = createWorkQueue();for (int i = 0; i < SAMPLE_QUEUE_SIZE; i++) {queue.add(creatTask());}collectGarbage(15);mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();return (mem1 - mem0) / SAMPLE_QUEUE_SIZE;}/*** Create your runnable task here.* * @return an instance of your runnable task under investigation*/protected abstract Runnable creatTask();/*** Return an instance of the queue used in the thread pool.* * @return queue instance*/protected abstract BlockingQueue<Runnable> createWorkQueue();/*** Calculate current cpu time. Various frameworks may be used here, depending on the operating system in use. (e.g.* http://www.hyperic.com/products/sigar). The more accurate the CPU time measurement, the more accurate the results* for thread count boundaries.* * @return current cpu time of current thread*/protected abstract long getCurrentThreadCPUTime();}
該程序將為您的工作隊列的最大容量和所需的線程數找到理想的線程池邊界。 該算法基于Brian Goetz和Heinz Kabutz博士的工作,您可以在Javadoc中找到引用。 計算固定線程池中的工作隊列所需的容量相對簡單。 您所需要的只是工作隊列的期望目標大小(以字節為單位)除以提交的任務的平均大小(以字節為單位)。 不幸的是,計算最大線程數并不是一門精確的科學。 但是,如果在程序中使用公式,則可以避免工作隊列太大和線程太多的有害極端情況。 計算理想的池大小取決于等待時間,以計算任務的時間比率。 等待時間越長,達到給定利用率所需的線程就越多。 PoolSizeCalculator需要所需的目標利用率和所需的最大工作隊列內存消耗作為輸入。 基于對對象大小和CPU時間的調查,它返回理想的設置,以獲得最大線程數和線程池中的工作隊列容量。
讓我們來看一個例子。 以下代碼片段顯示了如何在1.0(= 100%)所需利用率和100000字節最大工作隊列大小的場景下使用PoolSizeCalculator 。
public class MyPoolSizeCalculator extends PoolSizeCalculator {public static void main(String[] args) throws InterruptedException, InstantiationException, IllegalAccessException,ClassNotFoundException {MyThreadSizeCalculator calculator = new MyThreadSizeCalculator();calculator.calculateBoundaries(new BigDecimal(1.0), new BigDecimal(100000));}protected long getCurrentThreadCPUTime() {return ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime();}protected Runnable creatTask() {return new AsynchronousTask(0, "IO", 1000000);}protected BlockingQueue createWorkQueue() {return new LinkedBlockingQueue<>();}}
MyPoolSizeCalculator擴展了抽象PoolSizeCalculator 。 您需要實現三種模板方法: getCurrentThreadCPUTime , creatTask和createWorkQueue 。 該代碼段將標準Java管理擴展應用于CPU時間測量(第13行)。 如果JMX不夠準確,則可以考慮其他框架(例如SIGAR API )。 當任務是同構且獨立時,線程池最有效。 因此,createTask方法將創建一種類型的Runnable任務的實例(第17行)。 將研究此任務以計算等待時間與CPU時間的比率。 最后,我需要創建一個工作隊列實例來計算已提交任務的內存使用情況(第21行)。 該程序的輸出顯示了工作隊列容量和最大池大小(線程數)的理想設置。 這些是我在雙核計算機上執行I / O密集型AsynchronousTask的結果。
Target queue memory usage (bytes): 100000
createTask() produced com.schlimm.java7.nio.threadpools.AsynchronousTask which took 40 bytes in a queue
Formula: 100000 / 40
* Recommended queue capacity (bytes): 2500
Number of CPU: 2
Target utilization: 1.0
Elapsed time (nanos): 3000000000
Compute time (nanos): 906250000
Wait time (nanos): 2093750000
Formula: 2 * 1.0 * (1 + 2093750000 / 906250000)
* Optimal thread count: 6.0
“推薦的隊列容量”和“最佳線程數”是重要的值。 我的AsynchronousTask的理想設置如下:
ThreadPoolExecutor pool = new ThreadPoolExecutor(6, 6, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(2500));
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
使用這些設置,您的工作隊列不能增長到大于所需的100000字節。 而且,由于所需的利用率為1.0(100%),因此使池大于6個線程沒有意義(等待時間與計算時間的比率為3 –對于每個計算時間間隔l,緊隨其后的是三個等待時間間隔)。 程序的結果很大程度上取決于您處理的任務的類型。 如果任務是同質的并且計算量很大,則程序可能會建議將池大小設置為可用CPU的數量。 但是,如果任務具有等待時間,例如在I / O密集型任務中,程序將建議增加線程數以達到100%的利用率。 還要注意,某些任務在處理了一段時間后會更改其等待時間以計算時間比率,例如,如果I / O操作的文件大小增加了。 這個事實建議開發一個自調整線程池(我的后續博客之一)。 無論如何,您都應該使線程池的大小可配置,以便可以在運行時進行調整。
好吧,目前就強大的線程池而言。 希望您喜歡它。 如果最大池大小的公式不是100%準確,也不要怪我。 正如我所說,這不是一門精確的科學,它是關于理想池大小的想法。
JCG合作伙伴的 參考資料: “線程故事:關于健壯的線程池” ? 尼克拉斯。
翻譯自: https://www.javacodegeeks.com/2012/03/threading-stories-about-robust-thread.html