線程池
一、什么是線程池
在開發中,為了提升效率的操作,我們需要將一些業務采用多線程的方式去執行。
比如有一個比較大的任務,可以將任務分成幾塊,分別交給幾個線程去執行,最終做一個匯總就可以了。
比如做業務操作時,需要發送短信或者是發送郵件,這種操作也可以基于異步的方式完成,這種異步的方式,其實就是再構建一個線程去執行。
但是,如果每次異步操作或者多線程操作都需要新創建一個線程,使用完畢后,線程再被銷毀,這樣的話,對系統造成一些額外的開銷。在處理過程中到底由多線程處理了多少個任務,以及每個線程的開銷無法統計和管理。
所以咱們需要一個線程池機制來管理這些內容。線程池的概念和連接池類似,都是在一個Java的集合中存儲大量的線程對象,每次需要執行異步操作或者多線程操作時,不需要重新創建線程,直接從集合中拿到線程對象直接執行方法就可以了。
JDK中就提供了線程池的類。
在線程池構建初期,可以將任務提交到線程池中。會根據一定的機制來異步執行這個任務。
- 可能任務直接被執行
- 任務可以暫時被存儲起來了。等到有空閑線程再來處理。
- 任務也可能被拒絕,無法被執行。
JDK提供的線程池中記錄了每個線程處理了多少個任務,以及整個線程池處理了多少個任務。同時還可以針對任務執行前后做一些勾子函數的實現。可以在任務執行前后做一些日志信息,這樣可以多記錄信息方便后面統計線程池執行任務時的一些內容參數等等……
二、JDK自帶的構建線程池的方式
JDK中基于Executors提供了很多種線程池
2.1 newFixedThreadPool
這個線程池的特別是線程數是固定的。
在Executors中第一個方法就是構建newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
構建時,需要給newFixedThreadPool方法提供一個nThreads的屬性,而這個屬性其實就是當前線程池中線程的個數。當前線程池的本質其實就是使用ThreadPoolExecutor。
構建好當前線程池后,線程個數已經固定好**(線程是懶加載,在構建之初,線程并沒有構建出來,而是隨著人任務的提交才會將線程在線程池中國構建出來)**。如果線程沒構建,線程會待著任務執行被創建和執行。如果線程都已經構建好了,此時任務會被放到LinkedBlockingQueue無界隊列中存放,等待線程從LinkedBlockingQueue中去take出任務,然后執行。
測試功能效果
public static void main(String[] args) throws Exception {ExecutorService threadPool = Executors.newFixedThreadPool(3);threadPool.execute(() -> {System.out.println("1號任務:" + Thread.currentThread().getName() + System.currentTimeMillis());try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}});threadPool.execute(() -> {System.out.println("2號任務:" + Thread.currentThread().getName() + System.currentTimeMillis());try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}});threadPool.execute(() -> {System.out.println("3號任務:" + Thread.currentThread().getName() + System.currentTimeMillis());try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}});
}
2.2 newSingleThreadExecutor
這個線程池看名字就知道是單例線程池,線程池中只有一個工作線程在處理任務
如果業務涉及到順序消費,可以采用newSingleThreadExecutor
// 當前這里就是構建單例線程池的方式
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService// 在內部依然是構建了ThreadPoolExecutor,設置的線程個數為1// 當任務投遞過來后,第一個任務會被工作線程處理,后續的任務會被扔到阻塞隊列中// 投遞到阻塞隊列中任務的順序,就是工作線程處理的順序// 當前這種線程池可以用作順序處理的一些業務中(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {// 線程池的使用沒有區別,跟正常的ThreadPoolExecutor沒區別FinalizableDelegatedExecutorService(ExecutorService executor) {super(executor);}// finalize是當前對象被GC干掉之前要執行的方法// 當前FinalizableDelegatedExecutorService的目的是為了在當前線程池被GC回收之前// 可以執行shutdown,shutdown方法是將當前線程池停止,并且干掉工作線程// 但是不能基于這種方式保證線程池一定會執行shutdown// finalize在執行時,是守護線程,這種線程無法保證一定可以執行完畢。// 在使用線程池時,如果線程池是基于一個業務構建的,在使用完畢之后,一定要手動執行shutdown,// 否則會造成JVM中一堆線程protected void finalize() {super.shutdown();}
}
測試單例線程池效果:
public static void main(String[] args) throws Exception {ExecutorService threadPool = Executors.newSingleThreadExecutor();threadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + "," + "111");});threadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + "," + "222");});threadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + "," + "333");});threadPool.execute(() -> {System.out.println(Thread.currentThread().getName() + "," + "444");});
}
測試線程池使用完畢后,不執行shutdown的后果:
如果是局部變量僅限當前線程池使用的線程池,在使用完畢之后要記得執行shutdown,避免線程無法結束
如果是全局的線程池,很多業務都會到,使用完畢后不要shutdown,因為其他業務也要執行當前線程池
static ExecutorService threadPool = Executors.newFixedThreadPool(200);public static void main(String[] args) throws Exception {newThreadPool();System.gc();Thread.sleep(5000);System.out.println("線程池被回收了!!");System.in.read();
}private static void newThreadPool(){for (int i = 0; i < 200; i++) {final int a = i;threadPool.execute(() -> {System.out.println(a);});}threadPool.shutdown();for (int i = 0; i < 200; i++) {final int a = i;threadPool.execute(() -> {System.out.println(a);});}
}
2.3 newCachedThreadPool
看名字好像是一個緩存的線程池,查看一下構建的方式
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
當第一次提交任務到線程池時,會直接構建一個工作線程
這個工作線程帶執行完人后,60秒沒有任務可以執行后,會結束
如果在等待60秒期間有任務進來,他會再次拿到這個任務去執行
如果后續提升任務時,沒有線程是空閑的,那么就構建工作線程去執行。
最大的一個特點,任務只要提交到當前的newCachedThreadPool中,就必然有工作線程可以處理
代碼測試效果
public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 1; i <= 200; i++) {final int j = i;executorService.execute(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + ":" + j);});}
}
2.4 newScheduleThreadPool
首先看到名字就可以猜到當前線程池是一個定時任務的線程池,而這個線程池就是可以以一定周期去執行一個任務,或者是延遲多久執行一個任務一次
查看一下如何構建的。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}
基于這個方法可以看到,構建的是ScheduledThreadPoolExecutor線程池
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor{//....
}
所以本質上還是正常線程池,只不過在原來的線程池基礎上實現了定時任務的功能
原理是基于DelayQueue實現的延遲執行。周期性執行是任務執行完畢后,再次扔回到阻塞隊列。
代碼查看使用的方式和效果
public static void main(String[] args) throws Exception {ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);// 正常執行
// pool.execute(() -> {
// System.out.println(Thread.currentThread().getName() + ":1");
// });// 延遲執行,執行當前任務延遲5s后再執行
// pool.schedule(() -> {
// System.out.println(Thread.currentThread().getName() + ":2");
// },5,TimeUnit.SECONDS);// 周期執行,當前任務第一次延遲5s執行,然后沒3s執行一次// 這個方法在計算下次執行時間時,是從任務剛剛開始時就計算。
// pool.scheduleAtFixedRate(() -> {
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println(System.currentTimeMillis() + ":3");
// },2,1,TimeUnit.SECONDS);// 周期執行,當前任務第一次延遲5s執行,然后沒3s執行一次// 這個方法在計算下次執行時間時,會等待任務結束后,再計算時間pool.scheduleWithFixedDelay(() -> {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(System.currentTimeMillis() + ":3");},2,1,TimeUnit.SECONDS);
}
至于Executors提供的newSingleThreadScheduledExecutor單例的定時任務線程池就不說了。
一個線程的線程池可以延遲或者以一定的周期執行一個任務。
2.5 newWorkStealingPool
當前JDK提供構建線程池的方式newWorkStealingPool和之前的線程池很非常大的區別
之前定長,單例,緩存,定時任務都基于ThreadPoolExecutor去實現的。
newWorkStealingPool是基于ForkJoinPool構建出來的
ThreadPoolExecutor的核心點:
在ThreadPoolExecutor中只有一個阻塞隊列存放當前任務
ForkJoinPool的核心特點:
ForkJoinPool從名字上就能看出一些東西。當有一個特別大的任務時,如果采用上述方式,這個大任務只能會某一個線程去執行。ForkJoin第一個特點是可以將一個大任務拆分成多個小任務,放到當前線程的阻塞隊列中。其他的空閑線程就可以去處理有任務的線程的阻塞隊列中的任務
來一個比較大的數組,里面存滿值,計算總和
單線程處理一個任務:
/** 非常大的數組 */
static int[] nums = new int[1_000_000_000];
// 填充值
static{for (int i = 0; i < nums.length; i++) {nums[i] = (int) ((Math.random()) * 1000);}
}
public static void main(String[] args) {// ===================單線程累加10億數據================================System.out.println("單線程計算數組總和!");long start = System.nanoTime();int sum = 0;for (int num : nums) {sum += num;}long end = System.nanoTime();System.out.println("單線程運算結果為:" + sum + ",計算時間為:" + (end - start));
}
多線程分而治之的方式處理:
/** 非常大的數組 */
static int[] nums = new int[1_000_000_000];
// 填充值
static{for (int i = 0; i < nums.length; i++) {nums[i] = (int) ((Math.random()) * 1000);}
}
public static void main(String[] args) {// ===================單線程累加10億數據================================System.out.println("單線程計算數組總和!");long start = System.nanoTime();int sum = 0;for (int num : nums) {sum += num;}long end = System.nanoTime();System.out.println("單線程運算結果為:" + sum + ",計算時間為:" + (end - start));// ===================多線程分而治之累加10億數據================================// 在使用forkJoinPool時,不推薦使用Runnable和Callable// 可以使用提供的另外兩種任務的描述方式// Runnable(沒有返回結果) -> RecursiveAction// Callable(有返回結果) -> RecursiveTaskForkJoinPool forkJoinPool = (ForkJoinPool) Executors.newWorkStealingPool();System.out.println("分而治之計算數組總和!");long forkJoinStart = System.nanoTime();ForkJoinTask<Integer> task = forkJoinPool.submit(new SumRecursiveTask(0, nums.length - 1));Integer result = task.join();long forkJoinEnd = System.nanoTime();System.out.println("分而治之運算結果為:" + result + ",計算時間為:" + (forkJoinEnd - forkJoinStart));
}private static class SumRecursiveTask extends RecursiveTask<Integer>{/** 指定一個線程處理哪個位置的數據 */private int start,end;private final int MAX_STRIDE = 100_000_000;// 200_000_000: 147964900// 100_000_000: 145942100public SumRecursiveTask(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {// 在這個方法中,需要設置好任務拆分的邏輯以及聚合的邏輯int sum = 0;int stride = end - start;if(stride <= MAX_STRIDE){// 可以處理任務for (int i = start; i <= end; i++) {sum += nums[i];}}else{// 將任務拆分,分而治之。int middle = (start + end) / 2;// 聲明為2個任務SumRecursiveTask left = new SumRecursiveTask(start, middle);SumRecursiveTask right = new SumRecursiveTask(middle + 1, end);// 分別執行兩個任務left.fork();right.fork();// 等待結果,并且獲取sumsum = left.join() + right.join();}return sum;}
}
最終可以發現,這種累加的操作中,采用分而治之的方式效率提升了2倍多。
但是也不是所有任務都能拆分提升效率,首先任務得大,耗時要長。
三、ThreadPoolExecutor應用&源碼剖析
前面講到的Executors中的構建線程池的方式,大多數還是基于ThreadPoolExecutor去new出來的。
3.1 為什么要自定義線程池
首先ThreadPoolExecutor中,一共提供了7個參數,每個參數都是非常核心的屬性,在線程池去執行任務時,每個參數都有決定性的作用。
但是如果直接采用JDK提供的方式去構建,可以設置的核心參數最多就兩個,這樣就會導致對線程池的控制粒度很粗。所以在阿里規范中也推薦自己去自定義線程池。手動的去new ThreadPoolExecutor設置他的一些核心屬性。
自定義構建線程池,可以細粒度的控制線程池,去管理內存的屬性,并且針對一些參數的設置可能更好的在后期排查問題。
查看一下ThreadPoolExecutor提供的七個核心參數
public ThreadPoolExecutor(int corePoolSize, // 核心工作線程(當前任務執行結束后,不會被銷毀)int maximumPoolSize, // 最大工作線程(代表當前線程池中,一共可以有多少個工作線程)long keepAliveTime, // 非核心工作線程在阻塞隊列位置等待的時間TimeUnit unit, // 非核心工作線程在阻塞隊列位置等待時間的單位BlockingQueue<Runnable> workQueue, // 任務在沒有核心工作線程處理時,任務先扔到阻塞隊列中ThreadFactory threadFactory, // 構建線程的線程工作,可以設置thread的一些信息RejectedExecutionHandler handler) { // 當線程池無法處理投遞過來的任務時,執行當前的拒絕策略// 初始化線程池的操作
}
3.2 ThreadPoolExecutor應用
手動new一下,處理的方式還是執行execute或者submit方法。
JDK提供的幾種拒絕策略:
- AbortPolicy:當前拒絕策略會在無法處理任務時,直接拋出一個異常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString()); }
- CallerRunsPolicy:當前拒絕策略會在線程池無法處理任務時,將任務交給調用者處理
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();} }
- DiscardPolicy:當前拒絕策略會在線程池無法處理任務時,直接將任務丟棄掉
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
- DiscardOldestPolicy:當前拒絕策略會在線程池無法處理任務時,將隊列中最早的任務丟棄掉,將當前任務再次嘗試交給線程池處理
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);} }
- 自定義Policy:根據自己的業務,可以將任務扔到數據庫,也可以做其他操作。
private static class MyRejectedExecution implements RejectedExecutionHandler{@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("根據自己的業務情況,決定編寫的代碼!");} }
代碼構建線程池,并處理有無返回結果的任務
public static void main(String[] args) throws ExecutionException, InterruptedException {//1. 構建線程池ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,5,10,TimeUnit.SECONDS,new ArrayBlockingQueue<>(5),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("test-ThreadPoolExecutor");return thread;}},new MyRejectedExecution());//2. 讓線程池處理任務,沒返回結果threadPool.execute(() -> {System.out.println("沒有返回結果的任務");});//3. 讓線程池處理有返回結果的任務Future<Object> future = threadPool.submit(new Callable<Object>() {@Overridepublic Object call() throws Exception {System.out.println("我有返回結果!");return "返回結果";}});Object result = future.get();System.out.println(result);//4. 如果是局部變量的線程池,記得用完要shutdownthreadPool.shutdown();
}private static class MyRejectedExecution implements RejectedExecutionHandler{@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("根據自己的業務情況,決定編寫的代碼!");}
}```### 3.3 ThreadPoolExecutor源碼剖析線程池的源碼內容會比較多一點,需要一點一點的去查看,內部比較多。#### 3.3.1 ThreadPoolExecutor的核心屬性核心屬性主要就是ctl,基于ctl拿到線程池的狀態以及工作線程個數在整個線程池的執行流程中,會基于ctl判斷上述兩個內容```java
// 當前是線程池的核心屬性
// 當前的ctl其實就是一個int類型的數值,內部是基于AtomicInteger套了一層,進行運算時,是原子性的。
// ctl表示著線程池中的2個核心狀態:
// 線程池的狀態:ctl的高3位,表示線程池狀態
// 工作線程的數量:ctl的低29位,表示工作線程的個數
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// Integer.SIZE:在獲取Integer的bit位個數
// 聲明了一個常量:COUNT_BITS = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
00000000 00000000 00000000 00000001
00100000 00000000 00000000 00000000
00011111 11111111 11111111 11111111
// CAPACITY就是當前工作線程能記錄的工作線程的最大個數
private static final int CAPACITY = (1 << COUNT_BITS) - 1;// 線程池狀態的表示
// 當前五個狀態中,只有RUNNING狀態代表線程池沒問題,可以正常接收任務處理
// 111:代表RUNNING狀態,RUNNING可以處理任務,并且處理阻塞隊列中的任務。
private static final int RUNNING = -1 << COUNT_BITS;
// 000:代表SHUTDOWN狀態,不會接收新任務,正在處理的任務正常進行,阻塞隊列的任務也會做完。
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001:代表STOP狀態,不會接收新任務,正在處理任務的線程會被中斷,阻塞隊列的任務一個不管。
private static final int STOP = 1 << COUNT_BITS;
// 010:代表TIDYING狀態,這個狀態是否SHUTDOWN或者STOP轉換過來的,代表當前線程池馬上關閉,就是過渡狀態。
private static final int TIDYING = 2 << COUNT_BITS;
// 011:代表TERMINATED狀態,這個狀態是TIDYING狀態轉換過來的,轉換過來只需要執行一個terminated方法。
private static final int TERMINATED = 3 << COUNT_BITS;// 在使用下面這幾個方法時,需要傳遞ctl進來// 基于&運算的特點,保證只會拿到ctl高三位的值。
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 基于&運算的特點,保證只會拿到ctl低29位的值。
private static int workerCountOf(int c) { return c & CAPACITY; }
線程池狀態的特點以及轉換的方式
3.3.2 ThreadPoolExecutor的有參構造
有參構造沒啥說的,記住核心線程個數是允許為0的。
// 有參構造。無論調用哪個有參構造,最終都會執行當前的有參構造
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {// 健壯性校驗// 核心線程個數是允許為0個的。// 最大線程數必須大于0,最大線程數要大于等于核心線程數// 非核心線程的最大空閑時間,可以等于0if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)// 不滿足要求就拋出參數異常throw new IllegalArgumentException();// 阻塞隊列,線程工廠,拒絕策略都不允許為null,為null就扔空指針異常if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();// 不要關注當前內容,系統資源訪問決策,和線程池核心業務關系不大。this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();// 各種賦值,JUC包下,幾乎所有涉及到線程掛起的操作,單位都用納秒。// 有參構造的值,都賦值給成員變量。// Doug Lea的習慣就是將成員變量作為局部變量單獨操作。this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}
3.3.3 ThreadPoolExecutor的execute方法
execute方法是提交任務到線程池的核心方法,很重要
線程池的執行流程其實就是在說execute方法內部做了哪些判斷
execute源碼的分析
// 提交任務到線程池的核心方法
// command就是提交過來的任務
public void execute(Runnable command) {// 提交的任務不能為nullif (command == null)throw new NullPointerException();// 獲取核心屬性ctl,用于后面的判斷int c = ctl.get();// 如果工作線程個數,小于核心線程數。// 滿足要求,添加核心工作線程if (workerCountOf(c) < corePoolSize) {// addWorker(任務,是核心線程嗎)// addWorker返回true:代表添加工作線程成功// addWorker返回false:代表添加工作線程失敗// addWorker中會基于線程池狀態,以及工作線程個數做判斷,查看能否添加工作線程if (addWorker(command, true))// 工作線程構建出來了,任務也交給command去處理了。return;// 說明線程池狀態或者是工作線程個數發生了變化,導致添加失敗,重新獲取一次ctlc = ctl.get();}// 添加核心工作線程失敗,往這走// 判斷線程池狀態是否是RUNNING,如果是,正常基于阻塞隊列的offer方法,將任務添加到阻塞隊列if (isRunning(c) && workQueue.offer(command)) {// 如果任務添加到阻塞隊列成功,走if內部// 如果任務在扔到阻塞隊列之前,線程池狀態突然改變了。// 重新獲取ctlint recheck = ctl.get();// 如果線程池的狀態不是RUNNING,將任務從阻塞隊列移除,if (!isRunning(recheck) && remove(command))// 并且直接拒絕策略reject(command);// 在這,說明阻塞隊列有我剛剛放進去的任務// 查看一下工作線程數是不是0個// 如果工作線程為0個,需要添加一個非核心工作線程去處理阻塞隊列中的任務// 發生這種情況有兩種:// 1. 構建線程池時,核心線程數是0個。// 2. 即便有核心線程,可以設置核心線程也允許超時,設置allowCoreThreadTimeOut為true,代表核心線程也可以超時else if (workerCountOf(recheck) == 0)// 為了避免阻塞隊列中的任務饑餓,添加一個非核心工作線程去處理addWorker(null, false);}// 任務添加到阻塞隊列失敗// 構建一個非核心工作線程// 如果添加非核心工作線程成功,直接完事,告辭else if (!addWorker(command, false))// 添加失敗,執行決絕策略reject(command);
}
execute方法的完整執行流程圖
3.3.4 ThreadPoolExecutor的addWorker方法
addWorker中主要分成兩大塊去看
- 第一塊:校驗線程池的狀態以及工作線程個數
- 第二塊:添加工作線程并且啟動工作線程
校驗線程池的狀態以及工作線程個數
// 添加工作線程之校驗源碼
private boolean addWorker(Runnable firstTask, boolean core) {// 外層for循環在校驗線程池的狀態// 內層for循環是在校驗工作線程的個數// retry是給外層for循環添加一個標記,是為了方便在內層for循壞跳出外層for循環retry:for (;;) {// 獲取ctlint c = ctl.get();// 拿到ctl的高3位的值int rs = runStateOf(c);
//==========================線程池狀態判斷==================================================// 如果線程池狀態是SHUTDOWN,并且此時阻塞隊列有任務,工作線程個數為0,添加一個工作線程去處理阻塞隊列的任務// 判斷線程池的狀態是否大于等于SHUTDOWN,如果滿足,說明線程池不是RUNNINGif (rs >= SHUTDOWN &&// 如果這三個條件都滿足,就代表是要添加非核心工作線程去處理阻塞隊列任務// 如果三個條件有一個沒滿足,返回false,配合!,就代表不需要添加!(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))// 不需要添加工作線程return false;for (;;) {
//==========================工作線程個數判斷================================================== // 基于ctl拿到低29位的值,代表當前工作線程個數 int wc = workerCountOf(c);// 如果工作線程個數大于最大值了,不可以添加了,返回falseif (wc >= CAPACITY ||// 基于core來判斷添加的是否是核心工作線程// 如果是核心:基于corePoolSize去判斷// 如果是非核心:基于maximumPoolSize去判斷wc >= (core ? corePoolSize : maximumPoolSize))// 代表不能添加,工作線程個數不滿足要求return false;// 針對ctl進行 + 1,采用CAS的方式if (compareAndIncrementWorkerCount(c))// CAS成功后,直接退出外層循環,代表可以執行添加工作線程操作了。break retry;// 重新獲取一次ctl的值c = ctl.get(); // 判斷重新獲取到的ctl中,表示的線程池狀態跟之前的是否有區別// 如果狀態不一樣,說明有變化,重新的去判斷線程池狀態if (runStateOf(c) != rs)// 跳出一次外層for循環continue retry;}}// 省略添加工作線程以及啟動的過程
}
添加工作線程并且啟動工作線程
private boolean addWorker(Runnable firstTask, boolean core) {// 省略校驗部分的代碼// 添加工作線程以及啟動工作線程~~~// 聲明了三個變量// 工作線程啟動了沒,默認falseboolean workerStarted = false;// 工作線程添加了沒,默認falseboolean workerAdded = false;// 工作線程,默認為nullWorker w = null;try {// 構建工作線程,并且將任務傳遞進去w = new Worker(firstTask);// 獲取了Worker中的Thread對象final Thread t = w.thread;// 判斷Thread是否不為null,在new Worker時,內部會通過給予的ThreadFactory去構建Thread交給Worker// 一般如果為null,代表ThreadFactory有問題。if (t != null) {// 加鎖,保證使用workers成員變量以及對largestPoolSize賦值時,保證線程安全final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 再次獲取線程池狀態。int rs = runStateOf(ctl.get());// 再次判斷// 如果滿足 rs < SHUTDOWN 說明線程池是RUNNING,狀態正常,執行if代碼塊// 如果線程池狀態為SHUTDOWN,并且firstTask為null,添加非核心工作處理阻塞隊列任務if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 到這,可以添加工作線程。// 校驗ThreadFactory構建線程后,不能自己啟動線程,如果啟動了,拋出異常if (t.isAlive()) throw new IllegalThreadStateException();// private final HashSet<Worker> workers = new HashSet<Worker>();// 將new好的Worker添加到HashSet中。workers.add(w);// 獲取了HashSet的size,拿到工作線程個數int s = workers.size();// largestPoolSize在記錄最大線程個數的記錄// 如果當前工作線程個數,大于最大線程個數的記錄,就賦值if (s > largestPoolSize)largestPoolSize = s;// 添加工作線程成功workerAdded = true;}} finally {mainLock.unlock();}// 如果工作線程添加成功,if (workerAdded) {// 直接啟動Worker中的線程t.start();// 啟動工作線程成功workerStarted = true;}}} finally {// 做補償的操作,如果工作線程啟動失敗,將這個添加失敗的工作線程處理掉if (!workerStarted)addWorkerFailed(w);}// 返回工作線程是否啟動成功return workerStarted;
}
// 工作線程啟動失敗,需要不的步長操作
private void addWorkerFailed(Worker w) {// 因為操作了workers,需要加鎖final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 如果w不為null,之前Worker已經new出來了。if (w != null)// 從HashSet中移除workers.remove(w);// 同時對ctl進行 - 1,代表去掉了一個工作線程個數decrementWorkerCount();// 因為工作線程啟動失敗,判斷一下狀態的問題,是不是可以走TIDYING狀態最終到TERMINATED狀態了。tryTerminate();} finally {// 釋放鎖mainLock.unlock();}
}
3.3.5 ThreadPoolExecutor的Worker工作線程
Worker對象主要包含了兩個內容
- 工作線程要執行任務
- 工作線程可能會被中斷,控制中斷
// Worker繼承了AQS,目的就是為了控制工作線程的中斷。
// Worker實現了Runnable,內部的Thread對象,在執行start時,必然要執行Worker中斷額一些操作
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{// =======================Worker管理任務================================ // 線程工廠構建的線程final Thread thread;// 當前Worker要執行的任務Runnable firstTask;// 記錄當前工作線程處理了多少個任務。volatile long completedTasks;// 有參構造Worker(Runnable firstTask) {// 將State設置為-1,代表當前不允許中斷線程setState(-1); // 任務賦值this.firstTask = firstTask;// 基于線程工作構建Thread,并且傳入的Runnable是Workerthis.thread = getThreadFactory().newThread(this);}// 當thread執行start方法時,調用的是Worker的run方法,public void run() {// 任務執行時,執行的是runWorker方法runWorker(this);}// =======================Worker管理中斷================================ // 當前方法是中斷工作線程時,執行的方法void interruptIfStarted() {Thread t;// 只有Worker中的state >= 0的時候,可以中斷工作線程if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {// 如果狀態正常,并且線程未中斷,這邊就中斷線程t.interrupt();} catch (SecurityException ignore) {}}}protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }}
3.3.6 ThreadPoolExecutor的runWorker方法
runWorker就是讓工作線程拿到任務去執行即可。
并且在內部也處理了在工作線程正常結束和異常結束時的處理方案
// 工作線程啟動后執行的任務。
final void runWorker(Worker w) {// 拿到當前線程Thread wt = Thread.currentThread();// 從worker對象中拿到任務Runnable task = w.firstTask;// 將Worker中的firstTask置位空w.firstTask = null;// 將Worker中的state置位0,代表當前線程可以中斷的w.unlock(); // allow interrupts// 判斷工作線程是否是異常結束,默認就是異常結束boolean completedAbruptly = true;try {// 獲取任務// 直接拿到第一個任務去執行// 如果第一個任務為null,去阻塞隊列中獲取任務while (task != null || (task = getTask()) != null) {// 執行了Worker的lock方法,當前在lock時,shutdown操作不能中斷當前線程,因為當前線程正在處理任務w.lock();// 比較ctl >= STOP,如果滿足找個狀態,說明線程池已經到了STOP狀態甚至已經要涼涼了// 線程池到STOP狀態,并且當前線程還沒有中斷,確保線程是中斷的,進到if內部執行中斷方法// if(runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()) {中斷線程}// 如果線程池狀態不是STOP,確保線程不是中斷的。// 如果發現線程中斷標記位是true了,再次查看線程池狀態是大于STOP了,再次中斷線程// 這里其實就是做了一個事情,如果線程池狀態 >= STOP,確保線程中斷了。if ((runStateAtLeast(ctl.get(), STOP) || ( Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) ))&& !wt.isInterrupted())wt.interrupt();try {// 勾子函數在線程池中沒有做任何的實現,如果需要在線程池執行任務前后做一些額外的處理,可以重寫勾子函數// 前置勾子函數beforeExecute(wt, task);Throwable thrown = null;try {// 執行任務。task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 前后置勾子函數afterExecute(task, thrown);}} finally {// 任務執行完,丟掉任務task = null;// 當前工作線程處理的任務數+1w.completedTasks++;// 執行unlock方法,此時shutdown方法才可以中斷當前線程w.unlock();}}// 如果while循環結束,正常走到這,說明是正常結束// 正常結束的話,在getTask中就會做一個額外的處理,將ctl - 1,代表工作線程沒一個。completedAbruptly = false;} finally {// 考慮干掉工作線程processWorkerExit(w, completedAbruptly);}
}
// 工作線程結束前,要執行當前方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {// 如果是異常結束if (completedAbruptly) // 將ctl - 1,扣掉一個工作線程decrementWorkerCount();// 操作Worker,為了線程安全,加鎖final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 當前工作線程處理的任務個數累加到線程池處理任務的個數屬性中completedTaskCount += w.completedTasks;// 將工作線程從hashSet中移除workers.remove(w);} finally {// 釋放鎖mainLock.unlock();}// 只要工作線程涼了,查看是不是線程池狀態改變了。tryTerminate();// 獲取ctlint c = ctl.get();// 判斷線程池狀態,當前線程池要么是RUNNING,要么是SHUTDOWNif (runStateLessThan(c, STOP)) {// 如果正常結束工作線程if (!completedAbruptly) {// 如果核心線程允許超時,min = 0,否則就是核心線程個數int min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 如果min == 0,可能會出現沒有工作線程,并且阻塞隊列有任務沒有線程處理if (min == 0 && ! workQueue.isEmpty())// 至少要有一個工作線程處理阻塞隊列任務min = 1;// 如果工作線程個數 大于等于1,不怕沒線程處理,正常returnif (workerCountOf(c) >= min)return; }// 異常結束,為了避免出現問題,添加一個空任務的非核心線程來填補上剛剛異常結束的工作線程addWorker(null, false);}
}
3.3.7 ThreadPoolExecutor的getTask方法
工作線程在去阻塞隊列獲取任務前,要先查看線程池狀態
如果狀態沒問題,去阻塞隊列take或者是poll任務
第二個循環時,不但要判斷線程池狀態,還要判斷當前工作線程是否可以被干掉
// 當前方法就在阻塞隊列中獲取任務
// 前面半部分是判斷當前工作線程是否可以返回null,結束。
// 后半部分就是從阻塞隊列中拿任務
private Runnable getTask() {// timeOut默認值是false。boolean timedOut = false; // 死循環for (;;) {// 拿到ctlint c = ctl.get();// 拿到線程池的狀態int rs = runStateOf(c);// 如果線程池狀態是STOP,沒有必要處理阻塞隊列任務,直接返回null// 如果線程池狀態是SHUTDOWN,并且阻塞隊列是空的,直接返回nullif (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {// 如果可以返回null,先扣減工作線程個數decrementWorkerCount();// 返回null,結束runWorker的while循環return null;}// 基于ctl拿到工作線程個數int wc = workerCountOf(c);// 核心線程允許超時,timed為true// 工作線程個數大于核心線程數,timed為trueboolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if (// 如果工作線程個數,大于最大線程數。(一般情況不會滿足),把他看成false// 第二個判斷代表,只要工作線程數小于等于核心線程數,必然為false// 即便工作線程個數大于核心線程數了,此時第一次循環也不會為true,因為timedOut默認值是false// 考慮第二次循環了,因為循環內部必然有修改timeOut的位置(wc > maximumPoolSize || (timed && timedOut))&& // 要么工作線程還有,要么阻塞隊列為空,并且滿足上述條件后,工作線程才會走到if內部,結束工作線程(wc > 1 || workQueue.isEmpty())) {// 第二次循環才有可能到這。// 正常結束,工作線程 - 1,因為是CAS操作,如果失敗了,重新走for循環if (compareAndDecrementWorkerCount(c))return null;continue;}// 工作線程從阻塞隊列拿任務try {// 如果是核心線程,timed是false,如果是非核心線程,timed就是trueRunnable r = timed ?// 如果是非核心,走poll方法,拿任務,等待一會workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :// 如果是核心,走take方法,死等。workQueue.take();// 從阻塞隊列拿到的任務不為null,這邊就正常返回任務,去執行if (r != null)return r;// 說明當前線程沒拿到任務,將timeOut設置為true,在上面就可以返回null退出了。timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}
3.3.8 ThreadPoolExecutor的關閉方法
首先查看shutdownNow方法,可以從RUNNING狀態轉變為STOP
// shutDownNow方法,shutdownNow不會處理阻塞隊列的任務,將任務全部給你返回了。
public List<Runnable> shutdownNow() {// 聲明返回結果List<Runnable> tasks;// 加鎖final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 不關注這個方法……checkShutdownAccess();// 將線程池狀態修改為STOPadvanceRunState(STOP);// 無論怎么,直接中斷工作線程。interruptWorkers();// 將阻塞隊列的任務全部扔到List集合中。tasks = drainQueue();} finally {// 釋放鎖mainLock.unlock();}tryTerminate();return tasks;
}// 將線程池狀態修改為STOP
private void advanceRunState(int STOP) {// 死循環。for (;;) {// 獲取ctl屬性的值int c = ctl.get();// 第一個判斷:如果當前線程池狀態已經大于等于STOP了,不管了,告辭。if (runStateAtLeast(c, STOP) ||// 基于CAS,將ctl從c修改為STOP狀態,不修改工作線程個數,但是狀態變為了STOP// 如果修改成功結束ctl.compareAndSet(c, ctlOf(STOP, workerCountOf(c))))break;}
}
// 無論怎么,直接中斷工作線程。
private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 遍歷HashSet,拿到所有的工作線程,直接中斷。for (Worker w : workers)w.interruptIfStarted();} finally {mainLock.unlock();}
}
// 移除阻塞隊列,內容全部扔到List集合中
private List<Runnable> drainQueue() {BlockingQueue<Runnable> q = workQueue;ArrayList<Runnable> taskList = new ArrayList<Runnable>();// 阻塞隊列自帶的,直接清空阻塞隊列,內容扔到List集合q.drainTo(taskList);// 為了避免任務丟失,重新判斷,是否需要編輯阻塞隊列,重新扔到Listif (!q.isEmpty()) {for (Runnable r : q.toArray(new Runnable[0])) {if (q.remove(r))taskList.add(r);}}return taskList;
}// 查看當前線程池是否可以變為TERMINATED狀態
final void tryTerminate() {// 死循環。for (;;) {// 拿到ctlint c = ctl.get();// 如果是RUNNING,直接告辭。// 如果狀態已經大于等于TIDYING,馬上就要涼涼,直接告辭。// 如果狀態是SHUTDOWN,但是阻塞隊列還有任務,直接告辭。if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;// 如果還有工作線程if (workerCountOf(c) != 0) { // 再次中斷工作線程interruptIdleWorkers(ONLY_ONE);// 告辭,等你工作線程全完事,我這再嘗試進入到TERMINATED狀態return;}// 加鎖,為了可以執行Condition的釋放操作final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 將線程池狀態修改為TIDYING狀態,如果成功,繼續往下走if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {// 這個方法是空的,如果你需要在線程池關閉后做一些額外操作,這里你可以自行實現terminated();} finally {// 最終修改為TERMINATED狀態ctl.set(ctlOf(TERMINATED, 0));// 線程池提供了一個方法,主線程在提交任務到線程池后,是可以繼續做其他操作的。// 咱們也可以讓主線程提交任務后,等待線程池處理完畢,再做后續操作// 這里線程池涼涼后,要喚醒哪些調用了awaitTermination方法的線程termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}
}
再次shutdown方法,可以從RUNNING狀態轉變為SHUTDOWN
shutdown狀態下,不會中斷正在干活的線程,而且會處理阻塞隊列中的任務
public void shutdown() {// 加鎖。。final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 不看。checkShutdownAccess();// 里面是一個死循環,將線程池狀態修改為SHUTDOWNadvanceRunState(SHUTDOWN);// 中斷空閑線程interruptIdleWorkers();// 說了,這個是為了ScheduleThreadPoolExecutor準備的,不管onShutdown(); } finally {mainLock.unlock();}// 嘗試結束線程tryTerminate();
}// 中斷空閑線程
private void interruptIdleWorkers(boolean onlyOne) {// 加鎖final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;// 如果線程沒有中斷,那么就去獲取Worker的鎖,基于tryLock可知,不會中斷正在干活的線程if (!t.isInterrupted() && w.tryLock()) {try {// 會中斷空閑線程t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}
}
3.4 線程池的核心參數設計規則
線程池的使用難度不大,難度在于線程池的參數并不好配置。
主要難點在于任務類型無法控制,比如任務有CPU密集型,還有IO密集型,甚至還有混合型的。
因為IO咱們無法直接控制,所以很多時間按照一些書上提供的一些方法,是無法解決問題的。
《Java并發編程實踐》
想調試出一個符合當前任務情況的核心參數,最好的方式就是測試。
需要將項目部署到測試環境或者是沙箱環境中,結果各種壓測得到一個相對符合的參數。
如果每次修改項目都需要重新部署,成本太高了。
此時咱們可以實現一個動態監控以及修改線程池的方案。
因為線程池的核心參數無非就是:
- corePoolSize:核心線程數
- maximumPoolSize:最大線程數
- workQueue:工作隊列
線程池中提供了獲取核心信息的get方法,同時也提供了動態修改核心屬性的set方法。
也可以采用一些開源項目提供的方式去做監控和修改
比如hippo4j就可以對線程池進行監控,而且可以和SpringBoot整合。
Github地址:https://github.com/opengoofy/hippo4j
官方文檔:https://hippo4j.cn/docs/user_docs/intro
3.5 線程池處理任務的核心流程
基于addWorker添加工作線程的流程切入到整體處理任務的位置
四、ScheduleThreadPoolExecutor應用&源碼
4.1 ScheduleThreadPoolExecutor介紹
從名字上就可以看出,當前線程池是用于執行定時任務的線程池。
Java比較早的定時任務工具是Timer類。但是Timer問題很多,串行的,不靠譜,會影響到其他的任務執行。
其實除了Timer以及ScheduleThreadPoolExecutor之外,正常在企業中一般會采用Quartz或者是SpringBoot提供的Schedule的方式去實現定時任務的功能。
ScheduleThreadPoolExecutor支持延遲執行以及周期性執行的功能。
4.2 ScheduleThreadPoolExecutor應用
定時任務線程池的有參構造
public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory, handler);
}
發現ScheduleThreadPoolExecutor在構建時,直接調用了父類的構造方法
ScheduleThreadPoolExecutor的父類就是ThreadPoolExecutor
首先ScheduleThreadPoolExecutor最多允許設置3個參數:
- 核心線程數
- 線程工廠
- 拒絕策略
首先沒有設置阻塞隊列,以及最大線程數和空閑時間以及單位
阻塞隊列設置的是DelayedWorkQueue,其實本質就是DelayQueue,一個延遲隊列。DelayQueue是一個無界隊列。所以最大線程數以及非核心線程的空閑時間是不需要設置的。
代碼落地使用
public static void main(String[] args) {//1. 構建定時任務線程池ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(5,new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);return t;}},new ThreadPoolExecutor.AbortPolicy());//2. 應用ScheduledThreadPoolExecutor// 跟直接執行線程池的execute沒啥區別pool.execute(() -> {System.out.println("execute");});// 指定延遲時間執行System.out.println(System.currentTimeMillis());pool.schedule(() -> {System.out.println("schedule");System.out.println(System.currentTimeMillis());},2, TimeUnit.SECONDS);// 指定第一次的延遲時間,并且確認后期的周期執行時間,周期時間是在任務開始時就計算// 周期性執行就是將執行完畢的任務再次社會好延遲時間,并且重新扔到阻塞隊列// 計算的周期執行,也是在原有的時間上做累加,不關注任務的執行時長。System.out.println(System.currentTimeMillis());pool.scheduleAtFixedRate(() -> {System.out.println("scheduleAtFixedRate");System.out.println(System.currentTimeMillis());},2,3,TimeUnit.SECONDS);// // 指定第一次的延遲時間,并且確認后期的周期執行時間,周期時間是在任務結束后再計算下次的延遲時間System.out.println(System.currentTimeMillis());pool.scheduleWithFixedDelay(() -> {System.out.println("scheduleWithFixedDelay");System.out.println(System.currentTimeMillis());try {Thread.sleep(4000);} catch (InterruptedException e) {e.printStackTrace();}},2,3,TimeUnit.SECONDS);}
4.3 ScheduleThreadPoolExecutor源碼剖析
4.3.1 核心屬性
后面的方法業務流程會涉及到這些屬性。
// 這里是針對任務取消時的一些業務判斷會用到的標記
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
private volatile boolean removeOnCancel = false;// 計數器,如果兩個任務的執行時間節點一模一樣,根據這個序列來判斷誰先執行
private static final AtomicLong sequencer = new AtomicLong();// 這個方法是獲取當前系統時間的毫秒值
final long now() {return System.nanoTime();
}// 內部類。核心類之一。
private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> {// 全局唯一的序列,如果兩個任務時間一直,基于當前屬性判斷private final long sequenceNumber;// 任務執行的時間,單位納秒private long time;/*** period == 0:執行一次的延遲任務* period > 0:代表是At* period < 0:代表是With*/private final long period;// 周期性執行時,需要將任務重新扔回阻塞隊列,基礎當前屬性拿到任務,方便扔回阻塞隊列RunnableScheduledFuture<V> outerTask = this;/*** 構建schedule方法的任務*/ScheduledFutureTask(Runnable r, V result, long ns) {super(r, result);this.time = ns;this.period = 0;this.sequenceNumber = sequencer.getAndIncrement();}/*** 構建At和With任務的有參構造*/ ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time = ns;this.period = period;this.sequenceNumber = sequencer.getAndIncrement();}} // 內部類。核心類之一。
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
// 這個類就是DelayQueue,不用過分關注,如果沒看過,看阻塞隊列中的優先級隊列和延遲隊列
4.3.2 schedule方法
execute方法也是調用的schedule方法,只不過傳入的延遲時間是0納秒
schedule方法就是將任務和延遲時間封裝到一起,并且將任務扔到阻塞隊列中,再去創建工作線程去take阻塞隊列。
// 延遲任務執行的方法。
// command:任務
// delay:延遲時間
// unit:延遲時間的單位
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {// 健壯性校驗。if (command == null || unit == null)throw new NullPointerException();// 將任務和延遲時間封裝到一起,最終組成ScheduledFutureTask// 要分成三個方法去看// triggerTime:計算延遲時間。最終返回的是當前系統時間 + 延遲時間 // triggerTime就是將延遲時間轉換為納秒,并且+當前系統時間,再做一些健壯性校驗// ScheduledFutureTask有參構造:將任務以及延遲時間封裝到一起,并且設置任務執行的方式// decorateTask:當前方式是讓用戶基于自身情況可以動態修改任務的一個擴展口RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));// 任務封裝好,執行delayedExecute方法,去執行任務delayedExecute(t);// 返回FutureTaskreturn t;
}// triggerTime做的事情
// 外部方法,對延遲時間做校驗,如果小于0,就直接設置為0
// 并且轉換為納秒單位
private long triggerTime(long delay, TimeUnit unit) {return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
// 將延遲時間+當前系統時間
// 后面的校驗是為了避免延遲時間超過Long的取值范圍
long triggerTime(long delay) {return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}// ScheduledFutureTask有參構造
ScheduledFutureTask(Runnable r, V result, long ns) {super(r, result);// time就是任務要執行的時間this.time = ns;// period,為0,代表任務是延遲執行,不是周期執行this.period = 0;// 基于AtmoicLong生成的序列this.sequenceNumber = sequencer.getAndIncrement();
}// delayedExecute 執行延遲任務的操作
private void delayedExecute(RunnableScheduledFuture<?> task) {// 查看當前線程池是否還是RUNNING狀態,如果不是RUNNING,進到ifif (isShutdown())// 不是RUNNING。// 執行拒絕策略。reject(task);else {// 線程池狀態是RUNNING// 直接讓任務扔到延遲的阻塞隊列中super.getQueue().add(task);// DCL的操作,再次查看線程池狀態// 如果線程池在添加任務到阻塞隊列后,狀態不是RUNNINGif (isShutdown() &&// task.isPeriodic():現在反回的是false,因為任務是延遲執行,不是周期執行// 默認情況,延遲隊列中的延遲任務,可以執行!canRunInCurrentRunState(task.isPeriodic()) &&// 從阻塞隊列中移除任務。remove(task))task.cancel(false);else// 線程池狀態正常,任務可以執行ensurePrestart();}
}// 線程池狀態不為RUNNING,查看任務是否可以執行
// 延遲執行:periodic==false
// 周期執行:periodic==true
// continueExistingPeriodicTasksAfterShutdown:周期執行任務,默認為false
// executeExistingDelayedTasksAfterShutdown:延遲執行任務,默認為true
boolean canRunInCurrentRunState(boolean periodic) {return isRunningOrShutdown(periodic ?continueExistingPeriodicTasksAfterShutdown :executeExistingDelayedTasksAfterShutdown);
}
// 當前情況,shutdownOK為true
final boolean isRunningOrShutdown(boolean shutdownOK) {int rs = runStateOf(ctl.get());// 如果狀態是RUNNING,正常可以執行,返回true// 如果狀態是SHUTDOWN,根據shutdownOK來決定return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}// 任務可以正常執行后,做的操作
void ensurePrestart() {// 拿到工作線程個數int wc = workerCountOf(ctl.get());// 如果工作線程個數小于核心線程數if (wc < corePoolSize)// 添加核心線程去處理阻塞隊列中的任務addWorker(null, true);else if (wc == 0)// 如果工作線程數為0,核心線程數也為0,這是添加一個非核心線程去處理阻塞隊列任務addWorker(null, false);
}
4.3.3 At和With方法&任務的run方法
這兩個方法在源碼層面上的第一個區別,就是在計算周期時間時,需要將這個值傳遞給period,基于正負數在區別At和With
所以查看一個方法就ok,查看At方法
// At方法,
// command:任務
// initialDelay:第一次執行的延遲時間
// period:任務的周期執行時間
// unit:上面兩個時間的單位
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {// 健壯性校驗if (command == null || unit == null)throw new NullPointerException();// 周期時間不能小于等于0.if (period <= 0)throw new IllegalArgumentException();// 將任務以及第一次的延遲時間,和后續的周期時間封裝好。ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));// 擴展口,可以對任務做修改。RunnableScheduledFuture<Void> t = decorateTask(command, sft);// 周期性任務,需要在任務執行完畢后,重新扔會到阻塞隊列,為了方便拿任務,將任務設置到outerTask成員變量中sft.outerTask = t;// 和schedule方法一樣的方式// 如果任務剛剛扔到阻塞隊列,線程池狀態變為SHUTDOWN,默認情況,當前任務不執行delayedExecute(t);return t;
}// 延遲任務以及周期任務在執行時,都會調用當前任務的run方法。
public void run() {// periodic == false:一次性延遲任務// periodic == true:周期任務boolean periodic = isPeriodic();// 任務執行前,會再次判斷狀態,能否執行任務if (!canRunInCurrentRunState(periodic))cancel(false);// 判斷是周期執行還是一次性任務else if (!periodic)// 一次性任務,讓工作線程直接執行command的邏輯ScheduledFutureTask.super.run();// 到這個else if,說明任務是周期執行else if (ScheduledFutureTask.super.runAndReset()) {// 設置下次任務執行的時間setNextRunTime();// 將任務重新扔回線程池做處理reExecutePeriodic(outerTask);}
}
// 設置下次任務執行的時間
private void setNextRunTime() {// 拿到period值,正數:At,負數:Withlong p = period;if (p > 0)// 拿著之前的執行時間,直接追加上周期時間time += p;else// 如果走到else,代表任務是With方式,這種方式要重新計算延遲時間// 拿到當前系統時間,追加上延遲時間,time = triggerTime(-p);
}// 將任務重新扔回線程池做處理
void reExecutePeriodic(RunnableScheduledFuture<?> task) {// 如果狀態ok,可以執行if (canRunInCurrentRunState(true)) {// 將任務扔到延遲隊列super.getQueue().add(task);// DCL,判斷線程池狀態if (!canRunInCurrentRunState(true) && remove(task))task.cancel(false);else// 添加工作線程ensurePrestart();}
}