原理篇-- 定時任務xxl-job-服務端(admin)項目啟動過程--JobTriggerPoolHelper 初始化 (3)

文章目錄

  • 前言
  • 一、JobTriggerPoolHelper 作用:
  • 二、JobTriggerPoolHelper 源碼介紹:
    • 2.1. start() 方法:
    • 2.2 任務觸發:
    • 2.3 XxlJobTrigger.trigger 任務執行:
    • 2.4 processTrigger 任務的執行:
    • 2.5 runExecutor 任務的執行:
  • 總結


前言

本文對 JobTriggerPoolHelper 的工作進行介紹;


一、JobTriggerPoolHelper 作用:

JobTriggerPoolHelper 創建兩個線程池ThreadPoolExecutor,當任務被觸發需要執行時,ThreadPoolExecutor 負責進行執行任務;

二、JobTriggerPoolHelper 源碼介紹:

2.1. start() 方法:

// 創建日志對象private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);// ---------------------- trigger pool ----------------------// fast/slow thread pool 快慢線程池創建 區別是線程池最大線程的數量不一樣
private ThreadPoolExecutor fastTriggerPool = null;
private ThreadPoolExecutor slowTriggerPool = null;public void start(){// 快速線程池 的最大線程數量  xxl.job.triggerpool.fast.max  默認值 200fastTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(1000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());}});// 慢速線程池 的最大線程數量 xxl.job.triggerpool.slow.max  默認值 100slowTriggerPool = new ThreadPoolExecutor(10,XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());}});
}
// 項目停止 關閉線程池釋放資源
public void stop() {//triggerPool.shutdown();fastTriggerPool.shutdownNow();slowTriggerPool.shutdownNow();logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");
}

2.2 任務觸發:

/**
* add trigger  執行任務
*/
public void addTrigger(final int jobId,final TriggerTypeEnum triggerType,final int failRetryCount,final String executorShardingParam,final String executorParam,final String addressList) {// choose thread pool// 快慢線程池 的選擇 1分鐘 內同一個任務(jobId 相同),超過10次 執行的時間超過 500ms 就放入到慢線程池中執行ThreadPoolExecutor triggerPool_ = fastTriggerPool;AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 mintriggerPool_ = slowTriggerPool;}// trigger 線程池 任務的執行triggerPool_.execute(new Runnable() {@Overridepublic void run() {// 記錄開始時間,方便統計 本次job 執行的耗時 long start = System.currentTimeMillis();try {// do trigger  執行任務XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);} catch (Exception e) {logger.error(e.getMessage(), e);} finally {// check timeout-count-maplong minTim_now = System.currentTimeMillis()/60000;if (minTim != minTim_now) {// 如果不是同一分鐘,則重新開始計數minTim = minTim_now;jobTimeoutCountMap.clear();}// incr timeout-count-maplong cost = System.currentTimeMillis()-start;if (cost > 500) {       // ob-timeout threshold 500ms// 大于 500ms 任務超時 進行計數AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));if (timeoutCount != null) {timeoutCount.incrementAndGet();}}}}});
}// ---------------------- helper ----------------------private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();public static void toStart() {helper.start();
}
public static void toStop() {helper.stop();
}/**
* @param jobId
* @param triggerType
* @param failRetryCount
* 			>=0: use this param
* 			<0: use param from job info config
* @param executorShardingParam
* @param executorParam
*          null: use job param
*          not null: cover job param
*  觸發任務的執行 調用本類中的 addTrigger
*/
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
}

2.3 XxlJobTrigger.trigger 任務執行:

