傳送門
分布式定時任務系列1:XXL-job安裝
分布式定時任務系列2:XXL-job使用
分布式定時任務系列3:任務執行引擎設計
分布式定時任務系列4:任務執行引擎設計續
分布式定時任務系列5:XXL-job中blockingQueue的應用
分布式定時任務系列6:XXL-job觸發日志過大引發的CPU告警
分布式定時任務系列7:XXL-job源碼分析之任務觸發
分布式定時任務系列8:XXL-job源碼分析之遠程調用
?分布式定時任務系列9:XXL-job路由策略
分布式定時任務系列10:XXL-job源碼分析之路由策略
番外篇:從XXL-job路由策略的“服務容錯“說起
Java并發編程實戰1:java中的阻塞隊列
第一個問題:XXL-job是如何做到定時觸發的?
不知道大家在看XXL-job源碼的過程中,有沒有像我一樣產生過一個疑惑:那就是XXL-job到底是怎樣做到讓任務按時觸發的呢,或者說讓任務定時定點如此"聽話"?
比如說一個郵件提醒功能,每天晚上20:00點給相關的值班人員發郵件,它到時間(晚上20:00)一定會觸發嗎,會不會漏?會不會判斷不準,超過時間21:00才觸發?
觸發整體時序圖
在分布式定時任務系列7:XXL-job源碼分析之任務觸發節中,從整體上梳理過任務觸發的調用邏輯,通過一個時序圖來展現:
上面圖中圈起來的地方:
- 立即創建一個線程Thread并啟動
- 不停掃描任務表xxl_job_info
- 根據配置判斷是否觸發任務
對應的代碼如下,我把核心的地方貼出來了:
{// 立即創建一個線程scheduleThread = new Thread(new Runnable() {@Overridepublic void run() {try {TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );} catch (InterruptedException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>> init xxl-job admin scheduler success.");// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;// 不停掃描任務表xxl_job_info,相當于線程的自旋while (!scheduleThreadToStop) {// Scan Joblong start = System.currentTimeMillis();Connection conn = null;Boolean connAutoCommit = null;PreparedStatement preparedStatement = null;boolean preReadSuc = true;try {// JDBC操作數據庫conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();connAutoCommit = conn.getAutoCommit();conn.setAutoCommit(false);// 加上db悲觀鎖,防止并發執行preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );preparedStatement.execute();// tx start// 1、pre readlong nowTime = System.currentTimeMillis();// 查詢所有任務列表,一次最多6000個List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);if (scheduleList!=null && scheduleList.size()>0) {// 2、push time-ringfor (XxlJobInfo jobInfo: scheduleList) {// time-ring jumpif (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {// 2.1、trigger-expire > 5s:pass && make next-trigger-timelogger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());// 1、錯過觸發時間,根據策略決定,是否立即補嘗執行一次MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {// FIRE_ONCE_NOW 》 triggerJobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );}// 2、更新下次執行相關時間參數refreshNextValidTime(jobInfo, new Date());} else if (nowTime > jobInfo.getTriggerNextTime()) {// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time// 1、觸發任務JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );// 2、更新下次執行相關時間參數refreshNextValidTime(jobInfo, new Date());// next-trigger-time in 5s, pre-read againif (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {// 1、make ring secondint ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、push time ringpushTimeRing(ringSecond, jobInfo.getId());// 3、fresh nextrefreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}} else {// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time// 1、make ring secondint ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);// 2、push time ringpushTimeRing(ringSecond, jobInfo.getId());// 3、fresh nextrefreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));}}// 3、更新db表中,下次執行相關時間參數for (XxlJobInfo jobInfo: scheduleList) {XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);}} else {preReadSuc = false;}// tx stop} catch (Exception e) {if (!scheduleThreadToStop) {logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);}} finally {// commitif (conn != null) {try {conn.commit();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {conn.setAutoCommit(connAutoCommit);} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}try {conn.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}// close PreparedStatementif (null != preparedStatement) {try {preparedStatement.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}}long cost = System.currentTimeMillis()-start;// Wait seconds, align secondif (cost < 1000) { // scan-overtime, not waittry {// pre-read period: success > scan each second; fail > skip this period;TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);} catch (InterruptedException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}}}logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");}});// 線程啟動scheduleThread.setDaemon(true);scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");scheduleThread.start();}
這個JobScheduleHelper邏輯有多個分支,但是里面核心是:立即創建一個線程(這里是單線程),在run()方法中通過一個while循環來觸發任務。但是這個while關鍵的一點是:如果線程沒有停止,就會一直執行下去:
// 不停掃描任務表xxl_job_info,相當于線程的自旋while (!scheduleThreadToStop) {// 執行任務策略并觸好,具體見上,這里略過...
}
我們都知道Java里面的線程有可能阻塞,或者由于CPU切換得不到時間片分配導致任務暫停,這些情況下任務不就觸發不了么?所以XXL-job在run()方法里面寫了類似死循環來盡量避免任務延遲觸發,這一點其實在其它定時任務設計中(或框架)也是一種通用思路,所以這次會通過JDK自帶的Timer工具類源碼來印證、對比、擴展!
JDK-Timer類源碼解析
JDK-Timer什么是
??JDK Timer??是Java開發工具包(JDK)中提供的一個定時器工具,用于在后臺線程中安排將來執行的任務。它可以安排任務為一次性執行,或者以固定間隔重復執行。
Timer類自jdk1.3就引入,作者是大名鼎鼎的Josh Bloch(是google的首席JAVA架構師)。既然它開始工作的早,那么"退休"自然也早,現在一般不再推薦使用:
優點?:
- 簡單易用,適合簡單的定時任務需求。
- 線程安全,多個線程可以共享一個Timer對象而無需外部同步。
?缺點?:
- 單一Timer對象共享一個線程,如果任務執行時間較長,會影響后續任務的執行。
- 不適合高并發或高性能要求的場景。
?替代方案?:
- ??ScheduledExecutorService??:提供了更強大的功能,包括線程池支持、更靈活的任務調度等。例如,可以使用
scheduleAtFixedRate
和scheduleWithFixedDelay
方法來安排任務的固定頻率執行。- ??Quartz Scheduler??:一個更強大的調度框架,支持復雜的調度需求、持久化存儲、集群等功能,適合企業級應用。
但是通過它可以看看任務調度的原理,以及學習一下大師的設計!
JDK-Timer的使用
Timer的API方法并不多,主要有以下幾個:
變量和類型 | 方法 | 描述 |
---|---|---|
void | cancel() | 終止此計時器,丟棄當前計劃的任何任務。 |
int | purge() | 從此計時器的任務隊列中刪除所有已取消的任務。 |
void | schedule?(TimerTask?task, long?delay) | 在指定的延遲后安排指定的任務執行。 |
void | schedule?(TimerTask?task, long?delay, long?period) | 在指定?的延遲之后開始,為重復的?固定延遲執行安排指定的任務。 |
void | schedule?(TimerTask?task,?Date?time) | 計劃在指定時間執行指定的任務。 |
void | schedule?(TimerTask?task,?Date?firstTime, long?period) | 從指定時間開始,為重復的?固定延遲執行安排指定的任務。 |
void | scheduleAtFixedRate?(TimerTask?task, long?delay, long?period) | 在指定的延遲之后開始,為重復的?固定速率執行安排指定的任務。 |
void | scheduleAtFixedRate?(TimerTask?task,?Date?firstTime, long?period) | 從指定時間開始,為重復的?固定速率執行安排指定的任務。 |
下面通過幾個測試方法來感受一下:
public class TimerTest {public static void main(String[] args) {// 通過new實例化一個timer實例Timer timer = new Timer();// 只觸發一次的任務:延遲1s執行(如果為0,則表示立即執行)timer.schedule(new TimerTask() {@Overridepublic void run() {System.out.println("========name1:" + Thread.currentThread().getName() + ", time:" + new Date());}}, 1000L);// 固定頻率執行的任務:延遲1s執行,每2秒執行一次TimerTask timerTask2 = new TimerTask() {@Overridepublic void run() {System.out.println("========name2:" + Thread.currentThread().getName() + ", time:" + new Date());}};timer.schedule(timerTask2, 1000L, 2000L);// 固定頻率執行的任務:延遲1s執行,每3秒執行一次;并指定首次執行的時間Date fixedDate = getFixedDate();timer.schedule(new TimerTask() {@Overridepublic void run() {System.out.println("========name3:" + Thread.currentThread().getName() + ", time:" + new Date());}}, fixedDate, 3000L);}public static Date getFixedDate() {String dateString = "2025-06-08 19:19:59";SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");try {Date specifiedDate = sdf.parse(dateString);System.out.println("Specified Date: " + specifiedDate);return specifiedDate;} catch (Exception e) {System.out.println("Error parsing date: " + e.getMessage());}return null;}}
運行輸出結果:
Specified Date: Sun Jun 08 21:53:59 CST 2025
// name1輸出:在下面日志中可以看出只觸發了一次,屬于一次性任務
========name1:Timer-0, time:Sun Jun 08 21:53:48 CST 2025
// name2輸出:在下面日志中可以每隔2s觸發一次,屬于固定頻率任務
========name2:Timer-0, time:Sun Jun 08 21:53:48 CST 2025
========name2:Timer-0, time:Sun Jun 08 21:53:50 CST 2025
========name2:Timer-0, time:Sun Jun 08 21:53:52 CST 2025
========name2:Timer-0, time:Sun Jun 08 21:53:54 CST 2025
========name2:Timer-0, time:Sun Jun 08 21:53:56 CST 2025
========name2:Timer-0, time:Sun Jun 08 21:53:58 CST 2025
// name3輸出:表示在指定的時間節點才觸發了任務,,屬于固定頻率任務
========name3:Timer-0, time:Sun Jun 08 21:53:59 CST 2025
========name2:Timer-0, time:Sun Jun 08 21:54:00 CST 2025
========name3:Timer-0, time:Sun Jun 08 21:54:02 CST 2025
========name2:Timer-0, time:Sun Jun 08 21:54:02 CST 2025
========name2:Timer-0, time:Sun Jun 08 21:54:04 CST 2025
========name3:Timer-0, time:Sun Jun 08 21:54:05 CST 2025
除去上述幾個API之外,還有scheduleAtFixedRate相關的幾個方法,也是設置固定頻率執行的操作,區別是:
- scheduleAtFixedRate更注重任務執行頻率,如果任務由于其它原因被阻塞,當恢復時會盡力去補償執行遺漏的
- schedule這種更注重任務執行時間,如果任務由于其它原因被阻塞,當恢復時不會去補償執行遺漏的
可以通過如下例子來感受:
public static void main(String[] args) {// 通過new實例化一個timer實例Timer timer = new Timer();// 只觸發一次的任務:延遲1s執行(如果為0,則表示立即執行)timer.schedule(new TimerTask() {@Overridepublic void run() {System.out.println("========name1:" + Thread.currentThread().getName() + ", time:" + new Date());try {// 模擬任務阻塞,線程睡眠6sThread.sleep(6000, TimeUnit.SECONDS.ordinal());} catch (InterruptedException e) {throw new RuntimeException(e);}}}, 1000L);timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {System.out.println("========name4:" + Thread.currentThread().getName() + ", time:" + new Date());}}, 1000L, 2000L);}// name1輸出:在下面日志中可以看出只觸發了一次,屬于一次性任務
========name1:Timer-0, time:Sun Jun 08 22:31:18 CST 2025
// name4輸出:在下面幾個name4的連續輸出中可以看到是同一時刻執行的,這就是由于任務1中線程阻塞了6s所以被補償執行了
========name4:Timer-0, time:Sun Jun 08 22:31:24 CST 2025
========name4:Timer-0, time:Sun Jun 08 22:31:24 CST 2025
========name4:Timer-0, time:Sun Jun 08 22:31:24 CST 2025
========name4:Timer-0, time:Sun Jun 08 22:31:24 CST 2025
// name4輸出:后面都是每隔2s的正常輸出了
========name4:Timer-0, time:Sun Jun 08 22:31:26 CST 2025
========name4:Timer-0, time:Sun Jun 08 22:31:28 CST 2025
========name4:Timer-0, time:Sun Jun 08 22:31:30 CST 2025
JDK-Timer的源碼
從構造方法看類結構
先看下Timer的創建:
// 通過new實例化一個timer實例Timer timer = new Timer();
跟進Timer的類的源碼看看這個構造方法:
/*** Creates a new timer. The associated thread does <i>not</i>* {@linkplain Thread#setDaemon run as a daemon}.*/public Timer() {this("Timer-" + serialNumber());}/*** Creates a new timer whose associated thread has the specified name.* The associated thread does <i>not</i>* {@linkplain Thread#setDaemon run as a daemon}.** @param name the name of the associated thread* @throws NullPointerException if {@code name} is null* @since 1.5*/public Timer(String name) {thread.setName(name);thread.start();}
首先Timer()無參構造方法調用會繼續調用Timer(String name)這個有參構造,傳遞一個線程的名稱:名稱為"Timer-"+序列號:
/*** This ID is used to generate thread names.*/private final static AtomicInteger nextSerialNumber = new AtomicInteger(0);// 上面的原子類注釋說明了,nextSerialNumber是生成一個線程名稱private static int serialNumber() {return nextSerialNumber.getAndIncrement();}
那么這個線程到底是什么呢?
TimerThread-定時器線程
我們在前面說過Timer是單線程的,其實就是在這里綁定的,繼續看下thread定義:
/*** The timer task queue. This data structure is shared with the timer* thread. The timer produces tasks, via its various schedule calls,* and the timer thread consumes, executing timer tasks as appropriate,* and removing them from the queue when they're obsolete.*/private final TaskQueue queue = new TaskQueue();/*** The timer thread.*/private final TimerThread thread = new TimerThread(queue);
?這里創建了一個TimerThread對象得到一個實例線程:
class TimerThread extends Thread {/*** This flag is set to false by the reaper to inform us that there* are no more live references to our Timer object. Once this flag* is true and there are no more tasks in our queue, there is no* work left for us to do, so we terminate gracefully. Note that* this field is protected by queue's monitor!*/boolean newTasksMayBeScheduled = true;/*** Our Timer's queue. We store this reference in preference to* a reference to the Timer so the reference graph remains acyclic.* Otherwise, the Timer would never be garbage-collected and this* thread would never go away.*/private TaskQueue queue;TimerThread(TaskQueue queue) {this.queue = queue;}public void run() {try {mainLoop();} finally {// Someone killed this Thread, behave as if Timer cancelledsynchronized(queue) {newTasksMayBeScheduled = false;queue.clear(); // Eliminate obsolete references}}}/*** The main timer loop. (See class comment.)*/private void mainLoop() {// 實現暫時略,后面會詳細討論}
}
這里創建了一個TimerThread對象得到一個實例線程:
在學習Java并發編程的時候,肯定學習過線程創建的2種方式:繼承Thread或者實現Runnable接口,這里就是典型的繼續Thread類!
TaskQueue-定時器任務隊列
除此以外在創建Timer實例的時候還實例化了一個隊列:
// 定時器任務隊列。這一數據結構與定時器線程共享。
//定時器通過不同的調度調用生成任務,定時器線程則負責消費這些任務,在適當時機執行它們,并在任務過期后從隊列中移除。
private final TaskQueue queue = new TaskQueue();
?從上面可以得到它的主要作用:
- 數據結構共享:隊列由定時器和定時器線程共同操作。
- 生產者-消費者模型:定時器(生產者)創建任務,定時器線程(消費者)處理任務。
- 任務生命周期:執行后或過期時,任務會被移出隊列。
繼續跟進定時器任務隊列的類定義:
class TaskQueue {// 任務數組,是一個最少堆結構private TimerTask[] queue = new TimerTask[128];
}
里面定義了一個TimerTask數組,定義了定時任務:
TimerTask-定時任務
// 定義了一個抽象類,實現了Runnable接口
public abstract class TimerTask implements Runnable {// 定義了一個抽象方法,定時任務要執行業務方法就是通過實現這個方法public abstract void run();// 其余參數先略過...
}
?至此Timer的相關類已經都出現了,可以據此得到它的類圖:
類圖
至此得到了Timer類的完整類圖,其中:
- Timer是定時器主類,持有一個執行線程TimerThread和一個任務隊列TaskQueue
- ?TimerThread類負責執行定時任務,引用了任務隊列TaskQueue
- TaskQueue類為任務隊列,里面持有一個TimerTask數組(也是最小堆結構)
- TimerTask為業務要實現的定時任務接口(抽象類),它實現Runable接口,所以自然要求實現對應的run()方法
接下來就仔細分析一下,定時任務的整個管理、執行流程!
定時器-任務線程啟動????????
再回到剛才的構造方法來:
public Timer(String name) {thread.setName(name);// 線程啟動thread.start();}
?只要創建了一個Timer實例(new Timer()),就會立即啟動執行線程(thread.start)
定時任務執行
線程啟動之后,會立即執行線程的run方法:
public void run() {try {// 主循環,所有的定時任務都是在這里觸發執行的mainLoop();} finally {// Someone killed this Thread, behave as if Timer cancelledsynchronized(queue) {newTasksMayBeScheduled = false;queue.clear(); // Eliminate obsolete references}}}
? 從方法命名上就可以看出,這個mainLoop是一個循環方法,繼續跟進它:
private void mainLoop() {// 死循環while (true) {try {// 當前需要觸發的定時任務TimerTask task;// 觸發標志位boolean taskFired;// 直接同步任務隊列,防止并發synchronized(queue) {// Wait for queue to become non-empty// 如果隊列為空且Timer未取消,則進行等待;關于Timer取消造成newTasksMayBeScheduled為false的,放到后面討論while (queue.isEmpty() && newTasksMayBeScheduled)queue.wait();// 如果隊列為空且Timer取消(主動或被動),則跳出循環:注意只有這種情況才會真正結束死循環,其實就是newTasksMayBeScheduled為false了if (queue.isEmpty())break; // Queue is empty and will forever remain; die// Queue nonempty; look at first evt and do the right thinglong currentTime, executionTime;// 獲取第一個任務task = queue.getMin();// 同步任務,也是防止并發synchronized(task.lock) {// 如果任務取消,雖從任務隊列中刪除任務if (task.state == TimerTask.CANCELLED) {queue.removeMin();continue; // No action required, poll queue again}currentTime = System.currentTimeMillis();executionTime = task.nextExecutionTime;// 判斷任務是否需要觸發:觸發條件是執行時間在當前時間之前,即executionTime<=currentTimeif (taskFired = (executionTime<=currentTime)) {// 如果是一次性任務if (task.period == 0) { // Non-repeating, remove// 則從任務隊列中刪除任務queue.removeMin();// 修改任務的狀態為"已執行"task.state = TimerTask.EXECUTED;// 如果是周期性任務} else { // Repeating task, reschedule// 則重新計算下一次觸發時間queue.rescheduleMin(task.period<0 ? currentTime - task.period: executionTime + task.period);}}}// 如果任務未觸發,則繼續等待時間:executionTime - currentTimeif (!taskFired) // Task hasn't yet fired; waitqueue.wait(executionTime - currentTime);}// 如果任務觸發,則調用run()方法執行任務if (taskFired) // Task fired; run it, holding no lockstask.run();} catch(InterruptedException e) {}}}
?從第一行代碼就看到了我們今天引出的問題,為什么定時任務的觸發是死循環?看來不止是XXL-job這樣實現的,JDK-Timer也是這樣實現的:
while (true) {
}
關于這個問題網上AI給出的一種解釋:
任務調度原理采用死循環設計的原因?在于其能夠有效地管理和執行多任務,提高系統的整體效率和響應速度。在任務調度中,每個任務通常設計為一個死循環,這是因為死循環結構能夠確保任務持續運行并等待事件或條件的發生,從而實現對任務的持續監控和執行。
死循環在任務調度中的作用
- ?持續運行?:死循環確保任務能夠持續運行,不斷檢查和處理事件或條件。這對于實時響應和任務管理至關重要?。
- ?事件驅動?:死循環結構使得任務能夠響應各種事件或信號,如定時器中斷、外部事件等。當特定事件發生時,任務可以執行相應的操作?。
- ?資源分配?:通過維護就緒隊列和數據結構,調度程序能夠動態選擇待執行任務,實現處理器資源的有效分配和管理?
定時任務添加
關于定時任務執行主流程源碼的討論先點到為止。回過頭來再看看定時器添加任務的例子:
// 只觸發一次的任務:延遲1s執行(如果為0,則表示立即執行)timer.schedule(new TimerTask() {@Overridepublic void run() {System.out.println("========name1:" + Thread.currentThread().getName() + ", time:" + new Date());}}, 1000L);
繼續跟進schedule方法:
public void schedule(TimerTask task, long delay) {// 延遲時間要>0,意思就是任務只能當前時間之后,這點很容易理解if (delay < 0)throw new IllegalArgumentException("Negative delay.");// 從這里就可以看出delay>0原因:因為執行時間直接取的是 當前時間+延遲時間sched(task, System.currentTimeMillis()+delay, 0);}private void sched(TimerTask task, long time, long period) {// 任務執行時間if (time < 0)throw new IllegalArgumentException("Illegal execution time.");// Constrain value of period sufficiently to prevent numeric// overflow while still being effectively infinitely large.if (Math.abs(period) > (Long.MAX_VALUE >> 1))period >>= 1;// 同步隊列synchronized(queue) {// 如果定時器被取消,直接拋出異常if (!thread.newTasksMayBeScheduled)throw new IllegalStateException("Timer already cancelled.");// 同步任務synchronized(task.lock) {if (task.state != TimerTask.VIRGIN)throw new IllegalStateException("Task already scheduled or cancelled");// 更新任務信息:設置下次執行時間、周期、任務狀態為"SCHEDULED",在這個狀態的任務才能被執行,這個可以在上面執行器中代碼可以印證!task.nextExecutionTime = time;task.period = period;task.state = TimerTask.SCHEDULED;}// 添加到任務隊列中queue.add(task);// 如果任務恰好是隊列第一個元素,則直接"通知"任務隊列執行任務if (queue.getMin() == task)// 這個代碼還有另外一個作用,就是喚醒被阻塞的隊列,因為在執行的代碼中如果隊列為空會進行等待的。 queue.notify();}}
任務執行流程圖
結合上面的任務執行/添加源碼分析,在不考慮主動取消定時器的情況,可以大致得出下面的任務執行流程圖:
生產者-消費者模型
至此梳理了任務執行的流程,留意一下可以看到里面用到了wait(),notify()方法用來做線程的阻塞與喚醒,這其實是典型的生產者-消費者模型:
生產者
- 業務類通過調用Timer的schedule方法添加任務TimerTask,其實就是生產者,比如上面的TimerTest
消費者
- 這里的執行線程TimerThread其實就是消費者,它通過while循環不停的從任務隊列中取出定時任務
但是這里要明確一下,執行線程既是消費者也是生產者!
可以從mainLoop()的方法可以體現:
if (taskFired = (executionTime<=currentTime)) {// 如果是一次性任務if (task.period == 0) { // Non-repeating, remove// 則從任務隊列中刪除任務queue.removeMin();// 修改任務的狀態為"已執行"task.state = TimerTask.EXECUTED;// 如果是周期性任務} else { // Repeating task, reschedule// 則重新計算下一次觸發時間queue.rescheduleMin(task.period<0 ? currentTime - task.period: executionTime + task.period);}}
如果任務的周期字段是period,如果是0表示一次性任務;如果period>0,則表示周期性任務:在任務觸發時同時計算下一次觸發時間就是queue.rescheduleMin方法:
void rescheduleMin(long newTime) {// 更新下次執行時間:當前時間+周期queue[1].nextExecutionTime = newTime;// 調整堆順序,保持最小堆特性fixDown(1);}
通過這個方法其實"相當"于又往隊列了放了一個元素:就是同一個任務還在隊列中,只是下次執行時間nextExecutionTime被更新了,這樣在任務觸發之后可以繼續被執行!
關于復用wait和notify實現線程間的協作,這里不展開。只是列出一下相關的API:
void | notify() | 喚醒正在此對象監視器上等待的單個線程。 |
---|---|---|
void | notifyAll() | 喚醒等待此對象監視器的所有線程。 |
void | wait() | 導致當前線程等待它被喚醒,通常是?通知或?中斷?。 |
---|---|---|
void | wait?(long?timeoutMillis) | 導致當前線程等待它被喚醒,通常是?通知或?中斷?,或者直到經過一定量的實時。 |
void | wait?(long?timeoutMillis, int?nanos) | 導致當前線程等待它被喚醒,通常是?通知或?中斷?,或者直到經過一定量的實時。 |
?最小堆
前面提到過:
TaskQueue類為任務隊列,里面持有一個TimerTask數組(也是最小堆結構)
?關于最小堆的介紹在這里列出來:
經過排序的完全二叉樹
最小堆,是一種經過排序的完全二叉樹,其中任一非終端節點的數據值均不大于其左子結點和右子結點的值。
這里在添加任務的時候,其實就進行堆排序:
void add(TimerTask task) {// Grow backing store if necessaryif (size + 1 == queue.length)queue = Arrays.copyOf(queue, 2*queue.length);queue[++size] = task;fixUp(size);}private void fixUp(int k) {while (k > 1) {int j = k >> 1;if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)break;TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;k = j;}}
?
附錄Java創建的2種方式
1. 繼承?
Thread
?類這是最傳統的創建線程的方式。通過創建一個新的類繼承自?
java.lang.Thread
?類,并重寫?run()
?方法來定義線程的執行體。然后,創建該類的實例并調用其?start()
?方法來啟動線程。2. 實現?
Runnable
?接口這種方式更加靈活,因為它允許你將線程的執行代碼封裝在一個實現了?
Runnable
?接口的類的實例中。這種方式更適合資源共享和便于線程的共享和管理。