本文是"Dagster Pipes教程"的第一部分,介紹如何通過Dagster資產調用外部Python腳本并集成到數據管道中。首先,創建Dagster資產subprocess_asset
,利用PipesSubprocessClient
資源執行外部腳本external_code.py
,實現跨進程的數據處理。通過dagster dev
啟動UI,可在Dagster界面中監控子進程的執行狀態和日志輸出,包括標準輸出(stdout)內容。本文詳細講解了資產定義、資源注入及命令執行的完整流程,為后續修改外部代碼以支持Dagster Pipes通信奠定基礎。此方法適用于需要將現有腳本集成到Dagster數據管道的場景,提升自動化與可觀測性。完成本部分后,讀者可繼續學習第二部分,掌握如何增強外部腳本與Dagster的交互能力。
教程概述
本教程將指導你完成以下步驟:
- 創建一個調用外部Python腳本的Dagster資產
- 定義必要的Dagster資源(resources)
- 在Dagster UI中運行并查看結果
前提條件
在開始之前,請確保你已經:
- 安裝了Dagster
- 創建了一個名為
external_code.py
的獨立Python腳本,內容如下:
import pandas as pddef main():orders_df = pd.DataFrame({"order_id": [1, 2],"item_id": [432, 878]})total_orders = len(orders_df)print(f"processing total {total_orders} orders")
第一步:定義Dagster資產
首先,在與external_code.py
相同的目錄下創建一個名為dagster_code.py
的新文件。
1.1 創建資產定義
將以下代碼復制到dagster_code.py
中:
import shutil
import dagster as dg@dg.asset
def subprocess_asset(context: dg.AssetExecutionContext,pipes_subprocess_client: dg.PipesSubprocessClient
) -> dg.MaterializeResult:cmd = [shutil.which("python"),dg.file_relative_path(__file__, "external_code.py")]return pipes_subprocess_client.run(command=cmd,context=context).get_materialize_result()
代碼解析:
- 我們創建了一個名為
subprocess_asset
的資產 - 使用
AssetExecutionContext
作為上下文參數,它提供了系統信息如資源、配置和日志記錄 - 指定了
PipesSubprocessClient
資源 - 構建了一個命令列表來執行外部腳本
- 使用
pipes_subprocess_client.run()
方法在管道會話中同步執行子進程
1.2 從資產調用外部代碼
上述代碼中的關鍵部分是:
pipes_subprocess_client.run(command=cmd,context=context
).get_materialize_result()
這段代碼做了什么:
PipesSubprocessClient
資源暴露了一個run
方法- 當資產執行時,這個方法會在管道會話中同步執行子進程
- 返回一個
PipesClientCompletedInvocation
對象 - 可以使用
get_materialize_result()
方法訪問子進程報告的MaterializeResult
事件
第二步:定義Definitions對象
為了讓Dagster工具(如CLI、UI和Dagster+)能夠加載和訪問資產及子進程資源,我們需要創建一個Definitions
對象。
在dagster_code.py
文件末尾添加以下代碼:
from dagster import Definitionsdefs = Definitions(assets=[subprocess_asset],resources={"pipes_subprocess_client": dg.PipesSubprocessClient()}
)
此時,dagster_code.py
文件應該如下所示:
import shutil
import dagster as dg@dg.asset
def subprocess_asset(context: dg.AssetExecutionContext,pipes_subprocess_client: dg.PipesSubprocessClient
) -> dg.MaterializeResult:cmd = [shutil.which("python"),dg.file_relative_path(__file__, "external_code.py")]return pipes_subprocess_client.run(command=cmd,context=context).get_materialize_result()from dagster import Definitionsdefs = Definitions(assets=[subprocess_asset],resources={"pipes_subprocess_client": dg.PipesSubprocessClient()}
)
第三步:從Dagster UI運行子進程
現在,讓我們在Dagster UI中執行我們創建的子進程資產。
-
在新的命令行會話中運行以下命令啟動UI:
dagster dev -f dagster_code.py
-
點擊右上角的"Materialize"按鈕來運行你的代碼
-
導航到"Run details"頁面,在這里你可以看到運行的日志
-
在
external_code.py
中,我們有一個打印語句將輸出到stdout。Dagster會在UI的原始計算日志視圖中顯示這些內容。 -
要查看stdout日志,切換日志部分到stdout:
下一步
到目前為止,你已經創建了一個調用外部Python腳本的Dagster資產,在子進程中執行了代碼,并在Dagster UI中查看了結果。接下來,你將學習如何修改外部代碼以與Dagster Pipes配合工作,將信息發送回Dagster。
總結
通過本教程的第一部分,我們實現了:
- 創建了一個Dagster資產來調用外部Python腳本
- 配置了必要的資源來支持子進程執行
- 在Dagster UI中成功運行并查看了結果
這個基礎設置為你在后續步驟中實現更復雜的管道通信打下了良好的基礎。