文章目錄
- 前言
- 一、JobRegistryHelper 作用:
- 二、JobRegistryHelper 源碼介紹:
- 2.1 初始化start() 方法:
- 2.1.1 registryOrRemoveThreadPool 執行器注冊和移除:
- 2.1.2 registryMonitorThread 執行器注冊監控線程:
- 2.2 toStop() 釋放資源:
- 2.3 執行器的注冊和移除:
- 2.3 registry 執行器的注冊:
- 2.3 registryRemove執行器的移除:
- 三、擴展:
- 3.1 守護線程 Thread:
- 總結
前言
本文對JobRegistryHelper 工作內容進行介紹。
一、JobRegistryHelper 作用:
JobRegistryHelper是xxl-job-admin中的一個工具類,用于注冊并管理各個任務執行器的信息。具體作用包括:
- 注冊任務執行器:JobRegistryHelper可以將任務執行器的信息注冊到注冊中心,例如Zookeeper等,以便管理和監控任務執行器的運行狀態。
- 監控任務執行器:JobRegistryHelper可以監控任務執行器的健康狀態,包括可用性、負載情況等,確保任務執行器能夠正常執行任務。
- 發現任務執行器:JobRegistryHelper可以幫助xxl-job-admin發現注冊的任務執行器,以便將任務分配給合適的執行器執行。
- 注銷任務執行器:JobRegistryHelper還可以在任務執行器下線或停止運行時,及時將其從注冊中心注銷,以確保系統能夠正確管理和調度任務。
總之,JobRegistryHelper在xxl-job-admin中扮演著任務執行器注冊、監控和發現的重要角色,保證了任務的正常執行和系統的穩定運行。
二、JobRegistryHelper 源碼介紹:
2.1 初始化start() 方法:
由于start() 方法內容較多,故拆為 2.1.1 和 2.1.2 進行介紹
2.1.1 registryOrRemoveThreadPool 執行器注冊和移除:
private ThreadPoolExecutor registryOrRemoveThreadPool = null;
private Thread registryMonitorThread;
private volatile boolean toStop = false;public void start(){// for registry or remove 執行器注冊和移除 線程池
registryOrRemoveThreadPool = new ThreadPoolExecutor(2,10,30L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {r.run();logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");}});}
2.1.2 registryMonitorThread 執行器注冊監控線程:
// for monitor
registryMonitorThread = new Thread(new Runnable() {@Overridepublic void run() {while (!toStop) {// 死循環 停止標識 當容器停止時 toStop 會被置為 true 這樣就跳出循環try {// auto registry group 獲取自動注冊的執行器 (addressType執行器地址類型:0=自動注冊、1=手動錄入)// 通過頁面收到進行添加的執行器 不進行處理/*** SELECT <include refid="Base_Column_List" />FROM xxl_job_group AS tWHERE t.address_type = #{addressType}ORDER BY t.app_name, t.title, t.id ASC**/List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);if (groupList!=null && !groupList.isEmpty()) {// remove dead address (admin/executor)// 獲取更新時間超過 90s 的執行器 進行刪除(超過90s 任務執行器已經下線)// RegistryConfig.DEAD_TIMEOUT 30*3=90/*** SELECT t.idFROM xxl_job_registry AS tWHERE t.update_time <![CDATA[ < ]]> DATE_ADD(#{nowTime},INTERVAL -#{timeout} SECOND)**/List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());if (ids!=null && ids.size()>0) {// 移除移除的執行器/*** DELETE FROM xxl_job_registryWHERE id in<foreach collection="ids" item="item" open="(" close=")" separator="," >#{item}</foreach>**/XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);}// fresh online address (admin/executor)// 獲取更新時間在 90 s 全部執行器/*** SELECT <include refid="Base_Column_List" />FROM xxl_job_registry AS tWHERE t.update_time <![CDATA[ > ]]> DATE_ADD(#{nowTime},INTERVAL -#{timeout} SECOND)**/HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());if (list != null) {for (XxlJobRegistry item: list) {// 判斷 執行是否是自動注冊的if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {// RegistType{ EXECUTOR, ADMIN } EXECUTOR 自動注冊,ADMIN 手動注冊// 獲取執行器的名字 執行器項目中設置的 xxl.job.executor.appname value 值String appname = item.getRegistryKey();// 按照執行器名稱 進行數據分組List<String> registryList = appAddressMap.get(appname);if (registryList == null) {registryList = new ArrayList<String>();}if (!registryList.contains(item.getRegistryValue())) {registryList.add(item.getRegistryValue());}// 放入執行器的的地址 key : 執行器名稱 ,value: 執行器地址appAddressMap.put(appname, registryList);}}}// fresh group address 刷新執行器地址for (XxlJobGroup group: groupList) {// 根據執行器項目名稱 獲取對應的執行器地址List<String> registryList = appAddressMap.get(group.getAppname());String addressListStr = null;if (registryList!=null && !registryList.isEmpty()) {Collections.sort(registryList);StringBuilder addressListSB = new StringBuilder();// 多個執行器則使用, 進行分隔for (String item:registryList) {addressListSB.append(item).append(",");}addressListStr = addressListSB.toString();addressListStr = addressListStr.substring(0, addressListStr.length()-1);}// 更新 執行器地址group.setAddressList(addressListStr);group.setUpdateTime(new Date());XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}try {// 每隔 30s 執行一次 BEAT_TIMEOUT = 30TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}}logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");}
});
// 設置registryMonitorThread 守護線程,線程的名字,以及啟動線程
registryMonitorThread.setDaemon(true);
registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
registryMonitorThread.start();
執行器集群情況下的注冊地址:
2.2 toStop() 釋放資源:
public void toStop(){// registryMonitorThread while 循環標識符設置為true 結束死循環toStop = true;// stop registryOrRemoveThreadPool 停掉線程registryOrRemoveThreadPool.shutdownNow();// stop monitir (interrupt and wait)registryMonitorThread.interrupt();try {// 等待registryMonitorThread 的任務執行完成registryMonitorThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}
}
2.3 執行器的注冊和移除:
2.3 registry 執行器的注冊:
public ReturnT<String> registry(RegistryParam registryParam) {// valid
if (!StringUtils.hasText(registryParam.getRegistryGroup())|| !StringUtils.hasText(registryParam.getRegistryKey())|| !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
}// async execute 交由 registryOrRemoveThreadPool 線程池 執行注冊任務
registryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {// 更新執行器信息/*** UPDATE xxl_job_registrySET `update_time` = #{updateTime}WHERE `registry_group` = #{registryGroup}AND `registry_key` = #{registryKey}AND `registry_value` = #{registryValue}**/int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());// 如果更新的條數小于1 證明執行器不在數據庫 xxl_job_registry 中存在if (ret < 1) {// 插入一條新的呃執行器數據XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());// fresh 刷新 xxl_job_group 的執行器地址(目前此方法并沒有實現具體的業務)freshGroupRegistryInfo(registryParam);}}
});
// 返回給執行器客戶端項目 注冊成功標識
return ReturnT.SUCCESS;
}private void freshGroupRegistryInfo(RegistryParam registryParam){// Under consideration, prevent affecting core tables
}
2.3 registryRemove執行器的移除:
public ReturnT<String> registryRemove(RegistryParam registryParam) {// validif (!StringUtils.hasText(registryParam.getRegistryGroup())|| !StringUtils.hasText(registryParam.getRegistryKey())|| !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");}// async execute 交由 registryOrRemoveThreadPool 線程池 執行任務registryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {// 刪除失效的執行器/*** DELETE FROM xxl_job_registryWHERE registry_group = #{registryGroup}AND registry_key = #{registryKey}AND registry_value = #{registryValue}**/int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());if (ret > 0) {// fresh 刪除后更新 xxl_job_group 的執行器地址(目前此方法并沒有實現具體的業務)freshGroupRegistryInfo(registryParam);}}});return ReturnT.SUCCESS;
}
三、擴展:
3.1 守護線程 Thread:
將一個線程設置為守護線程(daemon thread)的作用是告訴JVM,如果所有的非守護線程都執行完畢了,那么守護線程也應該隨著退出。換句話說,當所有的非守護線程都執行完畢時,守護線程會自動結束,不會影響JVM的正常關閉。
設置一個線程為守護線程通常用于執行一些后臺任務,這些任務不需要操縱UI或與用戶進行交互,而且這些任務不是應用程序的關鍵部分。通過將這些線程設置為守護線程,可以避免這些線程持續運行導致程序無法正常結束的情況。
需要注意的是,守護線程所執行的任務可能會在任何時刻被JVM終止,因此對于一些需要確保執行完整性的任務,不應該將其設置為守護線程。常見的例子是Timer和TimerTask,如果Timer是守護線程,那么可能在主線程結束后,Timer任務就被終止了。
總結
本文對 JobRegistryHelper 的工作內容進行介紹。