Dagster Pipes系列-2:增強外部腳本與Dagster的交互能力

在現代數據工程中,自動化和監控是確保數據管道高效運行的關鍵因素。Dagster作為一款強大的數據編排工具,提供了多種方式來實現這些目標。本文將深入探討如何使用Dagster Pipes修改外部代碼,以實現日志記錄、結構化元數據報告以及資產檢查等功能。

什么是Dagster Pipes?

Dagster Pipes是Dagster提供的一種機制,允許你在Dagster之外運行的代碼與Dagster內部的工作流進行交互。通過Dagster Pipes,你可以將現有的腳本或應用程序集成到Dagster的數據管道中,并實現信息的雙向流動。這不僅提高了代碼的復用性,還增強了管道的可監控性和可維護性。

在這里插入圖片描述

修改外部代碼的步驟

假設我們有一個獨立的Python腳本external_code.py,我們希望將其與Dagster集成,并實現日志記錄和結構化元數據的報告。同時,我們還有一個Dagster定義文件dagster_code.py,其中包含了一個Dagster資產和其他相關定義。

步驟1:在外部代碼中引入Dagster上下文

首先,我們需要在external_code.py中引入Dagster Pipes的相關模塊,并初始化Dagster Pipes上下文。這可以通過調用open_dagster_pipes()函數來實現,該函數會返回一個上下文管理器,用于管理Dagster Pipes連接的生命周期。

from dagster_pipes import PipesContext, open_dagster_pipes
import pandas as pddef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 獲取Dagster Pipes上下文with open_dagster_pipes() as context:print(f"processing total {total_orders} orders")

步驟2:發送日志消息到Dagster

接下來,我們可以使用context.log方法將日志消息發送回Dagster。這比直接打印到標準輸出更加靈活,因為日志消息可以在Dagster UI中進行過濾和查看。

def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 獲取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")

在Dagster UI的Run details頁面中,你可以通過選擇日志級別來過濾出info級別的日志消息。
在這里插入圖片描述

步驟3:發送結構化元數據到Dagster

除了日志消息,我們還可以發送結構化元數據到Dagster。這對于報告資產的狀態、數據質量檢查結果等信息非常有用。

報告資產物化

我們可以使用context.report_asset_materialization方法來報告資產物化的元數據。例如,我們可以報告處理的總訂單數。

def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 獲取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})
報告資產檢查

如果我們的資產有定義數據質量檢查,我們還可以通過context.report_asset_check方法來報告檢查的結果。

def main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 獲取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 報告數據質量檢查結果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)

在Dagster UI中,你可以在Asset Details頁面的Events和Checks標簽頁中查看這些事件和檢查結果。
在這里插入圖片描述

完整代碼示例

外部代碼 external_code.py

import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipesdef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 獲取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 報告數據質量檢查結果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)

Dagster代碼 dagster_code.py

import shutil
import dagster as dg
import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipesdef main():orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})total_orders = len(orders_df)# 獲取Dagster Pipes上下文with open_dagster_pipes() as context:context.log.info(f"processing total {total_orders} orders")context.report_asset_materialization(metadata={"total_orders": total_orders})# 報告數據質量檢查結果context.report_asset_check(passed=orders_df[["item_id"]].notnull().all().bool(),check_name="no_empty_order_check",)@dg.asset(check_specs=[dg.AssetCheckSpec(name="no_empty_order_check", asset="subprocess_asset")],
)
def subprocess_asset(context: dg.AssetExecutionContext, pipes_subprocess_client: dg.PipesSubprocessClient
):cmd = [shutil.which("python"),dg.file_relative_path(__file__, "external_code.py"),]return pipes_subprocess_client.run(command=cmd, context=context).get_materialize_result()defs = dg.Definitions(assets=[subprocess_asset],resources={"pipes_subprocess_client": dg.PipesSubprocessClient()},
)

總結

通過上述步驟,我們成功地將一個獨立的Python腳本與Dagster集成,并實現了日志記錄和結構化元數據的報告。這不僅提高了代碼的可維護性,還增強了數據管道的監控能力。你可以進一步探索Dagster Pipes的其他功能,如自定義協議和與其他系統的集成,以滿足更復雜的需求。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/80722.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/80722.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/80722.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

C++類和對象進階 —— 與數據結構的結合

