文章目錄
- 前言
- 一、JobTriggerPoolHelper 作用:
- 二、JobTriggerPoolHelper 源碼介紹:
- 2.1. start() 方法:
- 2.2 任務觸發:
- 2.3 XxlJobTrigger.trigger 任務執行:
- 2.4 processTrigger 任務的執行:
- 2.5 runExecutor 任務的執行:
- 總結
前言
本文對 JobTriggerPoolHelper 的工作進行介紹;
一、JobTriggerPoolHelper 作用:
JobTriggerPoolHelper 創建兩個線程池ThreadPoolExecutor,當任務被觸發需要執行時,ThreadPoolExecutor 負責進行執行任務;
二、JobTriggerPoolHelper 源碼介紹:
2.1. start() 方法:
// 創建日志對象private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);// ---------------------- trigger pool ----------------------// fast/slow thread pool 快慢線程池創建 區別是線程池最大線程的數量不一樣
private ThreadPoolExecutor fastTriggerPool = null;
private ThreadPoolExecutor slowTriggerPool = null;public void start(){// 快速線程池 的最大線程數量 xxl.job.triggerpool.fast.max 默認值 200fastTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(1000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());}});// 慢速線程池 的最大線程數量 xxl.job.triggerpool.slow.max 默認值 100slowTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());}});
}
// 項目停止 關閉線程池釋放資源
public void stop() {//triggerPool.shutdown();fastTriggerPool.shutdownNow();slowTriggerPool.shutdownNow();logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");
}
2.2 任務觸發:
/**
* add trigger 執行任務
*/
public void addTrigger(final int jobId,final TriggerTypeEnum triggerType,final int failRetryCount,final String executorShardingParam,final String executorParam,final String addressList) {// choose thread pool// 快慢線程池 的選擇 1分鐘 內同一個任務(jobId 相同),超過10次 執行的時間超過 500ms 就放入到慢線程池中執行ThreadPoolExecutor triggerPool_ = fastTriggerPool;AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 mintriggerPool_ = slowTriggerPool;}// trigger 線程池 任務的執行triggerPool_.execute(new Runnable() {@Overridepublic void run() {// 記錄開始時間,方便統計 本次job 執行的耗時 long start = System.currentTimeMillis();try {// do trigger 執行任務XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);} catch (Exception e) {logger.error(e.getMessage(), e);} finally {// check timeout-count-maplong minTim_now = System.currentTimeMillis()/60000;if (minTim != minTim_now) {// 如果不是同一分鐘,則重新開始計數minTim = minTim_now;jobTimeoutCountMap.clear();}// incr timeout-count-maplong cost = System.currentTimeMillis()-start;if (cost > 500) { // ob-timeout threshold 500ms// 大于 500ms 任務超時 進行計數AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));if (timeoutCount != null) {timeoutCount.incrementAndGet();}}}}});
}// ---------------------- helper ----------------------private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();public static void toStart() {helper.start();
}
public static void toStop() {helper.stop();
}/**
* @param jobId
* @param triggerType
* @param failRetryCount
* >=0: use this param
* <0: use param from job info config
* @param executorShardingParam
* @param executorParam
* null: use job param
* not null: cover job param
* 觸發任務的執行 調用本類中的 addTrigger
*/
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
}
2.3 XxlJobTrigger.trigger 任務執行:
/*** trigger job** @param jobId* @param triggerType* @param failRetryCount* >=0: use this param* <0: use param from job info config* @param executorShardingParam* @param executorParam* null: use job param* not null: cover job param* @param addressList* null: use executor addressList* not null: cover*/
public static void trigger(int jobId,TriggerTypeEnum triggerType,int failRetryCount,String executorShardingParam,String executorParam,String addressList) {// load data// 根據job id 從 xxl_job_info 獲取job /*** SELECT <include refid="Base_Column_List" />FROM xxl_job_info AS tWHERE t.id = #{id}**/XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);if (jobInfo == null) {logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);return;}// 執行器,任務參數if (executorParam != null) {jobInfo.setExecutorParam(executorParam);}// 改任務設置的 任務執行失敗次數,如果到時間正常觸發 則failRetryCount 為-1 ;// 如果是失敗的補充,則為改任務剩余補償次數// 到時間第一次觸發任務 則獲取xxl_job_info executor_fail_retry_count 設置的重試次數int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();// 獲取改任務對應的執行器信息/***SELECT <include refid="Base_Column_List" />FROM xxl_job_group AS tWHERE t.id = #{id}**/XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());// cover addressList// 執行器地址覆蓋,如果重新傳入了執行器地址 則進行覆蓋;否則取 XxlJobGroup 的執行器地址if (addressList!=null && addressList.trim().length()>0) {// 執行器地址類型:0=自動注冊、1=手動錄入group.setAddressType(1);group.setAddressList(addressList.trim());}// sharding param// 執行器任務分片參數 格式如 1/2int[] shardingParam = null;if (executorShardingParam!=null){String[] shardingArr = executorShardingParam.split("/");if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {shardingParam = new int[2];shardingParam[0] = Integer.valueOf(shardingArr[0]);shardingParam[1] = Integer.valueOf(shardingArr[1]);}}// 判斷路由策略if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()&& shardingParam==null) {// 如果是: 分片廣播,則需要為注冊的每個執行器都 發送任務for (int i = 0; i < group.getRegistryList().size(); i++) {processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());}} else {if (shardingParam == null) {shardingParam = new int[]{0, 1};}// 非分片廣播 ,則直接執行任務processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);}}
2.4 processTrigger 任務的執行:
/**
* @param group job group, registry list may be empty* @param jobInfo* @param finalFailRetryCount* @param triggerType* @param index sharding index* @param total sharding index*/
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){// param// 獲取阻塞處理策略 ,如果從任務中的 阻塞處理策略 不在ExecutorBlockStrategyEnum 枚舉中,則默認為 單機率行// "SERIAL EXECUTION":單機率行; "DISCARD LATER":云棄后續調度;"COVER EARLY":獲蓋之前調度ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy// 獲取任務路由策略 ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategyString shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;// 1、save log-id 記錄log 日志XxlJobLog jobLog = new XxlJobLog();jobLog.setJobGroup(jobInfo.getJobGroup());jobLog.setJobId(jobInfo.getId());jobLog.setTriggerTime(new Date());XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());// 2、init trigger-param 創建任務執行器的 triggerParam 對象TriggerParam triggerParam = new TriggerParam();triggerParam.setJobId(jobInfo.getId());triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());triggerParam.setExecutorParams(jobInfo.getExecutorParam());triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());triggerParam.setLogId(jobLog.getId());triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());triggerParam.setGlueType(jobInfo.getGlueType());triggerParam.setGlueSource(jobInfo.getGlueSource());triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());triggerParam.setBroadcastIndex(index);triggerParam.setBroadcastTotal(total);// 3、init address 根據任務的路由策略,從執行器地址中,選擇出一個執行器地址String address = null;ReturnT<String> routeAddressResult = null;if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {if (index < group.getRegistryList().size()) {address = group.getRegistryList().get(index);} else {address = group.getRegistryList().get(0);}} else {routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {address = routeAddressResult.getContent();}}} else {routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));}// 4、trigger remote executor 獲取到執行器地址,runExecutor 執行任務ReturnT<String> triggerResult = null;if (address != null) {triggerResult = runExecutor(triggerParam, address);} else {triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);}// 5、collection trigger infoStringBuffer triggerMsgSb = new StringBuffer();triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":").append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());if (shardingParam != null) {triggerMsgSb.append("("+shardingParam+")");}triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>").append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");// 6、save log trigger-info 更新日志信息jobLog.setExecutorAddress(address);jobLog.setExecutorHandler(jobInfo.getExecutorHandler());jobLog.setExecutorParam(jobInfo.getExecutorParam());jobLog.setExecutorShardingParam(shardingParam);jobLog.setExecutorFailRetryCount(finalFailRetryCount);//jobLog.setTriggerTime();jobLog.setTriggerCode(triggerResult.getCode());jobLog.setTriggerMsg(triggerMsgSb.toString());XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
2.5 runExecutor 任務的執行:
/**
* run executor* @param triggerParam* @param address* @return*/
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){ReturnT<String> runResult = null;try {// 根據執行器的地址,獲取包裝執行器的接口對象ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);// 遠程rpc 調用 執行器端,進行任務的執行runResult = executorBiz.run(triggerParam);} catch (Exception e) {logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));}// 返滬執行的信息StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");runResultSB.append("<br>address:").append(address);runResultSB.append("<br>code:").append(runResult.getCode());runResultSB.append("<br>msg:").append(runResult.getMsg());runResult.setMsg(runResultSB.toString());return runResult;
}
executorBiz.run 任務的執行:
@Override
public ReturnT<String> run(TriggerParam triggerParam) {return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
總結
本文對 JobTriggerPoolHelper 的工作內容進行介紹。