FATE Flow 源碼解析 - 作業提交處理流程

背景介紹

FATE 是隱私計算中最有名的開源項目了,從 star 的數量上來看也可以看出來。截止 2023 年 3 月共收獲 4.9k 個 star,但是 FATE 一直被認為代碼框架復雜,難以理解,作為一個相關的從業者,后續會持續對 FATE 項目的源碼進行解析,方便對隱私計算感興趣的后來者提供一點點幫助。

本文主要基于 FATE-Flow 2022 年 12 月發布的版本 v1.10.0,后續的版本可能略有差異。針對 FATE-Flow 的代碼,基于 v1.10.0 的做了一個代碼注解的倉庫,方便查看具體的代碼 https://github.com/hustyichi/FATE-Flow

Fate-Flow 基礎介紹

FATE-Flow 是 FATE 項目的重要組成部分,主要用于實現作業的調度,整體的設計可以查看 官方文檔
FATE 中提交的訓練作業會提交給 Fate-Flow,由 FATE-Flow 統一進行調度執行,最終生成所需訓練結果

Fate-Flow 是作為一個 Web 服務對外提供服務的,對應的初始啟動文件為 FATE-Flow/python/fate_flow/fate_flow_server.py ,熟悉 flask 的可以看到,最終就是調用 run_simple 創建了一個 Web 服務,根據 app 可以找到 Web 服務的主要代碼都在 FATE-Flow/python/fate_flow/apps 目錄下,路由注冊的代碼如下所示:

# 注冊 HTTP 路由,將 Fate-Flow/python/fate_flow/apps 以及 Fate-Flow/python/fate_flow/scheduling_apps 下所有 python 文件client_urls_prefix = [register_page(path)for path in search_pages_path(Path(__file__).parent)
]
scheduling_urls_prefix = [register_page(path)for path in search_pages_path(Path(__file__).parent.parent / 'scheduling_apps')
]

可以看到注冊的路由主要就是 apps 目錄與 scheduling_apps 目錄下的路由。

一個注意點:FATE-Flow 是沒辦法獨立運行的,需要作為 FATE 的一部分執行。 FATE-Flow 項目部分依賴的代碼,比如 fate_arch 是存在于 FATE 工程下,對應的路徑為 FATE/python/fate_arch ,找不到代碼時可以聯合 FATE 代碼倉庫進行閱讀

作業處理流程

作為一個作業調度的服務,最重要的就是完整的處理流程,先厘清這個主線,其他分支就更容易理解了,主要流程如下所示:

作業提交

作業提交是通過 FATE-Flow/python/fate_flow/apps/job_app.py 中的 submit_job 進行提交的,主要的處理都是通過 DAGScheduler.submit() 來完成的,簡化版本的代碼如下所示:


def submit(cls, submit_job_conf: JobConfigurationBase, job_id: str = None):# 沒有 id 時默認生成唯一 idif not job_id:job_id = job_utils.generate_job_id()submit_result = {"job_id": job_id}job = Job()job.f_job_id = job_idjob.f_dsl = dsljob.f_train_runtime_conf = train_runtime_confjob.f_roles = runtime_conf["role"]job.f_initiator_role = job_initiator["role"]job.f_initiator_party_id = job_initiator["party_id"]job.f_role = job_initiator["role"]job.f_party_id = job_initiator["party_id"]# 通知各個站點 (party) 去創建對應的作業 job 以及對應的任務 tasksstatus_code, response = FederatedScheduler.create_job(job=job)# 更新 job 狀態為 WAITINGjob.f_status = JobStatus.WAITING# 將 job 狀態同步給各個站點(party)status_code, response = FederatedScheduler.sync_job_status(job=job)return submit_result

可以看到提交作業時,主要是在數據庫中的作業表 Job 生成對應的記錄,并將作業的數據與狀態同步給各個站點,并根據作業 job 的信息初始化生成對應的任務 task,最終實際執行時是以任務為單位進行的

資源申請

提交后作業的狀態變為 WAITING,在 DAGScheduler.run_do() 對 WAITING 狀態的作業進行了處理,可以看到如下所示:

