線程池存在的意義
平常使用線程即new Thread()然后調用start()方法去啟動這個線程,但是在頻繁的業務情況下如果在生產環境大量的創建Thread對象是則會浪費資源,不僅增加GC回收壓力,并且還浪費了時間,創建線程是需要花時間的;
線程池的存在就是降低頻繁的創建線程,降低資源的消耗以及創建時間的浪費,并且可以同一管理。
ThreadPoolExecutor
在JDK中所有的線程池的父類就是ThreadPoolExecutor,以下是它的構造方法
/*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize the number of threads to keep in the pool, even* if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the* pool* @param keepAliveTime when the number of threads is greater than* the core, this is the maximum time that excess idle threads* will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are* executed. This queue will hold only the {@code Runnable}* tasks submitted by the {@code execute} method.* @param threadFactory the factory to use when the executor* creates a new thread* @param handler the handler to use when execution is blocked* because the thread bounds and queue capacities are reached* @throws IllegalArgumentException if one of the following holds:<br>* {@code corePoolSize < 0}<br>* {@code keepAliveTime < 0}<br>* {@code maximumPoolSize <= 0}<br>* {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}* or {@code threadFactory} or {@code handler} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
int corePoolSize?:線程池中核心線程數,小于corePoolSize?,就會創建新線程,等于corePoolSize?,這個任務就會保存到BlockingQueue,如果調用prestartAllCoreThreads()方法就會一次性的啟動corePoolSize個數的線程。
int maximumPoolSize: 允許的最大線程數,BlockingQueue也滿了,小于maximumPoolSize時候就會再次創建新的線程
long keepAliveTime:線程空閑下來后,存活的時間,這個參數只在大于corePoolSize才有用
TimeUnit unit:存活時間的單位值
BlockingQueue<Runnable> workQueue:保存任務的阻塞隊列
ThreadFactory threadFactory:創建線程的工廠,給新建的線程賦予名字
RejectedExecutionHandler handler:飽和策略
? ? ? ? ? AbortPolicy :直接拋出異常,默認;
? ? ? ? ?CallerRunsPolicy:用調用者所在的線程來執行任務
? ? ? ? ?DiscardOldestPolicy:丟棄阻塞隊列里最老的任務,隊列里最靠前的任務
? ? ? ? ?DiscardPolicy :當前任務直接丟棄
也可以實現自己的飽和策略,實現RejectedExecutionHandler接口即可
實現基本原理
主要是依賴BlockingQueue<Runnable>隊列和HashSet<Worker>實現的,Worker繼承了Runnable以及AQS的一個內部類,所以這個類具體等待并且開啟線程的功能
在提交Runnable可執行的線程時,
當前線程數小于corePoolSize??的時候,僅僅是將Runnable添加到HashSet<Worker>當中,并且執行start()方法,調用的是runWorker()方法
當前線程數大于或等于corePoolSize??的時候,會將Runnable添加到workerQueue隊列中等待并且會添加一個null的Runnable到addWorker()方法當中。如果隊列滿了offer失敗就會執相應的reject拒絕策略。
public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
在addWorker()方法當中,如果Runnable為空的話,會直接返回false,否則將創建一個Worker對象并且啟動它,在runWorker中,首先執行完后傳輸過來的Runnable對象中的run(),然后循環去workerQueue隊列使用take方法拿等待隊列中的Runnable對象,并且執行相應的run()方法。
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
關閉線程池的方法:
shutdownNow():設置線程池的狀態,還會嘗試停止正在運行或者暫停任務的線程
shutdown()設置線程池的狀態,只會中斷所有沒有執行任務的線程
工作機制
合理配置線程池
根據任務的性質來:計算密集型(CPU),IO密集型,混合型
計算密集型:加密,大數分解,正則……., 線程數適當小一點,最大推薦:機器的Cpu核心數+1,為什么+1,防止頁缺失,(機器的Cpu核心=Runtime.getRuntime().availableProcessors();)
IO密集型:讀取文件,數據庫連接,網絡通訊, 線程數適當大一點,機器的Cpu核心數*2,
混合型:盡量拆分,IO密集型>>計算密集型,拆分意義不大,IO密集型~計算密集型
隊列的選擇上,應該使用有界,無界隊列可能會導致內存溢出
Executors預定義的線程池
FixedThreadPool:創建固定線程數量的,適用于負載較重的服務器,使用了無界隊列
SingleThreadPoolExecutor:創建單個線程,需要順序保證執行任務,不會有多個線程活動,使用了無界隊列
CachedThreadPool:會根據需要來創建新線程的,執行很多短期異步任務的程序,使用了SynchronousQueue
WorkStealingPool(JDK7以后): 基于ForkJoinPool實現
Executor框架
還有一個是定時器,待會兒再說吧