線程池優化之充分利用線程池資源

一、前言

  最近做了電子發票的需求,分省開票接口和發票下載接口都有一定的延遲。為了完成開票后自動將發票插入用戶微信卡包,目前的解決方案是利用線程池,將開票后插入卡包的任務(輪詢分省發票接口,直到獲取到發票相關信息或者輪詢次數用完,如果獲取到發票信息,執行發票插入微信卡包,結束任務)放入線程池異步執行。仔細想一想,這種實現方案存在一個問題,線程池沒有充分的利用。為什么沒有充分的利用?下面詳細的分析。

二、異步線程池和異步任務包裝

  AsyncConfigurerSupport可以幫我們指定異步任務(注有@Async注解)對應的線程池。

@Configuration
public class MyAsyncConfigurer extends AsyncConfigurerSupport {private static Logger LOGGER = LoggerFactory.getLogger(MyAsyncConfigurer.class);@Overridepublic Executor getAsyncExecutor() {ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();taskExecutor.setCorePoolSize(2);taskExecutor.setMaxPoolSize(4);taskExecutor.setQueueCapacity(10);taskExecutor.setRejectedExecutionHandler((runnable, executor) -> LOGGER.error("異步線程池拒絕任務..." + runnable));taskExecutor.setThreadFactory(new MyAsyncThreadFactory());taskExecutor.initialize();return taskExecutor;}static class MyAsyncThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;MyAsyncThreadFactory() {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "myasync-pool-" +poolNumber.getAndIncrement() +"-thread-";}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}}
}

  異步任務包裝,除了異步,還加入了retry功能,實現指定次數的接口輪詢。

@Component
public class AsyncWrapped {protected static Logger LOGGER = LoggerFactory.getLogger(AsyncWrapped.class);@Asyncpublic void asyncProcess(Runnable runnable, Callback callback, Retry retry) {try {if (retry == null) {retry = new Retry(1);}retry.execute(ctx -> {runnable.run();return null;}, ctx -> {if (callback != null) {callback.call();}return null;});} catch (Exception e) {LOGGER.error("異步調用異常...", e);}}
}

  業務代碼大致邏輯如下。

asyncWrapped.asyncProcess(() -> {//調用分省接口獲取發票信息//如果發票信息異常,拋出異常(進入下次重試)//否則,插入用戶微信卡包}, () -> {//輪詢次數用盡,用戶插入卡包失敗
    }, new Retry(2, 1000)
);

  這里說一下為什么線程池沒有充分的利用。異步任務中包含輪詢操作,輪詢有一定的時間間隔,導致在這段時間間隔內,線程一直處于被閑置的狀態。所以為了能更好的利用線程池資源,我們得想辦法解決時間間隔的問題。假如有個延遲隊列,隊列里放著我們的異步任務(不包含重試機制),然后延遲(輪詢的時間間隔)一定時間之后,將任務放入線程池中執行,任務執行完畢之后根據是否需要再次執行決定是否再次放入到延遲隊列去,這樣每個線程池中的線程都不會閑著,達到了充分利用的目的。

三、定時任務線程池和實現輪詢機制

  @EnableScheduling 幫助開啟@Scheduled注解解析。注冊一個名字是ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME的定時任務線程池。

@Configuration
@EnableScheduling
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class TaskConfiguration {@Bean(name = ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public ScheduledExecutorService scheduledAnnotationProcessor() {return Executors.newScheduledThreadPool(5, new DefaultThreadFactory());}private static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "pool-" +poolNumber.getAndIncrement() +"-schedule-";}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon()) {t.setDaemon(false);}if (t.getPriority() != Thread.NORM_PRIORITY) {t.setPriority(Thread.NORM_PRIORITY);}return t;}}
}

?  實現輪詢任務,實現接口SchedulingConfigurer,獲取ScheduledTaskRegistrar 并指定定時任務線程池。

@Override
public void configureTasks(ScheduledTaskRegistrar registrar) {this.registrar = registrar;this.registrar.setScheduler(this.applicationContext.getBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, ScheduledExecutorService.class));scheduledTaskRegistrarHelper = new ScheduledTaskRegistrarHelper();
}

  scheduledFutures提交定時任務時返回結果集,periodTasks 定時任務結果集。

private static final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, TimingTask> periodTasks = new ConcurrentHashMap<>();

  定時任務包裝類,包含任務的執行次數(重試次數)、重試間隔、具體任務、重試次數用盡之后的回調等,以及自動結束定時任務、重試計數重置功能。

