2019獨角獸企業重金招聘Python工程師標準>>>
線程池的簡單介紹
基于多核CPU的發展,使得多線程開發日趨流行。然而線程的創建和銷毀,都涉及到系統調用,比較消耗系統資源,所以就引入了線程池技術,避免頻繁的線程創建和銷毀。
在Java用有一個Executors工具類,可以為我們創建一個線程池,其本質就是new了一個ThreadPoolExecutor對象。
建議使用較為方便的 Executors 工廠方法來創建線程池。
- Executors.newCachedThreadPool()(無界線程池,可以進行自動線程回收)
- Executors.newFixedThreadPool(int)(固定大小線程池)
- Executors.newSingleThreadExecutor()(單個后臺線程)。
- Executors.newScheduledThreadPool() (支持計劃任務的線程池)
ThreadPoolExecutor工作原理介紹
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
- corePoolSize:線程池的核心線程數,說白了就是,即便是線程池里沒有任何任務,也會有corePoolSize個線程在候著等任務。
- maximumPoolSize:最大線程數,不管你提交多少任務,線程池里最多工作線程數就是maximumPoolSize。
- keepAliveTime:線程的存活時間。當線程池里的線程數大于corePoolSize時,如果等了keepAliveTime時長還沒有任務可執行,則線程退出。
- unit:這個用來指定keepAliveTime的單位,比如秒:TimeUnit.SECONDS。
- workQueue:一個阻塞隊列,提交的任務將會被放到這個隊列里。
- threadFactory:線程工廠,用來創建線程,主要是為了給線程起名字,默認工廠的線程名字:pool-1-thread-3。
- handler:拒絕策略,當線程池里線程被耗盡,且隊列也滿了的時候會調用。
線程池的執行流程圖
任務被提交到線程池,會先判斷當前線程數量是否小于corePoolSize,如果小于則創建線程來執行提交的任務,否則將任務放入workQueue隊列,如果workQueue滿了,則判斷當前線程數量是否小于maximumPoolSize,如果小于則創建線程執行任務,否則就會調用handler,以表示線程池拒絕接收任務。
線程池使用介紹
newScheduledThreadPool的使用示例
public class SchedulePoolDemo {public static void main(String[] args){ScheduledExecutorService service = Executors.newScheduledThreadPool(10);//如果前面的任務沒有完成, 調度也不會啟動service.scheduleAtFixedRate(()->{try {Thread.sleep(2000);// 每兩秒打印一次.System.out.println(System.currentTimeMillis()/1000);} catch (InterruptedException e) {e.printStackTrace();}}, 0, 2, TimeUnit.SECONDS);}
}
潛在宕機風險
使用Executors來創建要注意潛在宕機風險.其返回的線程池對象的弊端如下:
- FixedThreadPool和SingleThreadPoolPool : 允許的請求隊列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM.
- CachedThreadPool和ScheduledThreadPool : 允許的創建線程數量為 Integer.MAX_VALUE,可能會創建大量的線程,從而導致 OOM.
綜上所述, 在可能有大量請求的線程池場景中, 更推薦自定義ThreadPoolExecutor來創建線程池, 具體構造函數配置如下:
線程池大小配置
一般根據任務類型進行區分, 假設CPU為N核
- CPU密集型任務需要減少線程數量, 降低線程之間切換造成的開銷, 可配置線程池大小為N + 1.
- IO密集型任務則可以加大線程數量, 可配置線程池大小為 N * 2.
- 混合型任務則可以拆分為CPU密集型與IO密集型, 獨立配置.
自定義阻塞隊列BlockingQueue
主要存放等待執行的線程, ThreadPoolExecutor中支持自定義該隊列來實現不同的排隊隊列.
- ArrayBlockingQueue:先進先出隊列,創建時指定大小, 有界;
- LinkedBlockingQueue:使用鏈表實現的先進先出隊列,默認大小為Integer.MAX_VALUE;
- SynchronousQueue:不保存提交的任務, 數據也不會緩存到隊列中, 用于生產者和消費者互等對方, 一起離開.
- PriorityBlockingQueue: 支持優先級的隊列
回調接口
線程池提供了一些回調方法, 具體使用如下所示.
ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>()) {@Overrideprotected void beforeExecute(Thread t, Runnable r) {System.out.println("準備執行任務: " + r.toString());}@Overrideprotected void afterExecute(Runnable r, Throwable t) {System.out.println("結束任務: " + r.toString());}@Overrideprotected void terminated() {System.out.println("線程池退出");}};
可以在回調接口中, 對線程池的狀態進行監控, 例如任務執行的最長時間, 平均時間, 最短時間等等, 還有一些其他的屬性如下:
- taskCount:線程池需要執行的任務數量.
- completedTaskCount:線程池在運行過程中已完成的任務數量.小于或等于taskCount.
- largestPoolSize:線程池曾經創建過的最大線程數量.通過這個數據可以知道線程池是否滿過.如等于線程池的最大大小,則表示線程池曾經滿了.
- getPoolSize:線程池的線程數量.如果線程池不銷毀的話,池里的線程不會自動銷毀,所以這個大小只增不減.
- getActiveCount:獲取活動的線程數.
自定義拒絕策略
線程池滿負荷運轉后, 因為時間空間的問題, 可能需要拒絕掉部分任務的執行.
jdk提供了RejectedExecutionHandler接口, 并內置了幾種線程拒絕策略
- AbortPolicy: 直接拒絕策略, 拋出異常.
- CallerRunsPolicy: 調用者自己執行任務策略.
- DiscardOldestPolicy: 舍棄最老的未執行任務策略. 使用方式也很簡單, 直接傳參給ThreadPool
ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>(),Executors.defaultThreadFactory(),new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("reject task: " + r.toString());}});
自定義ThreadFactory
線程工廠用于創建池里的線程. 例如在工廠中都給線程setDaemon(true), 這樣程序退出的時候, 線程自動退出.或者統一指定線程優先級, 設置名稱等等.
class NamedThreadFactory implements ThreadFactory {private static final AtomicInteger threadIndex = new AtomicInteger(0);private final String baseName;private final boolean daemon;public NamedThreadFactory(String baseName) {this(baseName, true);}public NamedThreadFactory(String baseName, boolean daemon) {this.baseName = baseName;this.daemon = daemon;}public Thread newThread(Runnable runnable) {Thread thread = new Thread(runnable, this.baseName + "-" + threadIndex.getAndIncrement());thread.setDaemon(this.daemon);return thread;}
}
關閉線程池
跟直接new Thread不一樣, 局部變量的線程池, 需要手動關閉, 不然會導致線程泄漏問題.默認提供兩種方式關閉線程池.- shutdown: 等所有任務, 包括阻塞隊列中的執行完, 才會終止, 但是不會接受新任務.
- shutdownNow: 立即終止線程池, 打斷正在執行的任務, 清空隊列.
ThreadPoolExecutor源碼分析
ThreadPoolExecutor中ctl屬性介紹
ctl是ThreadPoolExecutor的一個重要屬性,它記錄著ThreadPoolExecutor的線程數量和線程狀態。
//Integer有32位,其中前三位用于記錄線程狀態,后29位用于記錄線程的數量.
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//表示用于記錄線程數量的位數
private static final int COUNT_BITS = Integer.SIZE - 3;
//將1左移COUNT_BITS位再減1,表示能表示的最大線程數
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//用ctl前三位分別表示線程池的狀態
//(前三位為111)接受新任務并且處理已經進入隊列的任務
private static final int RUNNING = -1 << COUNT_BITS;
//(前三位為000)不接受新任務,但是處理已經進入隊列的任務
private static final int SHUTDOWN = 0 << COUNT_BITS;
//(前三位001)不接受新任務,不處理已經進入隊列的任務,并且中斷正在執行的任務
private static final int STOP = 1 << COUNT_BITS;
//(前三位010)所有任務執行完成,workerCount為0。線程轉到了狀態TIDYING會執行terminated()鉤子方法
private static final int TIDYING = 2 << COUNT_BITS;
//(前三位011)任務已經執行完成
private static final int TERMINATED = 3 << COUNT_BITS;
//狀態值就是只關心前三位的值,所以把后29位清0
private static int runStateOf(int c) { return c & ~CAPACITY; }//線程數量就是只關心后29位的值,所以把前3位清0
private static int workerCountOf(int c) { return c & CAPACITY; }//兩個數相或
private static int ctlOf(int rs, int wc) { return rs | wc; }
execute()方法解析
public void execute(Runnable command) {if (command == null) throw new NullPointerException();int c = ctl.get();//判斷當前活躍線程數是否小于corePoolSizeif (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))//調用addWorker創建線程執行任務return;c = ctl.get();}//如果不小于corePoolSize,則將任務添加到workQueue隊列。if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//再次獲取ctl的狀態//如果不在運行狀態了,那么就從隊列中移除任務if (! isRunning(recheck) && remove(command))reject(command);//如果在運行階段,但是Worker數量為0,調用addWorker方法else if (workerCountOf(recheck) == 0)addWorker(null, false);}//嘗試創建非核心線程如果創建失敗就會調用reject拒絕接受任務。else if (!addWorker(command, false))reject(command);}
//調用handler的rejectedExecution(command,this)方法。handler是RejectedExecutionHandler接口,默認實現是AbortPolicy
final void reject(Runnable command) {handler.rejectedExecution(command, this);
}
addWorker()方法解析
addWorker方法用于創建線程,并且通過core參數表示該線程是否是核心線程,如果返回true則表示創建成功,否則失敗。addWorker的代碼如下所示:
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);//得到線程池的運行狀態// rs>=SHUTDOWN為false,即線程池處于RUNNING狀態.// rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()這個條件為true,也就意味著三個條件同時滿足,即線程池狀態為SHUTDOWN且firstTask為null且隊列不為空,這種情況為處理隊列中剩余任務。上面提到過當處于SHUTDOWN狀態時,不接受新任務,但是會處理完隊列里面的任務。如果firstTask不為null,那么就屬于添加新任務;如果firstTask為null,并且隊列為空,那么就不需要再處理了。if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||//如果創建的是非核心線程(core=false)時,則需要判斷當前線程數wc>=maximumPoolSize,如果返回false,創建非核心線程失敗。//如果創建的是核心線程(core=true)時,則需要判斷當前線程數wc>=corePoolSize,如果返回false,創建核心線程失敗。wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))//worker+1執行成功,那么跳出外循環break retry;c = ctl.get();if (runStateOf(c) != rs)//再次判斷當前狀態,如果新獲取的狀態和當前狀態不一致,則再次進入外循環continue retry;// else CAS failed due to workerCount change; retry inner loop}}/*
一旦跳出外循環,表示可以創建創建線程,這里具體是Worker對象,Worker實現了Runnable接口并且繼承AbstractQueueSynchronizer,內部維持一個Runnable的隊列。try塊中主要就是創建Worker對象,然后將其保存到workers中,workers是一個HashSet,表示工作線程的集合。然后如果添加成功,則開啟Worker所在的線程。如果開啟線程失敗,則調用addWorkerFailed方法,addWokerFailed用于回滾worker線程的創建。
*/boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//以firstTask作為Worker的第一個任務創建Workerw = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();//對整個線程池加鎖try {int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();//啟動啟動這個線程workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
addWorkerFailed()方法解析
private void addWorkerFailed(Worker w) {//對整個線程成績加鎖final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//移除Worker對象if (w != null)workers.remove(w);//減小worker數量decrementWorkerCount();//檢查termination狀態tryTerminate();} finally {mainLock.unlock();}}
addWorkerFailed首先從workers集合中移除線程,然后將wokerCount減1,最后檢查終結。
tryTerminate()方法解析
tryTerminate()方法用于檢查是否有必要將線程池狀態轉移到TERMINATED。
final void tryTerminate() {for (;;) {int c = ctl.get();/*狀態判斷,如果有符合以下條件之一。則跳出循環(1)線程池處于RUNNING狀態(2)線程池狀態處于TIDYING狀態(3)線程池狀態處于SHUTDOWN狀態并且隊列不為空
如果不滿足上述的情況,那么目前狀態屬于SHUTDOWN切隊列為空,或者狀態屬于STOP,那么調用interruptIdleWorkers方法停止一個Worker線程,然后退出。*/if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}
/*
如果沒有退出循環的話,那么就首先將狀態設置成TIDYING,然后調用terminated方法,最后設置狀態為TERMINATED。terminated方法是個空實現,用于當線程池終結時處理一些事情。
*/final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}}
構造函數Worker(firstTask)解析
Worker繼承自AbstractQueuedSynchronizer并實現Runnbale接口。AbstractQueuedSynchronizer提供了一個實現阻塞鎖和其他同步工具,比如信號量、事件等依賴于等待隊列的框架。Worker的構造方法中會使用threadFactory構造線程變量并持有run方法調用了runWorker方法,將線程委托給主循環線程。
Worker(Runnable firstTask) {setState(-1);this.firstTask = firstTask;//設置該線程的this.thread = getThreadFactory().newThread(this);//創建一個線程
}//當我我們啟動一個線程時就會觸發Worker中的此方法
public void run() {runWorker(this);
}
runWorker()方法解析
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;//首次任務是創建Worker時添加的任務w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//線程調用runWoker,會while循環調用getTask方法從workerQueue里讀取任務,然后執行任務。只要getTask方法不返回null,此線程就不會退出。while (task != null || (task = getTask()) != null) {w.lock();//對Worker加鎖//如果線程池停止了,那么中斷線程if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);//空實現,任務運行之前的操作Throwable thrown = null;try {task.run();//執行任務} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);//空實現,任務運行之后的操作}} finally {task = null;//任務執行完畢后,將task設置為nullw.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
getTask()方法解析
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);//必要時檢查隊列是否為空if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);//判斷是否允許超時,wc>corePoolSize則是判斷當前線程數是否大于corePoolSize。boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//如果當前線程數大于corePoolSize,//則會調用workQueue的poll方法獲取任務,超時時間是keepAliveTime。//如果超過keepAliveTime時長,poll返回了null,//上邊提到的while循序就會退出,線程也就執行完了。//如果當前線程數小于corePoolSize,//則會調用workQueue的take方法阻塞當前線程,不會退出Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
參考地址:
- http://www.cnblogs.com/qingquanzi/p/8146638.html
- https://blog.csdn.net/qq_19431333/article/details/59030892
- https://www.cnblogs.com/xdecode/p/9119794.html