基于 Apache Dolphinscheduler3.1.9中的Task 處理流程解析

實現一個調度任務,可能很簡單。但是如何讓工作流下的任務跑得更好、更快、更穩定、更具有擴展性,同時可視化,是值得我們去思考得問題。

Apache DolphinScheduler是一個分布式和可擴展的開源工作流協調平臺,具有強大的DAG可視化界面,廣泛應用于數據集成、數據分析和大規模數據遷移。

Master 整體啟動流程

    @PostConstructpublic void run() throws SchedulerException {// init rpc serverthis.masterRPCServer.start();// install task pluginthis.taskPluginManager.loadPlugin();// self tolerantthis.masterRegistryClient.start();this.masterRegistryClient.setRegistryStoppable(this);this.masterSchedulerBootstrap.init();// 處理任務的核心 重點是處理任務this.masterSchedulerBootstrap.start();// 事件執行啟動this.eventExecuteService.start();this.failoverExecuteThread.start();this.schedulerApi.start();Runtime.getRuntime().addShutdownHook(new Thread(() -> {if (!ServerLifeCycleManager.isStopped()) {close("MasterServer shutdownHook");}}));}

上面的代碼主要做的事情:

初始化rpc的服務端,也即netty的服務端,為處理請求做好鋪墊
安裝Task插件,task插件主要為業務需要集成的SPI信息
master注冊客戶端啟動
初始化master定時引導
master定時引導啟動
事件執行啟動
失敗執行線程啟動
定時任務api啟動

下面我們重點看這兩段代碼:

this.masterSchedulerBootstrap.start();
this.eventExecuteService.start();

this.masterSchedulerBootstrap.start()

任務的來源在t_ds_command表里面,因此需要先取出指令信息,然后將指令轉處理實例,遍歷處理實例,創建新的工作流線程。

(1) 將處理實例id和處理線程放入到processInstanceExecCacheManager中。

(2) 添加工作流運行狀態和實例id放入到workflowEventQueue隊列中。

因此可以看到消費的poolEvent,可以看到workflowEventQueue.poolEvent() 本質就是workflowEventLooper.start()啟動消費。

此時我們可以看到workflowEventLooper.start(),處理放入到workflowEventQueue中的事件event,也即workflowEventQueue.take(),獲取工作流處理器,處理事件。

Run中的核心處理:

劉亞洲

此時先執行工作流的流程,核心方法:workflowExecuteRunnable::call

workflowEventLooper.start()啟動做的事情:

將任務放入到優先任務隊列之后,就可以進行消費隊列中的任務了。

ProcessInstance任務放入的核心

ProcessInstance 啟動入口點為:org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable#call

根據工作流線程狀態可分為:

1)創建過程中的邏輯:

2)初始化DAG的過程:

3)初始化隊列的過程:

4)工作流狀態成功:

生成者消費者模型的產生是優先任務隊列的放入和優先任務隊列的消費。

因此可以看到兩者在轉換的過程之后基于Netty做了任務的轉發操作,從而在Netty中做指令處理,從而完成消費,最終流轉到具體的Task。

消費任務:TaskPriorityQueueConsumer

核心方法:this.batchDispatch(fetchTaskNum)

netty服務端消費任務消息

實質是放入任務線程,也即:

workerManager.offer(workerTaskExecuteRunnable)

處理消費在run方法里面:

waitSubmitQueue.take()

處理任務的核心:此時會具體流轉到具體的任務,執行處理

此時完成任務的適配業務任務的處理,最終實現任務的處理。

this.eventExecuteService.start()

針對任務處理的狀態進行處理。

stateEventHandler.handleStateEvent(this, stateEvent)

其主要過程是添加狀態事件和消費狀態事件,重點看隊列的生產和消費。

分析的思路和上面的隊列模式差不多,這里不展開了。

總結

從上面我們可以看到生產者消費者模型、線程模型在DS3.1.9版本中使用非常的多,也是我們去了解處理的思路的點。

同時對應任務的處理,為了保證任務的高效處理,使用了Netty來處理任務。