private static class TimingTask {//重試次數private Integer retry;//任務標識private String taskId;//重試間隔private Long period;//具體任務private ScheduledRunnable task;//結束回調private ScheduledCallback callback;//重試計數private AtomicInteger count = new AtomicInteger(0);//父線程MDCprivate Map<String, String> curContext;public TimingTask(Integer retry, String taskId, Long period, ScheduledRunnable task, ScheduledCallback callback) {this.retry = retry;this.taskId = taskId;this.period = period;this.task = task;this.callback = callback;this.curContext = MDC.getCopyOfContextMap();}public Long getPeriod() {return period;}public void setPeriod(Long period) {this.period = period;}public String getTaskId() {return taskId;}public void setTaskId(String taskId) {this.taskId = taskId;}public Integer getRetry() {return retry;}public void setRetry(Integer retry) {this.retry = retry;}public AtomicInteger getCount() {return count;}public boolean reset() {for (int cnt = this.count.intValue(); cnt < this.retry; cnt = this.count.intValue()) {if (this.count.compareAndSet(cnt, 0)) {return true;}}return false;}public void process() {Map<String, String> preContext = MDC.getCopyOfContextMap();try {if (this.curContext == null) {MDC.clear();} else {// 將父線程的MDC內容傳給子線程MDC.setContextMap(this.curContext);}this.task.run();exitTask(false);} catch (Exception e) {LOGGER.error("定時任務異常..." + this, e);if (count.incrementAndGet() >= this.retry) {exitTask(true);}} finally {if (preContext == null) {MDC.clear();} else {MDC.setContextMap(preContext);}}}//定時任務退出private void exitTask(boolean execCallback) {scheduledFutures.get(this.taskId).cancel(false);scheduledFutures.remove(this.getTaskId());periodTasks.remove(this.getTaskId());LOGGER.info("結束定時任務: " + this);if (execCallback && callback != null) {callback.call();}}@Overridepublic String toString() {return ReflectionToStringBuilder.toString(this, ToStringStyle.JSON_STYLE, false, false, TimingTask.class);}
}

  注意上面定時任務是如何退出的,是在某一次任務執行成功之后(沒有異常拋出)或者定時任務執行次數用盡才退出的。直接調用ScheduledFuture的cancel方法可以退出定時任務。還有就是定時任務中的日志需要父線程中的日志變量,所以需要對MDC進行一下處理。

@Scope("prototype")
@Bean
public AspectTimingTask aspectTimingTask() {return new AspectTimingTask();
}@Aspect
@Component
public static class ScheduledAspect {@Around("target(AspectTimingTask)")public Object executeScheduledWrapped(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {if (proceedingJoinPoint instanceof MethodInvocationProceedingJoinPoint) {MethodInvocationProceedingJoinPoint methodJoinPoint = (MethodInvocationProceedingJoinPoint) proceedingJoinPoint;Method method = ((MethodSignature) methodJoinPoint.getSignature()).getMethod();if (AnnotatedElementUtils.isAnnotated(method, ScheduledTask.class)) {LOGGER.info("電子發票定時任務日志同步...");//其他處理
            }}return proceedingJoinPoint.proceed();}
}public static class AspectTimingTask implements Runnable {private TimingTask timingTask;@Override@ScheduledTaskpublic void run() {timingTask.process();}public void setTimingTask(TimingTask timingTask) {this.timingTask = timingTask;}
}

