定時器
就是需要周期性的執行任務,也叫調度任務,在JDK中有個類Timer是支持周期性執行,但是這個類不建議使用了。
ScheduledThreadPoolExecutor
繼承自ThreadPoolExecutor線程池,在Executors默認創建了兩種:
newSingleThreadScheduledExecutor:只包含一個線程,只需要單個線程執行周期任務,保證順序的執行各個任務。
newScheduledThreadPool: 可以包含多個線程的,線程執行周期任務,適度控制后臺線程數量的時候。
方法:
schedule:只執行一次,任務還可以延時執行
scheduleAtFixedRate:提交固定時間間隔的任務
scheduleWithFixedDelay:提交固定延時間隔執行的任務
兩者的區別:間隔的時間定義不一樣
建議在提交給ScheduledThreadPoolExecutor的任務要住catch異常。否則不能周期性執行任務。
基本原理
在之前將BlockingQueue<T>的時候有個叫DelayQueue<E extends Delayed>堵塞隊列,這個就是實現延遲執行,在ScheduledThreadPoolExecutor實現時間間隔執行的原理與DelayQueue原理差不多
在ScheduledThreadPoolExecutor中有個靜態類DelayedWorkQueue,該類也是一個延時隊列。
構造方法是調用了父類構造,將隊列換成了延時隊列DelayedWorkQueue
public ScheduledThreadPoolExecutor(int corePoolSize,RejectedExecutionHandler handler) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), handler);}
在ThreadPoolExecutor談到了獲取隊列中Runnable對象即執行take方法,看一下DelayedWorkQueue的take()方法,在延時是調用了Condition的awaitNanos()方法進行延時執行,
private final Condition available = lock.newCondition(); public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {RunnableScheduledFuture<?> first = queue[0];if (first == null)available.await();else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0)return finishPoll(first);first = null; // don't retain ref while waitingif (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && queue[0] != null)available.signal();lock.unlock();}}
周期性執行是在執行完后會再次將當前任務放入到線程池中,再次等待延時執行。
//在調用scheduleAtFixedRate()方法是會調用delayedExecute方法將當前Runnable對象添加到隊列當中
//等待執行
private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {super.getQueue().add(task);if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);elseensurePrestart();}}
//在ScheduledFutureTask中,當線程池調用好了Runnable對象的run方法的時候,會調用reExecutePeriodic()方法將任務再次放入到線程池中,所以如果在執行報錯了,那么就不會放入到線程池中,/*** Overrides FutureTask version so as to reset/requeue if periodic.*/public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);else if (!periodic)ScheduledFutureTask.super.run();else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}}
CompletionService
這個類了解一下就好了,在使用Future并發執行的時候一般都是將多個Future對象用數組或集合保存起來,然后在循環數組或集合調用get方法獲取結果集,但是如果使用了CompletionService會將率先執行的結果集獲取到,就是利用了堵塞隊列原理實現的這種效果
用法:
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);CompletionService<String> completionService = new ExecutorCompletionService<String>(newFixedThreadPool);completionService.submit(new Callable<String>() {@Overridepublic String call() throws Exception {return null;}});completionService.take().get();