Spring Boot定時任務原理
在現代應用中,定時任務的調度是實現周期性操作的關鍵機制。Spring Boot 提供了強大的定時任務支持,通過注解驅動的方式,開發者可以輕松地為方法添加定時任務功能。本文將深入探討 Spring Boot 中定時任務的實現原理,重點分析 @EnableScheduling
和 ScheduledAnnotationBeanPostProcessor
的作用,以及任務如何被注冊和執行。我們還將詳細介紹底層使用的線程池調度器 ThreadPoolTaskScheduler
和 Java 內置的 ScheduledThreadPoolExecutor
,它們如何協同工作,保證定時任務的準確執行。此外,我們還將探討任務調度的線程阻塞與喚醒機制,深入剖析延遲隊列(DelayedWorkQueue
)如何有效管理任務的執行順序。通過本文的學習,你將能夠更好地理解和應用 Spring Boot 定時任務,提升應用的調度能力和性能。
1.注解驅動
Spring Boot通過@EnableScheduling
激活定時任務支持,而EnableScheduling
注解導入了SchedulingConfiguration
,這個類創建了一個名為ScheduledAnnotationBeanPostProcessor
的bean
,而這個bean
就是定時任務的關鍵
/*** {@code @Configuration} class that registers a {@link ScheduledAnnotationBeanPostProcessor}* bean capable of processing Spring's @{@link Scheduled} annotation.** <p>This configuration class is automatically imported when using the* {@link EnableScheduling @EnableScheduling} annotation. See* {@code @EnableScheduling}'s javadoc for complete usage details.** @author Chris Beams* @since 3.1* @see EnableScheduling* @see ScheduledAnnotationBeanPostProcessor*/
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {return new ScheduledAnnotationBeanPostProcessor();}}
2.對ScheduledAnnotationBeanPostProcessor
的分析
1. 類職責
- 核心作用:掃描 Spring Bean 中的
@Scheduled
注解方法,將其轉換為定時任務,并注冊到任務調度器。
2. 定時任務注冊的關鍵流程
代碼都是經過簡化的代碼,實際上我去看Spring的源碼,發現代碼都很長,但是整體意思是差不多的
Bean 初始化后掃描注解(關鍵方法:postProcessAfterInitialization
)
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {// 1. 跳過 AOP 基礎設施類if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||bean instanceof ScheduledExecutorService) {// Ignore AOP infrastructure such as scoped proxies.return bean;}// 2. 檢查類是否包含 @Scheduled 注解Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);if (!nonAnnotatedClasses.contains(targetClass) && AnnotationUtils.isCandidateClass(targetClass, List.of(Scheduled.class, Schedules.class))) {// 3. 反射查找所有帶 @Scheduled 的方法Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, method -> AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class));// 4. 處理每個帶注解的方法annotatedMethods.forEach((method, scheduledAnnotations) -> scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));}return bean;
}
- 跳過無關 Bean:如 AOP 代理類、
TaskScheduler
本身。 - 反射掃描方法:通過
MethodIntrospector
查找所有帶有@Scheduled
的方法。 - 注解聚合:支持
@Schedules
多注解合并。
解析任務參數并注冊(關鍵方法:processScheduled
)
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {// 1. 創建 Runnable 任務Runnable runnable = createRunnable(bean, method);// 2. 解析時間參數(cron/fixedDelay/fixedRate)if (StringUtils.hasText(cron)) {// 處理 cron 表達式CronTask task = new CronTask(runnable, new CronTrigger(cron, timeZone));tasks.add(registrar.scheduleCronTask(task));} else if (fixedDelay > 0) {// 處理 fixedDelayFixedDelayTask task = new FixedDelayTask(runnable, fixedDelay, initialDelay);tasks.add(registrar.scheduleFixedDelayTask(task));} else if (fixedRate > 0) {// 處理 fixedRateFixedRateTask task = new FixedRateTask(runnable, fixedRate, initialDelay);tasks.add(registrar.scheduleFixedRateTask(task));}// 3. 注冊任務到 ScheduledTaskRegistrarsynchronized (scheduledTasks) {scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>()).addAll(tasks);}
}
- 任務封裝:將方法封裝為
ScheduledMethodRunnable
。 - 時間參數解析:
- 支持
cron
、fixedDelay
、fixedRate
三種模式。 - 處理
initialDelay
初始延遲。 - 使用
embeddedValueResolver
解析占位符(如${task.interval}
)。
- 支持
- 任務注冊:最終任務被添加到
ScheduledTaskRegistrar
。
啟動任務調度(關鍵方法:finishRegistration
)
private void finishRegistration() {// 1. 配置 TaskScheduler(優先級:顯式設置 > 查找 Bean > 默認單線程)if (registrar.getScheduler() == null) {TaskScheduler scheduler = resolveSchedulerBean(beanFactory, TaskScheduler.class, false);registrar.setTaskScheduler(scheduler);}// 2. 調用 SchedulingConfigurer 自定義配置(擴展點)List<SchedulingConfigurer> configurers = beanFactory.getBeansOfType(SchedulingConfigurer.class);configurers.forEach(configurer -> configurer.configureTasks(registrar));// 3. 啟動所有注冊的任務registrar.afterPropertiesSet();
}
- 調度器解析:
- 默認查找名為
taskScheduler
的 Bean。 - 若無則創建單線程調度器(
Executors.newSingleThreadScheduledExecutor()
)。
- 默認查找名為
- 擴展點:允許通過
SchedulingConfigurer
自定義任務注冊邏輯。 - 最終啟動:調用
afterPropertiesSet()
觸發任務調度。
3.ThreadPoolTaskScheduler的剖析
ThreadPoolTaskScheduler
是 Spring 對 Java ScheduledThreadPoolExecutor
的封裝,是 @Scheduled
定時任務的底層執行引擎。
- 繼承關系:繼承
ExecutorConfigurationSupport
,實現TaskScheduler
接口,整合了線程池管理與定時任務調度。 - 底層依賴:基于
ScheduledThreadPoolExecutor
,支持 周期性任務(fixedRate/fixedDelay)和 動態觸發任務(如 cron 表達式)。
線程池初始化(關鍵方法:initializeExecutor
)
同樣,這里和以后的部分也都是偽代碼
@Override
protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {// 創建 ScheduledThreadPoolExecutorthis.scheduledExecutor = createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler);// 配置線程池策略(如取消后立即移除任務)if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor scheduledPoolExecutor) {scheduledPoolExecutor.setRemoveOnCancelPolicy(this.removeOnCancelPolicy);// 其他策略設置...}return this.scheduledExecutor;
}
這部分是我復制源碼的,可以清晰的看到,底層就是new了ScheduledThreadPoolExecutor
protected ScheduledExecutorService createExecutor(int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler);}
4.ScheduledThreadPoolExecutor的原理分析
核心成員:
- 任務隊列:使用
DelayedWorkQueue
(內部實現的小頂堆),按任務執行時間排序。 - 線程池:復用
ThreadPoolExecutor
的線程管理機制,支持核心線程數和最大線程數配置。
2. 定時任務調度機制
所有定時任務被封裝為 ScheduledFutureTask
對象,其核心邏輯如下:
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {private long time; // 下一次執行時間(納秒)private final long period; // 周期(正數:fixedRate;負數:fixedDelay)private int heapIndex; // 在 DelayedWorkQueue 中的索引public void run() {if (isPeriodic()) {// 周期性任務:重新計算下一次執行時間,并重新加入隊列setNextRunTime();reExecutePeriodic(outerTask);} else {// 一次性任務:直接執行super.run();}}
}
- 任務提交:通過
schedule
、scheduleAtFixedRate
等方法提交任務。 - 隊列管理:任務被封裝為
ScheduledFutureTask
并加入DelayedWorkQueue
。 - 線程喚醒:工作線程 (
Worker
) 從隊列獲取任務,若任務未到執行時間,線程進入限時等待(available.awaitNanos(delay)
)。 - 任務執行:到達執行時間后,線程執行任務:
- 固定速率(fixedRate):執行完成后,根據
period
計算下一次執行時間(time += period
)。 - 固定延遲(fixedDelay):執行完成后,根據當前時間計算下一次執行時間(
time = now() + (-period)
)。
- 固定速率(fixedRate):執行完成后,根據
- 重新入隊:周期性任務執行后,重新加入隊列等待下次調度。
3.DelayedWorkQueue
的簡單剖析
DelayQueue隊列是一個延遲隊列,DelayQueue中存放的元素必須實現Delayed接口的元素,實現接口后相當于是每個元素都有個過期時間,當隊列進行take獲取元素時,先要判斷元素有沒有過期,只有過期的元素才能出隊操作,沒有過期的隊列需要等待剩余過期時間才能進行出隊操作。
DelayQueue隊列內部使用了PriorityQueue優先隊列來進行存放數據,它采用的是二叉堆進行的優先隊列,使用ReentrantLock鎖來控制線程同步,由于內部元素是采用的PriorityQueue來進行存放數據,所以Delayed接口實現了Comparable接口,用于比較來控制優先級
線程阻塞與喚醒邏輯
(1) 取任務時的阻塞(take() 方法)
當線程調用 take()
方法從隊列中獲取任務時,若隊列為空或隊頭任務未到期,線程會進入阻塞狀態:
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null) {available.await(); // 隊列為空時無限等待} else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0) return q.poll(); // 任務已到期,取出執行if (leader != null) {available.await(); // 其他線程已為隊頭任務等待,本線程無限等待} else {Thread thisThread = Thread.currentThread();leader = thisThread; // 標記當前線程為“領導者”try {available.awaitNanos(delay); // 限時等待到期時間} finally {if (leader == thisThread) leader = null;}}}}} finally {if (leader == null && q.peek() != null) available.signal();lock.unlock();}
}
- 關鍵邏輯:
leader
線程優化:避免多個線程同時等待同一任務到期,僅一個線程(leader)限時等待,其他線程無限等待- 限時等待:通過
available.awaitNanos(delay)
阻塞到任務到期時間。
(2) 插入新任務時的喚醒(offer() 方法)
當新任務被插入隊列時,若新任務成為隊頭(即最早到期),會觸發喚醒邏輯:
public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e); // 插入任務并調整堆結構if (q.peek() == e) { // 新任務成為隊頭leader = null;available.signal(); // 喚醒等待線程}return true;} finally {lock.unlock();}
}
- 喚醒條件:
- 插入的任務成為新的隊頭(即其到期時間最早)。
- 調用
available.signal()
喚醒等待的線程(leader)
或其他線程
(3) 喚醒機制總結
- 何時喚醒:
- 超時喚醒:等待線程因任務到期而被 JVM 自動喚醒。
- 插入新任務喚醒:新任務的到期時間早于當前隊頭任務時,插入線程會觸發喚醒。
- 喚醒對象:
- 若存在
leader
線程(正在限時等待隊頭任務),優先喚醒它。 - 若無
leader
,喚醒任意一個等待線程
- 若存在