背景介紹
FATE 是隱私計算中最有名的開源項目了,從 star 的數量上來看也可以看出來。截止 2023 年 3 月共收獲 4.9k 個 star,但是 FATE 一直被認為代碼框架復雜,難以理解,作為一個相關的從業者,后續會持續對 FATE 項目的源碼進行解析,方便對隱私計算感興趣的后來者提供一點點幫助。
本文主要基于 FATE-Flow 2022 年 12 月發布的版本 v1.10.0,后續的版本可能略有差異。針對 FATE-Flow 的代碼,基于 v1.10.0 的做了一個代碼注解的倉庫,方便查看具體的代碼 https://github.com/hustyichi/FATE-Flow
Fate-Flow 基礎介紹
FATE-Flow 是 FATE 項目的重要組成部分,主要用于實現作業的調度,整體的設計可以查看 官方文檔
FATE 中提交的訓練作業會提交給 Fate-Flow,由 FATE-Flow 統一進行調度執行,最終生成所需訓練結果
Fate-Flow 是作為一個 Web 服務對外提供服務的,對應的初始啟動文件為 FATE-Flow/python/fate_flow/fate_flow_server.py
,熟悉 flask 的可以看到,最終就是調用 run_simple
創建了一個 Web 服務,根據 app 可以找到 Web 服務的主要代碼都在 FATE-Flow/python/fate_flow/apps
目錄下,路由注冊的代碼如下所示:
# 注冊 HTTP 路由,將 Fate-Flow/python/fate_flow/apps 以及 Fate-Flow/python/fate_flow/scheduling_apps 下所有 python 文件client_urls_prefix = [register_page(path)for path in search_pages_path(Path(__file__).parent)
]
scheduling_urls_prefix = [register_page(path)for path in search_pages_path(Path(__file__).parent.parent / 'scheduling_apps')
]
可以看到注冊的路由主要就是 apps 目錄與 scheduling_apps 目錄下的路由。
一個注意點:FATE-Flow 是沒辦法獨立運行的,需要作為 FATE 的一部分執行。 FATE-Flow 項目部分依賴的代碼,比如 fate_arch
是存在于 FATE 工程下,對應的路徑為 FATE/python/fate_arch
,找不到代碼時可以聯合 FATE 代碼倉庫進行閱讀
作業處理流程
作為一個作業調度的服務,最重要的就是完整的處理流程,先厘清這個主線,其他分支就更容易理解了,主要流程如下所示:
作業提交
作業提交是通過 FATE-Flow/python/fate_flow/apps/job_app.py
中的 submit_job
進行提交的,主要的處理都是通過 DAGScheduler.submit()
來完成的,簡化版本的代碼如下所示:
def submit(cls, submit_job_conf: JobConfigurationBase, job_id: str = None):# 沒有 id 時默認生成唯一 idif not job_id:job_id = job_utils.generate_job_id()submit_result = {"job_id": job_id}job = Job()job.f_job_id = job_idjob.f_dsl = dsljob.f_train_runtime_conf = train_runtime_confjob.f_roles = runtime_conf["role"]job.f_initiator_role = job_initiator["role"]job.f_initiator_party_id = job_initiator["party_id"]job.f_role = job_initiator["role"]job.f_party_id = job_initiator["party_id"]# 通知各個站點 (party) 去創建對應的作業 job 以及對應的任務 tasksstatus_code, response = FederatedScheduler.create_job(job=job)# 更新 job 狀態為 WAITINGjob.f_status = JobStatus.WAITING# 將 job 狀態同步給各個站點(party)status_code, response = FederatedScheduler.sync_job_status(job=job)return submit_result
可以看到提交作業時,主要是在數據庫中的作業表 Job 生成對應的記錄,并將作業的數據與狀態同步給各個站點,并根據作業 job 的信息初始化生成對應的任務 task,最終實際執行時是以任務為單位進行的
資源申請
提交后作業的狀態變為 WAITING,在 DAGScheduler.run_do()
對 WAITING 狀態的作業進行了處理,可以看到如下所示:
def run_do(self):# 默認處理 WAITING 狀態的第一個創建的 job 進行處理,會分配必要的資源,處理結束狀態變為 RUNNINGjobs = JobSaver.query_job(is_initiator=True, status=JobStatus.WAITING, order_by="create_time", reverse=False)if len(jobs):job = jobs[0]self.schedule_waiting_jobs(job=job, lock=True)
可以看到實際處理的方法是 schedule_waiting_jobs() 方法,對應的代碼如下所示:
def schedule_waiting_jobs(cls, job):job_id, initiator_role, initiator_party_id, = job.f_job_id, job.f_initiator_role, job.f_initiator_party_id,# 檢查作業的前置依賴關系dependence_status_code, federated_dependence_response = FederatedScheduler.dependence_for_job(job=job)if dependence_status_code == FederatedSchedulingStatusCode.SUCCESS:# 申請相關資源apply_status_code, federated_response = FederatedScheduler.resource_for_job(job=job, operation_type=ResourceOperation.APPLY)if apply_status_code == FederatedSchedulingStatusCode.SUCCESS:# 啟動 job 執行,狀態更新至 RUNNINGcls.start_job(job_id=job_id, initiator_role=initiator_role, initiator_party_id=initiator_party_id)
在此階段,會申請作業執行所需的資源,資源申請時會調用依次調用各個站點對應的接口,分配必要的 CPU 與內存資源,對應的接口為 FATE-Flow/python/fate_flow/scheduling_apps/party_app.py
中的 /<job_id>/<role>/<party_id>/resource/apply
接口,最終調用 FATE-Flow/python/fate_flow/manager/resource_manager.py
中的 resource_for_job()
方法執行資源的獲取,此時會基于數據庫表 EngineRegistry
去做資源的動態分配限制。具體的分配策略的實現后續專門介紹,這邊就不具體展開了。
可以理解為這個階段結束,作業執行所需的資源就已經被占用,從而保證后續作業的順利執行
實際執行
實際作業的執行是在 DAGScheduler.run_do()
中完成的,處理的狀態是在 RUNNING,可以看到如下所示:
def run_do(self):# 默認處理所有 RUNNING 狀態的 jobjobs = JobSaver.query_job(is_initiator=True, status=JobStatus.RUNNING, order_by="create_time", reverse=False)for job in jobs:self.schedule_running_job(job=job, lock=True)
可以看到實際的作業執行是在 schedule_running_job()
中完成的,此方法真正的任務執行是通過調用 TaskScheduler.schedule()
完成的,對應的代碼如下所示:
def schedule(cls, job, dsl_parser, canceled=False):initiator_tasks_group = JobSaver.get_tasks_asc(job_id=job.f_job_id, role=job.f_role, party_id=job.f_party_id)waiting_tasks = []# 獲取就緒的 tasksfor initiator_task in initiator_tasks_group.values():if initiator_task.f_status == TaskStatus.WAITING:waiting_tasks.append(initiator_task)# 執行所有就緒的 tasksfor waiting_task in waiting_tasks:status_code = cls.start_task(job=job, task=waiting_task)def start_task(cls, job, task):# 申請 task 相關的資源apply_status = ResourceManager.apply_for_task_resource(task_info=task.to_human_model_dict(only_primary_with=["status"]))if not apply_status:return SchedulingStatusCode.NO_RESOURCE# 更新狀態為 RUNNING , 并同步給各個站點task.f_status = TaskStatus.RUNNINGupdate_status = JobSaver.update_task_status(task_info=task.to_human_model_dict(only_primary_with=["status"]))FederatedScheduler.sync_task_status(job=job, task=task)# 實際調用參與方執行 taskstatus_code, response = FederatedScheduler.start_task(job=job, task=task)
可以看到作業的執行事實上是依次獲取所有就緒的任務 Task,然后執行 FederatedScheduler.start_task()
去執行 task 做成所需完成的功能,而 FederatedScheduler.start_task()
事實上就是發起一次請求調用 party_app.py
中的 /<job_id>/<component_name>/<task_id>/<task_version>/<role>/<party_id>/start
接口完成的
實際的任務執行是在 TaskController.start_task()
中完成,對應的代碼如下所示:
def start_task(cls, job_id, component_name, task_id, task_version, role, party_id, **kwargs):task_info = {"job_id": job_id,"task_id": task_id,"task_version": task_version,"role": role,"party_id": party_id,}# 根據 id 獲取對應的任務task = JobSaver.query_task(task_id=task_id, task_version=task_version, role=role, party_id=party_id)[0]task_info["engine_conf"] = {"computing_engine": run_parameters.computing_engine}# 根據執行環境選擇對應的 engine,目前主要是 eggroll 和 spark,默認是 eggrollbackend_engine = build_engine(run_parameters.computing_engine)# 實際執行對應的任務,對于 eggroll,會啟動新進程執行 python/fate_flow/worker/task_executor.py 腳本run_info = backend_engine.run(task=task,run_parameters=run_parameters,run_parameters_path=run_parameters_path,config_dir=config_dir,log_dir=job_utils.get_job_log_directory(job_id, role, party_id, component_name),cwd_dir=job_utils.get_job_directory(job_id, role, party_id, component_name),user_name=kwargs.get("user_id"))# 更新 task 相關的執行情況,執行正常的情況下狀態為 RUNNINGtask_info.update(run_info)task_info["start_time"] = current_timestamp()cls.update_task(task_info=task_info)task_info["party_status"] = TaskStatus.RUNNINGcls.update_task_status(task_info=task_info)
簡單理解任務 Task 最終只是根據作業在獨立進程中完成特定命令的執行,最終作業就是一系列任務的執行的組合。當所有任務完成時,作業也就完成了
進度更新
前面提到作業 job 的執行事實上僅僅是一系列對應的任務 task 的執行,因此 FATE-Flow 的進度更新也是根據任務 task 的完成的數量占所有 task 的數量來確定的。具體的代碼如下:
def schedule_running_job(cls, job: Job, force_sync_status=False):# 調度 job 進行執行task_scheduling_status_code, auto_rerun_tasks, tasks = TaskScheduler.schedule(job=job, dsl_parser=dsl_parser, canceled=job.f_cancel_signal)# 更新 job 執行的進度以及狀態tasks_status = dict([(task.f_component_name, task.f_status) for task in tasks])new_job_status = cls.calculate_job_status(task_scheduling_status_code=task_scheduling_status_code, tasks_status=tasks_status.values())# 根據 job 中已完成 task 的數量與總 task 的數量確定完成的進度total, finished_count = cls.calculate_job_progress(tasks_status=tasks_status)new_progress = float(finished_count) / total * 100if new_job_status != job.f_status or new_progress != job.f_progress:# 通知參與方更新 job 執行的進度信息if int(new_progress) - job.f_progress > 0:job.f_progress = new_progressFederatedScheduler.sync_job(job=job, update_fields=["progress"])cls.update_job_on_initiator(initiator_job=job, update_fields=["progress"])# 有狀態變化時通知相關方更新 job 狀態信息if new_job_status != job.f_status:job.f_status = new_job_statusFederatedScheduler.sync_job_status(job=job)cls.update_job_on_initiator(initiator_job=job, update_fields=["status"])# 處理結束,執行必要資源回收if EndStatus.contains(job.f_status):cls.finish(job=job, end_status=job.f_status)
可以看到最終就是調用 calculate_job_progress()
計算特定作業 job 中任務 task 完成的數量,最終確定完成的進度。
所有的處理處理結束時,調用 finish()
執行必要的資源回收
總結
本文對 FATE-Flow 的作業的完整執行流程進行了梳理,為了簡化刪除了大量異常分支的處理,有興趣的可以結合實際的 FATE-Flow v1.10.0 的源碼進行查看,應該會更有裨益