?傳送門
分布式定時任務系列1:XXL-job安裝
分布式定時任務系列2:XXL-job使用
分布式定時任務系列3:任務執行引擎設計
分布式定時任務系列4:任務執行引擎設計續
分布式定時任務系列5:XXL-job中blockingQueue的應用
分布式定時任務系列6:XXL-job觸發日志過大引發的CPU告警
分布式定時任務系列7:XXL-job源碼分析之任務觸發
分布式定時任務系列8:XXL-job源碼分析之遠程調用
?分布式定時任務系列9:XXL-job路由策略
分布式定時任務系列10:XXL-job源碼分析之路由策略
番外篇:從XXL-job路由策略的“服務容錯“說起
分布式定時任務系列12:XXL-job的任務觸發為什么是死循環?
Java并發編程實戰1:java中的阻塞隊列
定時任務都是通過"死循環"觸發?
在前面一共分析過XXL-job的定時任務觸發原理、JDK-Timer執行定時任務的原理:
- 底層源碼都是通過while(true)這種死循環寫法來遍歷任務的
- 一個是分布式調度框架,更為重量級、一個是JDK提供的偏輕量級的調度工具
盡管2者定位不同,但是都選擇了死循環這種方式來實現任務調度,說明while(true)一定是可行的、通用的設計方式。但是要問任務調度都是采用這種方式,那答案肯定是否定的。一來只分析了XXL-job與Timer的源碼導致樣本太少,二來接下來要分析的另一個任務調度工具,也的確不是通過死循環這種方式:它就是JDK提供的ScheduledExecutorService!
ScheduledExecutorService源碼解析
ScheduledExecutorService是什么
ScheduledExecutorService?是Java并發框架中用于定時任務調度的核心接口,基于線程池實現,支持延遲任務執行和周期性任務調度,相比傳統?Timer?類更高效、更靈活。 ?
核心功能
提供
schedule
、scheduleAtFixedRate
、scheduleWithFixedDelay
等方法,可安排任務在指定延遲后執行一次,或按固定頻率周期執行。支持任務并發執行,適用于需要定時輪詢、消息推送等場景。 ?關鍵特性
- ?線程池管理?:默認使用?ScheduledThreadPoolExecutor?,可設置核心線程數(如
Executors.newScheduledThreadPool(1)
),支持多任務并發執行 ?- ?任務調度策略?:
schedule
:延遲后執行一次任務scheduleAtFixedRate
:初始延遲后按固定頻率重復執行(如每5秒執行一次)scheduleWithFixedDelay
:首次執行后等待固定延遲再執行(如首次執行后每10秒執行一次) ?- ?取消任務?:可通過返回的
Future
對象取消未執行的任務 ?適用場景
- 定時輪詢數據庫或第三方接口(如每5分鐘檢查數據更新)
- 定時發送消息或推送通知(如每日定時郵件發送)
- 周期性任務調度(如每2小時重啟服務) ?
ScheduledExecutorService的使用
ScheduledExecutorService類自jdk1.5才引入,作者是大名鼎鼎的Doug Lea(Java并發包juc的作者)。既然它開始工作的晚,自然離"退休"還早,現在一般推薦使用!
說到Doug Lea自然要推薦他與Joshua Bloch等合著的《Java并發編程實戰》,見推薦書單。
看一下源碼中給出的一個例子:
import static java.util.concurrent.TimeUnit .*;class BeeperControl {// 創建執行器實例,初始化1個執行線程private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);public void beepForAnHour() {final Runnable beeper = new Runnable() {public void run() {System.out.println("beep");}};// 固定頻率周期性執行任務:每10s執行一次任務,發出"嘩"聲!final ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);// 延遲 delay 后執行一次任務:一次性任務,3600s(1小時)后"取消"任務
scheduler.schedule(new Runnable() {public void run() {beeperHandle.cancel(true);}}, 60 * 60, SECONDS);}
}
可以寫一個測試跑一下上面的例子,然后再用Timer改寫上面的例子,分別執行會發現效果是一致的:
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;/*** @author * @date 2025/7/16*/
public class ScheduleTest {private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);public static void main(String[] args) {ScheduledExecutorServiceBeepForAnHour();timerBeepForAnHour();}/*** 使用ScheduledExecutorService實現定時任務*/public static void ScheduledExecutorServiceBeepForAnHour() {final Runnable beeper = new Runnable() {public void run() {System.out.println(new Date() + "ScheduledExecutorService for beep");}};final ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(beeper, 10, 10, TimeUnit.SECONDS);scheduler.schedule(new Runnable() {public void run() {beeperHandle.cancel(true);}}, 60 * 60, TimeUnit.SECONDS);}/*** 使用Timer實現實時任務*/public static void timerBeepForAnHour() {// 創建Timer執行器實例Timer timer = new Timer();// 執行任務:,發出"嘩"聲!TimerTask beeper = new TimerTask() {@Overridepublic void run() {System.out.println(new Date() + "Timer for beep");}};// 固定頻率周期性執行任務:每10s執行一次任務,發出"嘩"聲!timer.scheduleAtFixedRate(beeper, 10, 10 * 1000);// 延遲 delay 后執行一次任務:一次性任務,3600s(1小時)后"取消"任務timer.schedule(new TimerTask() {@Overridepublic void run() {beeper.cancel();}}, 60 * 60 * 1000);}
}
上面例子要注意的是,timer的API里面的時間是毫秒,而ScheduledExecutorService是可以通過TimeUnit指定時間單位的。?
僅通過這個例子就簡單的認為功能兩者一樣,那肯定缺乏說服力的。并且如果兩個調度器一樣,也有重復造輪子的嫌疑,所以再接著改寫一下上面的例子:
- 增加一個執行任務,執行器里面維持2個任務
- 在任務里面將執行線程信息打印出來
- 但同時將ScheduledExecutorService的執行線程數從1調整為2
public class ScheduleTest {// 創建ScheduledExecutorService 執行器private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);public static void main(String[] args) {scheduledExecutorServiceBeepForAnHour();timerBeepForAnHour();}/*** 使用ScheduledExecutorService實現定時任務*/public static void scheduledExecutorServiceBeepForAnHour() {// 任務1final Runnable beeper = new Runnable() {public void run() {System.out.println(Thread.currentThread().getName() + "," + new Date() + ", ScheduledExecutorService for beep1");}};// 任務2Runnable beeper2 = () -> {System.out.println(Thread.currentThread().getName() + "," + new Date() + ", ScheduledExecutorService for beep2");};// 添加第1個任務final ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(beeper, 10, 10, TimeUnit.SECONDS);// 添加第2個任務scheduler.scheduleAtFixedRate(beeper2, 10, 10, TimeUnit.SECONDS);scheduler.schedule(new Runnable() {public void run() {beeperHandle.cancel(true);}}, 60 * 60, TimeUnit.SECONDS);}/*** 使用Timer實現實時任務*/public static void timerBeepForAnHour() {// 創建Timer執行器實例Timer timer = new Timer();// 執行任務1:發出"嘩"聲!TimerTask beeper = new TimerTask() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + "," + new Date() + ", Timer for beep1");}};執行任務2:TimerTask beeper2 = new TimerTask() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + "," + new Date() + ", Timer for beep2");}};// 固定頻率周期性執行任務:每10s執行一次任務,發出"嘩"聲!timer.scheduleAtFixedRate(beeper, 10, 10 * 1000);timer.scheduleAtFixedRate(beeper2, 10, 10 * 1000);// 延遲 delay 后執行一次任務:一次性任務,3600s(1小時)后"取消"任務timer.schedule(new TimerTask() {@Overridepublic void run() {beeper.cancel();}}, 60 * 60 * 1000);}
}
運行程序,打印結果:
Timer-0,Mon Jul 21 11:43:46 CST 2025, Timer for beep1
Timer-0,Mon Jul 21 11:43:46 CST 2025, Timer for beep2
pool-1-thread-2,Mon Jul 21 11:43:56 CST 2025, ScheduledExecutorService for beep2
pool-1-thread-1,Mon Jul 21 11:43:56 CST 2025, ScheduledExecutorService for beep1
Timer-0,Mon Jul 21 11:43:56 CST 2025, Timer for beep2
Timer-0,Mon Jul 21 11:43:56 CST 2025, Timer for beep1
pool-1-thread-1,Mon Jul 21 11:44:06 CST 2025, ScheduledExecutorService for beep1
Timer-0,Mon Jul 21 11:44:06 CST 2025, Timer for beep1
pool-1-thread-2,Mon Jul 21 11:44:06 CST 2025, ScheduledExecutorService for beep2
Timer-0,Mon Jul 21 11:44:06 CST 2025, Timer for beep2
pool-1-thread-2,Mon Jul 21 11:44:16 CST 2025, ScheduledExecutorService for beep2
pool-1-thread-1,Mon Jul 21 11:44:16 CST 2025, ScheduledExecutorService for beep1
上面截取了一部分打印結果,會發現對于ScheduledExecutorService來說,隨著執行線程的增加,任務是可能會由不同的線程執行。但是Timer來說,它始終只會有一個線程來執行任務。由此我們推斷兩個類在執行執行上是不同的,再改寫一下上面的例子:
- 將任務阻塞50s,大于執行的同期10s
public class ScheduleTest {// 創建執行private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);public static void main(String[] args) {scheduledExecutorServiceBeepForAnHour();timerBeepForAnHour();}/*** 使用ScheduledExecutorService實現定時任務*/public static void scheduledExecutorServiceBeepForAnHour() {final Runnable beeper = new Runnable() {@SneakyThrowspublic void run() {// 阻塞任務1,50sTimeUnit.SECONDS.sleep(50);System.out.println(Thread.currentThread().getName() + "," + new Date() + ", ScheduledExecutorService for beep1");}};Runnable beeper2 = () -> {System.out.println(Thread.currentThread().getName() + "," + new Date() + ", ScheduledExecutorService for beep2");};// 添加第1個任務final ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(beeper, 10, 10, TimeUnit.SECONDS);// 添加第2個任務scheduler.scheduleAtFixedRate(beeper2, 10, 10, TimeUnit.SECONDS);scheduler.schedule(new Runnable() {public void run() {beeperHandle.cancel(true);}}, 60 * 60, TimeUnit.SECONDS);}/*** 使用Timer實現實時任務*/public static void timerBeepForAnHour() {// 創建Timer執行器實例Timer timer = new Timer();// 執行任務:,發出"嘩"聲!TimerTask beeper = new TimerTask() {@SneakyThrows@Overridepublic void run() {// 阻塞任務1,50sTimeUnit.SECONDS.sleep(50);System.out.println(Thread.currentThread().getName() + "," + new Date() + ", Timer for beep1");}};TimerTask beeper2 = new TimerTask() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + "," + new Date() + ", Timer for beep2");}};// 固定頻率周期性執行任務:每10s執行一次任務,發出"嘩"聲!timer.scheduleAtFixedRate(beeper, 10, 10 * 1000);timer.scheduleAtFixedRate(beeper2, 10, 10 * 1000);// 延遲 delay 后執行一次任務:一次性任務,3600s(1小時)后"取消"任務timer.schedule(new TimerTask() {@Overridepublic void run() {beeper.cancel();}}, 60 * 60 * 1000);}
}
?運行程序,打印結果:
pool-1-thread-2,Mon Jul 21 12:20:02 CST 2025, ScheduledExecutorService for beep2
pool-1-thread-2,Mon Jul 21 12:20:12 CST 2025, ScheduledExecutorService for beep2
pool-1-thread-2,Mon Jul 21 12:20:22 CST 2025, ScheduledExecutorService for beep2
pool-1-thread-2,Mon Jul 21 12:20:32 CST 2025, ScheduledExecutorService for beep2
pool-1-thread-2,Mon Jul 21 12:20:42 CST 2025, ScheduledExecutorService for beep2
Timer-0,Mon Jul 21 12:20:42 CST 2025, Timer for beep1
Timer-0,Mon Jul 21 12:20:42 CST 2025, Timer for beep2
Timer-0,Mon Jul 21 12:20:42 CST 2025, Timer for beep2
pool-1-thread-2,Mon Jul 21 12:20:52 CST 2025, ScheduledExecutorService for beep2
pool-1-thread-1,Mon Jul 21 12:20:52 CST 2025, ScheduledExecutorService for beep1
從上面的輸出觀察到的現象:
- 對于Timer來說,是單線程執行的,一旦執行線程被阻塞,所有任務都會被阻塞至于阻塞解除才會被重新執行(是否補償取決于不同的API)?
- 對于ScheduledExecutorService來說,是多線程執行的,單個線程的阻塞不會造成其它任務的執行,理論上執行效率更高
ScheduledExecutorService與Timer對比
特性 | Timer | ScheduledExecutorService |
---|---|---|
引入版本 | Java 1.3(早期) | Java 5(java.util.concurrent 包) |
所屬包 | java.util.Timer | java.util.concurrent.ScheduledExecutorService |
核心功能 | 單線程定時任務調度 | 多線程任務調度(支持線程池) |
?從構造方法看類結構
?為了體現ScheduledExecutorService執行器底層用的線程池,我們大費周章的寫了好幾個例子來驗證推斷,其實通過它的構造方法也看出來:
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);// 創建調度線程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);}// 創建調度線程池,這里會super通過調用線程池創建
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}// 標準線程池
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}/*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize the number of threads to keep in the pool, even* if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the* pool* @param keepAliveTime when the number of threads is greater than* the core, this is the maximum time that excess idle threads* will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are* executed. This queue will hold only the {@code Runnable}* tasks submitted by the {@code execute} method.* @param threadFactory the factory to use when the executor* creates a new thread* @param handler the handler to use when execution is blocked* because the thread bounds and queue capacities are reached* @throws IllegalArgumentException if one of the following holds:<br>* {@code corePoolSize < 0}<br>* {@code keepAliveTime < 0}<br>* {@code maximumPoolSize <= 0}<br>* {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}* or {@code threadFactory} or {@code handler} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
由于源碼里面涉及到線程池代碼比較,這里不就完整的貼出來了,只展示相關重要的幾個類:
首先創建線程池:
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
?Executors介紹
Executors
?的核心作用是封裝線程池的創建邏輯,通過不同的靜態方法提供以下幾種線程池類型:
- 固定大小的線程池(Fixed Thread Pool)
- 可緩存的線程池(Cached Thread Pool)
- 單線程的線程池(Single Thread Pool)
- 周期性任務調度線程池(Scheduled Thread Pool)
這些線程池分別適用于不同的場景,比如任務數量固定、任務數量不確定、需要單線程執行、或需要定時執行任務等。
?Executors 提供的常用靜態方法
方法 | 說明 | 適用場景 |
---|---|---|
newFixedThreadPool(int nThreads) | 創建一個固定大小的線程池,線程數始終為 nThreads | 任務數量固定、資源有限的場景 |
newCachedThreadPool() | 創建一個可緩存的線程池,線程數可根據任務動態調整 | 任務數量不確定、需要快速響應的場景 |
newSingleThreadExecutor() | 創建一個單線程的線程池,保證任務按順序執行 | 需要串行執行任務的場景 |
newScheduledThreadPool(int corePoolSize) | 創建一個支持定時任務調度的線程池 | 需要周期性或延遲執行任務的場景 |
newWorkStealingPool(int parallelism) | 創建一個基于工作竊取算法的線程池(Java 8+) | 多核 CPU 下并行任務處理 |
而ScheduledThreadPoolExecutor與ScheduledExecutorService是接口->實現關系,而能創建線程池的原因在于ScheduledThreadPoolExecutor與ThreadPoolExecutor的繼承關系:
public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService {// 具體方法略,重點明確類關系...
}
?類圖
直接上類圖看看:
?
至此得到了ScheduledExecutorService類的完整類圖,其中:
- 通過
Executors獲取線程池
ScheduledThreadPoolExecutor - ScheduledThreadPoolExecutor有兩個內部類:DelayedWorkQueue、ScheduledFutureTask
- ScheduledFutureTask封裝了FutureTask,可以獲取任務執行結果、任務取消、設置任務周期
- DelayedWorkQueue封裝了延遲隊列,提供了任務管理的方法
定時器-任務線程啟動
通過線程池之后,任務的啟動就不用定時器自動管理了,而是委托給線程池來管理了,這里就不展示開,有興趣可以看看之前關于線程池源碼的分析《深入解析Java線程池-CSDN博客》,同理定時任務執行也不用定時器來觸發了
定時任務執行
談到定時任務的觸發,就又回到我們開篇的疑問了,是不是所有的調度器都是通過類似while(true)這種"死循環"策略?答案是"否定"的,至少從ScheduledExecutorService代碼來看不是這樣的,它的線程、任務的管理都是通過各種池來實現的,這樣非常方便。但是線程執行任務完成之后,對于周期性的任務,還是需要擴展任務、隊列來完成,比如下面添加任務的代碼:
// 添加周期性任務
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (period <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));// 封裝任務,里面存儲了任務周期period字段RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;}private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> {/** Sequence number to break ties FIFO */private final long sequenceNumber;/** The time the task is enabled to execute in nanoTime units */private long time;// 任務周期private final long period;
}
再繼續看延遲執行方法:
private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {// 重點是這個,添加任務到隊列里面super.getQueue().add(task);if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);else// 添加任務成功,調用此方法:用于確保線程池中至少有一個工作線程在運行ensurePrestart();}}
?而任務執行完成后,如果是同期性任務則會重新設置執行時間來達到周期性執行效果:
public void run() {// 獲取是否周期性任務標志boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);// 如果不是周期性任務,則直接執行else if (!periodic)ScheduledFutureTask.super.run();// 如果是周期性任務,則先重置執行狀態&設置下次執行時間再執行任務else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}}
加引號的"列循環"
談到定時任務的觸發,就又回到我們開篇的疑問了,是不是所有的調度器都是通過類似while(true)這種"死循環"策略?答案是"否定"的
在前面回答這個問題時,"否定"加了引號的。嚴格意義來說,線程池底層的代碼其實也是一種死循環,它通過自旋的方式來獲取任務:
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?// 沒有條件的for循環,會一直持有CPU,這種方式稱為自旋for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
?由于里面涉及大量的JAVA并發編程,就不展開了,可以自行看看源碼及相關書籍。
推薦書單
《Java并發編程實戰》:力薦!五星!
個人膚淺的認為是看過寫的最好的一本Java并發編碼方面的書,初看驚為天人,值的讀好幾遍!不過缺點就是剛看不容易懂也容易忘,隔段時間又要重頭看起。而且理論偏多代碼例子較少,有一定的閱讀門檻