總體來說,代碼寫得還是很不錯的,值得我們去學習。同時還使用了很多設計模式,比如SPI、工廠模式、狀態模式等等。

參考:

dolphinscheduler文檔:https://dolphinscheduler.apache.org/zh-cn github地址:https://github.com/apache/dolphinscheduler

本文由 白鯨開源科技 提供發布支持!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/62141.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/62141.shtml
英文地址,請注明出處:http://en.pswp.cn/web/62141.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

藍橋杯2117砍竹子(簡單易懂 包看包會版)

問題描述 這天, 小明在砍竹子, 他面前有 n 棵竹子排成一排, 一開始第 i 棵竹子的 高度為 hi?. 他覺得一棵一棵砍太慢了, 決定使用魔法來砍竹子。魔法可以對連續的一 段相同高度的竹子使用, 假設這一段竹子的高度為 H, 那么 用一次魔法可以 把這一段竹子的高度都變為 ?H2?…

如何進行 JavaScript 性能優化?

要進行 JavaScript 性能優化,我們可以從多個角度進行思考,主要包括減少頁面渲染時間、減少內存占用、優化代碼執行效率等。以下是優化的一些方法,并結合實際項目代碼示例講解。 目錄結構 減少 DOM 操作 緩存 DOM 元素批量更新 DOM 優化 Jav…

CTF-PWN: 全保護下格式化字符串利用 [第一屆“吾杯”網絡安全技能大賽 如果能重來] 賽后學習(不會)

通過網盤分享的文件:如果能重來.zip 鏈接: https://pan.baidu.com/s/1XKIJx32nWVcSpKiWFQGpYA?pwd1111 提取碼: 1111 --來自百度網盤超級會員v2的分享漏洞分析 格式化字符串漏洞,在printf(format); __int64 sub_13D7() {char format[56]; // [rsp10h] [rbp-40h]…

selenium-常見問題解決方案匯總

selenium-常見問題解決方案 selenium版本selenium代理本地瀏覽器頁面Selenium之多窗口句柄的切換 selenium版本 selenium版本為: 3.141.0 注:selenium4x跟selenium3x會有不同的使用方法, selenium代理本地瀏覽器頁面 利用 Selenium 庫實現對 Google C…

Flask使用長連接

Flask使用flask_socketio實現websocket Python中的單例模式 在HTTP通信中,連接復用(Connection Reuse)是一個重要的概念,它允許客戶端和服務器在同一個TCP連接上發送和接收多個HTTP請求/響應,而不是為每個新的請求/響…

雨晨 26100.2454 Windows 11 24H2 專業工作站 極簡純凈版

文件: 雨晨 26100.2454 Windows 11 24H2 專業工作站極簡 install.esd 大小: 1947043502 字節 修改時間: 2024年12月6日, 星期五, 16:38:37 MD5: 339B7FDCA0130D432A0E98957738A9DD SHA1: 2978AE0CEAF02E52EC4135200D4BDBC861E07BE8 CRC32: 8C329C89 簡述: 由YCDIS…

[docker中首次配置git環境與時間同步問題]

11月沒寫東西,12月初趕緊水一篇。 剛開始搭建docker服務器時,網上找一堆指令配置好git后,再次新建容器后忘記怎么配了,,這次記錄下。 一、git ssh指令法,該方法不用每次提交時輸入密碼 前期準備&#xff0…

MongoDB性能監控工具

mongostat mongostat是MongoDB自帶的監控工具,其可以提供數據庫節點或者整個集群當前的狀態視圖。該功能的設計非常類似于Linux系統中的vmstat命令,可以呈現出實時的狀態變化。不同的是,mongostat所監視的對象是數據庫進程。mongostat常用于…

linux下的python打包

linux下的python打包 一、pyinstaller 優點:打包簡單,將整個運行環境進行打包 缺點:打包文件大、臃腫、啟動慢 安裝pyinstaller包 pip install pyinstaller 打包一個文件 pyinstaller -D app.py會在當前路徑中生成build、dist文件夾還有…

Python模塊之random、hashlib、json、time等內置模塊語法學習