def run_do(self):# 默認處理 WAITING 狀態的第一個創建的 job 進行處理,會分配必要的資源,處理結束狀態變為 RUNNINGjobs = JobSaver.query_job(is_initiator=True, status=JobStatus.WAITING, order_by="create_time", reverse=False)if len(jobs):job = jobs[0]self.schedule_waiting_jobs(job=job, lock=True)

可以看到實際處理的方法是 schedule_waiting_jobs() 方法,對應的代碼如下所示:

def schedule_waiting_jobs(cls, job):job_id, initiator_role, initiator_party_id, = job.f_job_id, job.f_initiator_role, job.f_initiator_party_id,# 檢查作業的前置依賴關系dependence_status_code, federated_dependence_response = FederatedScheduler.dependence_for_job(job=job)if dependence_status_code == FederatedSchedulingStatusCode.SUCCESS:# 申請相關資源apply_status_code, federated_response = FederatedScheduler.resource_for_job(job=job, operation_type=ResourceOperation.APPLY)if apply_status_code == FederatedSchedulingStatusCode.SUCCESS:# 啟動 job 執行,狀態更新至 RUNNINGcls.start_job(job_id=job_id, initiator_role=initiator_role, initiator_party_id=initiator_party_id)

在此階段,會申請作業執行所需的資源,資源申請時會調用依次調用各個站點對應的接口,分配必要的 CPU 與內存資源,對應的接口為 FATE-Flow/python/fate_flow/scheduling_apps/party_app.py 中的 /<job_id>/<role>/<party_id>/resource/apply 接口,最終調用 FATE-Flow/python/fate_flow/manager/resource_manager.py 中的 resource_for_job() 方法執行資源的獲取,此時會基于數據庫表 EngineRegistry 去做資源的動態分配限制。具體的分配策略的實現后續專門介紹,這邊就不具體展開了。
可以理解為這個階段結束,作業執行所需的資源就已經被占用,從而保證后續作業的順利執行

實際執行

實際作業的執行是在 DAGScheduler.run_do()中完成的,處理的狀態是在 RUNNING,可以看到如下所示:

def run_do(self):# 默認處理所有 RUNNING 狀態的 jobjobs = JobSaver.query_job(is_initiator=True, status=JobStatus.RUNNING, order_by="create_time", reverse=False)for job in jobs:self.schedule_running_job(job=job, lock=True)

可以看到實際的作業執行是在 schedule_running_job() 中完成的,此方法真正的任務執行是通過調用 TaskScheduler.schedule() 完成的,對應的代碼如下所示:


def schedule(cls, job, dsl_parser, canceled=False):initiator_tasks_group = JobSaver.get_tasks_asc(job_id=job.f_job_id, role=job.f_role, party_id=job.f_party_id)waiting_tasks = []# 獲取就緒的 tasksfor initiator_task in initiator_tasks_group.values():if initiator_task.f_status == TaskStatus.WAITING:waiting_tasks.append(initiator_task)# 執行所有就緒的 tasksfor waiting_task in waiting_tasks:status_code = cls.start_task(job=job, task=waiting_task)def start_task(cls, job, task):# 申請 task 相關的資源apply_status = ResourceManager.apply_for_task_resource(task_info=task.to_human_model_dict(only_primary_with=["status"]))if not apply_status:return SchedulingStatusCode.NO_RESOURCE# 更新狀態為 RUNNING , 并同步給各個站點task.f_status = TaskStatus.RUNNINGupdate_status = JobSaver.update_task_status(task_info=task.to_human_model_dict(only_primary_with=["status"]))FederatedScheduler.sync_task_status(job=job, task=task)# 實際調用參與方執行 taskstatus_code, response = FederatedScheduler.start_task(job=job, task=task)

可以看到作業的執行事實上是依次獲取所有就緒的任務 Task,然后執行 FederatedScheduler.start_task() 去執行 task 做成所需完成的功能,而 FederatedScheduler.start_task() 事實上就是發起一次請求調用 party_app.py 中的 /<job_id>/<component_name>/<task_id>/<task_version>/<role>/<party_id>/start 接口完成的
實際的任務執行是在 TaskController.start_task() 中完成,對應的代碼如下所示:

