生產級編排AI工作流套件:Flyte全面使用指南 — Core concepts Launch plans
Flyte 是一個開源編排器,用于構建生產級數據和機器學習流水線。它以 Kubernetes 作為底層平臺,注重可擴展性和可重復性。借助 Flyte,用戶團隊可以使用 Python SDK 構建流水線,并將其無縫部署在云端和本地環境中,從而實現分布式處理和高效的資源利用。
文中內容僅限技術學習與代碼實踐參考,市場存在不確定性,技術分析需謹慎驗證,不構成任何投資建議。
啟動計劃
啟動計劃是工作流調用的模板。它將以下元素整合在一起:
- 一個工作流
- 啟動該工作流所需的(可能不完整)輸入參數集合
- 可選的通知和調度計劃
當調用時,啟動計劃會傳遞輸入參數來啟動工作流。如果啟動計劃未包含所有必需的工作流輸入參數,則需要在執行時提供額外的輸入參數。
默認啟動計劃
每個工作流自動附帶一個_默認啟動計劃_。該啟動計劃不定義任何默認輸入參數,因此所有參數必須在執行時提供。默認啟動計劃始終與其對應工作流同名。
啟動計劃具有版本控制
與任務和工作流一樣,啟動計劃具有版本控制。可以更新啟動計劃來更改輸入參數集合、調度計劃或通知配置。每次更新都會創建新的啟動計劃版本。
自定義啟動計劃
除了默認啟動計劃外,可以為任何工作流定義額外的啟動計劃。通常,一個工作流可以關聯多個啟動計劃,但每個啟動計劃只能關聯一個特定工作流。
查看工作流的啟動計劃
要查看指定工作流的啟動計劃,在UI中導航至工作流頁面并點擊Launch Workflow。從Launch Plan下拉菜單中可選擇用于啟動工作流的啟動計劃。默認情況下會選中默認啟動計劃。如果未為該工作流定義任何自定義啟動計劃,則僅顯示默認計劃。若已定義自定義啟動計劃,它們將與默認計劃一起顯示在下拉菜單中。更多細節請參考運行啟動計劃。
注冊啟動計劃
通過命令行注冊啟動計劃
大多數情況下,啟動計劃與項目代碼中的工作流和任務一起定義,并通過CLI與其他實體一起批量注冊(參見運行代碼)。
使用FlyteRemote
在Python中注冊啟動計劃
與所有Flyte命令行操作類似,您也可以通過編程方式使用FlyteRemote
注冊啟動計劃,具體方法是調用FlyteRemote.register_launch_plan
。
注冊結果
當上述代碼注冊到Flyte時,會創建四個對象:
- 任務
workflows.launch_plan_example.my_task
- 工作流
workflows.launch_plan_example.my_workflow
- 默認啟動計劃
workflows.launch_plan_example.my_workflow
(注意其名稱與工作流相同) - 自定義啟動計劃
my_workflow_custom_lp
(即我們在代碼中定義的那個)
修改啟動計劃
通過修改代碼中的定義并重新注冊來更改啟動計劃。當重新注冊具有相同項目(project)、域(domain)和名稱的啟動計劃時,將創建該啟動計劃的新版本。
定義啟動計劃
您可以使用 LaunchPlan
類 定義啟動計劃。
以下是一個定義啟動計劃的簡單示例:
import flytekit as fl@fl.workflow
def my_workflow(a: int, b: str) -> str:return f"Result: {a} and {b}"# 創建默認啟動計劃
default_lp = @fl.LaunchPlan.get_or_create(workflow=my_workflow)# 創建命名啟動計劃
named_lp = @fl.LaunchPlan.get_or_create(workflow=my_workflow,name="my_custom_launch_plan"
)
默認與固定輸入
默認輸入可在執行時被覆蓋,而固定輸入不可修改。
import flytekit as fl# 帶默認輸入的啟動計劃
lp_with_defaults = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="with_defaults",default_inputs={"a": 42, "b": "default_value"}
)# 帶固定輸入的啟動計劃
lp_with_fixed = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="with_fixed",fixed_inputs={"a": 100} # 'a' 將始終為 100,只有 'b' 可被指定
)# 組合默認與固定輸入
lp_combined = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="combined_inputs",default_inputs={"b": "default_string"},fixed_inputs={"a": 200}
)
定時執行
import fl
from datetime import timedelta
from flytekit.core.schedule import CronSchedule, FixedRate# 使用 cron 調度(每周一 UTC 時間 10:00 AM 運行)
cron_lp = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="weekly_monday",default_inputs={"a": 1, "b": "weekly"},schedule=CronSchedule(schedule="0 10 * * 1", # Cron 表達式: 分鐘 小時 日 月 周幾kickoff_time_input_arg=None)
)# 使用固定頻率調度(每 6 小時運行)
fixed_rate_lp = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="every_six_hours",default_inputs={"a": 1, "b": "periodic"},schedule=FixedRate(duration=timedelta(hours=6))
)
標簽與注解
標簽和注解有助于組織管理,可用于過濾或添加元數據。
import fl
from flytekit.models.common import Labels, Annotations# 添加標簽和注解
lp_with_metadata = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="with_metadata",default_inputs={"a": 1, "b": "metadata"},labels=Labels({"team": "data-science", "env": "staging"}),annotations=Annotations({"description": "測試用啟動計劃", "owner": "jane.doe"})
)
執行參數
import fl# 設置最大并行度限制并發任務執行
lp_with_parallelism = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="with_parallelism",default_inputs={"a": 1, "b": "parallel"},max_parallelism=10 # 最多允許 10 個任務節點并發執行
)# 禁用該啟動計劃的執行緩存
lp_no_cache = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="no_cache",default_inputs={"a": 1, "b": "fresh"},overwrite_cache=True # 總是全新執行,忽略緩存結果
)# 注冊時自動激活
lp_auto_activate = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="auto_active",default_inputs={"a": 1, "b": "active"},auto_activate=True # 注冊后立即激活啟動計劃
)
安全與認證
我們可以覆蓋用于執行啟動計劃的認證角色(IAM 角色或 Kubernetes 服務賬戶)。
import fl
from flytekit.models.common import AuthRole
from flytekit import SecurityContext# 為啟動計劃設置認證角色
lp_with_auth = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="with_auth",default_inputs={"a": 1, "b": "secure"},auth_role=AuthRole(assumable_iam_role="arn:aws:iam::12345678:role/my-execution-role")
)# 設置安全上下文
lp_with_security = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="with_security",default_inputs={"a": 1, "b": "context"},security_context=SecurityContext(run_as=SecurityContext.K8sServiceAccount(name="my-service-account"))
)
原始輸出數據配置
from flytekit.models.common import RawOutputDataConfig# 配置大型輸出存儲位置
lp_with_output_config = LaunchPlan.get_or_create(workflow=my_workflow,name="with_output_config",default_inputs={"a": 1, "b": "output"},raw_output_data_config=RawOutputDataConfig(output_location_prefix="s3://my-bucket/workflow-outputs/")
)
完整整合示例
以下是一個較為全面的示例。該自定義啟動計劃包含:
comprehensive_lp = LaunchPlan.get_or_create(workflow=my_workflow,name="comprehensive_example",default_inputs={"b": "configurable"},fixed_inputs={"a": 42},schedule=CronSchedule(schedule="0 9 * * *"), # 每日 UTC 時間 9:00 AMnotifications=[\Notification(\phases=["SUCCEEDED", "FAILED"],\email=EmailNotification(recipients_email=["team@example.com"])\)\],labels=Labels({"env": "production", "team": "data"}),annotations=Annotations({"description": "每日數據處理"}),max_parallelism=20,overwrite_cache=False,auto_activate=True,auth_role=AuthRole(assumable_iam_role="arn:aws:iam::12345678:role/workflow-role"),raw_output_data_config=RawOutputDataConfig(output_location_prefix="s3://results-bucket/daily-run/")
)
這些示例展示了 Flyte 中啟動計劃的靈活性,您可以根據工作流需求自定義執行參數、輸入、調度等多種配置。
查看啟動計劃
通過用戶界面查看啟動計劃
在側邊欄選擇Launch Plans,將顯示項目與域中所有已注冊啟動計劃的列表:
您可以通過以下方式篩選:
- 按名稱搜索啟動計劃
- 過濾僅顯示已歸檔的啟動計劃
啟動計劃表格列定義如下:
- 名稱:啟動計劃的名稱。點擊可查看具體詳情
- 觸發器:
- 若啟動計劃處于激活狀態,會顯示綠色Active徽章。激活狀態下,所有關聯的調度將生效并按計劃觸發
- 顯示是否包含觸發器。可通過右上角Has Triggers復選框過濾含觸發器的啟動計劃
- 最后執行:最近一次執行的時間戳(包含調度觸發、手動觸發等所有方式)
- 最近10次執行:以可視化方式展示最近10次執行記錄(包含所有觸發方式)
點擊列表條目可進入具體啟動計劃視圖:
在此界面可查看:
- 啟動計劃詳情(最新版本):
- 預期輸入:啟動計劃的輸入輸出類型
- 固定輸入:顯示預定義的輸入值(如有)
- 啟動計劃版本:該計劃的所有歷史版本列表
- 所有執行記錄:該計劃的所有執行歷史
右上角顯示激活狀態(若激活則顯示具體激活版本),并提供版本切換或完全停用的控制選項。詳見激活與停用
通過uctl
命令行查看啟動計劃
查看項目與域中所有啟動計劃:
$ uctl get launchplans \--project <project-id> \--domain <domain>
查看具體啟動計劃:
$ uctl get launchplan \--project <project-id> \--domain <domain> \<launch-plan-name>
更多詳情請參考Uctl CLI文檔
通過Python的FlyteRemote
查看啟動計劃
使用FlyteRemote.client.list_launch_plans_paginated
方法獲取啟動計劃列表。
通知
一個啟動計劃(launch plan)可以關聯一個或多個通知,當該啟動計劃關聯的工作流(workflow)執行完成時,這些通知會被觸發。
共有三種類型的通知:
Email
: 向指定收件人發送電子郵件PagerDuty
: 向配置的PagerDuty服務發送通知(需指定接收方)。PagerDuty將根據您的配置轉發通知Slack
: 向指定Slack頻道的關聯郵箱地址發送通知。此功能要求預先配置Slack賬戶以接收通知
可以根據工作流執行的不同最終狀態發送對應的通知。可選狀態包括:
WorkflowExecutionPhase.ABORTED
(執行中止)WorkflowExecutionPhase.FAILED
(執行失敗)WorkflowExecutionPhase.SUCCEEDED
(執行成功)WorkflowExecutionPhase.TIMED_OUT
(執行超時)
示例:
from datetime import datetimeimport flytekit as flfrom flytekit import (WorkflowExecutionPhase,Email,PagerDuty,Slack
)@fl.task
def add_numbers(a: int, b: int, c: int) -> int:return a + b + c@fl.task
def generate_message(s: int, kickoff_time: datetime) -> str:return f"sum: {s} at {kickoff_time}"@fl.workflow
def my_workflow(a: int, b: int, c: int, kickoff_time: datetime) -> str:return generate_message(add_numbers(a, b, c),kickoff_time,)fl.LaunchPlan.get_or_create(workflow=my_workflow,name="my_workflow_custom_lp",fixed_inputs={"a": 3},default_inputs={"b": 4, "c": 5},notifications=[\Email(\phases=[WorkflowExecutionPhase.FAILED],\recipients_email=["me@example.com", "you@example.com"],\),\PagerDuty(\phases=[WorkflowExecutionPhase.SUCCEEDED],\recipients_email=["myboss@example.com"],\),\Slack(\phases=[\WorkflowExecutionPhase.SUCCEEDED,\WorkflowExecutionPhase.ABORTED,\WorkflowExecutionPhase.TIMED_OUT,\],\recipients_email=["your_slack_channel_email"],\),\],
)
調度計劃
啟動計劃允許您對工作流的定時調用進行調度。一個啟動計劃可以關聯一個或多個調度方案,但同一時間最多只能有一個調度處于激活狀態。如果在啟動計劃上激活了調度,系統將按照預定時間自動調用工作流,并使用啟動計劃提供的輸入參數。
要為啟動計劃添加調度方案,請按以下方式向啟動計劃添加調度對象:
from datetime import timedeltaimport flytekit as fl
from flytekit import FixedRate@fl.task
def my_task(a: int, b: int, c: int) -> int:return a + b + c@fl.workflow
def my_workflow(a: int, b: int, c: int) -> int:return my_task(a=a, b=b, c=c)fl.LaunchPlan.get_or_create(workflow=my_workflow,name="my_workflow_custom_lp",fixed_inputs={"a": 3},default_inputs={"b": 4, "c": 5},schedule=FixedRate(duration=timedelta(minutes=10))
)
這里我們指定了FixedRate調度方案,系統將每10分鐘調用一次工作流。固定頻率調度也可以使用天數或小時數來定義。
或者,您也可以指定CronSchedule:
import flytekit as fl
from flytekit import CronSchedule@fl.task
def my_task(a: int, b: int, c: int) -> int:return a + b + c@fl.workflow
def my_workflow(a: int, b: int, c: int) -> int:return my_task(a=a, b=b, c=c)fl.LaunchPlan.get_or_create(workflow=my_workflow,name="my_workflow_custom_lp",fixed_inputs={"a": 3},default_inputs={"b": 4, "c": 5},schedule=CronSchedule(schedule="*/10 * * * *")
)
kickoff_time_input_arg
FixedRate
和CronSchedule
都可以接受名為kickoff_time_input_arg
的可選參數。
該參數用于指定工作流輸入參數的名稱。每次系統通過此調度調用工作流時,調用的時間將通過指定的參數傳遞給工作流。例如:
from datetime import datetime, timedeltaimport flytekit as fl
from flytekit import FixedRate@fl.task
def my_task(a: int, b: int, c: int) -> int:return a + b + c@fl.workflow
def my_workflow(a: int, b: int, c: int, kickoff_time: datetime ) -> str:return f"sum: {my_task(a=a, b=b, c=c)} at {kickoff_time}"fl.LaunchPlan.get_or_create(workflow=my_workflow,name="my_workflow_custom_lp",fixed_inputs={"a": 3},default_inputs={"b": 4, "c": 5},schedule=FixedRate(duration=timedelta(minutes=10),kickoff_time_input_arg="kickoff_time")
)
在此示例中,每次調度調用my_workflow
時,調用時間都會通過kickoff_time
參數傳遞。
激活與停用
您可以為啟動計劃設置激活/停用狀態。具體規則如下:
-
在具有相同名稱的啟動計劃版本中,最多只能有一個版本處于激活狀態,其他所有版本均為停用狀態
-
若激活包含執行計劃的啟動計劃版本,其關聯的執行計劃也會自動激活,工作流將按照該計劃自動觸發
-
當包含執行計劃的啟動計劃版本處于停用狀態時,其關聯的執行計劃也會停用,不會用于觸發工作流程
未關聯執行計劃的啟動計劃也可以設置激活版本。對于此類非計劃型啟動計劃,激活狀態可作為版本標識,用于區分不同版本。例如,管理邏輯可依據此狀態決定使用哪個版本進行新調用。
新注冊的啟動計劃首個版本默認為停用狀態。若該版本包含執行計劃,該計劃同樣處于停用狀態。一旦激活,該版本將保持激活狀態,即使后續注冊新版本也不會改變其狀態。
包含執行計劃的啟動計劃版本可通過以下方式激活:用戶界面、uctl
命令行工具或FlyteRemote
通過用戶界面激活/停用啟動計劃
激活操作步驟:
- 進入啟動計劃視圖
- 點擊屏幕右上角的添加激活啟動計劃按鈕:
- 在彈出的模態框中選擇要激活的版本:
該列表僅顯示包含執行計劃的版本。注意同一時間只能激活一個版本(即最多一個執行計劃)。
選擇版本后點擊更新即可激活該版本及其執行計劃。系統將根據執行計劃定期觸發工作流。
注意:
- 未關聯執行計劃的啟動計劃無法通過UI激活
- UI不支持管理無執行計劃的啟動計劃狀態,需使用
uctl
或FlyteRemote
停用操作步驟:
- 定位到包含激活計劃的啟動計劃
- 點擊Active launch plan旁的**…**圖標
- 選擇"停用"選項:
- 在確認模態框中完成停用操作
注意:
- 未關聯執行計劃的啟動計劃無法通過UI停用
- 需使用
uctl
或FlyteRemote
管理無執行計劃的啟動計劃狀態
使用uctl
命令行工具管理啟動計劃狀態
激活命令:
$ uctl update launchplan \--activate \--project <project-id> \--domain <domain> \<launch-plan-name> \--version <launch-plan-version>
停用命令:
$ uctl update launchplan \--deactivate \--project <project-id> \--domain <domain> \<launch-plan-name> \--version <launch-plan-version>
詳細說明請參考Uctl CLI文檔
使用Python的FlyteRemote
管理啟動計劃狀態
激活示例代碼:
from union.remote import FlyteRemote
from flytekit.configuration import Configremote = FlyteRemote(config=Config.auto(), default_project=<project-id>, default_domain=<domain>)
launch_plan = remote.fetch_launch_plan(ame=<launch-plan-name>, version=<launch-plan-version>).id
remote.client.update_launch_plan(launch_plan.id, "ACTIVE")
停用示例代碼:
from union.remote import FlyteRemote
from flytekit.remote import Configremote = FlyteRemote(config=Config.auto(), default_project=<project-id>, default_domain=<domain>)
launch_plan = remote.fetch_launch_plan(ame=<launch-plan-name>, version=<launch-plan-version>)
remote.client.update_launch_plan(launch_plan.id, "INACTIVE")
運行啟動計劃
在用戶界面中運行啟動計劃
要調用啟動計劃,請進入工作流列表,選擇目標工作流,點擊啟動工作流。在新執行對話框中,從啟動計劃下拉菜單中選擇目標啟動計劃,然后點擊啟動。
使用 uctl
命令行運行啟動計劃
要通過命令行調用啟動計劃,首先生成啟動計劃的執行規范文件:
$ uctl get launchplan \--project <project-id>--domain <domain> \<launch-plan-name> \--execFile <execution-spec-file-name>.yaml
然后使用以下命令執行啟動計劃:
$ uctl create execution \--project <project-id> \--domain <domain> \--execFile <execution-spec-file-name>.yaml
更多細節請參閱 Uctl CLI。
使用 FlyteRemote
在 Python 中運行啟動計劃
以下代碼使用 FlyteRemote
執行啟動計劃:
import flytekit as fl
from flytekit.remote import Configremote = fl.FlyteRemote(config=Config.auto(), default_project=<project-id>, default_domain=<domain>)
launch_plan = remote.fetch_launch_plan(name=<launch-plan-name>, version=<launch-plan-version>)
remote.execute(launch_plan, inputs=<inputs>)
更多細節請參閱 FlyteRemote。
子啟動計劃
上述調用示例假設您希望將啟動計劃作為項目中的頂級實體運行。但您也可以從_工作流內部_調用啟動計劃,創建_子啟動計劃_。這將使被調用的啟動計劃觸發其對應工作流,并向該工作流傳遞指定的參數。
這與子工作流的情況不同——當您在一個工作流函數中調用另一個工作流函數時,子工作流會成為父工作流執行圖的一部分,并共享相同的 execution ID 和執行上下文。而調用子啟動計劃時,會啟動一個完整的頂級工作流,該工作流擁有獨立的 execution ID 和執行上下文。
更多細節請參閱子工作流與子啟動計劃。
引用啟動計劃
引用啟動計劃是指引用先前已定義、序列化并注冊的啟動計劃。您可以跨項目引用其他啟動計劃,創建使用他人聲明的啟動計劃的工作流。
創建引用啟動計劃時,請務必驗證工作流接口是否與引用工作流的接口一致。
引用啟動計劃無法在本地運行。若需本地測試,請使用模擬實現。
示例
本例演示如何為Flytesnacks倉庫中的simple_wf
工作流創建引用啟動計劃。
-
克隆Flytesnacks倉庫:
git clone git@github.com:flyteorg/flytesnacks.git
-
進入
basics
目錄:cd flytesnacks/examples/basics
-
注冊
simple_wf
工作流:pyflyte register --project flytesnacks --domain development --version v1 basics/workflow.py.
-
創建
simple_wf_ref_lp.py
文件并復制以下代碼:import flytekit as fl from flytekit import reference_launch_plan@reference_launch_plan(project="flytesnacks",domain="development",name="basics.workflow.simple_wf",version="v1", )def simple_wf_lp(x: list[int], y: list[int] ) -> float:return 1.0@fl.workflow def run_simple_wf() -> float:x = [-8, 2, 4]y = [-2, 4, 7]return simple_wf_lp(x=x, y=y)
-
注冊
run_simple_wf
工作流:pyflyte register simple_wf_ref_lp.py
-
在Flyte UI中運行
run_simple_wf
工作流
風險提示與免責聲明
本文內容基于公開信息研究整理,不構成任何形式的投資建議。歷史表現不應作為未來收益保證,市場存在不可預見的波動風險。投資者需結合自身財務狀況及風險承受能力獨立決策,并自行承擔交易結果。作者及發布方不對任何依據本文操作導致的損失承擔法律責任。市場有風險,投資須謹慎。