實現一個調度任務,可能很簡單。但是如何讓工作流下的任務跑得更好、更快、更穩定、更具有擴展性,同時可視化,是值得我們去思考得問題。
Apache DolphinScheduler是一個分布式和可擴展的開源工作流協調平臺,具有強大的DAG可視化界面,廣泛應用于數據集成、數據分析和大規模數據遷移。
Master 整體啟動流程
@PostConstructpublic void run() throws SchedulerException {// init rpc serverthis.masterRPCServer.start();// install task pluginthis.taskPluginManager.loadPlugin();// self tolerantthis.masterRegistryClient.start();this.masterRegistryClient.setRegistryStoppable(this);this.masterSchedulerBootstrap.init();// 處理任務的核心 重點是處理任務this.masterSchedulerBootstrap.start();// 事件執行啟動this.eventExecuteService.start();this.failoverExecuteThread.start();this.schedulerApi.start();Runtime.getRuntime().addShutdownHook(new Thread(() -> {if (!ServerLifeCycleManager.isStopped()) {close("MasterServer shutdownHook");}}));}
上面的代碼主要做的事情:
初始化rpc的服務端,也即netty的服務端,為處理請求做好鋪墊
安裝Task插件,task插件主要為業務需要集成的SPI信息
master注冊客戶端啟動
初始化master定時引導
master定時引導啟動
事件執行啟動
失敗執行線程啟動
定時任務api啟動
下面我們重點看這兩段代碼:
this.masterSchedulerBootstrap.start();
this.eventExecuteService.start();
this.masterSchedulerBootstrap.start()
任務的來源在t_ds_command
表里面,因此需要先取出指令信息,然后將指令轉處理實例,遍歷處理實例,創建新的工作流線程。
(1) 將處理實例id
和處理線程放入到processInstanceExecCacheManager
中。
(2) 添加工作流運行狀態和實例id
放入到workflowEventQueue
隊列中。
因此可以看到消費的poolEvent
,可以看到workflowEventQueue.poolEvent()
本質就是workflowEventLooper.start()
啟動消費。
此時我們可以看到workflowEventLooper.start()
,處理放入到workflowEventQueue
中的事件event,也即workflowEventQueue.take()
,獲取工作流處理器,處理事件。
Run中的核心處理:
劉亞洲
此時先執行工作流的流程,核心方法:workflowExecuteRunnable::call
。
workflowEventLooper.start()
啟動做的事情:
將任務放入到優先任務隊列之后,就可以進行消費隊列中的任務了。
ProcessInstance任務放入的核心
ProcessInstance 啟動入口點為:org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable#call
。
根據工作流線程狀態可分為:
1)創建過程中的邏輯:
2)初始化DAG的過程:
3)初始化隊列的過程:
4)工作流狀態成功:
生成者消費者模型的產生是優先任務隊列的放入和優先任務隊列的消費。
因此可以看到兩者在轉換的過程之后基于Netty做了任務的轉發操作,從而在Netty中做指令處理,從而完成消費,最終流轉到具體的Task。
消費任務:TaskPriorityQueueConsumer
核心方法:this.batchDispatch(fetchTaskNum)
netty服務端消費任務消息
實質是放入任務線程,也即:
workerManager.offer(workerTaskExecuteRunnable)
處理消費在run方法里面:
waitSubmitQueue.take()
處理任務的核心:此時會具體流轉到具體的任務,執行處理
此時完成任務的適配業務任務的處理,最終實現任務的處理。
this.eventExecuteService.start()
針對任務處理的狀態進行處理。
stateEventHandler.handleStateEvent(this, stateEvent)
其主要過程是添加狀態事件和消費狀態事件,重點看隊列的生產和消費。
分析的思路和上面的隊列模式差不多,這里不展開了。
總結
從上面我們可以看到生產者消費者模型、線程模型在DS3.1.9版本中使用非常的多,也是我們去了解處理的思路的點。
同時對應任務的處理,為了保證任務的高效處理,使用了Netty來處理任務。
總體來說,代碼寫得還是很不錯的,值得我們去學習。同時還使用了很多設計模式,比如SPI、工廠模式、狀態模式等等。
參考:
dolphinscheduler文檔:https://dolphinscheduler.apache.org/zh-cn github地址:https://github.com/apache/dolphinscheduler
本文由 白鯨開源科技 提供發布支持!