手擼XXL-JOB(二)——定時任務管理

在上一節中,我們介紹了SpringBoot中關于定時任務的執行方式,以及ScheduledExecutorService接口提供的定時任務執行方法。假設我們現在要寫類似XXL-JOB這樣的任務調度平臺,那么,對于任務的管理,是尤為重要的。接下來我們將一步一步,實現一個任務調度管理類。

YangJobManager類基礎實現

假設我們現在的任務管理類,名為YangJobManager類。對于定時任務的執行,我們最終會調用到ScheduledExecutorService的相關方法,因此,我們的YangJobManager類,需要有ScheduledExecutorService屬性,其次,我們希望能對要執行的定時線程任務,其命名進行修改,因此,我們需要有一個線程工廠的屬性。基于上述兩點,我們對YangJobManager類進行實現:

package com.yang.job;import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;public class YangJobManager {private ScheduledExecutorService scheduledExecutorService;private ThreadFactory threadFactory;public YangJobManager(ScheduledExecutorService scheduledExecutorService, ThreadFactory threadFactory) {this.scheduledExecutorService = scheduledExecutorService;this.threadFactory = threadFactory;}public void schedule(Runnable runnable, Long delay) {Thread thread = threadFactory.newThread(runnable);scheduledExecutorService.schedule(thread, delay, TimeUnit.SECONDS);}public void scheduleWithFixedDelay(Runnable runnable, Long delay, Long period) {Thread thread = threadFactory.newThread(runnable);scheduledExecutorService.scheduleWithFixedDelay(thread, delay, period, TimeUnit.SECONDS);}public void scheduleWithFixedRate(Runnable runnable, Long delay, Long period) {Thread thread = threadFactory.newThread(runnable);scheduledExecutorService.scheduleAtFixedRate(thread, delay, period, TimeUnit.SECONDS);}public void shutdown() {if (this.scheduledExecutorService == null) {return;}if (this.scheduledExecutorService.isShutdown()) {return;}scheduledExecutorService.shutdown();try {if (!scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS)) {scheduledExecutorService.shutdownNow();}} catch (InterruptedException e) {e.printStackTrace();}}
}

然后,我們實現YangJobThreadFactory,完成對線程的命名

public class YangJobThreadFactory implements ThreadFactory {private String poolName;private String threadPrefixName;private static AtomicInteger poolNumber = new AtomicInteger(1);private AtomicInteger threadNumber = new AtomicInteger(1);public YangJobThreadFactory(String poolName) {this.poolName = poolName;this.threadPrefixName = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";}public String getPoolName() {return this.poolName;}@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName(this.threadPrefixName + threadNumber.getAndIncrement());return thread;}}

然后我們添加測試方法:

