文章目錄
- 線程池的實現原理
- execute(Runnable command)
- **1. 階段一:嘗試創建核心線程**
- **2. 階段二:嘗試將任務加入隊列**
- **3. 階段三:嘗試創建非核心線程或拒絕任務**
- **關鍵機制與設計思想**
線程池的實現原理
當向線程池提交一個任務之后,線程池是如何處理這個任務的呢?根據剛剛講的線程池參數的含義,我們來看一下線程池 的主要處理流程。
從圖中可以看出,當提交一個新任務到線程池時,線程池的處理流程是這樣的, 這個很關鍵,面試必問。
- 判斷核心線程池是否已滿,即線程數是否達到
corePoolSize
如果不是,則創建一個新的工作線程來執行任務。如果核心線程池里的線程已經滿了,則進入下個流程。
- 判斷工作隊列是否已經滿。
BlockingQueue
如果工作隊列沒有滿,則將新提交的任務存儲在這 個工作隊列里。如果工作隊列滿了,則進入下個流程。
- 判斷線程池是否已滿,即線程數是否達到
maxPoolSize
如果沒有,則創建一個新的工作線程 來執行任務。如果已經滿了,則交給飽和策略來處理這個任務。
再來看看 ThreadPoolExecutor執行execute()方法的圖:
按照我們上面說的, ThreadPoolExecutor執行execute方法也會分為這幾種情況。
-
如果當前運行的線程少于corePoolSize,則創建新線程來執行任務(注意,執行這一步驟需要獲取全局鎖)。
-
如果運行的線程等于或多于corePoolSize,則將任務加入BlockingQueue。
-
如果無法將任務加入BlockingQueue(隊列已滿),則創建新的線程來處理任務(注意,執行這一步驟需要獲取全局鎖)。
-
如果創建新線程將使當前運行的線程超出maximumPoolSize,任務將被拒絕,并調用 RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor采取上述步驟的總體設計思路,是為了在執行execute()方法時,盡可能地避免獲取全局鎖, 因為很明顯這是一個嚴重的瓶頸。
在ThreadPoolExecutor完成預熱之后 , 也就是當前運行的線程數大于等于corePoolSize,幾乎所有的execute()方法調用都是執行步驟2,而步驟2不需要獲取全局鎖。
通過流程分析,我們很直觀地了解了線程池的工作原理,接下來, 我們再通過源代碼來看看具體是如何實現的
execute(Runnable command)
//高3位表示狀態,低29位表示線程數
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {//如果當前工作線程數小于核心線程數(corePoolSize),//則嘗試創建新線程作為核心線程并立即執行任務if (addWorker(command, true)) // 以核心模式創建Workerreturn;c = ctl.get(); // 若創建失敗(如線程池已關閉),重新獲取ctl值}if (isRunning(c) && workQueue.offer(command)) { // 檢查狀態并嘗試入隊int recheck = ctl.get();if (! isRunning(recheck) && remove(command))// 線程池已關閉且任務成功移除reject(command); // 拒絕任務else if (workerCountOf(recheck) == 0) //無存活線程addWorker(null, false); //創建非核心線程防止隊列任務積壓}else if (!addWorker(command, false))// 以非核心模式創建Workerreject(command); // 拋出RejectedExecutionException異常
}
1. 階段一:嘗試創建核心線程
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) // 以核心模式創建Workerreturn;c = ctl.get(); // 若創建失敗(如線程池已關閉),重新獲取ctl值
}
-
目標:如果當前工作線程數小于核心線程數(
corePoolSize
),則嘗試創建新線程作為核心線程并立即執行任務。 -
關鍵操作:
-
workerCountOf(c)
:解析ctl
變量(高3位表示狀態,低29位表示線程數)獲取當前工作線程數。 -
addWorker(command, true)
:嘗試以核心線程限制(corePoolSize
)創建工作者(Worker
),command
作為首任務。 -
失敗處理:若線程池已關閉(狀態非
RUNNING
)或并發沖突導致創建失敗,則進入階段二。
-
2. 階段二:嘗試將任務加入隊列
if (isRunning(c) && workQueue.offer(command)) { // 檢查狀態并嘗試入隊int recheck = ctl.get(); // 重新獲取狀態以應對并發變化if (!isRunning(recheck) && remove(command)) // 線程池已關閉且任務成功移除reject(command); // 拒絕任務else if (workerCountOf(recheck) == 0) // 無存活線程(例如核心線程超時被回收)addWorker(null, false); // 創建非核心線程(后續從隊列取任務)
}
-
目標:當核心線程已滿,嘗試將任務加入阻塞隊列(如
LinkedBlockingQueue
)。 -
關鍵操作:
-
雙重狀態檢查:
-
初始入隊前校驗線程池是否處于運行狀態(
isRunning(c)
)。 -
入隊后再次校驗(
recheck
),避免在入隊期間線程池被關閉。
-
-
處理極端情況:
-
若線程池已關閉且成功從隊列移除任務,則觸發拒絕策略。
-
若所有工作線程意外終止(例如異常退出),則新建非核心線程(參數
firstTask = null
),強制處理隊列中的積壓任務。此處的null
表示新線程不立即執行提交的command
,而是直接從隊列中獲取任務(通過Worker.run()
中的循環邏輯)。此時創建的非核心線程雖然無初始任務,但會主動消費隊列中積累的任務,確保隊列不積壓。
while (running) {Job job = null;synchronized (jobs) {if (jobs.isEmpty()) jobs.wait(); // 從隊列取任務job = jobs.removeFirst();}if (job != null) job.run(); }
-
-
3. 階段三:嘗試創建非核心線程或拒絕任務
else if (!addWorker(command, false)) // 以非核心模式創建Workerreject(command); // 隊列已滿且線程數達到maximumPoolSize,拒絕任務
-
目標:當隊列已滿時,嘗試創建非核心線程(線程數上限為
maximumPoolSize
)。 -
關鍵操作:
-
addWorker(command, false)
:以非核心模式創建工作者,直接執行當前任務。 -
失敗條件:
-
線程數已達
maximumPoolSize
。 -
線程池已關閉(非
RUNNING
狀態)。
-
-
拒絕策略:調用
RejectedExecutionHandler.rejectedExecution()
,默認拋出RejectedExecutionException
。
-
關鍵機制與設計思想
-
全局鎖(Global Lock)的應用:
addWorker()
方法內部通過鎖(如ReentrantLock
)同步線程池狀態修改操作(例如增減線程、更新ctl
),確保原子性。
-
減少鎖競爭優化:
- 在核心線程已預熱的情況下,多數任務直接通過隊列處理(階段二),避免了頻繁獲取鎖的性能損耗。
-
Worker執行邏輯(補充):
- 每個
Worker
啟動后執行runWorker()
方法,循環從隊列中獲取任務。
- 每個