import com.lancoo.common.utils.Threads;
import com.lancoo.common.utils.spring.SpringUtils;import java.util.TimerTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;/*** 異步任務管理器* * @author lancoo*/
public class AsyncManager
{/*** 操作延遲10毫秒*/private final int OPERATE_DELAY_TIME = 10;/*** 異步操作任務調度線程池*/private ScheduledExecutorService executor = SpringUtils.getBean("scheduledExecutorService");/*** 單例模式*/private AsyncManager(){}private static AsyncManager me = new AsyncManager();public static AsyncManager me(){return me;}/*** 執行任務* * @param task 任務*/public void execute(TimerTask task){executor.schedule(task, OPERATE_DELAY_TIME, TimeUnit.MILLISECONDS);}/*** 停止任務線程池*/public void shutdown(){Threads.shutdownAndAwaitTermination(executor);}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.*;/*** 線程相關工具類.* * @author lancoo*/
public class Threads
{private static final Logger logger = LoggerFactory.getLogger(Threads.class);/*** sleep等待,單位為毫秒*/public static void sleep(long milliseconds){try{Thread.sleep(milliseconds);}catch (InterruptedException e){return;}}/*** 停止線程池* 先使用shutdown, 停止接收新任務并嘗試完成所有已存在任務.* 如果超時, 則調用shutdownNow, 取消在workQueue中Pending的任務,并中斷所有阻塞函數.* 如果仍然超時,則強制退出.* 另對在shutdown時線程本身被調用中斷做了處理.*/public static void shutdownAndAwaitTermination(ExecutorService pool){if (pool != null && !pool.isShutdown()){pool.shutdown();try{if (!pool.awaitTermination(120, TimeUnit.SECONDS)){pool.shutdownNow();if (!pool.awaitTermination(120, TimeUnit.SECONDS)){logger.info("Pool did not terminate");}}}catch (InterruptedException ie){pool.shutdownNow();Thread.currentThread().interrupt();}}}/*** 打印線程異常信息*/public static void printException(Runnable r, Throwable t){if (t == null && r instanceof Future<?>){try{Future<?> future = (Future<?>) r;if (future.isDone()){future.get();}}catch (CancellationException ce){t = ce;}catch (ExecutionException ee){t = ee.getCause();}catch (InterruptedException ie){Thread.currentThread().interrupt();}}if (t != null){logger.error(t.getMessage(), t);}}
}
import com.lancoo.common.utils.Threads;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;/*** 線程池配置***/
@Configuration
public class ThreadPoolConfig
{// 核心線程池大小private int corePoolSize = 50;// 最大可創建的線程數private int maxPoolSize = 200;// 隊列最大長度private int queueCapacity = 1000;// 線程池維護線程所允許的空閑時間private int keepAliveSeconds = 300;@Bean(name = "threadPoolTaskExecutor")public ThreadPoolTaskExecutor threadPoolTaskExecutor(){ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setMaxPoolSize(maxPoolSize);executor.setCorePoolSize(corePoolSize);executor.setQueueCapacity(queueCapacity);executor.setKeepAliveSeconds(keepAliveSeconds);// 線程池對拒絕任務(無線程可用)的處理策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());return executor;}/*** 執行周期性或定時任務*/@Bean(name = "scheduledExecutorService")protected ScheduledExecutorService scheduledExecutorService(){return new ScheduledThreadPoolExecutor(corePoolSize,new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build(),new ThreadPoolExecutor.CallerRunsPolicy()){@Overrideprotected void afterExecute(Runnable r, Throwable t){super.afterExecute(r, t);Threads.printException(r, t);}};}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;/*** 確保應用退出時能關閉后臺線程** @author lancoo*/
@Component
public class ShutdownManager
{private static final Logger logger = LoggerFactory.getLogger("sys-user");@PreDestroypublic void destroy(){shutdownAsyncManager();}/*** 停止異步執行任務*/private void shutdownAsyncManager(){try{logger.info("====關閉后臺任務任務線程池====");AsyncManager.me().shutdown();}catch (Exception e){logger.error(e.getMessage(), e);}}
}
調用異步時直接使用
或者直接在方法上使用異步注解@Async