一. Jdk中的定時任務
我們平時在 Spring 項目中會使用 @Scheduled 開啟定時任務;
jdk 中其實也提供了定時任務線程池 ScheduledThreadPool,我們可以直接通過 Executors 工具類獲取;
// 創建了核心線程數為 2 的 ScheduledThreadPool 對象
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);executorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {System.out.println("正在執行任務1,線程名:" + Thread.currentThread().getName());}
}, 0, 3, TimeUnit.SECONDS);executorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {System.out.println("正在執行任務2,線程名:" + Thread.currentThread().getName());}
}, 0, 3, TimeUnit.SECONDS);
這里會啟用兩個線程去執行定時任務,打印如下;
正在執行任務1,線程名:pool-1-thread-1
正在執行任務2,線程名:pool-1-thread-1
正在執行任務1,線程名:pool-1-thread-1
正在執行任務2,線程名:pool-1-thread-2
二. Spring中的定時任務
我們知道:要開啟 Spring 的定時任務,也就是要使用 @Scheduled 注解的話,需要 @EnableScheduling 啟用定時任務;
下面我們從源碼的角度來看一下 Spring 中的定時任務;
1. Spring中默認的TaskScheduler
Spring 項目中會存在一個默認的 taskScheduler 對象,它是一個 ThreadPoolTaskScheduler;
是在 TaskSchedulingAutoConfiguration 中導入的,可以看到會往 Spring 容器中注入一個 beanName 叫 “taskScheduler” 的 ThreadPoolTaskScheduler 對象;
@ConditionalOnClass(ThreadPoolTaskScheduler.class)
@AutoConfiguration(after = TaskExecutionAutoConfiguration.class)
@EnableConfigurationProperties(TaskSchedulingProperties.class)
public class TaskSchedulingAutoConfiguration {@Bean@ConditionalOnBean(name = "internalScheduledAnnotationProcessor")@ConditionalOnMissingBean({ SchedulingConfigurer.class, TaskScheduler.class, ScheduledExecutorService.class })public ThreadPoolTaskScheduler taskScheduler(TaskSchedulerBuilder builder) {// 通過 TaskSchedulerBuilder.build() 構建 ThreadPoolTaskScheduler 對象return builder.build();}@Bean@ConditionalOnMissingBeanpublic TaskSchedulerBuilder taskSchedulerBuilder(TaskSchedulingProperties properties, ObjectProvider<TaskSchedulerCustomizer> taskSchedulerCustomizers) {// 創建出 TaskSchedulerBuilderTaskSchedulerBuilder builder = new TaskSchedulerBuilder();// 設置 TaskSchedulerBuilder 的核心線程數builder = builder.poolSize(properties.getPool().getSize());Shutdown shutdown = properties.getShutdown();builder = builder.awaitTermination(shutdown.isAwaitTermination());// 設置 TaskSchedulerBuilder 的線程名前綴builder = builder.threadNamePrefix(properties.getThreadNamePrefix());builder = builder.customizers(taskSchedulerCustomizers);// 返回 TaskSchedulerBuilder 對象return builder;}
}
2. @Scheduled
我們需要知道是哪個類來解析 @Scheduled 注解的;
/*** 從注解的信息可以看出解析是在 ScheduledAnnotationBeanPostProcessor* 我們需要重點看 ScheduledAnnotationBeanPostProcessor 類** @see EnableScheduling* @see ScheduledAnnotationBeanPostProcessor* @see Schedules*/
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
public @interface Scheduled {long fixedDelay() default -1;long fixedRate() default -1;String cron() default "";
}
那這個 ScheduledAnnotationBeanPostProcessor 又是從哪注入到 Spring 容器的呢?其實是在 @EnableScheduling 中注入的;
3. @EnableScheduling
我們看一下 @EnableScheduling,它往 spring 容器中注入了一個配置類:SchedulingConfiguration;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)// 往 Spring 容器中注入 SchedulingConfiguration 類
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {}
我們看一下 SchedulingConfiguration 類,它往容器中注入了 ScheduledAnnotationBeanPostProcessor 對象;
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {// 往容器中注入了 ScheduledAnnotationBeanPostProcessor 對象// 并且它的 beanName 為 "internalScheduledAnnotationProcessor"// 正因為導入了 "internalScheduledAnnotationProcessor",// taskScheduler 對象才會被注入到 Spring 容器中@Bean(name = "internalScheduledAnnotationProcessor")public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {return new ScheduledAnnotationBeanPostProcessor();}}
至此,前置完成,解析 @Scheduled 注解的任務交給了 ScheduledAnnotationBeanPostProcessor,我們需要重點看 ScheduledAnnotationBeanPostProcessor 做了啥;
三. ScheduledAnnotionBeanPostProcessor
- ScheduledAnnotionBeanPostProcessor 實現了 BeanPostProcessor 接口,它的 postProcessAfterInitialization() 會解析 bean 中的 @Scheduled 注解;
- ScheduledAnnotionBeanPostProcessor 實現了 ApplicationListener 接口,它的 onApplication(ContextRefreshedEvent event) 會在 spring 刷新完 beanFactory 容器的時候調用,啟用定時任務;
我們也主要從這兩個方法入手;
1. postProcessAfterInitialization()
postProcessAfterInitialization() 如下;
// -------------------- ScheduledAnnotionBeanPostProcessor ---------------------
public Object postProcessAfterInitialization(Object bean, String beanName) {if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler || bean instanceof ScheduledExecutorService) {return bean;}Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);// 1. 遍歷類中的每一個方法,收集帶有 @Scheduled 注解的方法if (AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {Map<Method, Set<Scheduled>> annotatedMethods;// 構建 annotatedMethodsif (annotatedMethods.isEmpty()) {this.nonAnnotatedClasses.add(targetClass);} else {// 2. 輪詢 annotatedMethods// 對 @Scheduled 注解的方法執行 processScheduled()annotatedMethods.forEach((method, scheduledAnnotations) ->scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));}}return bean;
}
processScheduled(scheduled, method, bean) 中,scheduled 為方法上的 @Scheduled 對象;我們看下 processScheduled() 的過程,我們只關注用的多的 cron 表達式;
// -------------------- ScheduledAnnotionBeanPostProcessor ---------------------
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {// 1. 將 bean 和 method 包裝為 runnable 對象Runnable runnable = createRunnable(bean, method);boolean processedSchedule = false;Set<ScheduledTask> tasks = new LinkedHashSet<>(4);// 2. 解析 cron 表達式String cron = scheduled.cron();if (StringUtils.hasText(cron)) {String zone = scheduled.zone();if (this.embeddedValueResolver != null) {cron = this.embeddedValueResolver.resolveStringValue(cron);zone = this.embeddedValueResolver.resolveStringValue(zone);}processedSchedule = true;if (!Scheduled.CRON_DISABLED.equals(cron)) {TimeZone timeZone;if (StringUtils.hasText(zone)) {timeZone = StringUtils.parseTimeZoneString(zone);}else {timeZone = TimeZone.getDefault();}// 3. 往 this.registrar 中添加 CronTask 任務// this.registrar 為 ScheduledTaskRegistrar 類對象tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));}}
}// ------------------------ ScheduledTaskRegistrar -------------------------
public ScheduledTask scheduleCronTask(CronTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {// 1. 第一次進來的時候創建 scheduledTaskscheduledTask = new ScheduledTask(task);newTask = true;}// 2. 第一次進來的時候創建 this.taskScheduler == null,走 else 邏輯if (this.taskScheduler != null) {scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());} else {// 3. 將 task 放入到當前的 cronTask 中addCronTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);
}
至此,bean 中的 @Scheduled 注解的定時任務都被包裝為 cronTask 對象放入到 ScheduledTaskRegistrar 中;
2. onApplication(ContextRefreshEvent)
Spring 刷新完 beanFactory 容器的時候會調用該方法,啟用定時任務;
// -------------------- ScheduledAnnotionBeanPostProcessor ---------------------
public void onApplicationEvent(ContextRefreshedEvent event) {if (event.getApplicationContext() == this.applicationContext) {// 調用 finishRegistration()finishRegistration();}
}// -------------------- ScheduledAnnotionBeanPostProcessor ---------------------
private void finishRegistration() {if (this.scheduler != null) {this.registrar.setScheduler(this.scheduler);}// 1. 先查找 SchedulingConfigurer,如果有的話用 SchedulingConfigurerif (this.beanFactory instanceof ListableBeanFactory) {Map<String, SchedulingConfigurer> beans =((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());AnnotationAwareOrderComparator.sort(configurers);for (SchedulingConfigurer configurer : configurers) {configurer.configureTasks(this.registrar);}}if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {try {// 2. 尋找 TaskScheduler bean,通過 byType 的方式// 往 this.registrar 中設置定時線程池this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));} catch (NoUniqueBeanDefinitionException ex) {// 2.1 TaskScheduler bean 不唯一,通過 byName 的方式,注入 "taskScheduler"// 尋找 TaskScheduler bean,通過 byName 的方式// 一般注入的都是默認的 "taskScheduler"this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));} catch (NoSuchBeanDefinitionException ex) {try {// 3. 尋找 ScheduledExecutorService bean,通過 byType 的方式this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));} catch (NoUniqueBeanDefinitionException ex2) {// 3.1 ScheduledExecutorService bean 不唯一// 尋找 ScheduledExecutorService bean,通過 byName 的方式this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));} catch (NoSuchBeanDefinitionException ex2) {logger.info("沒有 TaskScheduler/ScheduledExecutorService bean")}}}// 4. 上述往 this.registrar 中設置了 taskScheduler 對象// 執行 this.registrar.afterPropertiesSet()this.registrar.afterPropertiesSet();
}
我們看下 this.registrar.afterPropertiesSet() 做了啥;
// ------------------------ ScheduledTaskRegistrar -------------------------
public void afterPropertiesSet() {scheduleTasks();
}// ------------------------ ScheduledTaskRegistrar -------------------------
protected void scheduleTasks() {// 1. 如果 this.taskScheduler == null// 創建單核心線程的 ThreadScheduledExecutor 作為定時線程池// 一般 this.taskScheduler 不會為 nullif (this.taskScheduler == null) {this.localExecutor = Executors.newSingleThreadScheduledExecutor();this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);}if (this.triggerTasks != null) {for (TriggerTask task : this.triggerTasks) {addScheduledTask(scheduleTriggerTask(task));}}// 2. this.cronTasks 不為 null// 依次遍歷 cronTask,執行 scheduleCronTask(cronTask)// 這是我們第二次進入 scheduleCronTask(cronTask),和第一次有點區別if (this.cronTasks != null) {for (CronTask task : this.cronTasks) {addScheduledTask(scheduleCronTask(task));}}if (this.fixedRateTasks != null) {for (IntervalTask task : this.fixedRateTasks) {addScheduledTask(scheduleFixedRateTask(task));}}if (this.fixedDelayTasks != null) {for (IntervalTask task : this.fixedDelayTasks) {addScheduledTask(scheduleFixedDelayTask(task));}}
}// ------------------------ ScheduledTaskRegistrar -------------------------
public ScheduledTask scheduleCronTask(CronTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {scheduledTask = new ScheduledTask(task);newTask = true;}// 1. 第二次進來,this.taskScheduler 不為 null// 執行 this.taskScheduler.schedule(),正式啟動定時任務if (this.taskScheduler != null) {scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());} else {addCronTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);
}
至此,SpringScheduled 全部解析完畢;
四. 配置定時線程池
1. 通過配置文件
我們在 TaskSchedulingAutoConfiguration 中知道,其實默認的 ThreadPoolTaskScheduler 都是根據配置項 TaskSchedulingProperties 創建的,默認核心線程 coreThreads = 1;
可以進行如下配置:
spring:task:scheduling:pool:size: 5thread-name-prefix: my-schedule-
2. 自定義ThreadPoolTaskScheduler
我們也可以直接往 Spring 容器中自定義注入 ThreadPoolTaskScheduler 對象,只不過需要注意它的 beanName 必須為 taskScheduler;
@Configuration
public class ThreadPoolConfig {@Beanpublic Executor taskScheduler() {ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();taskScheduler.setPoolSize(3);taskScheduler.setThreadNamePrefix("my-schedule-task-");taskScheduler.initialize();return taskScheduler;}
}