 public static void main(String[] args) {ThreadFactory threadFactory = new YangJobThreadFactory("yang");ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4, threadFactory);YangJobManager yangJobManager = new YangJobManager(scheduledExecutorService, threadFactory);yangJobManager.schedule(() -> {System.out.println(Thread.currentThread().getName() + "schedule定時任務開始執行:" + new Date());}, 1L);yangJobManager.scheduleWithFixedDelay(() -> {System.out.println(Thread.currentThread().getName() + "withFixedDelay定時任務開始執行:" + new Date());}, 0L, 1L);yangJobManager.scheduleWithFixedRate(() -> {System.out.println(Thread.currentThread().getName() + "withFixedRate定時任務開始執行:" + new Date());}, 0L, 1L);try {Thread.sleep(20000);} catch (InterruptedException e) {throw new RuntimeException(e);}yangJobManager.shutdown();}

執行結果如下:
image.png

提供統一的schedule方法

雖然我們能順利將任務提交給YangJobManager執行,當感覺還不夠收斂,因為我們創建了三個方法:schedule,scheduleWithFixedDelay, shceduleWithFixedRate,每個方法執行邏輯都差不多,最后都是調用scheduledExecutorService的相關方法,我們可以將這些方法都收斂到一個入口——schedule,然后在入參中添加一個參數,表示要執行的策略,根據入參的參數,選擇對應的方法執行。
首先,我們添加一個執行策略枚舉:

package com.yang.job.enums;public enum JobExecuteStrategyEnum {IMMEDIATE_EXECUTE("immediate", "立即執行"),ONCE("once", "執行一次"),WITH_FIXED_DELAY("withFixedDelay", "任務執行完畢后間隔執行"),WITH_FIXED_RATE("withFixedRate", "任務執行開始后間隔執行");private String name;private String description;JobExecuteStrategyEnum(String name, String description) {this.name = name;this.description = description;}public String getName() {return this.name;}public static JobExecuteStrategyEnum getJobExecuteStrategyByName(String name) {if (name == null) {return null;}for (JobExecuteStrategyEnum value : values()) {if (name.equals(value.getName())) {return value;}}return null;}public static boolean isLegal(String name) {JobExecuteStrategyEnum jobExecuteStrategyByName = getJobExecuteStrategyByName(name);return jobExecuteStrategyByName != null;}public String getDescription() {return description;}
}

然后添加YangJobManager的schedule方法的入參類:

package com.yang.job.request;import com.yang.job.enums.JobExecuteStrategyEnum;
import lombok.Data;import java.io.Serializable;@Data
public class YangJobSubmitParam implements Serializable {private Runnable runnable;private Integer initialDelay;private Integer period;private JobExecuteStrategyEnum jobExecuteStrategy;
}

最后,修改YangJobManager類,將執行定時任務收斂到schedule方法,進入該方法,首先根據入參判斷執行策略,如果是immediate,那么直接對入參的runnable調用run方法執行接口,其他的策略則分別對應scheduledExecutorService的schedule、scheduledWithFixedDelay、scheduledWithFixedRate方法,此外,這里對屬性也進行修改,去除ThreadFactory屬性。

package com.yang.job;import com.yang.job.enums.JobExecuteStrategyEnum;
import com.yang.job.request.YangJobSubmitParam;import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class YangJobManager {private ScheduledExecutorService scheduledExecutorService;public YangJobManager(ScheduledExecutorService scheduledExecutorService) {this.scheduledExecutorService = scheduledExecutorService;}public void schedule(YangJobSubmitParam yangJobSubmitParam) {JobExecuteStrategyEnum jobExecuteStrategy = yangJobSubmitParam.getJobExecuteStrategy();if (jobExecuteStrategy == null) {throw new RuntimeException("缺少執行策略=========");}Runnable runnable = yangJobSubmitParam.getRunnable();Integer initialDelay = yangJobSubmitParam.getInitialDelay();Integer period = yangJobSubmitParam.getPeriod();switch (jobExecuteStrategy) {case IMMEDIATE_EXECUTE:runnable.run();break;case ONCE:scheduledExecutorService.schedule(runnable, initialDelay, TimeUnit.SECONDS);break;case WITH_FIXED_DELAY:scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, period, TimeUnit.SECONDS);break;case WITH_FIXED_RATE:scheduledExecutorService.scheduleAtFixedRate(runnable, initialDelay, period, TimeUnit.SECONDS);break;}}public void shutdown() {if (this.scheduledExecutorService == null) {return;}if (this.scheduledExecutorService.isShutdown()) {return;}scheduledExecutorService.shutdown();try {if (!scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS)) {scheduledExecutorService.shutdownNow();}} catch (InterruptedException e) {e.printStackTrace();}}
}

最后,我們添加測試方法:

public static void main(String[] args) {ThreadFactory threadFactory = new YangJobThreadFactory("yang");ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4, threadFactory);YangJobManager yangJobManager = new YangJobManager(scheduledExecutorService);YangJobSubmitParam yangJobSubmitParam1 = new YangJobSubmitParam();yangJobSubmitParam1.setRunnable(() -> System.out.println("立即執行======" + new Date()));yangJobSubmitParam1.setJobExecuteStrategy(JobExecuteStrategyEnum.IMMEDIATE_EXECUTE);YangJobSubmitParam yangJobSubmitParam2 = new YangJobSubmitParam();yangJobSubmitParam2.setRunnable(() -> System.out.println("執行一次======" + new Date()));yangJobSubmitParam2.setInitialDelay(1);yangJobSubmitParam2.setJobExecuteStrategy(JobExecuteStrategyEnum.ONCE);YangJobSubmitParam yangJobSubmitParam3 = new YangJobSubmitParam();yangJobSubmitParam3.setRunnable(() -> System.out.println("withFixedDelay=====" + new Date()));yangJobSubmitParam3.setInitialDelay(1);yangJobSubmitParam3.setPeriod(2);yangJobSubmitParam3.setJobExecuteStrategy(JobExecuteStrategyEnum.WITH_FIXED_DELAY);YangJobSubmitParam yangJobSubmitParam4 = new YangJobSubmitParam();yangJobSubmitParam4.setRunnable(() -> System.out.println("withFixedRate=====" + new Date()));yangJobSubmitParam4.setInitialDelay(1);yangJobSubmitParam4.setPeriod(2);yangJobSubmitParam4.setJobExecuteStrategy(JobExecuteStrategyEnum.WITH_FIXED_RATE);yangJobManager.schedule(yangJobSubmitParam1);yangJobManager.schedule(yangJobSubmitParam2);yangJobManager.schedule(yangJobSubmitParam3);yangJobManager.schedule(yangJobSubmitParam4);try {Thread.sleep(20000);} catch (InterruptedException e) {throw new RuntimeException(e);}yangJobManager.shutdown();}

