一、線程池
線程池(ThreadPool)是一種線程復用的機制。它維護著若干個線程,任務來了就復用這些線程去執行,任務做完線程不會銷毀,而是回到池中等待下一個任務。
為什么要用線程池?
降低資源消耗:避免頻繁創建和銷毀線程帶來的系統開銷。
提高響應速度:任務到來時可以直接復用已有線程,無需等待新線程創建。
便于統一管理:可以統一分配、調優和監控線程,控制最大并發數,防止系統資源耗盡。
支持任務排隊和拒絕策略:可以靈活處理任務高峰和異常情況。
二、線程池的核心組成及工作流程
Java線程池的核心實現類是?ThreadPoolExecutor,其主要組成如下:
- 核心線程數(corePoolSize):池中始終存活的線程數,即使它們處于空閑狀態也不會被銷毀。
- 最大線程數(maximumPoolSize):池中允許的最大線程數。
- 線程空閑時間(keepAliveTime):非核心線程空閑多久會被銷毀。
- 時間單位(unit):keepAliveTime的時間單位。
- 任務隊列(workQueue):用于保存等待執行任務的隊列。
- 線程工廠(threadFactory):用于創建新線程的工廠。
- 拒絕策略(handler):當線程池和隊列都滿了時,如何處理新任務。
線程池的工作流程
提交任務:調用execute()或submit()方法提交任務。
判斷核心線程數:如果當前線程數小于corePoolSize,創建新線程執行任務。
任務入隊:如果核心線程已滿,嘗試將任務放入隊列。
創建非核心線程:如果隊列也滿了,且線程數小于maximumPoolSize,創建非核心線程執行任務。
拒絕策略:如果線程數已達最大且隊列也滿,執行拒絕策略(如拋異常、丟棄任務等)。
線程復用:線程執行完任務后不會銷毀,而是回到池中等待下一個任務。
三、創建線程池的兩種方式:
方式一:通過?Executors?工具類(不推薦在生產環境使用)
1.?newFixedThreadPool(int nThreads)
- 特點:創建一個固定大小的線程池。
- 核心線程數 = 最大線程數。
- 使用?LinkedBlockingQueue(無界隊列),可能會導致任務堆積,有OOM(內存溢出)風險。
- 適用場景:需要控制并發線程數量的場景。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
2.?newSingleThreadExecutor()
- 特點:創建一個只有一個線程的線程池。
- 核心線程數 = 最大線程數 = 1。
- 使用?LinkedBlockingQueue(無界隊列),同樣有OOM風險。
- 適用場景:需要保證所有任務按順序執行的場景。
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
3 newCachedThreadPool()
- 特點:創建一個可緩存的線程池,線程數會根據任務量動態調整。
- 核心線程數為0,最大線程數為?Integer.MAX_VALUE。
- 使用?SynchronousQueue,任務來了如果沒有空閑線程,就直接創建新線程。
- 風險:如果任務量巨大,會無限制地創建線程,可能導致OOM或系統資源耗盡。
- 適用場景:執行大量短期、異步任務的場景。
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
4? newScheduledThreadPool(int corePoolSize)
- 特點:創建一個支持定時及周期性任務執行的線程池。
- 適用場景:需要執行定時任務或周期性任務的場景。
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
方式二:通過?ThreadPoolExecutor?構造函數(推薦)
ThreadPoolExecutor?是 Java 并發包(java.util.concurrent)中線程池的核心實現類。它功能強大、高度可配置,是理解和使用 Java?線程池的基礎。這是最原始、最靈活,也是生產環境中推薦使用的方式。你可以完全控制線程池的所有參數。
下面是一段ThreadPoolExecutor實現的線程池構建:
import java.util.concurrent.*;public class CreateThreadPoolDemo {public static void main(String[] args) {// 定義線程池的核心參數int corePoolSize = 2;int maximumPoolSize = 5;long keepAliveTime = 60L;TimeUnit unit = TimeUnit.SECONDS;BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(10); // 使用有界隊列ThreadFactory threadFactory = Executors.defaultThreadFactory();RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); // 拒絕策略// 創建線程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);// 使用線程池for (int i = 0; i < 15; i++) {threadPoolExecutor.execute(() -> {System.out.println(Thread.currentThread().getName() + " is running...");});}// 關閉線程池threadPoolExecutor.shutdown();}
}
而且我們查看方式一的四種類型的線程池創建:
public static ExecutorService newFixedThreadPool(int nThreads) {// LinkedBlockingQueue 的默認長度為 Integer.MAX_VALUE,可以看作是無界的return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}public static ExecutorService newSingleThreadExecutor() {// LinkedBlockingQueue 的默認長度為 Integer.MAX_VALUE,可以看作是無界的return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}// 同步隊列 SynchronousQueue,沒有容量,最大線程數是 Integer.MAX_VALUE`
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}// DelayedWorkQueue(延遲阻塞隊列)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}
可以看到前三種類型都是對ThreadPoolExecutor的包裝,可以大膽猜測一下,第四種應該也是對ThreadPoolExecutor的包裝;但是在他的實現中沒有看到實例化的ThreadPoolExecutor,那就有些疑惑了。那它是如何實現的封裝ThreadPoolExecutor呢?看下面的類的關系:
可以看到ScheduledThreadPoolExecutor類中繼承了ThreadPoolExecutor,所以它通過?super?關鍵字調用了父類的構造函數。這說明定時任務線程池本質上也是一個?ThreadPoolExecutor,只是配置了特殊的參數。由此可見ThreadPoolExecutor類十分的重要,它是Executors工具類的基礎組成,而且阿里巴巴的《Java開發手冊》中強制要求不要使用?Executors?的這幾種方法來創建線程池,因為它們都存在資源耗盡的風險;因此ThreadPoolExecutor類進一步解析十分重要;
四、ThreadPoolExecutor源碼解析
1 構造方法:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
這是其中的全參構造構造方法,其他的構造方法都是通過this對這個構造方法的調用,類似之前Executors工具類是對ThreadPoolExecutor的封裝。它首先對非合理參數進行了判斷,如果參數不合理直接拋出對應錯誤,否則初始化參數構建ThreadPoolExecutor實例;
2 execute方法:
execute方法是Java線程池(如ThreadPoolExecutor)中最核心的任務提交方法,其作用可以總結為:向線程池提交一個需要執行的任務(Runnable),由線程池中的線程來執行該任務。
具體流程如下:
當你調用executor.execute(task)時,線程池會按照如下流程處理:
- 判斷當前線程數是否小于核心線程數(corePoolSize)
- 如果是,直接創建新線程來執行任務。
- 如果核心線程已滿,嘗試將任務放入任務隊列(workQueue)
- 如果隊列未滿,任務會被緩存,等待線程池中的線程來取出并執行。
- 如果隊列也滿了,且線程數未達到最大線程數(maximumPoolSize)
- 會創建新的非核心線程來執行任務。
- 如果線程池已滿且隊列也滿
- 執行拒絕策略(如拋出異常、丟棄任務等)。
那我們看看上述流程在execute方法中是如何實現的:
public void execute(Runnable command) {// 1. 判空,防止提交null任務if (command == null)throw new NullPointerException();// 2. 獲取線程池當前狀態和線程數int c = ctl.get();// 3. 如果當前線程數小于核心線程數,優先創建核心線程執行任務if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) // 嘗試創建核心線程return; // 成功則直接返回c = ctl.get(); // 失敗則重新獲取ctl,繼續后續流程}// 4. 如果線程池處于RUNNING狀態且隊列未滿,任務入隊if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get(); // 入隊后再次獲取ctl,防止狀態變化// 4.1 如果線程池已關閉且任務還在隊列,移除任務并執行拒絕策略if (!isRunning(recheck) && remove(command))reject(command);// 4.2 如果線程池里沒有線程了,創建一個非核心線程來保證隊列任務能被執行else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 5. 如果隊列也滿了,嘗試創建非核心線程執行任務else if (!addWorker(command, false))// 6. 如果創建失敗(線程池已滿或已關閉),執行拒絕策略reject(command);
}
3 狀態控制
如上,你可能會疑惑ctl是啥 : 它是一種原子整型成員變量,主要用來狀態控制:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
先看其參數RUNNING,它是-1向左位移29位
在?Java?中,int?類型是?32 位,-1?的補碼是:
1111 1111 1111 1111 1111 1111 1111 1111
?左移?29 位后,低?29 位變成?0,高?3 位還是 1:
1110 0000 0000 0000 0000 0000 0000 0000
在看看ctlOf方法:
private static int ctlOf(int rs, int wc) { return rs | wc;
}
它是實現了一個按位或運算,為啥要這樣設計呢?這是因為可以用一個int變量(ctl)同時存儲線程池的狀態和線程數,高三位存儲狀態,低29位存儲線程數,這樣可以通過一個int變量同時管理線程池狀態和線程數;
那如何獲取狀態信息和線程數呢?
private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; }
如上兩種方法,runStateOf是獲取狀態信息,wokerCountOf是獲取線程數信息:
CAPACITY是1左移29位-1得到的,其二進制信息為:0001 1111 1111 1111 1111 1111?1111 1111
其中高三位全為0,低29為全為1;可以對CAPACITY適時取反后進行按位與操作獲取高三位或者低29位信息,這樣就可以單獨獲取到該線程池的狀態和線程數量信息;
更多詳細的信息如下表:
表達式 | 二進制運算示例 | 作用 |
---|---|---|
c | 1110 0000 ... 0000 0101 | 存儲復合信息(狀態+線程數) |
CAPACITY | 0001 1111 ... 1111 1111 | 線程數掩碼(高3位為0,低29位為1) |
c & CAPACITY | 0000 0000?... 0000 0101 | 提取線程數(workerCountOf(c)的實現) |
~CAPACITY | 1110 0000 ... 0000?0000 | 運行狀態掩碼(高3位為1,低29位為0) |
c?& ~CAPACITY | 1110 0000?... 0000 0000 | 提取運行狀態(runStateOf(c)的實現) |
4 addWorker方法
好了,了解了狀態信息和線程信息獲取過程,還需要看一下addWorker方法,這個是用來創建線程的方法。它的兩個參數,一個是Runnable變量,一個是布爾類型變量;
/*** 嘗試創建一個新的Worker線程,并執行其第一個任務。* @param firstTask Worker線程的第一個任務,可以為null。* @param core 如果為true,則使用corePoolSize作為線程數上限;否則使用maximumPoolSize。* @return 如果成功創建并啟動了Worker,則返回true;否則返回false。*/
private boolean addWorker(Runnable firstTask, boolean core) {// 外層循環,用于在CAS失敗或線程池狀態改變時重試。retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// --- 狀態檢查 ---// 檢查是否可以添加新的Worker線程。// 如果線程池已關閉 (>= SHUTDOWN),則通常不允許添加新線程。// 但有一個例外:如果狀態是SHUTDOWN,且任務隊列不為空,允許添加一個沒有初始任務的Worker來處理隊列中的任務。if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;// 內層循環,用于通過CAS原子地增加工作線程數。for (;;) {int wc = workerCountOf(c);// --- 容量檢查 ---// 檢查工作線程數是否已達到上限 (CAPACITY或core/maximumPoolSize)。if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// --- CAS操作 ---// 嘗試原子地將工作線程數+1。if (compareAndIncrementWorkerCount(c))break retry; // CAS成功,跳出所有循環,繼續執行后續的創建邏輯。// --- CAS失敗處理 ---c = ctl.get(); // CAS失敗,重新讀取ctl的值。if (runStateOf(c) != rs)continue retry; // 如果線程池狀態已改變,回到外層循環重試。// 如果狀態未變,說明是其他線程也增加了線程數導致的CAS失敗,僅重試內層循環。}}// --- 創建并啟動Worker線程 ---boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 創建一個新的Worker對象,它包裝了任務和要執行的線程。w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock(); // 獲取全局鎖,保證線程安全地添加Worker和啟動線程。try {// 在持有鎖的情況下,再次檢查線程池狀態,防止在獲取鎖的過程中線程池被關閉。int rs = runStateOf(ctl.get());// 如果線程池正在運行,或者處于SHUTDOWN狀態且允許添加空任務的Worker。if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // 預檢,防止啟動一個已經存活的線程。throw new IllegalThreadStateException();// 將新的Worker添加到workers集合中。workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock(); // 確保鎖被釋放。}if (workerAdded) {t.start(); // 啟動線程。workerStarted = true;}}} finally {// --- 失敗回滾 ---// 如果線程啟動失敗(如ThreadFactory創建失敗或啟動過程中出錯)。if (!workerStarted) {addWorkerFailed(w); // 調用失敗處理方法,將之前增加的線程數-1,并從集合中移除Worker。}}return workerStarted;
}
先看for循環中的第一個判斷
? ?if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;
rs先判斷線程池是否是正常狀態并且任務和隊列不能為null,為啥判斷隊列不為null呢,因為,當核心線程數沒到最大值時,再來的任務會創建核心線程,當核心線程達到最大值時會向隊列中暫存任務,因此隊列不為null時,核心線程數達到了最大值,不會創建核心線程數;
在狀態檢查之后,進行線程容量檢查,如果小于核心線程數則進行增加線程數,在增加過程中使用的是cas操作,如果不成功則重新獲取rs判斷狀態;如果線程池狀態已改變,回到外層循環重試。如果狀態未變,說明是其他線程也增加了線程數導致的CAS失敗,僅重試內層循環。
當線程核心數增加成功后,開始增加worker線程并且啟動線程,在添加工作線程時使用ReentrantLock 對workers對象上鎖;在此過程中會再次檢查線程池狀態,在添加成功后同時更新largestPoolSize參數,這個參數記錄了線程池中最大的線程數量。別把核心線程數:corePoolSize和最大線程數:maximumPoolSize混淆了。
當添加線程成功則運行線程
如果線程啟動失敗則調用失敗處理方法,將之前增加的線程數-1,并從集合中移除Worker。
先寫這些吧......