/*** trigger job** @param jobId* @param triggerType* @param failRetryCount* 			>=0: use this param* 			<0: use param from job info config* @param executorShardingParam* @param executorParam*          null: use job param*          not null: cover job param* @param addressList*          null: use executor addressList*          not null: cover*/
public static void trigger(int jobId,TriggerTypeEnum triggerType,int failRetryCount,String executorShardingParam,String executorParam,String addressList) {// load data// 根據job id 從 xxl_job_info 獲取job /*** SELECT <include refid="Base_Column_List" />FROM xxl_job_info AS tWHERE t.id = #{id}**/XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);if (jobInfo == null) {logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);return;}//  執行器,任務參數if (executorParam != null) {jobInfo.setExecutorParam(executorParam);}// 改任務設置的 任務執行失敗次數,如果到時間正常觸發 則failRetryCount 為-1 ;// 如果是失敗的補充,則為改任務剩余補償次數// 到時間第一次觸發任務 則獲取xxl_job_info  executor_fail_retry_count 設置的重試次數int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();// 獲取改任務對應的執行器信息/***SELECT <include refid="Base_Column_List" />FROM xxl_job_group AS tWHERE t.id = #{id}**/XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());// cover addressList// 執行器地址覆蓋,如果重新傳入了執行器地址 則進行覆蓋;否則取 XxlJobGroup  的執行器地址if (addressList!=null && addressList.trim().length()>0) {//  執行器地址類型:0=自動注冊、1=手動錄入group.setAddressType(1);group.setAddressList(addressList.trim());}// sharding param// 執行器任務分片參數 格式如 1/2int[] shardingParam = null;if (executorShardingParam!=null){String[] shardingArr = executorShardingParam.split("/");if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {shardingParam = new int[2];shardingParam[0] = Integer.valueOf(shardingArr[0]);shardingParam[1] = Integer.valueOf(shardingArr[1]);}}// 判斷路由策略if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()&& shardingParam==null) {// 如果是: 分片廣播,則需要為注冊的每個執行器都 發送任務for (int i = 0; i < group.getRegistryList().size(); i++) {processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());}} else {if (shardingParam == null) {shardingParam = new int[]{0, 1};}// 非分片廣播 ,則直接執行任務processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);}}

2.4 processTrigger 任務的執行:

 /**
* @param group                     job group, registry list may be empty* @param jobInfo* @param finalFailRetryCount* @param triggerType* @param index                     sharding index* @param total                     sharding index*/
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){// param// 獲取阻塞處理策略 ,如果從任務中的 阻塞處理策略 不在ExecutorBlockStrategyEnum 枚舉中,則默認為 單機率行// "SERIAL EXECUTION":單機率行; "DISCARD LATER":云棄后續調度;"COVER EARLY":獲蓋之前調度ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy// 獲取任務路由策略 ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategyString shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;// 1、save log-id 記錄log 日志XxlJobLog jobLog = new XxlJobLog();jobLog.setJobGroup(jobInfo.getJobGroup());jobLog.setJobId(jobInfo.getId());jobLog.setTriggerTime(new Date());XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());// 2、init trigger-param  創建任務執行器的 triggerParam 對象TriggerParam triggerParam = new TriggerParam();triggerParam.setJobId(jobInfo.getId());triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());triggerParam.setExecutorParams(jobInfo.getExecutorParam());triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());triggerParam.setLogId(jobLog.getId());triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());triggerParam.setGlueType(jobInfo.getGlueType());triggerParam.setGlueSource(jobInfo.getGlueSource());triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());triggerParam.setBroadcastIndex(index);triggerParam.setBroadcastTotal(total);// 3、init address 根據任務的路由策略,從執行器地址中,選擇出一個執行器地址String address = null;ReturnT<String> routeAddressResult = null;if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {if (index < group.getRegistryList().size()) {address = group.getRegistryList().get(index);} else {address = group.getRegistryList().get(0);}} else {routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {address = routeAddressResult.getContent();}}} else {routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));}// 4、trigger remote executor 獲取到執行器地址,runExecutor 執行任務ReturnT<String> triggerResult = null;if (address != null) {triggerResult = runExecutor(triggerParam, address);} else {triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);}// 5、collection trigger infoStringBuffer triggerMsgSb = new StringBuffer();triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":").append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());if (shardingParam != null) {triggerMsgSb.append("("+shardingParam+")");}triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>").append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");// 6、save log trigger-info 更新日志信息jobLog.setExecutorAddress(address);jobLog.setExecutorHandler(jobInfo.getExecutorHandler());jobLog.setExecutorParam(jobInfo.getExecutorParam());jobLog.setExecutorShardingParam(shardingParam);jobLog.setExecutorFailRetryCount(finalFailRetryCount);//jobLog.setTriggerTime();jobLog.setTriggerCode(triggerResult.getCode());jobLog.setTriggerMsg(triggerMsgSb.toString());XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}

2.5 runExecutor 任務的執行:

/**
* run executor* @param triggerParam* @param address* @return*/
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){ReturnT<String> runResult = null;try {// 根據執行器的地址,獲取包裝執行器的接口對象ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);// 遠程rpc 調用 執行器端,進行任務的執行runResult = executorBiz.run(triggerParam);} catch (Exception e) {logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));}// 返滬執行的信息StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");runResultSB.append("<br>address:").append(address);runResultSB.append("<br>code:").append(runResult.getCode());runResultSB.append("<br>msg:").append(runResult.getMsg());runResult.setMsg(runResultSB.toString());return runResult;
}

executorBiz.run 任務的執行:

@Override
public ReturnT<String> run(TriggerParam triggerParam) {return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}

總結

本文對 JobTriggerPoolHelper 的工作內容進行介紹。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/718560.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/718560.shtml
英文地址,請注明出處:http://en.pswp.cn/news/718560.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【JAVA重要知識 | 第三篇】深入理解并暴打AQS原理、ReentrantLock鎖

文章目錄 3.深入理解AQS、ReentrantLock3.1AQS3.1.1AQS簡介3.1.2核心結構&#xff08;1&#xff09;設計模型&#xff08;2&#xff09;組成部分&#xff08;3&#xff09;State關鍵字 3.1.3實現的兩類隊列&#xff08;1&#xff09;同步隊列①CLH②Node③主要行為 img條件隊列…

中霖教育:注冊安全工程師考是科目有哪些?

注冊安全工程師的類型是職業資格證書&#xff0c;需要滿足報名條件才能參加考試&#xff0c;考試通過就能發放證書。報名時間一般在八月份&#xff0c;考試時間在十月底左右。 考試科目&#xff1a; 《安全生產法律法規》 《安全生產管理》 《安全生產技術基礎》 《安全生…

golang實現openssl自簽名雙向認證

第一步&#xff1a;生成CA、服務端、客戶端證書 1. 生成CA根證書 生成CA證書私鑰 openssl genrsa -out ca.key 4096創建ca.conf 文件 [ req ] default_bits 4096 distinguished_name req_distinguished_name[ req_distinguished_name ] countryName …

Node.js基礎---Express路由

1. 路由的概念 1. 什么是路由 廣義上來講&#xff0c;路由就是映射關系 2. Express 中的路由 在 Express 中&#xff0c;路由指的是客戶端的請求與服務器處理函數之間的映射關系 Express 中的路由分三部分&#xff1a;請求的類型、請求的URL地址&#xff0c;處理函數。如下&am…

怎么使用curl2py自動構造爬蟲代碼并進行網絡爬蟲

目錄 一、了解curl2py 二、安裝curl2py 三、使用curl2py生成爬蟲代碼 四、實際案例&#xff1a;爬取網頁數據 五、總結與建議 在當今數據驅動的時代&#xff0c;網絡爬蟲成為了獲取數據的重要工具。對于初學者來說&#xff0c;手動編寫爬蟲代碼可能是一項挑戰。幸運的是&a…

PyTorch-神經網絡

神經網絡&#xff0c;這也是深度學習的基石&#xff0c;所謂的深度學習&#xff0c;也可以理解為很深層的神經網絡。說起這里&#xff0c;有一個小段子&#xff0c;神經網絡曾經被打入了冷宮&#xff0c;因為SVM派的崛起&#xff0c;SVM不了解的同學可以去google一下&#xff0…

JavaScript 基礎學習筆記(五):函數、作用域、匿名函數

目錄 一、函數 1.1 聲明和調用 1.2 形參和實參 1.3 返回值 二、作用域 2.1 全局作用域 2.2 局部作用域 三、匿名函數 3.1 函數表達式 3.2 立即執行函數 一、函數 理解函數的封裝特性&#xff0c;掌握函數的語法規則 1.1 聲明和調用 函數可以把具有相同或相似邏輯的代…

NLP_文本張量表示方法(代碼示例)

目標 了解什么是文本張量表示及其作用.文本張量表示的幾種方法及其實現. 1 文本張量表示 將一段文本使用張量進行表示&#xff0c;其中一般將詞匯為表示成向量&#xff0c;稱作詞向量&#xff0c;再由各個詞向量按順序組成矩陣形成文本表示. ["人生", "該&q…

無極低碼:五分鐘快速上手,開啟編程新時代

無極低碼平臺憑借其革命性的設計理念和強大的功能特性&#xff0c;正在徹底改變軟件開發的傳統格局。該平臺專為開發者、初創企業和各類研發團隊量身打造&#xff0c;旨在提供一種快速而高效的解決方案&#xff0c;以應對日益增長的業務需求和技術挑戰。 1.無極低碼的核心價值在…

2024《》

vue-cli到哪做了那些事 vue-cli是vue.js的腳手架&#xff0c;用于自動生成vue.jswebpack的項目模板&#xff0c;快速搭建Vue.js項目。 vue cli內置了webpack的一些功能&#xff0c;這些是用webpack打包時需要我們自己配置的&#xff0c;例如&#xff1a; 1.ES6代碼轉換成ES5代…

Linux 實現打印彩色進度條

文章目錄 預備知識一、理解回車換行二、認識行緩沖1、代碼一、二&#xff08;回車換行理解&#xff09;2、代碼三、四&#xff08;sleep函數和ffush函數理解&#xff09; 三、簡單倒計時1. 倒計時代碼2、效果展示 四、進度條1、效果展示2、進度條代碼makefileProcessBar.hProce…

tomcat 反向代理 自建博客 修改狀態頁 等

一 自建博客 隨后&#xff0c;拷貝到webapps下面 并且做軟連接 隨后重定向 并且下載 cat >/etc/yum.repos.d/mysql.repo <<EOF [mysql57-community] nameMySQL 5.7 Community Server baseurlhttp://repo.mysql.com/yum/mysql-5.7-community/el/7/x86_64/ enabled1 g…

團體程序設計天梯賽 L2-006 樹的遍歷

L2-006 樹的遍歷 分數 25 給定一棵二叉樹的后序遍歷和中序遍歷&#xff0c;請你輸出其層序遍歷的序列。這里假設鍵值都是互不相等的正整數。 輸入格式&#xff1a; 輸入第一行給出一個正整數N&#xff08;≤30&#xff09;&#xff0c;是二叉樹中結點的個數。第二行給出其后…

【Linux】Linux系統磁盤分區和掛載相關命令介紹

Linux系統磁盤分區和掛載相關命令介紹 文章目錄 Linux系統磁盤分區和掛載相關命令介紹磁盤分區1、使用fdisk創建分區2、使用parted創建分區 格式化分區分區掛載自動掛載其他常見&#xff08;用&#xff09;的磁盤相關命令 在Linux系統中&#xff0c;磁盤分區和磁盤掛載是管理存…

第十四屆藍橋杯大賽B組 JAVA 蝸牛 (遞歸剪枝)

題目描述&#xff1a; 這天&#xff0c;一只蝸牛來到了二維坐標系的原點。 在 x 軸上長有 n 根竹竿。它們平行于 y 軸&#xff0c;底部縱坐標為 0&#xff0c;橫坐標分別為 x1, x2, …, xn。竹竿的高度均為無限高&#xff0c;寬度可忽略。蝸牛想要從原點走到第 n 個竹竿的底部也…

全域電商數據集成管理與采集|API接口的采集與管理

如今&#xff0c;全渠道零售已是大勢所趨。企業電商經營的一大現狀就是數據分散各處&#xff0c;比如有來自電商平臺私域數據、品牌一方數據、公開的第三方行業數據與電商平臺C端頁面數據等等。如何集成全域數據日益成為企業數字化基建的難題。 當前電商數據集成的主流方案為人…

【基于Matlab GUI的語音降噪系統設計】

客戶不要了&#xff0c;掛網上吧&#xff0c;有需要自行下載~ 賺點辛苦費 ** 功能實現: ** 1、導入音頻文件/錄入音頻&#xff0c;能實現播放功能。 2、對導入/錄入的音頻信號進行時域和頻域分析&#xff0c;并制圖。 3、可在導入/錄入的音頻信號上加入噪聲&#xff0c;并能夠播…

Apache JMeter 5.6.3 安裝

源碼下載 curl -O https://dlcdn.apache.org//jmeter/source/apache-jmeter-5.6.3_src.zipJMeter 下載 curl -O https://dlcdn.apache.org//jmeter/binaries/apache-jmeter-5.6.3.zipjmeter.properties 里 設置中文 windows系統上解壓&#xff0c;雙擊jmeter.bat 啟動 執行參…

【人工智能】DeepLearning學習路線及簡要說明

目錄 神經網絡 1.1 前饋神經網絡(FNN) 結構和工作原理 訓練過程 應用

架構設計方法(4A架構)-應用架構

1、應用架構&#xff08;AA&#xff09;&#xff1a;業務價值與產品之間的橋梁&#xff0c;是企業架構的一個子集 2、應用架構包含“應用系統模塊、應用服務、應用系統集成”3個關鍵要素 3、收集AS-IS應用架構&#xff0c;描繪現狀&#xff0c;并識別改進機會點 4、描述對新系統…