執行結果如下:
image.png

提交任務和取消任務

任務的提交對應的是schedule方法,但我們的YangJobManager類缺少了關于任務的取消邏輯。在ScheduledExecutorService的各個定時執行方法中,其返回值是一個ScheduleFuture類,我們可以通過該類的cancel方法,來將對應的線程任務進行取消。此外,對于每一個任務,我們需要有一個任務標識,所以,我們先修改YangJobSubmitParam類:

package com.yang.job.request;import com.yang.job.enums.JobExecuteStrategyEnum;
import lombok.Data;import java.io.Serializable;@Data
public class YangJobSubmitParam implements Serializable {private Integer jobId;private Runnable runnable;private Integer initialDelay;private Integer period;private JobExecuteStrategyEnum jobExecuteStrategy;
}

然后,我們修改YangJobManager類,首先將schedule方法改為submit方法,這樣更見名知義,在submit方法中,除了理解執行策略外,其他策略都會獲取返回的ScheduleFuture,然后存入對應的map,在取消的時候,我們根據jobId從map中找到對應的ScheduleFuture,并執行cancel方法,以此來取消任務。

package com.yang.job;import com.yang.job.enums.JobExecuteStrategyEnum;
import com.yang.job.request.YangJobSubmitParam;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;public class YangJobManager {private ScheduledExecutorService scheduledExecutorService;private Map<String, ScheduledFuture> jobId2ScheduleFutureMap = new ConcurrentHashMap<>();public YangJobManager(ScheduledExecutorService scheduledExecutorService) {this.scheduledExecutorService = scheduledExecutorService;}public void submitJob(YangJobSubmitParam yangJobSubmitParam) {Integer jobId = yangJobSubmitParam.getJobId();if (jobId == null) {throw new RuntimeException("缺少任務標識=========");}ScheduledFuture scheduledFuture = jobId2ScheduleFutureMap.get(jobId.toString());if (scheduledFuture != null && !scheduledFuture.isCancelled()) {// jobId存在對應的任務return;}JobExecuteStrategyEnum jobExecuteStrategy = yangJobSubmitParam.getJobExecuteStrategy();if (jobExecuteStrategy == null) {throw new RuntimeException("缺少執行策略=========");}if (jobExecuteStrategy == JobExecuteStrategyEnum.IMMEDIATE_EXECUTE) {yangJobSubmitParam.getRunnable().run();return;}scheduledFuture = scheduleJob(yangJobSubmitParam);jobId2ScheduleFutureMap.put(jobId.toString(), scheduledFuture);}public void cancelJob(Integer jobId) {if (jobId == null) {return;}ScheduledFuture scheduledFuture = jobId2ScheduleFutureMap.get(jobId.toString());if (scheduledFuture == null) {return;}if (!scheduledFuture.isCancelled()) {scheduledFuture.cancel(true);}jobId2ScheduleFutureMap.remove(jobId.toString());}private ScheduledFuture scheduleJob(YangJobSubmitParam yangJobSubmitParam) {Runnable runnable = yangJobSubmitParam.getRunnable();Integer initialDelay = yangJobSubmitParam.getInitialDelay();Integer period = yangJobSubmitParam.getPeriod();JobExecuteStrategyEnum jobExecuteStrategy = yangJobSubmitParam.getJobExecuteStrategy();switch (jobExecuteStrategy) {case ONCE:return scheduledExecutorService.schedule(runnable, initialDelay, TimeUnit.SECONDS);case WITH_FIXED_DELAY:return scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, period, TimeUnit.SECONDS);case WITH_FIXED_RATE:return scheduledExecutorService.scheduleAtFixedRate(runnable, initialDelay, period, TimeUnit.SECONDS);}throw new RuntimeException("執行策略有誤===========");}public void shutdown() {if (this.scheduledExecutorService == null) {return;}if (this.scheduledExecutorService.isShutdown()) {return;}scheduledExecutorService.shutdown();try {if (!scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS)) {scheduledExecutorService.shutdownNow();}} catch (InterruptedException e) {e.printStackTrace();}}
}

最后,我們添加對應的測試方法:

 public static void main(String[] args) {ThreadFactory threadFactory = new YangJobThreadFactory("yang");ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(4, threadFactory);YangJobManager yangJobManager = new YangJobManager(scheduledExecutorService);YangJobSubmitParam yangJobSubmitParam = new YangJobSubmitParam();yangJobSubmitParam.setJobId(1);yangJobSubmitParam.setRunnable(() -> System.out.println("執行任務=====" + new Date()));yangJobSubmitParam.setInitialDelay(0);yangJobSubmitParam.setPeriod(2);yangJobSubmitParam.setJobExecuteStrategy(JobExecuteStrategyEnum.WITH_FIXED_RATE);yangJobManager.submitJob(yangJobSubmitParam);try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("取消任務==========");yangJobManager.cancelJob(1);try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}yangJobManager.shutdown();}

在該方法中,我們提交任務,該任務間隔時間為2秒,10秒過后,取消任務,取消任務過后,再睡眠10秒,在后面10秒鐘,不會執行任務(或執行一次,因為在cancel之前剛好有任務沒執行完),執行結果如下:
image.png

YangJobManager建造者

對于YangJobManager,目前我們所擁有的屬性、方法都比較簡單,但是如果后續這個類進一步擴展,構造該類可能會變得很麻煩,因此,我們添加一個YangJobBuilder建造者類,用于構造YangJobManager,此外,我們將YangJobManager的構造方法設置為private,從而將構造YangJobManager的職責,徹底收斂到YangJobManagerBuilder類中,我們修改YangJobManager類如下:

package com.yang.job;import com.yang.job.enums.JobExecuteStrategyEnum;
import com.yang.job.factory.YangJobThreadFactory;
import com.yang.job.request.YangJobSubmitParam;import java.util.Map;
import java.util.concurrent.*;public class YangJobManager {private ScheduledExecutorService scheduledExecutorService;private Map<String, ScheduledFuture> jobId2ScheduleFutureMap = new ConcurrentHashMap<>();private YangJobManager(ScheduledExecutorService scheduledExecutorService) {this.scheduledExecutorService = scheduledExecutorService;}public void submitJob(YangJobSubmitParam yangJobSubmitParam) {Integer jobId = yangJobSubmitParam.getJobId();if (jobId == null) {throw new RuntimeException("缺少任務標識=========");}ScheduledFuture scheduledFuture = jobId2ScheduleFutureMap.get(jobId.toString());if (scheduledFuture != null && !scheduledFuture.isCancelled()) {// jobId存在對應的任務return;}JobExecuteStrategyEnum jobExecuteStrategy = yangJobSubmitParam.getJobExecuteStrategy();if (jobExecuteStrategy == null) {throw new RuntimeException("缺少執行策略=========");}if (jobExecuteStrategy == JobExecuteStrategyEnum.IMMEDIATE_EXECUTE) {yangJobSubmitParam.getRunnable().run();return;}scheduledFuture = scheduleJob(yangJobSubmitParam);jobId2ScheduleFutureMap.put(jobId.toString(), scheduledFuture);}public void cancelJob(Integer jobId) {if (jobId == null) {return;}ScheduledFuture scheduledFuture = jobId2ScheduleFutureMap.get(jobId.toString());if (scheduledFuture == null) {return;}if (!scheduledFuture.isCancelled()) {scheduledFuture.cancel(true);}jobId2ScheduleFutureMap.remove(jobId.toString());}private ScheduledFuture scheduleJob(YangJobSubmitParam yangJobSubmitParam) {Runnable runnable = yangJobSubmitParam.getRunnable();Integer initialDelay = yangJobSubmitParam.getInitialDelay();Integer period = yangJobSubmitParam.getPeriod();JobExecuteStrategyEnum jobExecuteStrategy = yangJobSubmitParam.getJobExecuteStrategy();switch (jobExecuteStrategy) {case ONCE:return scheduledExecutorService.schedule(runnable, initialDelay, TimeUnit.SECONDS);case WITH_FIXED_DELAY:return scheduledExecutorService.scheduleWithFixedDelay(runnable, initialDelay, period, TimeUnit.SECONDS);case WITH_FIXED_RATE:return scheduledExecutorService.scheduleAtFixedRate(runnable, initialDelay, period, TimeUnit.SECONDS);}throw new RuntimeException("執行策略有誤===========");}public void shutdown() {if (this.scheduledExecutorService == null) {return;}if (this.scheduledExecutorService.isShutdown()) {return;}scheduledExecutorService.shutdown();try {if (!scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS)) {scheduledExecutorService.shutdownNow();}} catch (InterruptedException e) {e.printStackTrace();}}public static class YangJobManagerBuilder {private ThreadFactory threadFactory;private ScheduledExecutorService scheduledExecutorService;public YangJobManagerBuilder() {}public YangJobManagerBuilder setThreadFactory(ThreadFactory threadFactory) {this.threadFactory = threadFactory;return this;}public YangJobManagerBuilder setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {this.scheduledExecutorService = scheduledExecutorService;return this;}public YangJobManager build() {if (this.threadFactory == null) {this.threadFactory = new YangJobThreadFactory("yang");}if (this.scheduledExecutorService == null) {this.scheduledExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(),this.threadFactory);} else {if (this.scheduledExecutorService instanceof ScheduledThreadPoolExecutor) {ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) this.scheduledExecutorService;scheduledThreadPoolExecutor.setThreadFactory(this.threadFactory);}}return new YangJobManager(this.scheduledExecutorService);}}
}

任務執行類

在之前的代碼中,我們的Runnable都是匿名函數類,但是在我們的定時任務調度平臺中,一般情況下,這個任務是會持久化到數據庫中的,我們一般不會說把這個Runnable的代碼也存到數據庫吧,一般存儲的,應該就是某個任務執行類的類路徑,和方法名,以及入參,然后在啟動項目時,從數據庫中加載這些數據,并通過反射或代理等方式,來構造這個Runnable。
首先,我們定義一個任務執行類,來規范任務的執行方法和入參格式:

// 任務執行類
package com.yang.job.execute;public interface IYangJobExecutor {void execute(YangJobExecuteRequest yangJobExecuteRequest);
}// 任務執行方法入參
package com.yang.job.execute;import lombok.Data;import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;@Data
public class YangJobExecuteRequest implements Serializable {private String jobId;private Map<String, String> params = new HashMap<>();public void addParam(String key, String value) {params.put(key, value);}public String getParam(String key) {return params.get(key);}
}

接著,我們創建這個YangJobExecutor的實現類,用于測試,在該類中,執行任務的方法很簡單,打印當前類的名字以及入參。

package com.yang.task;import com.yang.job.execute.IYangJobExecutor;
import com.yang.job.execute.YangJobExecuteRequest;import java.util.Date;public class TestJobExecutor implements IYangJobExecutor {@Overridepublic void execute(YangJobExecuteRequest yangJobExecuteRequest) {System.out.println(String.format("%s 任務執行類執行了,入參為:%s, 當前時間:%s",this.getClass().getName(), yangJobExecuteRequest.toString(),new Date().toString()));}
}

然后我們創建一個YangJobData,假設我們從數據庫中獲取的數據格式如下:

package com.yang.job.data;import lombok.Data;import java.io.Serializable;@Data
public class YangJobData implements Serializable {private Integer jobId;private String cron;private String executeStrategy;private String executeClassPath;private String executeParams;
}

executeStrategy表示任務的執行策略,executeClassPath表示要執行的任務類的路徑,executeParams表示執行任務方法的入參。
在XXL-JOB中,我們可以使用cron來設置定時任務的執行時間,因此我們這里,也使用cron作為定時任務的執行時間設置,為了解析cron表達式,我們添加下列依賴:

  <dependency><groupId>com.cronutils</groupId><artifactId>cron-utils</artifactId><version>9.2.0</version></dependency>

然后創建一個CronUtils工具類,用于解析cron表達式。

package com.yang.demo.infra.utils;import com.cronutils.model.CronType;
import com.cronutils.model.definition.CronDefinition;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser;import java.time.ZonedDateTime;
import java.util.Optional;public class CronUtils {private static final CronDefinition CRON_DEFINITION = CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ);private static final CronParser CRON_PARSER = new CronParser(CRON_DEFINITION);public static ZonedDateTime nextExecutionTime(String cron, ZonedDateTime startTime) {ExecutionTime executionTime = ExecutionTime.forCron(CRON_PARSER.parse(cron));Optional<ZonedDateTime> zonedDateTime = executionTime.nextExecution(startTime);return zonedDateTime.get();}
}

對于執行方法的入參,一般情況下,就是任務的id,以及一些擴展信息,這些擴展信息一般以鍵值對的形式存儲,即"key:value;key:value;"這些形式,所以這里添加一個FeaturesUtils類,用于解析這些鍵值對信息:

package com.yang.job.utils;import java.util.HashMap;
import java.util.Map;public class FeaturesUtils {private final static String KEY_KEY_SEPARATOR = ";";private final static String KEY_VALUE_SEPARATOR = ":";public static Map<String, String> convert2FeatureMap(String features) {Map<String, String> featureMap = new HashMap<>();if (features == null || features.isEmpty()) {return featureMap;}String[] keyValues = features.split(KEY_KEY_SEPARATOR);for (String keyValue : keyValues) {String[] split = keyValue.split(KEY_VALUE_SEPARATOR);String key = split[0];String value = split[1];featureMap.put(key, value);}return featureMap;}public static String convert2Features(Map<String, String> featureMap) {if (featureMap == null || featureMap.isEmpty()) {return "";}StringBuilder stringBuilder = new StringBuilder();featureMap.forEach((key, value) -> {stringBuilder.append(key).append(KEY_VALUE_SEPARATOR).append(value).append(KEY_KEY_SEPARATOR);});return stringBuilder.toString();}
}

然后我們添加測試方法,模擬從數據庫中獲取數據,并根據任務類路徑,獲取對應的runnable并提交到YangJobManager中。

  public static void main(String[] args) {YangJobData yangJobData = mockYangJobData();YangJobSubmitParam yangJobSubmitParam = convert2YangJobSubmitParam(yangJobData);YangJobManager yangJobManager = new YangJobManager.YangJobManagerBuilder().setThreadFactory(new YangJobThreadFactory("yang")).build();yangJobManager.submitJob(yangJobSubmitParam);try {Thread.sleep(20000);} catch (InterruptedException e) {throw new RuntimeException(e);}yangJobManager.shutdown();}private static YangJobSubmitParam convert2YangJobSubmitParam(YangJobData yangJobData) {YangJobSubmitParam yangJobSubmitParam = new YangJobSubmitParam();yangJobSubmitParam.setJobId(yangJobData.getJobId());yangJobSubmitParam.setJobExecuteStrategy(JobExecuteStrategyEnum.getJobExecuteStrategyByName(yangJobData.getExecuteStrategy()));ZonedDateTime nextExecutionTime = CronUtils.nextExecutionTime(yangJobData.getCron(), ZonedDateTime.now());ZonedDateTime nextNextExecutionTime = CronUtils.nextExecutionTime(yangJobData.getCron(), nextExecutionTime);long nowEochMill = ZonedDateTime.now().toInstant().toEpochMilli();long executeEochMill = nextExecutionTime.toInstant().toEpochMilli();long secondExecuteEochMill = nextNextExecutionTime.toInstant().toEpochMilli();yangJobSubmitParam.setInitialDelay((int)(executeEochMill - nowEochMill) / 1000);yangJobSubmitParam.setPeriod((int)(secondExecuteEochMill - executeEochMill) / 1000);try {Class<?> aClass = Class.forName(yangJobData.getExecuteClassPath());if (!IYangJobExecutor.class.isAssignableFrom(aClass)) {throw new RuntimeException("任務類必須實現IYangJobExecutor接口");}IYangJobExecutor executor = (IYangJobExecutor) aClass.newInstance();YangJobExecuteRequest yangJobExecuteRequest = convert2YangJobExecuteRequest(yangJobData);Runnable runnable = () -> executor.execute(yangJobExecuteRequest);yangJobSubmitParam.setRunnable(runnable);} catch (InstantiationException | IllegalAccessException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();}return yangJobSubmitParam;}private static YangJobExecuteRequest convert2YangJobExecuteRequest(YangJobData yangJobData) {YangJobExecuteRequest yangJobExecuteRequest = new YangJobExecuteRequest();yangJobExecuteRequest.setJobId(yangJobData.getJobId().toString());yangJobExecuteRequest.setParams(FeaturesUtils.convert2FeatureMap(yangJobData.getExecuteParams()));return yangJobExecuteRequest;}private static YangJobData mockYangJobData() {YangJobData yangJobData = new YangJobData();yangJobData.setJobId(1);yangJobData.setCron("0/5 * * * * ?");yangJobData.setExecuteStrategy(JobExecuteStrategyEnum.WITH_FIXED_DELAY.getName());yangJobData.setExecuteClassPath("com.yang.task.TestJobExecutor");yangJobData.setExecuteParams("jobId:1;startIndex:1;endIndex:10;");return yangJobData;}

這里對于cron的解析,其實不是特別好,這里的思路是,獲取下一次執行的時間,和下下一次執行的時間,然后以此來計算initialDelay和period,但是如果這個cron表示的是某幾天、某幾個小時,比如說星期一、星期二、星期三執行,那么我們那種解析方式是有誤的,這個可以后續再好好斟酌一下,目前先這樣解析。
執行結果如下:
image.png

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/12363.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/12363.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/12363.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

最新Linux Debian12安裝和使用ImageMagick圖像處理工具 常見圖片png、jpg格式轉webp格式

在Linux系統中&#xff0c;使用ImageMagick可以圖片格式轉換&#xff0c;其中最常用的是通過命令行工具進行。 ImageMagick是一個非常強大的圖像處理工具集&#xff0c;它包含了許多用于圖像轉換的命令。 一、安裝ImageMagick&#xff08;如果尚未安裝&#xff09;&#xff1…

在線音樂系統

文章目錄 在線音樂系統一、項目演示二、項目介紹三、部分功能截圖四、部分代碼展示五、底部獲取項目&#xff08;9.9&#xffe5;帶走&#xff09; 在線音樂系統 一、項目演示 音樂網站 二、項目介紹 基于springbootvue的前后端分離在線音樂系統 登錄角色 : 用戶、管理員 用…

外文文獻查找以及下載渠道

尋找外文文獻的渠道有很多種&#xff1a; 學術數據庫和期刊網站&#xff1a;像PubMed、IEEE Xplore、ScienceDirect等學術數據庫和期刊網站是獲取外文文獻的主要渠道之一。這些平臺通常提供了廣泛的學術資源&#xff0c;包括期刊文章、會議論文等。 學術搜索引擎&#xff1a;…

Git 的原理與使用(中)

Git 的原理與使用&#xff08;上&#xff09;中介紹了Git初識&#xff0c;Git的安裝與初始化以及工作區、暫存區、版本庫相關的概念與操作&#xff0c;本文接著上篇的內容&#xff0c;繼續深入介紹Git在的分支管理與遠程操作方面的應用。 目錄 五、分支管理 1.理解分支 2.創…

java約拍攝影小程序

獲取源碼配套資料論文等、問題解答&#xff0c;可以加華神扣扣&#xff1a;3753599439 扣扣&#xff1a;1590404240 叩叩&#xff1a;1306749621

Java窗口函數框架JDFrame

1、簡介 在上一節中已經介紹過 JDFrame&#xff0c;文章鏈接stream流太難用了看看JDFrame 沒看過的朋友可以先看看&#xff0c; 這次主要講講窗口函數相關API的使用 在各種數據庫mysql&#xff0c; hive、spark中都有非常好用的開窗函數使用&#xff0c; 但是java卻沒好用的J…

數據結構與算法學習筆記十---鏈隊列的表示和實現(C語言)

目錄 前言 1.什么是鏈隊 2.鏈隊的表示和實現 1.定義 2.初始化 3.銷毀 4.清空 5.空隊列 6.隊列長度 7.獲取隊頭 8.入隊 9.出隊 10.遍歷隊列 11.完整代碼 前言 本篇博客介紹鏈棧隊列的表示和實現。 1.什么是鏈隊 鏈隊是采用鏈式存儲結構實現的隊列。通常鏈隊使用單…

【知識拓展】大白話說清楚:IP地址、子網掩碼、網關、DNS等

前言 工作中常聽別人說的本地網絡是什么意思&#xff1f;同一網段又是什么意思&#xff1f;它倆有關系嗎&#xff1f; 在工作中內經常會遇到相關的網絡問題&#xff0c;涉及網絡通信中一些常見的詞匯&#xff0c;如IP地址、子網掩碼、網關和DNS等。具體一點&#xff1a;經常會…

申請免費的必應搜索API

申請免費的必應搜索API 文章目錄 申請免費的必應搜索API前言一、原理1.1 登錄1.2 進入1.3 獲取密鑰1.4 申請VISA信用卡1.5 創建必應自定義搜索資源 二、創建成功 前言 準備條件&#xff1a; 1、outlook郵箱 2、招商銀行全幣種VISA信用卡【建議之前就有一張招商銀行信用卡&…

【opencv】圖像拼接實驗

實驗環境&#xff1a;anaconda、jupyter notebook 實驗用到的包&#xff1a;opencv、matplotlib、numpy 注&#xff1a;opencv在3.4.2之后sift就不是免費的了 我用的是3.4.1.15版本 實驗使用到的圖片 一、sift函數獲取特征值 讀入圖片 book cv2.imread(book.png, cv2.IMRE…

【極簡】如何估算大模型inference所需的內存量

1字節8bit 16float2字節 模型后面的xxb的單位是字節。 1b 字節≈ 0.93G&#xff0c;這個是以8bit運行&#xff0c;4bit減半&#xff0c;16bit&#xff08;float&#xff09;加倍&#xff0c;32bit&#xff08;double&#xff09;炒雞加倍。 剩下的是小頭&#xff0c;需要參數計…

蘋果macOS無法給App麥克風授權解決辦法

好久沒有在電腦上錄制課程了&#xff0c;有些東西還是錄下來記憶深刻&#xff0c;卻意外發現MAC系統升級后無法授權給第三方的App使用攝像頭和麥克風&#xff0c;而錄屏軟件是需要開啟麥克風和攝像頭才能錄制屏幕上的操作和聲音&#xff0c;官方提示在第三方APP若有使用攝像頭和…

css的4種導入方式

熟悉CSS樣式4種的引用方式&#xff0c;分別為行內式、內嵌式、鏈入式和導入式。 行內式 <標簽名 style"屬性1:屬性值1;屬性2:屬性值2;屬性3:屬性值3;">內容</ 標簽名>style是標簽的屬性&#xff0c;實際上任何HTML標簽都擁有style屬性&#xff0c;用來…

pyqt QComboBox下拉列表框控件

pyqt QComboBox下拉列表框控件 QComboBox效果代碼 QComboBox QComboBox 是 PyQt&#xff08;中的一個控件&#xff0c;它允許用戶從下拉列表中選擇一個選項。這個控件在需要用戶從預定義選項中進行選擇時非常有用。 效果 代碼 import sys from PyQt5.QtWidgets import QAppl…

vite創建的項目使用rem適配

下面以創建vue3.0 項目為例&#xff1a; npm init vitelatest “名稱” 選擇vue &#xff08;選擇你所對應的語言&#xff09; 更具提示步驟執行 cd xxx npm i npm run dev 然后再項目中使用 rem 需要安裝插件 第一步安裝插件 npm i amfe-flexible npm i postcss-pxtorem 第二…

CS144 Checkpoint 4: interoperating in the world(2024)

分析網絡路徑和性能&#xff1a; mtr命令 mtr 輸出的詳細分析&#xff1a; mtr 162.105.253.58 命令用于結合 traceroute 和 ping 的功能&#xff0c;實時監測并分析從你的計算機到目標主機&#xff08;IP 地址 162.105.253.58&#xff0c;北京大學計算中心&#xff09;之間…

Nginx配置Referer防盜鏈

系列文章目錄 文章目錄 系列文章目錄前言 前言 前些天發現了一個巨牛的人工智能學習網站&#xff0c;通俗易懂&#xff0c;風趣幽默&#xff0c;忍不住分享一下給大家。點擊跳轉到網站&#xff0c;這篇文章男女通用&#xff0c;看懂了就去分享給你的碼吧。 HTTP Referer是Hea…

PBOOTCMS|URL靜態制作教程(已解答)

0、先解壓源碼文件&#xff0c;在覆蓋靜態文件&#xff0c;全部點是。 打開程序后臺登錄地址www.xxx.com(你的域名)/admin.php/Menu/index 打開程序后臺--系統菜單--菜單新增&#xff08;清理緩存后重新登錄賬號&#xff09; &#xff08;選擇父菜單&#xff0c;菜單名稱&#…

ROS2+TurtleBot3+Cartographer+Nav2實現slam建圖和導航

0 引言 入門機器人最常見的應用就是slam建圖和導航&#xff0c;本文將詳細介紹這一流程&#xff0c; 便于初學這快速上手。 首先對需要用到的軟件包就行簡單介紹。 turtlebot3: 是一個小型的&#xff0c;基于ros的移動機器人。 學習機器人的很多示例程序都是基于turtlebot3。 …

【Java基礎】枚舉類的方法及應用

如何實現讓一個類有固定個數的對象 手動封裝構造方法&#xff08;private&#xff09; → 創建靜態對象 → final修飾靜態對象&#xff0c;使其成為常量 class Season { //枚舉類public final static Season SPRING new Season();public final static Season SUMMER new Se…