在現代數據工程中,自動化和監控是確保數據管道高效運行的關鍵因素。Dagster作為一款強大的數據編排工具,提供了多種方式來實現這些目標。本文將深入探討如何使用Dagster Pipes修改外部代碼,以實現日志記錄、結構化元數據報告以及資產檢查等功能。
什么是Dagster Pipes?
Dagster Pipes是Dagster提供的一種機制,允許你在Dagster之外運行的代碼與Dagster內部的工作流進行交互。通過Dagster Pipes,你可以將現有的腳本或應用程序集成到Dagster的數據管道中,并實現信息的雙向流動。這不僅提高了代碼的復用性,還增強了管道的可監控性和可維護性。
修改外部代碼的步驟
假設我們有一個獨立的Python腳本external_code.py
,我們希望將其與Dagster集成,并實現日志記錄和結構化元數據的報告。同時,我們還有一個Dagster定義文件dagster_code.py
,其中包含了一個Dagster資產和其他相關定義。
步驟1:在外部代碼中引入Dagster上下文
首先,我們需要在external_code.py
中引入Dagster Pipes的相關模塊,并初始化Dagster Pipes上下文。這可以通過調用open_dagster_pipes()
函數來實現,該函數會返回一個上下文管理器,用于管理Dagster Pipes連接的生命周期。
from dagster_pipes import PipesContext, open_dagster_pipes
import pandas as pddef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 獲取Dagster Pipes上下文with open_dagster_pipes() as context:print(f"processing total {total_orders} orders")
步驟2:發送日志消息到Dagster
接下來,我們可以使用context.log
方法將日志消息發送回Dagster。這比直接打印到標準輸出更加靈活,因為日志消息可以在Dagster UI中進行過濾和查看。
def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 獲取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")
在Dagster UI的Run details頁面中,你可以通過選擇日志級別來過濾出info級別的日志消息。
步驟3:發送結構化元數據到Dagster
除了日志消息,我們還可以發送結構化元數據到Dagster。這對于報告資產的狀態、數據質量檢查結果等信息非常有用。
報告資產物化
我們可以使用context.report_asset_materialization
方法來報告資產物化的元數據。例如,我們可以報告處理的總訂單數。
def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 獲取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})
報告資產檢查
如果我們的資產有定義數據質量檢查,我們還可以通過context.report_asset_check
方法來報告檢查的結果。
def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 獲取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 報告數據質量檢查結果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)
在Dagster UI中,你可以在Asset Details頁面的Events和Checks標簽頁中查看這些事件和檢查結果。
完整代碼示例
外部代碼 external_code.py
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipesdef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 獲取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 報告數據質量檢查結果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)
Dagster代碼 dagster_code.py
import shutil
import dagster as dg
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipesdef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 獲取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 報告數據質量檢查結果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)@dg.asset(check_specs=[dg.AssetCheckSpec(name="no_empty_order_check", asset="subprocess_asset")],
)
def subprocess_asset(context: dg.AssetExecutionContext, pipes_subprocess_client: dg.PipesSubprocessClient
):cmd = [shutil.which("python"),dg.file_relative_path(__file__, "external_code.py"),]return pipes_subprocess_client.run(command=cmd, context=context).get_materialize_result()defs = dg.Definitions(assets=[subprocess_asset],resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},
)
總結
通過上述步驟,我們成功地將一個獨立的Python腳本與Dagster集成,并實現了日志記錄和結構化元數據的報告。這不僅提高了代碼的可維護性,還增強了數據管道的監控能力。你可以進一步探索Dagster Pipes的其他功能,如自定義協議和與其他系統的集成,以滿足更復雜的需求。