  AspectTimingTask 是對TimingTask 的包裝類,實現了Runnable接口。主要是為了對run接口做一層切面,獲取ProceedingJoinPoint 實例(公司中的日志調用鏈系統需要這個參數)。AspectTimingTask 的bean實例的scope是prototype,這個注意下。

public static void register(Integer retry, Long period, String taskId, ScheduledRunnable task, ScheduledCallback callback) {scheduledTaskRegistrarHelper.register(retry, taskId, period, task, callback);
}private class ScheduledTaskRegistrarHelper {public void register(Integer retry, String taskId, Long period, ScheduledRunnable task, ScheduledCallback callback) {//是否可以重置定時任務TimingTask preTask = periodTasks.get(taskId);if (null != preTask&& preTask.reset()&& existTask(taskId)) {return;}TimingTask curTask = new TimingTask(retry, taskId, period, task, callback);AspectTimingTask aspectTimingTask = applicationContext.getBean(AspectTimingTask.class);aspectTimingTask.setTimingTask(curTask);ScheduledFuture<?> scheduledFuture = registrar.getScheduler().scheduleAtFixedRate(aspectTimingTask, period);scheduledFutures.put(taskId, scheduledFuture);periodTasks.put(taskId, curTask);LOGGER.info("注冊定時任務: " + curTask);}private boolean existTask(String taskId) {return scheduledFutures.containsKey(taskId) && periodTasks.containsKey(taskId);}
}

  如果taskId的定時任務已經存在則重置定時任務,否則注冊新的定時任務。AspectTimingTask 實例通過ApplicationContext獲取,每次獲取都是一個新的實例。

  由 異步輪詢任務 優化成 定時任務,充分利用了線程池。修改之后的業務代碼如下。

ScheduledTaskRegistrarHelper.register(10, 5*1000L, "taskId", () -> {//調用分省接口獲取發票信息//如果發票信息異常,拋出異常(進入下次重試)//否則,插入用戶微信卡包
    }() -> {//輪詢次數用盡,用戶插入卡包失敗
    }
);

  針對電子發票插入微信卡包定時任務,重試執行次數10次,每隔5秒執行一次。任務完成之后結束定時任務,執行次數用盡之后觸發插入卡包失敗動作。

四、參考  

? ? ?Spring異步調用原理及SpringAop攔截器鏈原理

? ? ?Springboot定時任務原理及如何動態創建定時任務

轉載于:https://www.cnblogs.com/hujunzheng/p/10660479.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/531197.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/531197.shtml
英文地址,請注明出處:http://en.pswp.cn/news/531197.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Spring MVC源碼——Root WebApplicationContext

Spring MVC源碼——Root WebApplicationContext 打算開始讀一些框架的源碼,先拿 Spring MVC 練練手,歡迎點擊這里訪問我的源碼注釋, SpringMVC官方文檔一開始就給出了這樣的兩段示例: WebApplicationInitializer示例: public class MyWebApplicationInitializer implements Web…

Spring MVC源碼——Servlet WebApplicationContext

上一篇筆記(Spring MVC源碼——Root WebApplicationContext)中記錄了下 Root WebApplicationContext 的初始化代碼.這一篇來看 Servlet WebApplicationContext 的初始化代碼 DispatcherServlet 是另一個需要在 web.xml 中配置的類, Servlet WebApplicationContext 就由它來創建…

Springboot源碼——應用程序上下文分析

前兩篇(Spring MVC源碼——Root WebApplicationContext 和 Spring MVC源碼——Servlet WebApplicationContext)講述了springmvc項目創建上下文的過程&#xff0c;這一篇帶大家了解一下springboot項目創建上下文的過程。 SpringApplication引導類 SpringApplication類用于啟動或…

基于zookeeper實現分布式配置中心(一)

最近在學習zookeeper&#xff0c;發現zk真的是一個優秀的中間件。在分布式環境下&#xff0c;可以高效解決數據管理問題。在學習的過程中&#xff0c;要深入zk的工作原理&#xff0c;并根據其特性做一些簡單的分布式環境下數據管理工具。本文首先對zk的工作原理和相關概念做一下…

基于zookeeper實現分布式配置中心(二)

上一篇&#xff08;基于zookeeper實現分布式配置中心&#xff08;一&#xff09;&#xff09;講述了zookeeper相關概念和工作原理。接下來根據zookeeper的特性&#xff0c;簡單實現一個分布式配置中心。 配置中心的優勢 1、各環境配置集中管理。 2、配置更改&#xff0c;實時推…

Redis分布式鎖實戰

背景 目前開發過程中&#xff0c;按照公司規范&#xff0c;需要依賴框架中的緩存組件。不得不說&#xff0c;做組件的大牛對CRUD操作的封裝&#xff0c;連接池、緩存路由、緩存安全性的管控都處理的無可挑剔。但是有一個小問題&#xff0c;該組件沒有對分布式鎖做實現&#xff…

基于RobotFramework實現自動化測試

Java robotframework seleniumlibrary 使用Robot Framework Maven Plugin&#xff08;http://robotframework.org/MavenPlugin/&#xff09;執行自動化測試chromedriver下載&#xff1a; http://chromedriver.storage.googleapis.com/index.htmlchromedriver和chrome版本對應…

Springboot國際化信息(i18n)解析

國際化信息理解 國際化信息也稱為本地化信息 。 Java 通過 java.util.Locale 類來表示本地化對象&#xff0c;它通過 “語言類型” 和 “國家/地區” 來創建一個確定的本地化對象 。舉個例子吧&#xff0c;比如在發送一個具體的請求的時候&#xff0c;在header中設置一個鍵值對…

看了就知道為什么別人C語言學習效率那么高了

談及C語言&#xff0c;我想C語言功能強大都應該知道、應用廣泛&#xff0c;一旦掌握了后&#xff0c;你就可以理直氣壯地對他人說“我是電腦高手&#xff01;”&#xff0c;而且以后若是再自學其他語言就顯得輕而易舉了。憂慮的是&#xff0c;C語言般博大精深&#xff0c;太難學…

C語言一看就能上手的干貨!你確定你不來看嗎?

本地環境設置 如果您想要設置 C 語言環境&#xff0c;您需要確保電腦上有以下兩款可用的軟件&#xff0c;文本編輯器和 C 編譯器。 文本編輯器 這將用于輸入您的程序。文本編輯器包括 Windows Notepad、OS Edit command、Brief、Epsilon、EMACS 和 vim/vi。文本編輯器的名稱…

C語言爆炸干貨,小白你還不來看看嘛!

①&#xff1a;數據類型 int(整型)&#xff0c;short int(短整型)&#xff0c;long int(長整型)&#xff0c; char(字符型)&#xff0c;float&#xff08;單精度浮點型&#xff09; double&#xff08;雙精度浮點型&#xff09; C語言編程交流群815393895 ②&#xff1a;邏…

10萬碼農五年的C語言筆記!你現在知道別人為什么這么優秀了嗎?

c語言對許多同學來說確實是一門比較難學的課程&#xff0c;不僅抽象&#xff0c;而且繁瑣&#xff0c;但這又是一門不得不學的課程。前兩節可能還有興致聽一聽&#xff0c;然而&#xff0c;再過幾節課就是一臉蒙比。憑空要想出一道題的算法和程序&#xff0c;根本無從下手。 所…

C語言從來都沒有過時,你大爺終究是你大爺

直到今天&#xff0c;有人在喊C語言過時的語言&#xff0c;還有什么值得學習的&#xff0c;現在看Python&#xff0c;PHP等語言現在都很容易用&#xff0c;誰還在學習老C語言&#xff0c;其實這是真的嗎&#xff1f;作者下載了兩種語言的源代碼作為下載器。由于空間的限制&…

C語言超級瑪麗菜單模塊源碼

C語言是面向過程的&#xff0c;而C&#xff0b;&#xff0b;是面向對象的 C和C的區別&#xff1a; C是一個結構化語言&#xff0c;它的重點在于算法和…

C語言使用函數必須知道的3點注意事項!

C語言是面向過程的&#xff0c;而C&#xff0b;&#xff0b;是面向對象的 C和C的區別&#xff1a; C是一個結構化語言&#xff0c;它的重點在于算法和數據結構。C程序的設計首要考慮的是如何通過一個過程&#xff0c;對輸入&#xff08;或環境條件&#xff09;進行運算處理得…

C語言/C++編程學習:C語言環境設置!

C語言是面向過程的&#xff0c;而C&#xff0b;&#xff0b;是面向對象的 C和C的區別&#xff1a; C是一個結構化語言&#xff0c;它的重點在于算法和數據結構。C程序的設計首要考慮的是如何通過一個過程&#xff0c;對輸入&#xff08;或環境條件&#xff09;進行運算處理得…

C語言指針原來也可以這么的通俗易懂!

C語言是面向過程的&#xff0c;而C&#xff0b;&#xff0b;是面向對象的 C和C的區別&#xff1a; C是一個結構化語言&#xff0c;它的重點在于算法和數據結構。C程序的設計首要考慮的是如何通過一個過程&#xff0c;對輸入&#xff08;或環境條件&#xff09;進行運算處理得…

C語言過時了?你在做夢?

為什么要使用C語言&#xff1f; 在過去的四十年里&#xff0c;C語言已經成為世界上最流行、最重要的一種編程語言。 C是一種融合了控制特性的現代語言&#xff0c;而我們已發現在計算機科學的理論和實踐中&#xff0c;控制特性是很重要的。其設計使得用戶可以自然地采用自頂向…

C/C++的轉義字符詳解

所有的ASCII碼都可以用“\”加數字&#xff08;一般是8進制數字&#xff09;來表示。而C中定義了一些字母前加"\"來表示常見的那些不能顯示的ASCII字符&#xff0c;如\0,\t,\n等&#xff0c;就稱為轉義字符&#xff0c;因為后面的字符&#xff0c;都不是它本來的ASCI…

C語言深入理解!助你向大佬邁進!

Dennis Ritchie 過世了&#xff0c;他發明了C語言&#xff0c;一個影響深遠并徹底改變世界的計算機語言。一門經歷40多年的到今天還長盛不衰的語言&#xff0c;今天很多語言都受到C的影響&#xff0c;C&#xff0c;Java&#xff0c;C#&#xff0c;Perl&#xff0c; PHP&#xf…