ThreadPoolExecutor 是 Java 標準庫中用于創建和管理線程池的核心類之一。它實現了 ExecutorService 接口,提供了豐富的線程池管理功能。下面將通過源碼解析來深入了解 ThreadPoolExecutor 類的工作原理和各個重要部分。
可以在 Java 源代碼中找到 ThreadPoolExecutor 類的實現,位于 java.util.concurrent 包中。
以下是 ThreadPoolExecutor 類的一些關鍵概念和部分:
1、構造函數:
ThreadPoolExecutor 類提供了幾個不同的構造函數,允許傳遞核心線程數、最大線程數、空閑線程存活時間、任務隊列等參數。這些參數決定了線程池的基本行為。
下面是幾個常用的構造函數及其參數的解釋:
核心線程數 (corePoolSize):
核心線程數表示線程池中始終保持存活的線程數量。這些線程會一直存在,即使它們處于空閑狀態。線程池會根據任務數量自動創建新線程,直到核心線程數達到上限。最大線程數 (maximumPoolSize):
最大線程數表示線程池中最多可以同時存在的線程數量。當核心線程數已滿,且任務隊列也已滿時,線程池會創建新線程,直到最大線程數達到上限。如果達到最大線程數后仍有更多任務到達,根據飽和策略進行處理。空閑線程存活時間 (keepAliveTime):
空閑線程存活時間表示當線程池中的線程數超過核心線程數,并且這些線程處于空閑狀態時,它們會被保留的時間。超過此時間后,多余的空閑線程將被終止,從而節省系統資源。時間單位 (unit):
時間單位用于指定空閑線程存活時間的單位,可以是秒、毫秒等。任務隊列 (workQueue):
任務隊列用于存儲等待執行的任務。線程池可以使用不同類型的隊列,如 BlockingQueue 的各種實現。任務隊列決定了等待執行的任務數量。線程工廠 (threadFactory):
線程工廠用于創建新線程,默認使用 Executors.defaultThreadFactory()。可以自定義線程工廠來創建線程,從而指定線程的名稱、優先級等屬性。飽和策略 (handler):
當線程池和任務隊列都已滿,新的任務到達時,飽和策略定義了如何處理這些任務。線程池提供了幾種內置的飽和策略,如 AbortPolicy、CallerRunsPolicy、DiscardPolicy 以及 DiscardOldestPolicy。
2、線程池狀態:
ThreadPoolExecutor 的內部維護了幾種狀態,包括 RUNNING、SHUTDOWN、STOP、TIDYING 和 TERMINATED。線程池在不同的狀態下會有不同的行為,例如當調用 shutdown 方法時,線程池會從 RUNNING 狀態轉變為 SHUTDOWN 狀態,不再接受新的任務。
RUNNING(運行中):
在 RUNNING 狀態下,線程池處于正常運行狀態,可以接受新任務并執行已提交的任務。在這個狀態下,核心線程數和非核心線程數都可以創建和執行任務。SHUTDOWN(關閉中):
當調用線程池的 shutdown 方法時,線程池會進入 SHUTDOWN 狀態。在這個狀態下,線程池不再接受新任務,但會繼續執行已提交的任務,包括等待隊列中的任務。STOP(立即停止):
當調用線程池的 shutdownNow 方法時,線程池會進入 STOP 狀態。在這個狀態下,線程池會嘗試中斷所有正在執行的線程,并清空任務隊列。TIDYING(整理中):
當線程池狀態從 SHUTDOWN 轉變為 TIDYING,表示線程池已經停止接受新任務,正在執行中的任務也已經完成,處于整理和清理狀態。在這個狀態下,線程池會執行一些清理操作,例如中斷空閑線程。TERMINATED(已終止):
當線程池狀態從 TIDYING 轉變為 TERMINATED,表示線程池已經徹底終止,所有任務都已執行完畢,并且線程池中的所有線程都已銷毀。在這個狀態下,線程池不再執行任何操作。
3、任務隊列:
任務隊列用于存儲等待執行的任務。ThreadPoolExecutor 允許使用不同類型的隊列,如 BlockingQueue 的各種實現,包括 LinkedBlockingQueue、ArrayBlockingQueue 等。這些隊列控制了等待執行的任務數量。
以下是幾種常見的任務隊列類型及其特點:
LinkedBlockingQueue(鏈式阻塞隊列):
LinkedBlockingQueue 是一個基于鏈表的無界阻塞隊列,它可以存儲無限數量的任務。在核心線程數未滿的情況下,新的任務會直接創建新線程來執行。當核心線程數已滿時,任務會被放入隊列中等待執行。ArrayBlockingQueue(數組阻塞隊列):
ArrayBlockingQueue 是一個基于數組的有界阻塞隊列,需要指定隊列的容量。在核心線程數未滿的情況下,新的任務會直接創建新線程來執行。當核心線程數已滿時,任務會被放入隊列中等待執行。PriorityBlockingQueue(優先級阻塞隊列):
PriorityBlockingQueue 是一個無界阻塞隊列,它會根據任務的優先級來進行調度。具有較高優先級的任務會被優先執行。DelayedWorkQueue(延遲工作隊列):
DelayedWorkQueue 是一個用于調度延遲任務的隊列,其中的任務可以設置延遲執行時間。適用于需要按照一定延遲執行任務的場景。SynchronousQueue(同步隊列):
SynchronousQueue 是一個沒有實際存儲能力的隊列,每個插入操作必須等待一個相應的刪除操作,反之亦然。適用于需要實現一對一的任務交付機制。
用了無界隊列那非核心線程就不會創建了?
是的,當線程池使用無界隊列(如 LinkedBlockingQueue 或 PriorityBlockingQueue)時,非核心線程的創建將受到影響。在無界隊列中,任務可以無限制地排隊等待執行,因此不會觸發創建額外的非核心線程。
對于無界隊列,以下是一些關鍵點需要注意:
核心線程數 (corePoolSize): 核心線程數仍然會影響線程池的初始線程創建,當任務提交到線程池時,如果當前活動線程數小于核心線程數,新任務會創建一個核心線程來執行。這適用于線程池的初始階段或在任務數量較少的情況下。最大線程數 (maximumPoolSize): 如果任務的到達速率超過了核心線程的執行速度,并且任務隊列已滿,非核心線程將不會創建,因為無界隊列可以容納所有任務。任務隊列: 無界隊列會持續地存儲等待執行的任務,直到系統資源用盡。這可能導致內存占用逐漸增加,因此在選擇無界隊列時需要注意系統資源的管理。
無界隊列適用于一些特定場景,如任務量波動大,任務執行時間差異較大,或者希望盡量保留任務而不丟失的情況。然而,需要注意的是,如果任務數量持續增加,無界隊列可能會導致內存消耗過大,因此在選擇隊列類型時需要綜合考慮線程池的整體性能和資源利用。
阻塞隊列?
前面提到的 LinkedBlockingQueue、PriorityBlockingQueue 和 ArrayBlockingQueue 都是阻塞隊列,這里我將更詳細地解釋一下阻塞隊列的概念以及它們的作用。
阻塞隊列是一種特殊類型的隊列,具有以下特點:
阻塞特性: 當向隊列添加元素或從隊列中取出元素時,如果隊列已滿或為空,阻塞隊列會自動阻塞線程,直到隊列變為非滿或非空為止。線程安全: 阻塞隊列是線程安全的,多個線程可以并發地進行入隊和出隊操作,而不需要額外的同步措施。
在 ThreadPoolExecutor 中,任務隊列是一個關鍵組件,它決定了線程池中等待執行的任務數量、調度策略以及如何處理任務。不同類型的阻塞隊列在不同的情況下有不同的用途和特點,可以根據實際需求進行選擇。
具體來說:
LinkedBlockingQueue 是一個基于鏈表的無界阻塞隊列。當任務數量超過核心線程數時,新的任務會被放入隊列中等待執行。如果隊列已滿,新任務會阻塞等待直到有空間。
ArrayBlockingQueue 是一個基于數組的有界阻塞隊列。它需要指定隊列的容量。當隊列已滿時,新的任務會阻塞等待直到有空間。
PriorityBlockingQueue 是一個無界阻塞隊列,根據元素的優先級來進行調度。優先級高的元素會被先出隊執行。
請注意,阻塞隊列適用于不同的場景和需求。根據應用特性和性能要求,選擇適合的阻塞隊列類型是很重要的。
LinkedBlockingQueue無界隊列?
LinkedBlockingQueue 是一個基于鏈表的可選界限阻塞隊列,而不是無界隊列。這意味著它可以選擇性地指定隊列的容量,當容量未指定時,隊列會默認為無界。
所以,當使用 LinkedBlockingQueue 作為任務隊列時,如果沒有指定容量(或者容量為 Integer.MAX_VALUE),隊列會被認為是無界的,新任務總是可以放入隊列中,而不會因為隊列已滿而阻塞。
如果指定了容量,當任務數量超過容量時,新的任務會被放入隊列中等待執行。當隊列已滿時,新任務會阻塞等待直到有空間,這是典型的阻塞隊列行為。
4、線程工廠:
ThreadPoolExecutor 允許通過提供線程工廠來自定義線程的創建過程,包括線程的名稱、優先級、是否守護線程等屬性。線程工廠負責創建新的線程實例,然后線程池會使用這些線程來執行任務。
ThreadPoolExecutor 的構造函數中有一個參數 threadFactory,可以傳遞一個實現了 ThreadFactory 接口的對象來指定線程工廠。ThreadFactory 接口只有一個方法 newThread,用于創建新的線程。以下是一個簡單的示例:
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;public class CustomThreadFactoryExample {public static void main(String[] args) {ThreadFactory threadFactory = new CustomThreadFactory("MyThreadGroup");ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS,new LinkedBlockingQueue<>(), threadFactory);// ... 添加任務到線程池并執行 ...executor.shutdown();}
}class CustomThreadFactory implements ThreadFactory {private final ThreadGroup threadGroup;private final AtomicInteger threadNumber = new AtomicInteger(1);public CustomThreadFactory(String threadGroupName) {SecurityManager s = System.getSecurityManager();threadGroup = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();}public Thread newThread(Runnable r) {Thread thread = new Thread(threadGroup, r,"MyThread-" + threadNumber.getAndIncrement(),0);if (thread.isDaemon())thread.setDaemon(false);if (thread.getPriority() != Thread.NORM_PRIORITY)thread.setPriority(Thread.NORM_PRIORITY);return thread;}
}
在上面的示例中,創建了一個自定義的線程工廠 CustomThreadFactory 實現了 ThreadFactory 接口,用于創建具有自定義屬性的線程。在這個例子中,指定了線程組、線程名稱前綴等屬性。
通過使用線程工廠,可以為線程池中的每個線程設置特定的屬性,從而更好地管理和控制線程的行為。這對于在多線程應用程序中調試和監視線程非常有幫助。
5、飽和策略:
當線程池和任務隊列都已滿,新的任務到達時,飽和策略定義了如何處理這些任務。ThreadPoolExecutor 提供了幾種內置的飽和策略,如 AbortPolicy(拋出異常)、CallerRunsPolicy(由調用者線程執行任務)、DiscardPolicy(丟棄任務)以及 DiscardOldestPolicy(丟棄最舊的任務)。
下面對每種策略進行詳細解釋:
AbortPolicy(拋出異常):
這是默認的飽和策略。當線程池和隊列都已滿時,新任務會導致 RejectedExecutionException 異常被拋出,提示線程池已經飽和。這是一種保守的策略,防止任務丟失。CallerRunsPolicy(由調用者線程執行任務):
當線程池和隊列都已滿時,新任務會被調用者線程(提交任務的線程)直接執行,而不會交給線程池中的線程來執行。這種策略可能會導致調用者線程阻塞,因為它們會等待任務執行完畢。DiscardPolicy(丟棄任務):
當線程池和隊列都已滿時,新任務會被直接丟棄,不會進行任何處理。這可能會導致任務丟失,慎用此策略。DiscardOldestPolicy(丟棄最舊的任務):
當線程池和隊列都已滿時,會嘗試將最早的任務從隊列中移除,然后添加新任務。這可能會導致一些舊任務被丟棄,以便為新任務騰出空間。
這些飽和策略提供了不同的處理方式,可以根據應用需求來選擇合適的策略。通常情況下,AbortPolicy 是默認的且安全的選擇,因為它會在資源不足時拋出異常,提示應該考慮調整線程池大小或處理任務隊列。其他策略可能會導致任務丟失或阻塞,需要根據具體情況謹慎選擇。
6、線程池的執行過程:
當任務提交給 ThreadPoolExecutor 后,線程池會根據當前狀態、核心線程數、任務隊列狀態等決定任務的處理方式。如果核心線程數未滿,會創建新線程來執行任務;如果核心線程數已滿,會嘗試將任務放入隊列,如果隊列也已滿,會根據飽和策略來處理任務。
描述的流程如下:
核心線程數未滿:
如果線程池的當前活動線程數小于核心線程數,線程池會創建一個新的核心線程來立即執行提交的任務。核心線程數已滿:
如果線程池的當前活動線程數達到核心線程數,新任務會被放入任務隊列等待執行。隊列未滿:
如果任務隊列未滿,新任務會被放入隊列中等待執行。隊列已滿:
如果任務隊列已滿,根據選擇的飽和策略來處理任務。可能的策略包括:拋出異常(AbortPolicy):如果飽和策略為 AbortPolicy,則新任務會被拒絕,并拋出 RejectedExecutionException 異常。由調用者線程執行(CallerRunsPolicy):如果飽和策略為 CallerRunsPolicy,則提交任務的線程(調用者線程)會執行該任務,而不會交給線程池中的線程執行。丟棄任務(DiscardPolicy):如果飽和策略為 DiscardPolicy,則新任務會被直接丟棄,不會進行任何處理。丟棄最舊的任務(DiscardOldestPolicy):如果飽和策略為 DiscardOldestPolicy,則嘗試從隊列中移除最舊的任務,以便為新任務騰出空間。
線程池根據這些步驟來動態調整線程的創建和任務的處理,以適應不同的并發情況和資源限制。這種靈活的處理方式使得線程池能夠在不同的負載下保持高效的任務處理能力。
7、線程池的終止:
調用 shutdown 方法會觸發線程池的終止過程。線程池會拒絕新的任務,等待已提交但未執行的任務完成,然后關閉線程池中的線程。
調用 shutdown 方法是線程池的一種優雅關閉方式。下面將更詳細地解釋 shutdown 方法的作用和線程池的終止過程:
調用 shutdown 方法:
當調用線程池的 shutdown 方法時,線程池會開始終止的過程。在此過程中,線程池將不再接受新的任務,但會繼續執行已提交但尚未執行的任務,同時等待隊列中的任務也會被繼續執行。任務執行和隊列處理:
在終止過程中,線程池會讓已經創建的核心線程和非核心線程繼續處理已提交的任務。同時,線程池也會嘗試從任務隊列中獲取任務來執行。如果隊列中還有等待執行的任務,線程池會繼續分配線程來執行這些任務。拒絕新任務:
在調用 shutdown 方法后,線程池會拒絕接受新的任務。任何嘗試提交新任務的操作都會被拒絕,并且會拋出 RejectedExecutionException 異常。等待任務完成:
在終止過程中,線程池會等待隊列中的任務和正在執行的任務都完成。這意味著線程池不會立即關閉,而是會等待任務全部執行完畢。關閉線程池中的線程:
一旦所有任務都執行完畢,線程池會關閉其中的線程。如果線程池中存在非核心線程,它們在任務執行完畢后會根據 keepAliveTime 和空閑時間來判斷是否終止。終止狀態:
當線程池中的所有線程都已關閉時,線程池會達到 TERMINATED 狀態,表示線程池已經完全終止。
總之,調用 shutdown 方法后,線程池會等待已提交但未執行的任務完成,并且關閉線程池中的線程,最終達到終止狀態。這種方式可以避免任務丟失,并且允許線程池逐步優雅地停止,釋放資源。