def start_task(cls, job_id, component_name, task_id, task_version, role, party_id, **kwargs):task_info = {"job_id": job_id,"task_id": task_id,"task_version": task_version,"role": role,"party_id": party_id,}# 根據 id 獲取對應的任務task = JobSaver.query_task(task_id=task_id, task_version=task_version, role=role, party_id=party_id)[0]task_info["engine_conf"] = {"computing_engine": run_parameters.computing_engine}# 根據執行環境選擇對應的 engine,目前主要是 eggroll 和 spark,默認是 eggrollbackend_engine = build_engine(run_parameters.computing_engine)# 實際執行對應的任務,對于 eggroll,會啟動新進程執行 python/fate_flow/worker/task_executor.py 腳本run_info = backend_engine.run(task=task,run_parameters=run_parameters,run_parameters_path=run_parameters_path,config_dir=config_dir,log_dir=job_utils.get_job_log_directory(job_id, role, party_id, component_name),cwd_dir=job_utils.get_job_directory(job_id, role, party_id, component_name),user_name=kwargs.get("user_id"))# 更新 task 相關的執行情況,執行正常的情況下狀態為 RUNNINGtask_info.update(run_info)task_info["start_time"] = current_timestamp()cls.update_task(task_info=task_info)task_info["party_status"] = TaskStatus.RUNNINGcls.update_task_status(task_info=task_info)

簡單理解任務 Task 最終只是根據作業在獨立進程中完成特定命令的執行,最終作業就是一系列任務的執行的組合。當所有任務完成時,作業也就完成了

進度更新

前面提到作業 job 的執行事實上僅僅是一系列對應的任務 task 的執行,因此 FATE-Flow 的進度更新也是根據任務 task 的完成的數量占所有 task 的數量來確定的。具體的代碼如下:

def schedule_running_job(cls, job: Job, force_sync_status=False):# 調度 job 進行執行task_scheduling_status_code, auto_rerun_tasks, tasks = TaskScheduler.schedule(job=job, dsl_parser=dsl_parser, canceled=job.f_cancel_signal)# 更新 job 執行的進度以及狀態tasks_status = dict([(task.f_component_name, task.f_status) for task in tasks])new_job_status = cls.calculate_job_status(task_scheduling_status_code=task_scheduling_status_code, tasks_status=tasks_status.values())# 根據 job 中已完成 task 的數量與總 task 的數量確定完成的進度total, finished_count = cls.calculate_job_progress(tasks_status=tasks_status)new_progress = float(finished_count) / total * 100if new_job_status != job.f_status or new_progress != job.f_progress:# 通知參與方更新 job 執行的進度信息if int(new_progress) - job.f_progress > 0:job.f_progress = new_progressFederatedScheduler.sync_job(job=job, update_fields=["progress"])cls.update_job_on_initiator(initiator_job=job, update_fields=["progress"])# 有狀態變化時通知相關方更新 job 狀態信息if new_job_status != job.f_status:job.f_status = new_job_statusFederatedScheduler.sync_job_status(job=job)cls.update_job_on_initiator(initiator_job=job, update_fields=["status"])# 處理結束,執行必要資源回收if EndStatus.contains(job.f_status):cls.finish(job=job, end_status=job.f_status)

可以看到最終就是調用 calculate_job_progress() 計算特定作業 job 中任務 task 完成的數量,最終確定完成的進度。
所有的處理處理結束時,調用 finish() 執行必要的資源回收

總結

本文對 FATE-Flow 的作業的完整執行流程進行了梳理,為了簡化刪除了大量異常分支的處理,有興趣的可以結合實際的 FATE-Flow v1.10.0 的源碼進行查看,應該會更有裨益

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/44414.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/44414.shtml
英文地址,請注明出處:http://en.pswp.cn/web/44414.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

React@16.x(56)Redux@4.x(5)- 實現 createStore

