一、前言
最近做了電子發票的需求,分省開票接口和發票下載接口都有一定的延遲。為了完成開票后自動將發票插入用戶微信卡包,目前的解決方案是利用線程池,將開票后插入卡包的任務(輪詢分省發票接口,直到獲取到發票相關信息或者輪詢次數用完,如果獲取到發票信息,執行發票插入微信卡包,結束任務)放入線程池異步執行。仔細想一想,這種實現方案存在一個問題,線程池沒有充分的利用。為什么沒有充分的利用?下面詳細的分析。
二、異步線程池和異步任務包裝
AsyncConfigurerSupport可以幫我們指定異步任務(注有@Async注解)對應的線程池。
@Configuration public class MyAsyncConfigurer extends AsyncConfigurerSupport {private static Logger LOGGER = LoggerFactory.getLogger(MyAsyncConfigurer.class);@Overridepublic Executor getAsyncExecutor() {ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();taskExecutor.setCorePoolSize(2);taskExecutor.setMaxPoolSize(4);taskExecutor.setQueueCapacity(10);taskExecutor.setRejectedExecutionHandler((runnable, executor) -> LOGGER.error("異步線程池拒絕任務..." + runnable));taskExecutor.setThreadFactory(new MyAsyncThreadFactory());taskExecutor.initialize();return taskExecutor;}static class MyAsyncThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;MyAsyncThreadFactory() {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "myasync-pool-" +poolNumber.getAndIncrement() +"-thread-";}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}} }
異步任務包裝,除了異步,還加入了retry功能,實現指定次數的接口輪詢。
@Component public class AsyncWrapped {protected static Logger LOGGER = LoggerFactory.getLogger(AsyncWrapped.class);@Asyncpublic void asyncProcess(Runnable runnable, Callback callback, Retry retry) {try {if (retry == null) {retry = new Retry(1);}retry.execute(ctx -> {runnable.run();return null;}, ctx -> {if (callback != null) {callback.call();}return null;});} catch (Exception e) {LOGGER.error("異步調用異常...", e);}} }
業務代碼大致邏輯如下。
asyncWrapped.asyncProcess(() -> {//調用分省接口獲取發票信息//如果發票信息異常,拋出異常(進入下次重試)//否則,插入用戶微信卡包}, () -> {//輪詢次數用盡,用戶插入卡包失敗 }, new Retry(2, 1000) );
這里說一下為什么線程池沒有充分的利用。異步任務中包含輪詢操作,輪詢有一定的時間間隔,導致在這段時間間隔內,線程一直處于被閑置的狀態。所以為了能更好的利用線程池資源,我們得想辦法解決時間間隔的問題。假如有個延遲隊列,隊列里放著我們的異步任務(不包含重試機制),然后延遲(輪詢的時間間隔)一定時間之后,將任務放入線程池中執行,任務執行完畢之后根據是否需要再次執行決定是否再次放入到延遲隊列去,這樣每個線程池中的線程都不會閑著,達到了充分利用的目的。
三、定時任務線程池和實現輪詢機制
@EnableScheduling 幫助開啟@Scheduled注解解析。注冊一個名字是ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME的定時任務線程池。
@Configuration @EnableScheduling @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class TaskConfiguration {@Bean(name = ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public ScheduledExecutorService scheduledAnnotationProcessor() {return Executors.newScheduledThreadPool(5, new DefaultThreadFactory());}private static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "pool-" +poolNumber.getAndIncrement() +"-schedule-";}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon()) {t.setDaemon(false);}if (t.getPriority() != Thread.NORM_PRIORITY) {t.setPriority(Thread.NORM_PRIORITY);}return t;}} }
? 實現輪詢任務,實現接口SchedulingConfigurer,獲取ScheduledTaskRegistrar 并指定定時任務線程池。
@Override public void configureTasks(ScheduledTaskRegistrar registrar) {this.registrar = registrar;this.registrar.setScheduler(this.applicationContext.getBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, ScheduledExecutorService.class));scheduledTaskRegistrarHelper = new ScheduledTaskRegistrarHelper(); }
scheduledFutures提交定時任務時返回結果集,periodTasks 定時任務結果集。
private static final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<String, TimingTask> periodTasks = new ConcurrentHashMap<>();
定時任務包裝類,包含任務的執行次數(重試次數)、重試間隔、具體任務、重試次數用盡之后的回調等,以及自動結束定時任務、重試計數重置功能。
private static class TimingTask {//重試次數private Integer retry;//任務標識private String taskId;//重試間隔private Long period;//具體任務private ScheduledRunnable task;//結束回調private ScheduledCallback callback;//重試計數private AtomicInteger count = new AtomicInteger(0);//父線程MDCprivate Map<String, String> curContext;public TimingTask(Integer retry, String taskId, Long period, ScheduledRunnable task, ScheduledCallback callback) {this.retry = retry;this.taskId = taskId;this.period = period;this.task = task;this.callback = callback;this.curContext = MDC.getCopyOfContextMap();}public Long getPeriod() {return period;}public void setPeriod(Long period) {this.period = period;}public String getTaskId() {return taskId;}public void setTaskId(String taskId) {this.taskId = taskId;}public Integer getRetry() {return retry;}public void setRetry(Integer retry) {this.retry = retry;}public AtomicInteger getCount() {return count;}public boolean reset() {for (int cnt = this.count.intValue(); cnt < this.retry; cnt = this.count.intValue()) {if (this.count.compareAndSet(cnt, 0)) {return true;}}return false;}public void process() {Map<String, String> preContext = MDC.getCopyOfContextMap();try {if (this.curContext == null) {MDC.clear();} else {// 將父線程的MDC內容傳給子線程MDC.setContextMap(this.curContext);}this.task.run();exitTask(false);} catch (Exception e) {LOGGER.error("定時任務異常..." + this, e);if (count.incrementAndGet() >= this.retry) {exitTask(true);}} finally {if (preContext == null) {MDC.clear();} else {MDC.setContextMap(preContext);}}}//定時任務退出private void exitTask(boolean execCallback) {scheduledFutures.get(this.taskId).cancel(false);scheduledFutures.remove(this.getTaskId());periodTasks.remove(this.getTaskId());LOGGER.info("結束定時任務: " + this);if (execCallback && callback != null) {callback.call();}}@Overridepublic String toString() {return ReflectionToStringBuilder.toString(this, ToStringStyle.JSON_STYLE, false, false, TimingTask.class);} }
注意上面定時任務是如何退出的,是在某一次任務執行成功之后(沒有異常拋出)或者定時任務執行次數用盡才退出的。直接調用ScheduledFuture的cancel方法可以退出定時任務。還有就是定時任務中的日志需要父線程中的日志變量,所以需要對MDC進行一下處理。
@Scope("prototype") @Bean public AspectTimingTask aspectTimingTask() {return new AspectTimingTask(); }@Aspect @Component public static class ScheduledAspect {@Around("target(AspectTimingTask)")public Object executeScheduledWrapped(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {if (proceedingJoinPoint instanceof MethodInvocationProceedingJoinPoint) {MethodInvocationProceedingJoinPoint methodJoinPoint = (MethodInvocationProceedingJoinPoint) proceedingJoinPoint;Method method = ((MethodSignature) methodJoinPoint.getSignature()).getMethod();if (AnnotatedElementUtils.isAnnotated(method, ScheduledTask.class)) {LOGGER.info("電子發票定時任務日志同步...");//其他處理 }}return proceedingJoinPoint.proceed();} }public static class AspectTimingTask implements Runnable {private TimingTask timingTask;@Override@ScheduledTaskpublic void run() {timingTask.process();}public void setTimingTask(TimingTask timingTask) {this.timingTask = timingTask;} }
AspectTimingTask 是對TimingTask 的包裝類,實現了Runnable接口。主要是為了對run接口做一層切面,獲取ProceedingJoinPoint 實例(公司中的日志調用鏈系統需要這個參數)。AspectTimingTask 的bean實例的scope是prototype,這個注意下。
public static void register(Integer retry, Long period, String taskId, ScheduledRunnable task, ScheduledCallback callback) {scheduledTaskRegistrarHelper.register(retry, taskId, period, task, callback); }private class ScheduledTaskRegistrarHelper {public void register(Integer retry, String taskId, Long period, ScheduledRunnable task, ScheduledCallback callback) {//是否可以重置定時任務TimingTask preTask = periodTasks.get(taskId);if (null != preTask&& preTask.reset()&& existTask(taskId)) {return;}TimingTask curTask = new TimingTask(retry, taskId, period, task, callback);AspectTimingTask aspectTimingTask = applicationContext.getBean(AspectTimingTask.class);aspectTimingTask.setTimingTask(curTask);ScheduledFuture<?> scheduledFuture = registrar.getScheduler().scheduleAtFixedRate(aspectTimingTask, period);scheduledFutures.put(taskId, scheduledFuture);periodTasks.put(taskId, curTask);LOGGER.info("注冊定時任務: " + curTask);}private boolean existTask(String taskId) {return scheduledFutures.containsKey(taskId) && periodTasks.containsKey(taskId);} }
如果taskId的定時任務已經存在則重置定時任務,否則注冊新的定時任務。AspectTimingTask 實例通過ApplicationContext獲取,每次獲取都是一個新的實例。
由 異步輪詢任務 優化成 定時任務,充分利用了線程池。修改之后的業務代碼如下。
ScheduledTaskRegistrarHelper.register(10, 5*1000L, "taskId", () -> {//調用分省接口獲取發票信息//如果發票信息異常,拋出異常(進入下次重試)//否則,插入用戶微信卡包 }() -> {//輪詢次數用盡,用戶插入卡包失敗 } );
針對電子發票插入微信卡包定時任務,重試執行次數10次,每隔5秒執行一次。任務完成之后結束定時任務,執行次數用盡之后觸發插入卡包失敗動作。
四、參考
? ? ?Spring異步調用原理及SpringAop攔截器鏈原理
? ? ?Springboot定時任務原理及如何動態創建定時任務