Python內置模塊語法學習 random、hashlib、json、time、datetime、os等內置模塊語法學習 模塊 簡單理解為就是一個.py后綴的一個文件 分為三種: 內置模塊:python自帶,可調用第三方模塊:別人設計的,可調用自定義模塊…

從ctfwiki開始的pwn之旅 5.ret2csu

ret2csu 原理 在 64 位程序中,函數的前 6 個參數是通過寄存器傳遞的,但是大多數時候,我們很難找到每一個寄存器對應的 gadgets。 這時候,我們可以利用 x64 下的 __libc_csu_init 中的 gadgets。這個函數是用來對 libc 進行初始…

Ceph對象存儲

Ceph對象存儲1.概念對象存儲(Object Storage)是一種用于存儲大量非結構化數據的架構模型它使用簡單的HTTP或HTTPS協議進行文件訪問,而不是傳統的文件系統API與傳統的文件系統存儲方式不同,對象存儲不是將數據存儲在目錄或文件夾中…

嵌入式藍橋杯學習拓展 LCD翻轉顯示

通過配置SS和GS兩個標志位,實現掃描方向的切換。 將lcd.c的REG_932X_Init函數進行部分修改。 將LCD_WriteReg(R1, 0x0000);修改為LCD_WriteReg(R1,0x0100); 將LCD_WriteReg(R96, 0x2700); 修改為LCD_WriteReg(R96, 0xA700); void REG_932X_Init1(void) {LCD_Wr…

小程序 —— Day1

組件 — view和scroll-view view 類似于HTML中的div,是一個塊級元素 案例:通過view組件實現頁面的基礎布局 scroll-view 可滾動的視圖區域,用來實現滾動列表效果 案例:實現縱向滾動效果 scroll-x屬性:允許橫向滾動…

git pull error: cannot lock ref

Git: cannot lock ref ‘refs/remotes/origin/feature/xxx’: refs/remotes/origin/feature/xxx/car’ exists; cannot create refs/remotes/origin/feature/xxx git remote prune origin重新整理服務端和本地的關聯關系即可

pubmed關鍵詞搜索技能1:待更新

1,白話變為領域內學術詞: 例如,我想要做蛋白質糖基化修飾以功能,這個領域課題,則 第一性原理,首先是拆分詞匯:糖基化(一般比蛋白質、修飾、功能要在title中更常見,或者是…

iPhone手機清理軟件:相冊清理大師推薦

隨著智能手機成為我們日常生活的必需品,手機中的數據日益膨脹,尤其是照片和視頻這類容易積累的文件。對于iPhone用戶來說,管理這些文件,特別是清理相冊變得尤為重要。本文將介紹一款備受推崇的iPhone手機清理軟件——CleanMyPhone…

SpringBoot 開源停車場管理收費系統

一、下載項目文件 下載源碼項目文件口令: 【前端小程序地址】(3.0):伏脂火器白澤知洞座/~6f8d356LNL~:/【后臺管理地址】(3.0):伏脂火器仇恨篆洞座/~0f4a356Ks2~:/【崗亭端地址】(3.0):動作火器智匯堂多好/~dd69356K6r~:/復制口令…

網絡原理之 TCP 協議

目錄 1. TCP 協議格式 2. TCP 原理 (1) 確認應答 (2) 超時重傳 (3) 連接管理 a) 三次握手 b) 四次揮手 (4) 滑動窗口 (5) 流量控制 (6) 擁塞控制 (7) 延時應答 (8) 捎帶應答 3. TCP 特性 4. 異常情況的處理 1) 進程崩潰 2) 主機關機 (正常流程) 3) 主機掉電 (…

STM32使用RCC(Reset Clock Contorl,復位時鐘控制器)配置時鐘以及時鐘樹

RCC主要作用 設置系統時鐘SYSCLK(System Clock)頻率;設置AHB、APB2、APB1以及各個外設分頻因子,從而設置HCLK、PCLK2、PCLK1以及各個外設的時鐘頻率;控制AHB、APB2、APB1這三條總線時鐘以及每個外設的時鐘開啟&#xf…