目錄 1&#xff0c;分析2&#xff0c;實現2.1&#xff0c;基礎實現2.2&#xff0c;優化2.2.1&#xff0c;隨機字符串2.2.2&#xff0c;action 的判斷2.2.2&#xff0c;監聽器的優化 2.3&#xff0c;最終形態 1&#xff0c;分析 createStore()&#xff0c;參數1為 reducer&…

0601STM32TIM

TOC 分為四部分&#xff0c;八小節 一部分&#xff1a;主要講定時器基本定時的功能&#xff0c;也就是定一個事件&#xff0c;讓定時器每隔這個時間產生一個中斷&#xff0c;來實現每隔一個固定時間來執行一段程序的目的&#xff0c;比如做一個時鐘、秒表&#xff0c;或者使用一…

【Linux】1w詳解如何實現一個簡單的shell

目錄 實現思路 1. 交互 獲取命令行 2. 子串分割 解析命令行 3. 指令的判斷 內建命令 4. 普通命令的執行 補充&#xff1a;vim 文本替換 整體代碼 重點思考 1.getenv和putenv是什么意思 2.代碼extern char **environ; 3.內建命令是什么 4.lastcode WEXITSTATUS(sta…

Java-final關鍵字詳解

Java-final關鍵字詳解 一、引言 二、什么是 final 關鍵字&#xff1f; 三、final 變量 final 局部變量 final 實例變量 final 靜態變量 四、final 方法 五、final 類 六、final 關鍵字的實際應用 1. 定義常量 2. 防止方法被重寫 3. 創建不可變類 4. 優化性能 七、…

切割01串(牛客小白月賽98)

題意&#xff1a; 給三個整數n&#xff0c;l&#xff0c;r&#xff0c;和一個字符串s&#xff0c;滿足l<|c0-c1|<r就可以切成字符串a和字符串b&#xff0c;c0為字符串a左側出現0的次數&#xff0c;c1為字符串b右側出現1的次數&#xff0c;求最多切割次數 知識點&#x…

Onnx 1-深度學習-概述1

Onnx 1-深度學習-概述1 一: Onnx 概念1> Onnx 介紹2> Onnx 的作用3> Onnx 應用場景4> Onnx 文件格式1. Protobuf 特點2. onnx.proto3協議3> Onnx 模型基本操作二:Onnx API1> 算子詳解2> Onnx 算子介紹三: Onnx 模型1> Onnx 函數功能

昇思學習打卡-8-計算機視覺/FCN圖像語義分割

目錄 FCN介紹FCN所用的技術訓練數據的可視化模型訓練模型推理FCN的優點和不足優點不足 FCN介紹 FCN主要用于圖像分割領域&#xff0c;是一種端到端的分割方法&#xff0c;是深度學習應用在圖像語義分割的開山之作。通過進行像素級的預測直接得出與原圖大小相等的label map。因…

【C++基礎】初識C++(2)--引用、const、inline、nullptr

目錄 一、引用 1.1 引用的概念和定義 1.2 引用的特性 1.3引用的使用 1.4 const引用 1.5 指針和引用的關系 二、inline 三、nullptr 一、引用 1.1 引用的概念和定義 引?不是新定義?個變量&#xff0c;?是給已存在變量取了?個別名&#xff0c;編譯器不會為引?…

微軟的人工智能語音生成器在測試中達到與人類同等水平

微軟公司開發了一種新的神經編解碼語言模型 Vall-E&#xff0c;在自然度、語音魯棒性和說話者相似性方面都超越了以前的成果。它是同類產品中第一個在兩個流行基準測試中達到人類同等水平的產品&#xff0c;而且顯然非常逼真&#xff0c;以至于微軟不打算向公眾開放。 VALL-E …

Node.js 模塊系統

Node.js 模塊系統 Node.js 的模塊系統是其核心特性之一,它允許開發者將代碼組織成可重用的模塊。這種系統促進了代碼的模塊化,使得大型應用程序的構建和管理變得更加容易。本文將深入探討 Node.js 的模塊系統,包括其工作原理、如何創建和使用模塊,以及模塊系統的優勢和局限…

【每日一練】python類和對象現實舉例詳細講解

