本章我們會對定時算法做個簡單介紹,包括常用的定時算法(最小堆、時間輪)的概述、實現方式、典型場景做個說明。
概述
系統或者項目中難免會遇到各種需要自動去執行的任務,實現這些任務的手段也多種多樣,如操作系統的crontab,spring框架的quartz,java的Timer和ScheduledThreadPool都是定時任務中的典型手段。
最小堆
概念
Timer是java中最典型的基于優先級隊列+最小堆實現的定時器,內部維護一個存放定時任務的優先級隊列,該優先級隊列使用了最小堆排序。當我們調用schedule方法的時候,一個新的任務被加入queue,堆重排,始終保持堆頂是執行時間最小(即最近馬上要執行)的。同時,內部相當于起了一個線程不斷掃描隊列,從隊列中依次獲取堆頂元素執行,任務得到調度。
下面以Timer為例,介紹優先級隊列+最小堆算法的實現原理:
案例
package com.ls.cloud.sys.alg.Timer;import java.util.Timer;
import java.util.TimerTask;class Task extends TimerTask {@Overridepublic void run() {System.out.println("running...");}
}
public class TimerDemo {public static void main(String[] args) {Timer t=new Timer();//在1秒后執行,以后每2秒跑一次t.schedule(new Task(), 1000,2000);}
}
源碼分析
新加任務時,t.schedule方法會add到隊列
void add(TimerTask task) {// Grow backing store if necessaryif (size + 1 == queue.length)queue = Arrays.copyOf(queue, 2*queue.length);queue[++size] = task;fixUp(size);}
add實現了容量維護,不足時擴容,同時將新任務追加到隊列隊尾,觸發堆排序,始終保持堆頂元素最小
private void fixUp(int k) {while (k > 1) {//k指針指向當前新加入的節點,也就是隊列的末尾節點,j為其父節點int j = k >> 1;//如果新加入的執行時間比父節點晚,那不需要動if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)break;//如果大于其父節點,父子交換TimerTask tmp = queue[j]; queue[j] = queue[k]; queue[k] = tmp;//交換后,當前指針繼續指向新加入的節點,繼續循環,知道堆重排合格k = j;}}
線程調度中的run,主要調用內部mainLoop()方法,使用while循環
private void mainLoop() {while (true) {try {TimerTask task;boolean taskFired;synchronized(queue) {// Wait for queue to become non-emptywhile (queue.isEmpty() && newTasksMayBeScheduled)queue.wait();if (queue.isEmpty())break; // Queue is empty and will forever remain; die// Queue nonempty; look at first evt and do the right thinglong currentTime, executionTime;task = queue.getMin();synchronized(task.lock) {if (task.state == TimerTask.CANCELLED) {queue.removeMin();continue; // No action required, poll queue again}currentTime = System.currentTimeMillis();executionTime = task.nextExecutionTime;//判斷是否到了執行時間if (taskFired = (executionTime<=currentTime)) {//判斷下一次執行時間,單次的執行完移除 //循環的修改下次執行時間if (task.period == 0) { // Non-repeating, removequeue.removeMin();task.state = TimerTask.EXECUTED;} else { // Repeating task, reschedule//下次時間的計算有兩種策略 //1.period是負數,那下一次的執行時間就是當前時間‐period //2.period是正數,那下一次就是該任務本次的執行時間+period //注意!這兩種策略大不相同。因為Timer是單線程的 //如果是1,那么currentTime是當前時間,就受任務執行長短影響 //如果是2,那么executionTime是絕對時間戳,與任務長短無關queue.rescheduleMin(task.period<0 ? currentTime - task.period: executionTime + task.period);}}}//不到執行時間,等待if (!taskFired) // Task hasn't yet fired; waitqueue.wait(executionTime - currentTime);}//到達執行時間,run!if (taskFired) // Task fired; run it, holding no lockstask.run();} catch(InterruptedException e) {}}}
}
應用
- Timer是單線程,一旦一個失敗或出現異常,將打斷全部任務隊列,線程池不會
- Timer在jdk1.3+,而線程池需要jdk1.5+
時間輪
概念
時間輪是一種更為常見的定時調度算法,各種操作系統的定時任務調度,linux crontab,基于java的通信框架Netty等。其靈感來源于我們生活中的時鐘。 輪盤實際上是一個頭尾相接的環狀數組,數組的個數即是插槽數,每個插槽中可以放置任務。 以1天為例,將任務的執行時間%12,根據得到的數值,放置在時間輪上,小時指針沿著輪盤掃描,掃到的點取出任務執行:
實現
package com.ls.cloud.sys.alg.Timer;public class RoundTask {//延遲多少秒后執行int delay;//加入的序列號,只是標記一下加入的順序int index;public RoundTask(int index, int delay) {this.index = index;this.delay = delay;}void run() {System.out.println("task " + index + " start , delay = "+delay);}@Overridepublic String toString() {return String.valueOf(delay);}
}
package com.ls.cloud.sys.alg.Timer;import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;public class RoundDemo {//小輪槽數int size1=10;//大輪槽數int size2=5;//小輪,數組,每個元素是一個鏈表LinkedList<RoundTask>[] t1 = new LinkedList[size1];//大輪LinkedList<RoundTask>[] t2 = new LinkedList[size2];//小輪計數器,指針跳動的格數,每秒加1final AtomicInteger flag1=new AtomicInteger(0);//大輪計數器,指針跳動個格數,即每10s加1final AtomicInteger flag2=new AtomicInteger(0);//調度器,拖動指針跳動ScheduledExecutorService service = Executors.newScheduledThreadPool(2);public RoundDemo(){//初始化時間輪for (int i = 0; i < size1; i++) {t1[i]=new LinkedList<>();}for (int i = 0; i < size2; i++) {t2[i]=new LinkedList<>();}}//打印時間輪的結構,數組+鏈表void print(){System.out.println("t1:");for (int i = 0; i < t1.length; i++) {System.out.println(t1[i]);}System.out.println("t2:");for (int i = 0; i < t2.length; i++) {System.out.println(t2[i]);}}//添加任務到時間輪void add(RoundTask task){int delay = task.delay;if (delay < size1){//10以內的,在小輪,槽取余數t1[delay%size1].addLast(task);}else {//超過小輪的放入大輪,槽除以小輪的長度t2[delay/size1].addLast(task);}}void startT1(){//每秒執行一次,推動時間輪旋轉,取到任務立馬執行service.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {int point = flag1.getAndIncrement()%size1;System.out.println("t1 -----> slot "+point);LinkedList<RoundTask> list = t1[point];if (!list.isEmpty()){//如果當前槽內有任務,取出來,依次執行,執行完移除while (list.size() != 0){list.getFirst().run();list.removeFirst();}}}},0,1, TimeUnit.SECONDS);}void startT2(){//每10秒執行一次,推動時間輪旋轉,取到任務下方到t1service.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {int point = flag2.getAndIncrement()%size2;System.out.println("t2 =====> slot "+point);LinkedList<RoundTask> list = t2[point];if (!list.isEmpty()){//如果當前槽內有任務,取出,放到定義的小輪while (list.size() != 0){RoundTask task = list.getFirst();//放入小輪哪個槽呢?小輪的槽按10取余數t1[task.delay % size1].addLast(task);//從大輪中移除list.removeFirst();}}}},0,10, TimeUnit.SECONDS);}public static void main(String[] args) {RoundDemo roundDemo = new RoundDemo();//生成100個任務,每個任務的延遲時間隨機for (int i = 0; i < 100; i++) {roundDemo.add(new RoundTask(i,new Random().nextInt(50)));}//打印,查看時間輪任務布局roundDemo.print();//啟動大輪roundDemo.startT2();//小輪啟動roundDemo.startT1();}}
結果
t1 -----> slot 1
task 8 start , delay = 11
task 45 start , delay = 11
task 87 start , delay = 11
t1 -----> slot 2
task 40 start , delay = 12
task 89 start , delay = 12
t1 -----> slot 3
task 25 start , delay = 13
t1 -----> slot 4
task 69 start , delay = 14
t1 -----> slot 5
- 輸出結果嚴格按delay順序執行,而不管index是何時被提交的
- t1為小輪,10個槽,每個1s,10s一輪回
- t2為大輪,5個槽,每個10s,50s一輪回
- t1循環到每個槽時,打印槽內的任務數據,如 t1–>slot9 , 打印了3個9s執行的數據
- t2循環到每個槽時,將槽內的任務delay時間取余10后,放入對應的t1槽中,如 t2==>slot1
- 那么t1旋轉對應的圈數后,可以取到t2下放過來的任務并執行,如10,11…