目錄
1.概述
2.ThreadPoolExector
2.1.參數
2.2.新任務提交流程
2.3.拒絕策略
2.4.代碼示例
1.概述
線程池的核心:
線程池的實現原理是個標準的生產消費者模型,調用方不停向線程池中寫數據,線程池中的線程組不停從隊列中取任務。
實現線程池需要考慮的幾個核心因素:
-
隊列的長度
-
隊列滿后,后面來的線程如何處理
隊列的長度:
用來存線程這個隊列的長度太小了可能會不夠用,要是沒有限制又可能導致機器的物理內存耗盡,所以最好的方式就是給這個隊列一個初始化的長度,然后允許這個隊列動態擴容。
隊列滿后,后面來的任務如何處理:
隊列滿了之后新來的任務如何處理?也就是拒絕策略,關于這個拒絕策略,是直接拒絕丟棄掉?還是把隊列中的老任務丟棄掉給它讓位置?還是說不走線程池,直接新開一條線程來執行?
繼承體系:
可以看到頂級父接口提供了規范標準,真正干活兒的實現類只有ThreadPoolExcutor和ScheduleThreadPoolExecutor。
本文主要以ThreadPoolExcutor為切入聊一下線程池的核心概念,由于ScheduleThreadPoolExecutor主要是用來做延遲任務和周期任務的,以它為切入來聊線程池的核心概念并不是那么合適,后面會有文章專門聊一聊JDK基于線程池打造的一整套延遲任務、周期任務、異步任務等,這些任務調度體系。
2.ThreadPoolExector
2.1.參數
public class ThreadPoolExecutor extends AbstractExecutorService{private final AtomicInteger ctl;//狀態變量private final BlockingQueue<Runnable> workQueue;//任務隊列private final ReentrantLock mainLock;//用于保證線程池中各變量之間的互斥private final HashSet<ThreadPoolExecutor.Worker> workers;//線程組
}
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{final Thread thread;//被封裝的線程Runnable firstTask;//worker收到的第一個任務volatile long completedTasks;//worker執行完畢的任務數
}
線程池的核心參數為
-
corePoolSize:在線程池中始終維護的線程個數.
-
maxPoolSize:在corePooSize已滿、隊列也滿的情況下,擴 充線程至此值。
-
keepAliveTime/TimeUnit:maxPoolSize 中的空閑線程,銷 毀所需要的時間,總線程數收縮回corePoolSize。
-
blockingQueue:線程池所用的隊列類型。
-
threadFactory:線程創建工廠,可以自定義,也有一個默 認的。
-
RejectedExecutionHandler:corePoolSize 已滿,隊列已 滿,maxPoolSize 已滿,最后的拒絕策略。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
2.2.新任務提交流程
入口在ThreadPoolExector.execute(Runnable command)
public void execute(Runnable command) {if (command == null)throw new NullPointerException();
?int c = ctl.get();//如果當前線程組中的線程數量小于核心線程數,直接開一個新線程來執行該任務if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//如果當前線程組中的線程數量大于等于核心線程數,將該任務放入隊列中if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}//放入隊列失敗,再嘗試新開一個線程來執行該任務else if (!addWorker(command, false))//這時候再失敗意味著線程組數量已經大于maxPoolSize且任務隊列已滿,直接執行拒絕策略reject(command);}
2.3.拒絕策略
ThreadPoolExector一共提供了四種拒絕策略:
-
AbortPolicy,默認拒絕策略,直接拋出異常。
-
CallerRunsPolicy,讓任務在調用者的線程中執行,線程池不對任務做處理。
-
DiscardPolicy,線程池直接把任務丟棄掉,就當什么都沒有發生。
-
DiscardOldestPolicy,把隊列中最老的任務刪掉,將新任務放入隊列。
2.4.代碼示例
在使用線程池的時候并不需要我們手動去創建,JDK中有工具類來幫我們創建各種線程池,這個工具類只是包了一層,其底層還是創建的我們上面聊的這些線程池的實現類,以下是代碼示例:
import java.util.concurrent.*;public class ThreadPoolExamples {public static void main(String[] args) throws InterruptedException {// 固定大小的線程池示例fixedThreadPoolExample();// 單線程線程池示例singleThreadExecutorExample();// 緩存線程池示例cachedThreadPoolExample();// 定時線程池示例scheduledThreadPoolExample();}/*** 創建一個固定大小的線程池,該線程池中的線程數量固定,不會因為任務的增加而增加新的線程。*/private static void fixedThreadPoolExample() {ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5); // 創建一個包含5個線程的線程池for (int i = 0; i < 10; i++) {final int taskId = i;fixedThreadPool.execute(() -> {System.out.println("FixedThreadPool: Task ID " + taskId + " is running by " + Thread.currentThread().getName());try {Thread.sleep(1000); // 模擬耗時操作} catch (InterruptedException e) {e.printStackTrace();}System.out.println("FixedThreadPool: Task ID " + taskId + " finished by " + Thread.currentThread().getName());});}fixedThreadPool.shutdown(); // 關閉線程池try {fixedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);} catch (InterruptedException e) {e.printStackTrace();}}/*** 創建一個單線程線程池,所有的任務都將在同一個線程中依次執行。*/private static void singleThreadExecutorExample() {ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); // 創建一個單線程的線程池for (int i = 0; i < 10; i++) {final int taskId = i;singleThreadExecutor.execute(() -> {System.out.println("SingleThreadExecutor: Task ID " + taskId + " is running by " + Thread.currentThread().getName());try {Thread.sleep(1000); // 模擬耗時操作} catch (InterruptedException e) {e.printStackTrace();}System.out.println("SingleThreadExecutor: Task ID " + taskId + " finished by " + Thread.currentThread().getName());});}singleThreadExecutor.shutdown(); // 關閉線程池try {singleThreadExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);} catch (InterruptedException e) {e.printStackTrace();}}/*** 創建一個緩存線程池,該線程池會根據需要創建新線程,但會在線程閑置后回收線程。*/private static void cachedThreadPoolExample() {ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); // 創建一個緩存線程池for (int i = 0; i < 10; i++) {final int taskId = i;cachedThreadPool.execute(() -> {System.out.println("CachedThreadPool: Task ID " + taskId + " is running by " + Thread.currentThread().getName());try {Thread.sleep(1000); // 模擬耗時操作} catch (InterruptedException e) {e.printStackTrace();}System.out.println("CachedThreadPool: Task ID " + taskId + " finished by " + Thread.currentThread().getName());});}cachedThreadPool.shutdown(); // 關閉線程池try {cachedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);} catch (InterruptedException e) {e.printStackTrace();}}/*** 創建一個定時線程池,可以安排任務在指定時間執行,或定期執行任務。*/private static void scheduledThreadPoolExample() {ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); // 創建一個包含5個線程的定時線程池Runnable task = () -> System.out.println("ScheduledThreadPool: Task executed at: " + System.currentTimeMillis());// 安排在1秒后執行一次,然后每隔2秒重復執行scheduledThreadPool.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);try {Thread.sleep(10000); // 主線程休眠10秒,以便觀察任務執行情況} catch (InterruptedException e) {e.printStackTrace();}scheduledThreadPool.shutdown(); // 關閉線程池}
}