""" 本節課程目的&#xff1a; 1.掌握類描述現實世界實物思想 2.掌握類和對象的關系 3.理解什么事面向對象 """ #比如設計一個鬧鐘&#xff0c;在這里就新建一個類 class Clock:idNone #鬧鐘的序列號&#xff0c;也就是類的屬性priceNone #鬧…

Git最常用操作速查表

Git常用操作 文章目錄 Git常用操作1. 克隆/拉取2. 分支操作1. 查看分支2. 創建分支3. 切換到分支4. 刪除分支5. 刪除遠程分支6. 推送分支到遠程 3. 暫存庫操作4. Git團隊規范1. 原則2. 分支設計3. commit備注一般規范 1. 克隆/拉取 git clone xxx 從遠程倉庫克隆 git rebase…

【開源之美】:WinMerge Files

一、引言 強大的windows端文件比較工具&#xff0c;跟Beyond Compare相比&#xff0c;更為強大。但是這里我們推薦他的原因&#xff0c;不僅是因為作為一個使用的工具&#xff0c;主要是因為他開源&#xff0c;可以通過調試優秀的源代碼&#xff0c;進一步的提升C項目設計和編…

Alternative to Receptive field in Transformers and what factors impact it

題意&#xff1a;Transformer中感受野的替代概念及其影響因素 問題背景&#xff1a; I have two transformer networks. One with 3 heads per attention and 15 layers in total and second one with 5 heads per layer and 30 layers in total. Given an arbitrary set of d…

什么是數據模型?數據模型與數據治理有什么關系?

在企業數據治理的廣闊領域中&#xff0c;首要且關鍵的一步是明確溝通數據治理的需求。這包括對企業所持有的數據種類、數據存儲位置、以及當前數據管理的具體情況有一個清晰的了解和記錄。了解企業的數據資產是制定有效數據治理策略的基礎。企業需要識別和盤點所有類型的數據資…

AIGC產品經理學習路徑

基礎篇&#xff08;課時 2 &#xff09; AIGC 行業視角 AIGC 的行業發展演進&#xff1a;傳統模型/深度學習/大模型 AIGC 的產品設計演進&#xff1a;AI Embedded / AI Copilot / AI Agen AIGC 的行業產業全景圖 AIGC 的產品應用全景圖 AIGC 職業視角 AI 產品經理/ AIGC…

2974.最小數字游戲

1.題目描述 你有一個下標從 0 開始、長度為 偶數 的整數數組 nums &#xff0c;同時還有一個空數組 arr 。Alice 和 Bob 決定玩一個游戲&#xff0c;游戲中每一輪 Alice 和 Bob 都會各自執行一次操作。游戲規則如下&#xff1a; 每一輪&#xff0c;Alice 先從 nums 中移除一個 …

Spring MVC 全面指南:從入門到精通的詳細解析

引言&#xff1a; Spring MVC&#xff0c;作為Spring框架的一個重要模塊&#xff0c;為構建Web應用提供了強大的功能和靈活性。無論是初學者還是有一定經驗的開發者&#xff0c;掌握Spring MVC都將顯著提升你的Web開發技能。本文旨在為初學者提供一個全面且易于理解的學習路徑…

數據建設實踐之大數據平臺(五)安裝hive

安裝hive 上傳安裝包到/opt/software目錄并解壓 [bigdata@node101 software]$ tar -zxvf hive-3.1.3-with-spark-3.3.1.tar.gz -C /opt/services [bigdata@node101 services]$ mv apache-hive-3.1.3-bin apache-hive-3.1.3 配置環境變量 export JAVA_HOME=/opt/services…

Debezium系列之:驗證mysql、mariadb等兼容mysql協議數據庫賬號權限

Debezium系列之:驗證mysql、mariadb等兼容mysql協議數據庫賬號權限 一、數據庫需要開啟binlog二、創建賬號和賬號需要賦予的權限三、賬號具有權限查看日志信息四、驗證賬號權限五、驗證賬號能否執行show master status六、驗證數據庫是否開啟binlog一、數據庫需要開啟binlog …