簡介
ScheduledThreadPoolExecutor 是 Java 中的一個類,它屬于 java.util.concurrent 包。這個類是一個線程池,用于在給定的延遲后運行命令,或者定期地執行命令。它是 ThreadPoolExecutor 的一個子類,專門用于處理需要定時或周期性執行的任務。
ScheduledThreadPoolExecutor 的主要特點如下:
- 線程池大小固定:線程池的大小在創建時就已經設定,并且之后不會改變。
- 任務調度:它可以用來調度一次性任務,也可以用來調度重復執行的任務。
- 延遲執行:任務可以在給定的延遲之后開始執行。
- 周期性執行:任務可以周期性地執行,例如每隔固定時間執行一次。
ScheduledThreadPoolExecutor 提供的主要方法包括:
- schedule(Runnable command, long delay, TimeUnit unit): 在給定的延遲后執行一次任務。
- scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit): 以固定的速率周期性地執行任務。
- scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit): 在給定的初始延遲后首次執行任務,然后每次任務執行完畢之后等待指定的延遲再次執行。
源碼
public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService {public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {//任務或單位為空 則拋出異常if (command == null || unit == null)throw new NullPointerException();//把要執行的任務包裝成 RunnableScheduledFutureRunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));//延時執行定時任務delayedExecute(t);return t;}//邏輯同上public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit) {if (callable == null || unit == null)throw new NullPointerException();RunnableScheduledFuture<V> t = decorateTask(callable,new ScheduledFutureTask<V>(callable,triggerTime(delay, unit)));delayedExecute(t);return t;}public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {//任務和單位為空則拋出異常if (command == null || unit == null)throw new NullPointerException();if (period <= 0)//執行周期時間小于等于0 則拋出異常throw new IllegalArgumentException();//將任務包裝成ScheduledFutureTaskScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));//然后包裝成RunnableScheduledFutureRunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;//延時執行任務delayedExecute(t);return t;}//同上public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (delay <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;}//所謂的包裝就是直接返回RunnableScheduledFuture對象protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {return task;}//從scheduleWithFixedDelay方法的源代碼,我們可以看出在將Runnable對象封裝成ScheduledFutureTask時,設置了執行周期,//但是此時設置的執行周期與scheduleAtFixedRate方法設置的執行周期不同。//此時設置的執行周期規則為:下一次任務執行的時間是上一次任務完成的時間加上delay時長,時長單位由TimeUnit決定。//也就是說,具體的執行時間不是固定的,但是執行的周期是固定的,整體采用的是相對固定的延遲來執行定時任務public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) {//傳入的Runnable對象和TimeUnit為空,則拋出空指針異常if (command == null || unit == null)throw new NullPointerException();//任務延時時長小于或者等于0,則拋出非法參數異常if (delay <= 0)throw new IllegalArgumentException();//將Runnable對象封裝成ScheduledFutureTask任務//并設置固定的執行周期來執行任務ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay));//調用decorateTask方法,本質上直接返回ScheduledFutureTask任務RunnableScheduledFuture<Void> t = decorateTask(command, sft);//設置執行的任務sft.outerTask = t;//執行延時任務delayedExecute(t);return t;}private void setNextRunTime() {//距離下次執行任務的時長long p = period;//固定頻率執行,//上次執行任務的時間//加上任務的執行周期if (p > 0)time += p;//相對固定的延遲//使用的是系統當前時間//加上任務的執行周期elsetime = triggerTime(-p);}//這兩個triggerTime方法的代碼比較簡單,就是獲取下一次執行任務的具體時間。//有一點需要注意的是:delay <(Long.MAX_VALUE >> 1判斷delay的值是否小于Long.MAX_VALUE的一半,//如果小于Long.MAX_VALUE值的一半,則直接返回delay,否則需要處理溢出的情況。private long triggerTime(long delay, TimeUnit unit) {return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));}long triggerTime(long delay) {return now() +((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));}private long overflowFree(long delay) {//獲取隊列中的節點Delayed head = (Delayed) super.getQueue().peek();//獲取的節點不為空,則進行后續處理if (head != null) {//從隊列節點中獲取延遲時間long headDelay = head.getDelay(NANOSECONDS);//如果從隊列中獲取的延遲時間小于0,并且傳遞的delay//值減去從隊列節點中獲取延遲時間小于0if (headDelay < 0 && (delay - headDelay < 0))//將delay的值設置為Long.MAX_VALUE + headDelaydelay = Long.MAX_VALUE + headDelay;}//返回延遲時間return delay;}private void delayedExecute(RunnableScheduledFuture<?> task) {//如果當前線程池已經關閉//則執行線程池的拒絕策略if (isShutdown())reject(task);//線程池沒有關閉else {//將任務添加到阻塞隊列中super.getQueue().add(task);//如果當前線程池是SHUTDOWN狀態//并且當前線程池狀態下不能執行任務//并且成功從阻塞隊列中移除任務if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))//取消任務的執行,但不會中斷執行中的任務task.cancel(false);else//調用ThreadPoolExecutor類中的ensurePrestart()方法ensurePrestart();}}void reExecutePeriodic(RunnableScheduledFuture<?> task) {//線程池當前狀態下能夠執行任務if (canRunInCurrentRunState(true)) {//將任務放入隊列super.getQueue().add(task);//線程池當前狀態下不能執行任務,并且成功移除任務if (!canRunInCurrentRunState(true) && remove(task))//取消任務task.cancel(false);else//調用ThreadPoolExecutor類的ensurePrestart()方法ensurePrestart();}}//ScheduledThreadPoolExecutor類中的onShutdown方法的主要邏輯就是先判斷線程池調用shutdown方法后,是否繼續執行現有的延遲任務和定時任務,//如果不再執行,則取消任務并清空隊列;如果繼續執行,將隊列中的任務強轉為RunnableScheduledFuture對象之后,從隊列中刪除并取消任務。//大家需要好好理解這兩種處理方式。最后調用ThreadPoolExecutor類的tryTerminate方法。@Overridevoid onShutdown() {//獲取隊列BlockingQueue<Runnable> q = super.getQueue();//在線程池已經調用shutdown方法后,是否繼續執行現有延遲任務boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();//在線程池已經調用shutdown方法后,是否繼續執行現有定時任務boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();//在線程池已經調用shutdown方法后,不繼續執行現有延遲任務和定時任務if (!keepDelayed && !keepPeriodic) {//遍歷隊列中的所有任務for (Object e : q.toArray())//取消任務的執行if (e instanceof RunnableScheduledFuture<?>)((RunnableScheduledFuture<?>) e).cancel(false);//清空隊列q.clear();}//在線程池已經調用shutdown方法后,繼續執行現有延遲任務和定時任務else {//遍歷隊列中的所有任務for (Object e : q.toArray()) {//當前任務是RunnableScheduledFuture類型if (e instanceof RunnableScheduledFuture) {//將任務強轉為RunnableScheduledFuture類型RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>) e;//在線程池調用shutdown方法后不繼續的延遲任務或周期任務//則從隊列中刪除并取消任務if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||t.isCancelled()) {if (q.remove(t))t.cancel(false);}}}}//最終調用tryTerminate()方法tryTerminate();}}
示例
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; public class ScheduledThreadPoolExecutorExample { public static void main(String[] args) { // 創建一個包含單個線程的ScheduledThreadPoolExecutor ScheduledThreadPoolExecutor executor = Executors.newScheduledThreadPool(1); // 創建一個Runnable任務 Runnable task = () -> System.out.println("Task is running: " + System.currentTimeMillis()); // 在10秒后執行該任務 executor.schedule(task, 10, TimeUnit.SECONDS); // 每隔5秒執行一次該任務 executor.scheduleAtFixedRate(task, 0, 5, TimeUnit.SECONDS); // 在初始延遲15秒后執行,之后每次任務執行完等待2秒再次執行 executor.scheduleWithFixedDelay(task, 15, 2, TimeUnit.SECONDS); // 注意:在實際應用中,你應該適當地關閉線程池以避免資源泄露 // executor.shutdown(); // 這將平滑地關閉線程池,等待所有任務完成 }
}
在這個例子中,我們創建了一個 ScheduledThreadPoolExecutor 實例,它包含單個線程。我們定義了三個不同的任務調度:一個在10秒后執行,一個每隔5秒執行一次,還有一個在初始延遲15秒后執行,然后每次任務執行完畢之后等待2秒再次執行。
請注意,在實際應用中,當你不再需要 ScheduledThreadPoolExecutor 時,應該調用 shutdown 或 shutdownNow 方法來關閉線程池,以避免資源泄露。同時,線程池中的任務也應該設計為能夠在適當的時候結束執行,避免無限期地占用資源。