🎁個人主頁:工藤新一 🔍系列專欄:C面向對象(類和對象篇) 🌟心中的天空之城,終會照亮我前方的路 🎉歡迎大家點贊👍評論📝收藏?文章 文章目錄 […

Java中進階并發編程

第一章、并發編程的挑戰 并發和并行:指多線程或多進程 線程的本質:操作系統能夠進行運算調度的最小單位,是進程(Process)中的實際工作單元 進程的本質:操作系統進行資源分配和調度的基本單位&#xff0c…

《 指針變量類型與內存訪問:揭秘背后的奧秘》

🚀個人主頁:BabyZZの秘密日記 📖收入專欄:C語言 🌍文章目入 一、指針變量類型的基本概念二、指針類型與內存訪問字節數的關系(一)整型指針(二)字符型指針(三&…

mapbox進階,使用mapbox-plugins插件加載餅狀圖

????? 主頁: gis分享者 ????? 感謝各位大佬 點贊?? 收藏? 留言?? 加關注?! ????? 收錄于專欄:mapbox 從入門到精通 文章目錄 一、??前言1.1 ??mapboxgl.Map 地圖對象1.1 ??mapboxgl.Map style屬性二、??使用mapbox-plugins插件加載餅狀圖1. ?…

GraphicLayer與BusineDataLayer層級控制

補充說明: 當參與層級控制的元素是點型元素時,是無法參與ZIndex層級控制的,此時可以換個解決方案 1.給不同的高度值實現,元素間的層級控制覆蓋 import * as mars3d from "mars3d"export let map // mars3d.Map三維地…

uniapp 百家云直播插件打包失敗

打包錯誤日志 Android自有證書 打包失敗 錯誤日志: https://app.liuyingyong.cn/build/errorLog/cf41a610-effe-11ef-88db-05262d4c3e5d原因:需要導入插件依賴 依賴地址:https://ext.dcloud.net.cn/plugin?id16289 百家云直播插件地址 直播插…

【C++】”如虎添翼“:模板初階

泛型編程: C中一種使用模板來實現代碼重用和類型安全的編程范式。它允許程序員編寫與數據類型無關的代碼,從而可以用相同的代碼邏輯處理不同的數據類型。模板是泛型編程的基礎 模板分為兩類: 函數模板:代表了一個函數家族&#x…

十五、多態與虛函數

十五、多態與虛函數 15.1 引言 面向對象編程的基本特征:數據抽象(封裝)、繼承、多態基于對象:我們創建類和對象,并向這些對象發送消息多態(Polymorphism):指的是相同的接口、不同的…

點云特征提取的兩大經典范式:Voxel-based 與 Pillar-based

點云特征提取的兩大經典范式:Voxel-based 與 Pillar-based 在點云處理領域,尤其是針對 3D 目標檢測任務,特征提取是核心環節之一。目前,Voxel-based(體素化)和 Pillar-based(柱狀化&#xff09…

前蘋果首席設計官回顧了其在蘋果的設計生涯、公司文化、標志性產品的背后故事

每周跟蹤AI熱點新聞動向和震撼發展 想要探索生成式人工智能的前沿進展嗎?訂閱我們的簡報,深入解析最新的技術突破、實際應用案例和未來的趨勢。與全球數同行一同,從行業內部的深度分析和實用指南中受益。不要錯過這個機會,成為AI領…

web 自動化之 selenium 元素四大操作三大切換等待

文章目錄 一、元素的四大操作二、三大切換&等待1、切換窗口:當定位的元素不在當前窗口,則需要切換窗口2、切換iframe:當定位的元素在frame/iframe,則需要切換3、切換彈出窗口 一、元素的四大操作 1、輸入 2、點擊 3、獲取文本 4、獲取屬…

window server 2012安裝sql server2008 r2

執行sql server2008 r2安裝目錄下的setup 選擇運行程序而不獲取幫助 然后就是讓人絕望的 只能先搞這個了,F*微軟,自家軟件不讓正常安裝 打開服務器管理器->添加角色和功能->選擇Web 服務(IIS)->添加.NET Framework3.5 然…

【K8S學習之生命周期鉤子】詳細了解 postStart 和 preStop 生命周期鉤子

0. 參考 Kubernetes容器生命周期 —— 鉤子函數詳解(postStart、preStop) - 人艱不拆_zmc - 博客園詳解Kubernetes Pod優雅退出 - 人艱不拆_zmc - 博客園 1. Kubernetes 生命周期鉤子概述 在 Kubernetes 中,生命周期鉤子(Lifec…

測試文章標題01

模型上下文協議(Model Context Protocol, MCP)深度解析 一、MCP的核心概念 模型上下文協議(Model Context Protocol, MCP)是一種用于規范機器學習模型與外部環境交互的標準化框架。其核心目標是通過定義統一的接口和數據格式&am…

kubuntu系統詳解

Kubuntu 系統深度解析(從系統架構到用戶體驗) 一、定位與核心特性 Kubuntu 是 Ubuntu 的官方 KDE 衍生版,基于 Ubuntu 的穩定底層(Debian 技術棧),搭載 KDE Plasma 桌面環境,主打 “功能豐富、…

cURL:通過URL傳輸數據的命令行工具庫介紹

文章目錄 1. 什么是 curl?2. 下載與安裝 curl3. curl 的常見用法3.1 獲取網頁內容3.2 下載文件3.3 發送 POST 請求(帶表單數據)3.4 發送帶 JSON 的 POST 請求 1. 什么是 curl? cURL(CommandLine URL)是非常…

從零搭建AI工作站:Gemma3大模型本地部署+WebUI配置全套方案

文章目錄 前言1. 安裝Ollama2.Gemma3模型安裝與運行3. 安裝Open WebUI圖形化界面3.1 Open WebUI安裝運行3.2 添加模型3.3 多模態測試 4. 安裝內網穿透工具5. 配置固定公網地址總結 前言 如今各家的AI大模型廝殺得如火如荼,每天都有新的突破。今天我要給大家安利一款…

Element Plus對話框(ElDialog)全面指南:打造靈活彈窗交互

📌 開篇導語 對話框是Web應用中實現用戶交互的核心組件之一,常用于信息確認、表單提交或詳情展示。Element Plus的ElDialog組件以高擴展性和優雅動效著稱,支持高度定制化開發。本文將從基礎配置到進階技巧,手把手教你掌握對話框組…

解決WSL、Ubuntu的.ico圖標不正確顯示縮略圖

解決WSL、Ubuntu的.ico圖標不正確顯示縮略圖 問題描述 Win10系統中由于更新了某些軟件,篡改了默認的圖像顯示軟件,導致WSL等軟件未能成功顯示圖標,表現如下: 解決方法 將ico文件的默認打開方式更改為“畫圖”,如下…

[數據結構高階]并查集初識、手撕、可以解決哪類問題?

標題:[數據結構高階]并查集初識、手撕、可以解決哪類問題? 水墨不寫bug 文章目錄 一、認識并查集二、模擬實現并查集三、用并查集解決問題1、[省份的數量](https://leetcode.cn/problems/number-of-provinces/)2、[等式方程的可滿足性](https://leetcode…