原理篇-- 定時任務xxl-job-服務端(admin)項目啟動過程--JobRegistryHelper 初始化 (4)

文章目錄

  • 前言
  • 一、JobRegistryHelper 作用:
  • 二、JobRegistryHelper 源碼介紹:
    • 2.1 初始化start() 方法:
      • 2.1.1 registryOrRemoveThreadPool 執行器注冊和移除:
      • 2.1.2 registryMonitorThread 執行器注冊監控線程:
    • 2.2 toStop() 釋放資源:
    • 2.3 執行器的注冊和移除:
      • 2.3 registry 執行器的注冊:
      • 2.3 registryRemove執行器的移除:
  • 三、擴展:
    • 3.1 守護線程 Thread:
  • 總結


前言

本文對JobRegistryHelper 工作內容進行介紹。


一、JobRegistryHelper 作用:

JobRegistryHelper是xxl-job-admin中的一個工具類,用于注冊并管理各個任務執行器的信息。具體作用包括:

  1. 注冊任務執行器:JobRegistryHelper可以將任務執行器的信息注冊到注冊中心,例如Zookeeper等,以便管理和監控任務執行器的運行狀態。
  2. 監控任務執行器:JobRegistryHelper可以監控任務執行器的健康狀態,包括可用性、負載情況等,確保任務執行器能夠正常執行任務。
  3. 發現任務執行器:JobRegistryHelper可以幫助xxl-job-admin發現注冊的任務執行器,以便將任務分配給合適的執行器執行。
  4. 注銷任務執行器:JobRegistryHelper還可以在任務執行器下線或停止運行時,及時將其從注冊中心注銷,以確保系統能夠正確管理和調度任務。

總之,JobRegistryHelper在xxl-job-admin中扮演著任務執行器注冊、監控和發現的重要角色,保證了任務的正常執行和系統的穩定運行。

二、JobRegistryHelper 源碼介紹:

2.1 初始化start() 方法:

由于start() 方法內容較多,故拆為 2.1.1 和 2.1.2 進行介紹

2.1.1 registryOrRemoveThreadPool 執行器注冊和移除:

private ThreadPoolExecutor registryOrRemoveThreadPool = null;
private Thread registryMonitorThread;
private volatile boolean toStop = false;public void start(){// for registry or remove 執行器注冊和移除 線程池
registryOrRemoveThreadPool = new ThreadPoolExecutor(2,10,30L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {r.run();logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");}});}

2.1.2 registryMonitorThread 執行器注冊監控線程:

// for monitor
registryMonitorThread = new Thread(new Runnable() {@Overridepublic void run() {while (!toStop) {// 死循環 停止標識 當容器停止時 toStop 會被置為 true 這樣就跳出循環try {// auto registry group 獲取自動注冊的執行器  (addressType執行器地址類型:0=自動注冊、1=手動錄入)// 通過頁面收到進行添加的執行器 不進行處理/*** 	SELECT <include refid="Base_Column_List" />FROM xxl_job_group AS tWHERE t.address_type = #{addressType}ORDER BY t.app_name, t.title, t.id ASC**/List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);if (groupList!=null && !groupList.isEmpty()) {// remove dead address (admin/executor)//  獲取更新時間超過 90s 的執行器 進行刪除(超過90s 任務執行器已經下線)// RegistryConfig.DEAD_TIMEOUT 30*3=90/*** 	SELECT t.idFROM xxl_job_registry AS tWHERE t.update_time <![CDATA[ < ]]> DATE_ADD(#{nowTime},INTERVAL -#{timeout} SECOND)**/List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());if (ids!=null && ids.size()>0) {// 移除移除的執行器/*** DELETE FROM xxl_job_registryWHERE id in<foreach collection="ids" item="item" open="(" close=")" separator="," >#{item}</foreach>**/XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);}// fresh online address (admin/executor)// 獲取更新時間在 90 s 全部執行器/*** SELECT <include refid="Base_Column_List" />FROM xxl_job_registry AS tWHERE t.update_time <![CDATA[ > ]]> DATE_ADD(#{nowTime},INTERVAL -#{timeout} SECOND)**/HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());if (list != null) {for (XxlJobRegistry item: list) {// 判斷 執行是否是自動注冊的if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {//  RegistType{ EXECUTOR, ADMIN } EXECUTOR 自動注冊,ADMIN 手動注冊//  獲取執行器的名字 執行器項目中設置的 xxl.job.executor.appname value 值String appname = item.getRegistryKey();// 按照執行器名稱 進行數據分組List<String> registryList = appAddressMap.get(appname);if (registryList == null) {registryList = new ArrayList<String>();}if (!registryList.contains(item.getRegistryValue())) {registryList.add(item.getRegistryValue());}// 放入執行器的的地址 key : 執行器名稱 ,value: 執行器地址appAddressMap.put(appname, registryList);}}}// fresh group address 刷新執行器地址for (XxlJobGroup group: groupList) {// 根據執行器項目名稱 獲取對應的執行器地址List<String> registryList = appAddressMap.get(group.getAppname());String addressListStr = null;if (registryList!=null && !registryList.isEmpty()) {Collections.sort(registryList);StringBuilder addressListSB = new StringBuilder();// 多個執行器則使用, 進行分隔for (String item:registryList) {addressListSB.append(item).append(",");}addressListStr = addressListSB.toString();addressListStr = addressListStr.substring(0, addressListStr.length()-1);}// 更新 執行器地址group.setAddressList(addressListStr);group.setUpdateTime(new Date());XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);}}} catch (Exception e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}try {// 每隔 30s 執行一次  BEAT_TIMEOUT = 30TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);}}}logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");}
});
// 設置registryMonitorThread 守護線程,線程的名字,以及啟動線程
registryMonitorThread.setDaemon(true);
registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
registryMonitorThread.start();

執行器集群情況下的注冊地址:
在這里插入圖片描述

2.2 toStop() 釋放資源:

public void toStop(){// registryMonitorThread while 循環標識符設置為true 結束死循環toStop = true;// stop registryOrRemoveThreadPool 停掉線程registryOrRemoveThreadPool.shutdownNow();// stop monitir (interrupt and wait)registryMonitorThread.interrupt();try {// 等待registryMonitorThread 的任務執行完成registryMonitorThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}
}

2.3 執行器的注冊和移除:

2.3 registry 執行器的注冊:

public ReturnT<String> registry(RegistryParam registryParam) {// valid
if (!StringUtils.hasText(registryParam.getRegistryGroup())|| !StringUtils.hasText(registryParam.getRegistryKey())|| !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
}// async execute 交由 registryOrRemoveThreadPool 線程池 執行注冊任務
registryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {// 更新執行器信息/***  UPDATE xxl_job_registrySET `update_time` = #{updateTime}WHERE `registry_group` = #{registryGroup}AND `registry_key` = #{registryKey}AND `registry_value` = #{registryValue}**/int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());// 如果更新的條數小于1 證明執行器不在數據庫 xxl_job_registry 中存在if (ret < 1) {// 插入一條新的呃執行器數據XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());// fresh 刷新 xxl_job_group 的執行器地址(目前此方法并沒有實現具體的業務)freshGroupRegistryInfo(registryParam);}}
});
// 返回給執行器客戶端項目 注冊成功標識
return ReturnT.SUCCESS;
}private void freshGroupRegistryInfo(RegistryParam registryParam){// Under consideration, prevent affecting core tables
}

2.3 registryRemove執行器的移除:

public ReturnT<String> registryRemove(RegistryParam registryParam) {// validif (!StringUtils.hasText(registryParam.getRegistryGroup())|| !StringUtils.hasText(registryParam.getRegistryKey())|| !StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");}// async execute 交由 registryOrRemoveThreadPool 線程池 執行任務registryOrRemoveThreadPool.execute(new Runnable() {@Overridepublic void run() {// 刪除失效的執行器/*** 	DELETE FROM xxl_job_registryWHERE registry_group = #{registryGroup}AND registry_key = #{registryKey}AND registry_value = #{registryValue}**/int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());if (ret > 0) {// fresh 刪除后更新 xxl_job_group 的執行器地址(目前此方法并沒有實現具體的業務)freshGroupRegistryInfo(registryParam);}}});return ReturnT.SUCCESS;
}

三、擴展:

3.1 守護線程 Thread:

將一個線程設置為守護線程(daemon thread)的作用是告訴JVM,如果所有的非守護線程都執行完畢了,那么守護線程也應該隨著退出。換句話說,當所有的非守護線程都執行完畢時,守護線程會自動結束,不會影響JVM的正常關閉。

設置一個線程為守護線程通常用于執行一些后臺任務,這些任務不需要操縱UI或與用戶進行交互,而且這些任務不是應用程序的關鍵部分。通過將這些線程設置為守護線程,可以避免這些線程持續運行導致程序無法正常結束的情況。

需要注意的是,守護線程所執行的任務可能會在任何時刻被JVM終止,因此對于一些需要確保執行完整性的任務,不應該將其設置為守護線程。常見的例子是Timer和TimerTask,如果Timer是守護線程,那么可能在主線程結束后,Timer任務就被終止了。


總結

本文對 JobRegistryHelper 的工作內容進行介紹。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/718683.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/718683.shtml
英文地址,請注明出處:http://en.pswp.cn/news/718683.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

折線圖實現柱狀陰影背景的demo

這個是一個由官網的基礎折線圖實現的流程&#xff0c;將涉及到的知識點附上個人淺薄的見解&#xff0c;源碼在最后&#xff0c;需要的可自取。 折線圖 成果展示代碼注解參數backgroundColordataZoomlegendtitlexAxisyAxisgridseries 源碼 成果展示 官網的基礎折線圖&#xff…

貓耳語音下載(mediadown)

貓耳語音下載(mediadown) 一、介紹 貓耳語音下載,能夠幫助你下載貓耳音頻節目。如果你是會員,它還能幫你下載會員節目。 二、下載地址 下載:貓耳語音下載(mediadown) 百度網盤下載:貓耳語音下載(mediadown) 三、安裝教程 將下載的文件解壓到D:\xibinhui,D:\Pr…

Unity RectTransform·屏幕坐標轉換

RectTransform轉屏幕坐標 分兩種情況 Canvas渲染模式為Overlay時&#xff0c;使用此方式 public Rect GetScreenCoordinatesOfCorners(RectTransform rt) {var worldCorners new Vector3[4];rt.GetWorldCorners(worldCorners);var result new Rect(worldCorners[0].x,world…

Manomotion 實現AR手勢互動-解決手勢無效的問題

之前就玩過 Manomotion &#xff0c;現在有新需求&#xff0c;重新接入發現不能用了&#xff0c;不管什么辦法&#xff0c;都識別不了手勢&#xff0c;我記得當初是直接調用就可以的。 經過研究發現&#xff0c;新版本SDK改了寫法。下邊就寫一下新版本的調用&#xff0c;并且實…

好書推薦 《Excel函數與公式應用大全for Excel 365 Excel 2021》

一.基本介紹 1.什么是 Excel? Excel 是微軟公司開發的一款電子表格軟件&#xff0c;是 Microsoft Office 套件的一部分。它被廣泛用于數據處理、分析、可視化和管理等方面。Excel 提供了豐富的功能&#xff0c;使用戶能夠創建、編輯、存儲和分享各種類型的數據表格。 2.Exc…

Golang Channel 詳細原理和使用技巧

1.簡介 Channel(一般簡寫為 chan) 管道提供了一種機制:它在兩個并發執行的協程之間進行同步&#xff0c;并通過傳遞與該管道元素類型相符的值來進行通信,它是Golang在語言層面提供的goroutine間的通信方式.通過Channel在不同的 goroutine中交換數據&#xff0c;在goroutine之間…

代碼隨想錄Day66 | 圖的DFS與BFS

代碼隨想錄Day66 | 圖的DFS與BFS DFS797.所有可能的路徑無向圖和有向圖的處理 BFS200.島嶼數量 DFS 文檔講解&#xff1a;代碼隨想錄 視頻講解&#xff1a; 狀態 本質上就是回溯算法。 void dfs(參數) {if (終止條件) {存放結果;return;}for (選擇&#xff1a;本節點所連接的…

『運維備忘錄』之 Shell 內置命令大集合

前言 在 Shell 中&#xff0c;有許多內置命令可用于執行各種任務&#xff0c;包括文件操作、進程管理、環境變量設置等。本文將詳細介紹一些常見的Shell內置命令及其示例用法。 命令描述alias創建命令別名&#xff0c;用于將命令或命令組合關聯到用戶自定義名稱bg將作業放入后…

Qt textBrowser的Html相關

Qt textBrowser的Html相關 Qt textBrowser的Html相關 Qt textBrowser的Html相關 一開始就想要一個簡單的功能&#xff0c;點一下按鈕&#xff0c;添加的文字居中顯示&#xff0c;再點一下按鈕&#xff0c;添加的文字變更顏色居右顯示。 但是&#xff1a; ui->textEdit-&g…

WordPress免費的遠程圖片本地化下載插件nicen-localize-image

nicen-localize-image&#xff08;可在wordpress插件市場搜索下載&#xff09;&#xff0c;是一款用于本地化文章外部圖片的插件&#xff0c;支持如下功能&#xff1a; 文章發布前通過編輯器插件本地化 文章手動發布時自動本地化 文章定時發布時自動本地化 針對已發布的文章…

python類的屬性、方法、靜態方法、靜態方法類內部的調用、直接調用與實例化調用

設計者&#xff1a;ISDF工軟未來 版本&#xff1a;v1.0 日期&#xff1a;2024/3/4 class Restaurant:餐館類def __init__(self,restaurant_name,cuisine_type):#類的屬性self.restaurant_name restaurant_nameself.cuisine_type cuisine_type# self.stregth_level 0def desc…

基于機器學習的密碼強度檢測

項目簡介 利用機器學習對提供的數據集預測用戶輸入的密碼是否為弱密碼。 原始數據集只包含關于弱密碼的信息&#xff0c;并沒有包含強密碼的數據或分類器&#xff0c;這意味著模型無法學習到強密碼的規律!!! 我之所以這樣設計這個示例&#xff0c;其目的是為了向你展示模型的…

python65-Python的循環之for表達式

for表達式用于利用其他區間、元組、列表等可迭代對象創建新的列表。for 表達式的語法格式如下: [表達式 for 循環計數器 in 可迭代對象] 從上面的語法格式可以看出,for表達式與普通for循環的區別 1)在for關鍵字之前定義一個表達式,該表達式通常會包含循環計數器 2)for …

@RestController和@Controller的區別

RestController和Controller是Spring框架中兩個常用的注解&#xff0c;用于標識Controller類 1.Controller&#xff1a;使用Controller注解標記的類是一個典型的MVC控制器&#xff08;controller&#xff09;&#xff0c;用于處理請求并返回視圖。通常在該類的方法上使用Reque…

alzet供應商你值得擁有

在20世紀70年代&#xff0c;ALZE公司研發出來一款巧妙的藥物輸送裝置——Alzet osmotic pump。這款產品如膠囊般精致小巧&#xff0c;它既有膠囊的外表&#xff0c;也具有膠囊的作用。在Alzet osmotic pump中藏有可以裝配藥物溶液的空間。此款膠囊泵如同一個小投遞員&#xff0…

Vue整合three.js 環境從頭搭建 詳細教程

目錄 一、創建Vue項目 二、搭建three.js 三、引入擴展庫OrbitControls 首先我們要有nodejs環境,這里我們用的是17.1.0版本。 一、創建Vue項目 第一步:終端命令創建vue項目 npm create vite@latest

AI蠕蟲病毒威脅升級,揭示AI安全新危機

一組研究人員成功研發出首個能夠通過電子郵件客戶端竊取數據、傳播惡意軟件以及向他人發送垃圾郵件的AI蠕蟲&#xff0c;并在使用流行的大規模語言模型&#xff08;LLMs&#xff09;的測試環境中展示了其按設計功能運作的能力。基于他們的研究成果&#xff0c;研究人員向生成式…

用pyinstaller打包python代碼為exe可執行文件并在其他電腦運行的方法

本文介紹基于Python語言中的pyinstaller模塊&#xff0c;將寫好的.py格式的Python代碼及其所用到的所有第三方庫打包&#xff0c;生成.exe格式的可執行文件&#xff0c;從而方便地在其他環境、其他電腦中直接執行這一可執行文件的方法。 有時&#xff0c;我們希望將自己電腦上的…

python筆記:super(),__getitem()__,__call()__

目錄 Q1&#xff1a;super(NetworkBlock, self).__init__()什么意思 Q2&#xff1a;__call__() 類的實例圓括號自動調用 Q3&#xff1a;__getitem__() 類的實例 方括號自動調用 Q4&#xff1a;沒有雙下劃線方法時&#xff0c;普通的調用類的方法 總結 Q1&#xff1a;super…

Python 全棧系列230 輕全局函數服務 GFGoLite

說明 為了將GlobalFunc推向正常的應用軌道&#xff0c;構造一個功能簡單的服務。 內容 1 工作模式 Server模式&#xff1a;以API服務方式執行 Worker模式: 以Worker方式執行。通過left/right文件夾和rsync方式執行任務并寫結果。 2 構造方法 重載和執行&#xff1b;從標…