前言
Java 提供的java.util.Timer
類可以用來執行延時任務,任務可以只執行一次,也可以周期性的按照固定的速率或延時來執行。
實現一個延時任務調度器,核心有兩點:
- 如何存儲延時任務
- 如何調度執行延時任務
源碼分析
TimerTask
延時任務的核心屬性有兩個:
- 任務的邏輯(任務要干什么)
- 任務計劃執行時間
Java 把延時任務封裝成java.util.TimerTask
類,實現自 Runnable,可以被線程執行。
public abstract class TimerTask implements Runnable {int state = VIRGIN;// 默認值,表示任務還沒被安排調度執行static final int VIRGIN = 0;// 任務入隊,等待調度static final int SCHEDULED = 1;// 任務執行中static final int EXECUTED = 2;// 任務取消static final int CANCELLED = 3;long nextExecutionTime;long period = 0;
}
屬性說明:
- state 任務的狀態,例如 執行中、已取消等
- nextExecutionTime 任務下次執行的時間
- period 任務重復執行的時間周期,正值表示固定速率執行,負值表示固定延時執行,0表示非重復任務
Timer
Java 把延時任務調度器封裝成java.util.Timer
類
public class Timer {private final TaskQueue queue = new TaskQueue();private final TimerThread thread = new TimerThread(queue);
}
屬性說明:
- queue 任務優先級隊列,基于最小堆實現,隊頭永遠是最早要執行的任務
- thread 任務調度執行線程,單線程執行
提交延時任務,最終會調用sched()
,給 task 賦值任務的執行時間,狀態等信息,然后加入到隊列,如果隊頭就是自己,那么就要喚醒線程輪詢隊列調度執行。
private void sched(TimerTask task, long time, long period) {if (time < 0)throw new IllegalArgumentException("Illegal execution time.");if (Math.abs(period) > (Long.MAX_VALUE >> 1))period >>= 1;synchronized(queue) {if (!thread.newTasksMayBeScheduled)throw new IllegalStateException("Timer already cancelled.");// 搶鎖synchronized(task.lock) {if (task.state != TimerTask.VIRGIN)throw new IllegalStateException("Task already scheduled or cancelled");// 設置任務的執行時間,重復執行的時間周期,狀態改為已調度task.nextExecutionTime = time;task.period = period;task.state = TimerTask.SCHEDULED;}// 入隊queue.add(task);// 如果隊頭是自己,喚醒線程調度執行if (queue.getMin() == task)queue.notify();}
}
TimerThread
執行延時任務的線程被封裝成java.util.TimerThread
類,繼承自 Thread,內部持有任務隊列的引用。
class TimerThread extends Thread {private TaskQueue queue;
}
線程執行會調用mainLoop()
,不斷輪詢任務隊列,如果隊列是空的,線程就 wait,等待外部提交延時任務將自己喚醒。如果隊列非空,就判斷隊頭節點的執行時間是否已到,時間到了就立即執行任務,否則就 wait 指定時間。如果任務是一次性的,就把它從堆中移除;如果任務是重復執行的,就再重新添加到堆中。
private void mainLoop() {while (true) {try {TimerTask task;boolean taskFired;synchronized(queue) {// 隊列空就waitwhile (queue.isEmpty() && newTasksMayBeScheduled)queue.wait();if (queue.isEmpty())break; // Queue is empty and will forever remain; die// 獲取隊頭節點long currentTime, executionTime;task = queue.getMin();synchronized(task.lock) {if (task.state == TimerTask.CANCELLED) {queue.removeMin();continue; // No action required, poll queue again}currentTime = System.currentTimeMillis();executionTime = task.nextExecutionTime;if (taskFired = (executionTime<=currentTime)) {if (task.period == 0) {// 一次性任務,刪除掉queue.removeMin();task.state = TimerTask.EXECUTED;} else {// 重復執行的任務,再提交一次queue.rescheduleMin(task.period<0 ? currentTime - task.period: executionTime + task.period);}}}if (!taskFired)// 任務執行時間還沒到,繼續waitqueue.wait(executionTime - currentTime);}if (taskFired)// 任務執行時間到了,就執行task.run();} catch(InterruptedException e) {}}
}
注意,period>0 代表任務以 **固定速率 **執行,period<0 代表任務以 **固定延時 **執行。
什么意思呢?固定速率模式下,任務的下次執行時間會以上次任務的計劃執行時間開始計算,上次任務執行的耗時盡量不影響下次任務的執行時間。固定延時模式下,任務的下次執行時間會以上次任務的實際執行時間開始計算,也就是說上次任務的執行耗時會影響下次任務的執行時間。
TaskQueue
存儲任務的隊列被封裝成java.util.TaskQueue
類,它是一個基于最小堆實現的優先隊列,堆中的元素會按照任務的計劃執行時間升序排列,隊頭永遠是最早要執行的任務,這樣獲取要執行的任務時間復雜度是O(1),但是任務的插入刪除,時間復雜度是O(logn)。
因為堆是一棵完全二叉樹,數據規模為n的情況下,二叉樹的高度是 logn。堆節點的插入和刪除,需要不斷和父節點或子節點比較并交換,交換的次數最多是樹的高度,所以時間復雜度最壞是O(logn)。
class TaskQueue {private TimerTask[] queue = new TimerTask[128];private int size = 0;
}
屬性說明:
- queue 任務隊列,最小堆結構,用數組存儲
- size 任務數
提交任務會調用add()
,一個基本的往堆中添加節點的操作。先判斷數組是否要擴容,然后添加到隊尾,最后節點上濾,調整堆結構。
void add(TimerTask task) {// 擴容if (size + 1 == queue.length)queue = Arrays.copyOf(queue, 2*queue.length);// 先入到隊尾queue[++size] = task;// 上濾操作,調整堆結構fixUp(size);
}
堆節點上濾過程很簡單,不斷與父節點比較任務的執行時間,如果父節點任務晚于自己執行,就要和父節點交換位置。
/**
* 元素上濾,調整堆結構
* 不斷與父節點比較,如果父節點比自己大,就要和父節點交換
*/
private void fixUp(int k) {// 是否有父節點while (k > 1) {// j = 父節點下標int j = k >> 1;// 如果父節點的執行時間大于自己,就要交換,直到自己排到正確的位置if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)break;TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;k = j;}
}
如果是一次性任務,執行前要把它從堆中移除,調用removeMin()
。先把隊尾節點賦值給隊頭節點,然后將隊頭節點下濾操作。
void removeMin() {queue[1] = queue[size];queue[size--] = null;fixDown(1);
}
堆節點下濾也很簡單,就是不斷和自己的左右子節點比較,如果子節點比自己小,就要交換,直到自己被交換到正確的位置。
private void fixDown(int k) {int j;// 是否有子節點 j=左子節點,j+1=右子節點while ((j = k << 1) <= size && j > 0) {if (j < size &&queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)j++; // j indexes smallest kidif (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)break;TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;k = j;}
}
尾巴
延時任務調度執行器的核心有兩點,分別是如何存儲任務以及如何調度執行任務。Java Timer 基于最小堆的數據結構來存儲延時任務,根節點永遠是最早執行的任務,獲取任務的時間復雜度是O(1),但是任務的插入和刪除需要O(logn)復雜度,這在需要維護大量延時任務時,會有性能問題,可以考慮采用時間輪算法。
另外,每個 Java Timer 對象都會開啟一個線程來調度執行提交的所有任務,因為是單線程執行,所以一旦有耗時的任務,隊列中的其它任務都會受到影響,這點尤其要注意。