Java并發編程之線程池ThreadPoolExecutor解析

線程池存在的意義

平常使用線程即new Thread()然后調用start()方法去啟動這個線程,但是在頻繁的業務情況下如果在生產環境大量的創建Thread對象是則會浪費資源,不僅增加GC回收壓力,并且還浪費了時間,創建線程是需要花時間的;

線程池的存在就是降低頻繁的創建線程,降低資源的消耗以及創建時間的浪費,并且可以同一管理。

ThreadPoolExecutor

在JDK中所有的線程池的父類就是ThreadPoolExecutor,以下是它的構造方法

    /*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize the number of threads to keep in the pool, even*        if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the*        pool* @param keepAliveTime when the number of threads is greater than*        the core, this is the maximum time that excess idle threads*        will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are*        executed.  This queue will hold only the {@code Runnable}*        tasks submitted by the {@code execute} method.* @param threadFactory the factory to use when the executor*        creates a new thread* @param handler the handler to use when execution is blocked*        because the thread bounds and queue capacities are reached* @throws IllegalArgumentException if one of the following holds:<br>*         {@code corePoolSize < 0}<br>*         {@code keepAliveTime < 0}<br>*         {@code maximumPoolSize <= 0}<br>*         {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}*         or {@code threadFactory} or {@code handler} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

int corePoolSize?:線程池中核心線程數,小于corePoolSize?,就會創建新線程,等于corePoolSize?,這個任務就會保存到BlockingQueue,如果調用prestartAllCoreThreads()方法就會一次性的啟動corePoolSize個數的線程。

int maximumPoolSize: 允許的最大線程數,BlockingQueue也滿了,小于maximumPoolSize時候就會再次創建新的線程

long keepAliveTime:線程空閑下來后,存活的時間,這個參數只在大于corePoolSize才有用

TimeUnit unit:存活時間的單位值

BlockingQueue<Runnable> workQueue:保存任務的阻塞隊列

ThreadFactory threadFactory:創建線程的工廠,給新建的線程賦予名字

RejectedExecutionHandler handler:飽和策略

? ? ? ? ? AbortPolicy :直接拋出異常,默認;

? ? ? ? ?CallerRunsPolicy:用調用者所在的線程來執行任務

? ? ? ? ?DiscardOldestPolicy:丟棄阻塞隊列里最老的任務,隊列里最靠前的任務

? ? ? ? ?DiscardPolicy :當前任務直接丟棄

也可以實現自己的飽和策略,實現RejectedExecutionHandler接口即可

實現基本原理

主要是依賴BlockingQueue<Runnable>隊列和HashSet<Worker>實現的,Worker繼承了Runnable以及AQS的一個內部類,所以這個類具體等待并且開啟線程的功能

在提交Runnable可執行的線程時,

當前線程數小于corePoolSize??的時候,僅僅是將Runnable添加到HashSet<Worker>當中,并且執行start()方法,調用的是runWorker()方法

當前線程數大于或等于corePoolSize??的時候,會將Runnable添加到workerQueue隊列中等待并且會添加一個null的Runnable到addWorker()方法當中。如果隊列滿了offer失敗就會執相應的reject拒絕策略。

    public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

在addWorker()方法當中,如果Runnable為空的話,會直接返回false,否則將創建一個Worker對象并且啟動它,在runWorker中,首先執行完后傳輸過來的Runnable對象中的run(),然后循環去workerQueue隊列使用take方法拿等待隊列中的Runnable對象,并且執行相應的run()方法。

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}

關閉線程池的方法:

shutdownNow():設置線程池的狀態,還會嘗試停止正在運行或者暫停任務的線程

shutdown()設置線程池的狀態,只會中斷所有沒有執行任務的線程

工作機制

合理配置線程池

根據任務的性質來:計算密集型(CPU),IO密集型,混合型

計算密集型:加密,大數分解,正則……., 線程數適當小一點,最大推薦:機器的Cpu核心數+1,為什么+1,防止頁缺失,(機器的Cpu核心=Runtime.getRuntime().availableProcessors();)

IO密集型:讀取文件,數據庫連接,網絡通訊, 線程數適當大一點,機器的Cpu核心數*2,

混合型:盡量拆分,IO密集型>>計算密集型,拆分意義不大,IO密集型~計算密集型

隊列的選擇上,應該使用有界,無界隊列可能會導致內存溢出

Executors預定義的線程池

FixedThreadPool:創建固定線程數量的,適用于負載較重的服務器,使用了無界隊列

SingleThreadPoolExecutor:創建單個線程,需要順序保證執行任務,不會有多個線程活動,使用了無界隊列

CachedThreadPool:會根據需要來創建新線程的,執行很多短期異步任務的程序,使用了SynchronousQueue
WorkStealingPool(JDK7以后): 基于ForkJoinPool實現

Executor框架

還有一個是定時器,待會兒再說吧

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/537159.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/537159.shtml
英文地址,請注明出處:http://en.pswp.cn/news/537159.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

面向過程的門面模式

{*******************************************************}{ }{ 業務邏輯一 }{ }{ 版權所有 (C) 2008 陳…

Java并發編程之線程定時器ScheduledThreadPoolExecutor解析

定時器 就是需要周期性的執行任務&#xff0c;也叫調度任務&#xff0c;在JDK中有個類Timer是支持周期性執行&#xff0c;但是這個類不建議使用了。 ScheduledThreadPoolExecutor 繼承自ThreadPoolExecutor線程池&#xff0c;在Executors默認創建了兩種&#xff1a; newSin…

python xml轉換鍵值對_Python 提取dict轉換為xml/json/table并輸出

#!/usr/bin/python#-*- coding:gbk -*-#設置源文件輸出格式import sysimport getoptimport jsonimport createDictimport myConToXMLimport myConToTabledef getRsDataToDict():#獲取控制臺中輸入的參數&#xff0c;并根據參數找到源文件獲取源數據csDict{}try:#通過getopt獲取…

應用開發框架之——根據數據表中的存儲的方法名稱來調用方法

功用一&#xff1a;在框架里面根據存儲在數據表中的方法名來動態調用執行方法。 unit Unit1; interface uses Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms, Dialogs, StdCtrls; type TForm1 class(TForm) Button1: TButton; procedu…

Spring IOC容器組件注入的幾種方式

整理一下之前Spring的學習筆記&#xff0c;大致有一下幾種Spring注入到容器中的方法: 1&#xff09;、配置在xml的方式。 2&#xff09;、開啟包掃描ComponentScan使用Component&#xff0c;Service&#xff0c;Controller&#xff0c;Repository&#xff08;其實后三個都繼承…

我們是如何拿下Google和Facebook Offer的?

http://posts.careerengine.us/p/57c3a1c1a09633ee7e57803c 大家好&#xff0c;我是小高&#xff0c;CMU CS Master&#xff0c;來Offer第一期學員&#xff0c;2014年初在孫老師的帶領下我在幾個月的時間內進入了Yahoo&#xff0c;并工作了近2年。2016年初&#xff0c;Yahoo工作…

Spring中BeanFactory和FactoryBean的區別

先介紹一下Spring的IOC容器到底是個什么東西&#xff0c;都說是一個控制反轉的容器&#xff0c;將對象的控制權交給IOC容器&#xff0c;其實在看了源代碼之后&#xff0c;就會發現IOC容器只是一個存儲單例的一個ConcurrentHashMap<String, BeanDefinition> BeanDefiniti…

python中數字和字符串可以直接相加_用c語言或者python將文件中特定字符串后面的數字相加...

匿名用戶1級2014-08-31 回答代碼應該不難吧。既然用爬蟲爬下來了&#xff0c;為什么爬取數據的時候沒做處理呢。之前用過Scrapy爬蟲框架&#xff0c;挺好用的&#xff0c;你可研究下。代碼&#xff1a;#!codingutf-8import osimport reimport random# 獲取當前目錄文件列表def …

Spring中Aware的用法以及實現

Aware 在Spring當中有一些內置的對象是未開放給我們使用的&#xff0c;例如Spring的上下文ApplicationContext、環境屬性Environment&#xff0c;BeanFactory等等其他的一些內置對象&#xff0c;而在我們可以通過實現對應的Aware接口去拿到我們想要的一些屬性&#xff0c;一般…

c#字符型轉化為asc_C#字符串和ASCII碼的轉換

//字符轉ASCII碼&#xff1a;public static int Asc(string character){if (character.Length 1){System.Text.ASCIIEncoding asciiEncoding new System.Text.ASCIIEncoding();int intAsciiCode (int)asciiEncoding.GetBytes(character)[0];return (intAsciiCode);}else{thr…

topcoder srm 625 div1

problem1 link 假設第$i$種出現的次數為$n_{i}$&#xff0c;總個數為$m$&#xff0c;那么排列數為$T\frac{m!}{\prod_{i1}^{26}(n_{i}!)}$ 然后計算回文的個數&#xff0c;只需要考慮前一半&#xff0c;得到個數為$R$&#xff0c;那么答案為$\frac{R}{T}$. 為了防止數字太大導致…

Spring的組件賦值以及環境屬性@PropertySource

PropertySource 將指定類路徑下的.properties一些配置加載到Spring當中&#xff0c; 有個跟這個差不多的注解PropertySources Target(ElementType.TYPE) Retention(RetentionPolicy.RUNTIME) Documented public interface PropertySources {PropertySource[] value();} 使用…

python語音識別框架_橫評:五款免費開源的語音識別工具

編者按&#xff1a;本文原作者 Cindi Thompson&#xff0c;美國德克薩斯大學奧斯汀分校(University of Texas at Austin)計算機科學博士&#xff0c;數據科學咨詢公司硅谷數據科學(Silicon Valley Data Science&#xff0c;SVDS)首席科學家&#xff0c;在機器學習、自然語言處理…

csharp read excel file get sheetName list

1 /// <summary>2 /// 3 /// 塗聚文4 /// 201208035 /// Geovin Du6 ///找到EXCEL的工作表名稱 要考慮打開的文件的進程問題7 /// </summary>8 /// <param name"filename">…

Spring Bean的生命周期以及IOC源碼解析

IOC源碼這一塊太多只能講個大概吧&#xff0c;建議還是去買本Spring IOC源碼解析的書來看比較好&#xff0c;我也是自己看源代碼以及視頻整理的筆記 Bean的生命周期大概可以分為四個階段&#xff0c;具體的等會再說&#xff0c;先看看IOC的源碼吧 1、bean的創建 2、bean的屬…

python3繪圖_python3繪圖示例2(基于matplotlib:柱狀圖、分布圖、三角圖等)

#!/usr/bin/env python# -*- coding:utf-8 -*-from matplotlib import pyplot as pltimport numpy as npimport pylabimport os,sys,time,math,random# 圖1-給已有的圖加上刻度filer‘D:\jmeter\jmeter3.2\data\Oracle數據庫基礎.png‘arrnp.array(file.getdata()).reshape(fil…

bzoj4152-[AMPPZ2014]The_Captain

Description 給定平面上的n個點&#xff0c;定義(x1,y1)到(x2,y2)的費用為min(|x1-x2|,|y1-y2|)&#xff0c;求從1號點走到n號點的最小費用。 Input 第一行包含一個正整數n(2<n<200000)&#xff0c;表示點數。 接下來n行&#xff0c;每行包含兩個整數x[i],yi&#xff0c;…

python日志統計_python試用-日志統計

最近兩天嘗試用python代替bash寫Linux Shell腳本來統計日志。發現python寫起來比bash更簡單和容易閱讀&#xff0c;發現不少驚喜。所以寫了一個粗糙的腳本來統計日志。目標1、通過簡單命令和腳本統計事件發生數2、日志限定文本類型假定環境日志文件&#xff1a;1.logtest:aaa,1…

Spring AOP兩種使用方式以及如何使用解析

AOP是一種面向切面編程思想&#xff0c;也是面向對象設計&#xff08;OOP&#xff09;的一種延伸。 在Spring實現AOP有兩種實現方式&#xff0c;一種是采用JDK動態代理實現&#xff0c;另外一種就是采用CGLIB代理實現&#xff0c;Spring是如何實現的在上篇已經講到了Spring Be…