文章目錄
- 前言
- 正文
- 一、執行線程的基本流程
- 1.1 JUC中的線程池執行線程
- 1.2 Tomcat 中線程池執行線程
- 二、被改造的阻塞隊列
- 2.1 TaskQueue的 offer(...)
- 2.2 TaskQueue的 force(...)
- 三、總結
前言
Tomcat 線程池,是依據 JUC 中的線程池 ThreadPoolExecutor
重新自定義實現的。
其執行線程的代碼邏輯,和JUC 中是相同的。主要區別在于,Tomcat中對 阻塞隊列進行了改造。
本文主要研究 Tomcat 的線程池是如何執行線程的,即線程池的工作原理。
同系列文章:Tomcat線程池原理(上篇:初始化原理)
正文
一、執行線程的基本流程
Tomcat 中執行線程的基本流程,和JUC中是一致的。
以下貼出兩種執行方法。
1.1 JUC中的線程池執行線程
在ThreadPoolExecutor
中,執行線程的方法定義如下:
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);
}
1.2 Tomcat 中線程池執行線程
@Override
public void execute(Runnable command) {execute(command,0,TimeUnit.MILLISECONDS);
}
重載方法定義如下:
這個方法被Deprecated
標注,源碼中注釋中描述說是,Tomcat10中會被刪除,估計是一種優化。本文不做研究。
@Deprecated
public void execute(Runnable command, long timeout, TimeUnit unit) {// 提交的任務數量+1submittedCount.incrementAndGet();try {// 執行線程任務(這個方法中的代碼和JUC中執行線程的代碼一致,差別在于,阻塞隊列使用了Tomcat自定義的隊列)executeInternal(command);} catch (RejectedExecutionException rx) {// 被拒絕后,嘗試將線程放入隊列// 如果當前隊列是Tomcat自定義的隊列TaskQueue,嘗試將任務放入隊列if (getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue) getQueue();try {// 強制將線程任務放入阻塞隊列if (!queue.force(command, timeout, unit)) {// 放入隊列失敗,提交的任務數-1submittedCount.decrementAndGet();// 拋出異常,線程池隊列已滿throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));}} catch (InterruptedException x) {// 中斷時,提交的任務數-1submittedCount.decrementAndGet();// 拋出異常throw new RejectedExecutionException(x);}} else {// 當前指定的隊列不是TaskQueue,提交的任務數-1,并拋出異常submittedCount.decrementAndGet();throw rx;}}
}
另外,executeInternal
的內容如下(和JUC中基本一致):區別在于阻塞隊列換成了TaskQueue
private void executeInternal(Runnable command) {if (command == null) {throw new NullPointerException();}int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) {return;}c = ctl.get();}// 這里的workQueue,實際已經換成了TaskQueueif (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command)) {reject(command);} else if (workerCountOf(recheck) == 0) {addWorker(null, false);}}else if (!addWorker(command, false)) {reject(command);}
}
二、被改造的阻塞隊列
從第一小節中,看到的東西并不多。因為,Tomcat線程池的改造重心,在于阻塞隊列,也就是 TaskQueue
。
2.1 TaskQueue的 offer(…)
@Override
public boolean offer(Runnable o) {//we can't do any checksif (parent==null) {return super.offer(o);}// 若是達到最大線程數,直接進隊列if (parent.getPoolSizeNoLock() == parent.getMaximumPoolSize()) {return super.offer(o);}// 已提交任務數小于等于當前線程數// 對應的場景是:當前活躍線程為10個,但是只有8個任務在執行,于是,直接進隊列if (parent.getSubmittedCount() <= parent.getPoolSizeNoLock()) {return super.offer(o);}// 當前線程數小于最大線程數,此次拒絕進入隊列if (parent.getPoolSizeNoLock() < parent.getMaximumPoolSize()) {return false;}//if we reached here, we need to add it to the queuereturn super.offer(o);
}
2.2 TaskQueue的 force(…)
其本質是,直接入隊列。
public boolean force(Runnable o) {if (parent == null || parent.isShutdown()) {throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));}return super.offer(o); //forces the item onto the queue, to be used if the task is rejected
}
三、總結
根據源碼中對于TaskQueue的改造,可以觀察出,Tomcat線程池的主要特點如下:
- 當前線程數小于corePoolSize,則去創建工作線程;
- 當前線程數大于corePoolSize,但小于maximumPoolSize,則去創建工作線程;
- 當前線程數大于maximumPoolSize,則將任務放入到阻塞隊列中,當阻塞隊列滿了之后,則調用拒絕策略丟棄任務;
- 任務執行失敗時不會直接拋出錯誤,而是裝回隊列里再次嘗試執行;
- 當線程池沒有達到最大執行線程的時候,會優先開線程再使用任務隊列;
總結之后就是,Tomcat 為了更適配 IO 密集型任務,改造了阻塞隊列。與JUC相比,會先去創建線程執行任務,創建的線程數達到最大線程數時,再放入隊列等待空閑線程的出現。