Dagster Pipes系列-1:調用外部Python腳本

本文是"Dagster Pipes教程"的第一部分,介紹如何通過Dagster資產調用外部Python腳本并集成到數據管道中。首先,創建Dagster資產subprocess_asset,利用PipesSubprocessClient資源執行外部腳本external_code.py,實現跨進程的數據處理。通過dagster dev啟動UI,可在Dagster界面中監控子進程的執行狀態和日志輸出,包括標準輸出(stdout)內容。本文詳細講解了資產定義、資源注入及命令執行的完整流程,為后續修改外部代碼以支持Dagster Pipes通信奠定基礎。此方法適用于需要將現有腳本集成到Dagster數據管道的場景,提升自動化與可觀測性。完成本部分后,讀者可繼續學習第二部分,掌握如何增強外部腳本與Dagster的交互能力。

教程概述

本教程將指導你完成以下步驟:

  1. 創建一個調用外部Python腳本的Dagster資產
  2. 定義必要的Dagster資源(resources)
  3. 在Dagster UI中運行并查看結果
    在這里插入圖片描述

前提條件

在開始之前,請確保你已經:

  • 安裝了Dagster
  • 創建了一個名為external_code.py的獨立Python腳本,內容如下:
import pandas as pddef main():orders_df = pd.DataFrame({"order_id": [1, 2],"item_id": [432, 878]})total_orders = len(orders_df)print(f"processing total {total_orders} orders")

第一步:定義Dagster資產

首先,在與external_code.py相同的目錄下創建一個名為dagster_code.py的新文件。

1.1 創建資產定義

將以下代碼復制到dagster_code.py中:

import shutil
import dagster as dg@dg.asset
def subprocess_asset(context: dg.AssetExecutionContext,pipes_subprocess_client: dg.PipesSubprocessClient
) -> dg.MaterializeResult:cmd = [shutil.which("python"),dg.file_relative_path(__file__, "external_code.py")]return pipes_subprocess_client.run(command=cmd,context=context).get_materialize_result()

代碼解析:

  • 我們創建了一個名為subprocess_asset的資產
  • 使用AssetExecutionContext作為上下文參數,它提供了系統信息如資源、配置和日志記錄
  • 指定了PipesSubprocessClient資源
  • 構建了一個命令列表來執行外部腳本
  • 使用pipes_subprocess_client.run()方法在管道會話中同步執行子進程

1.2 從資產調用外部代碼

上述代碼中的關鍵部分是:

pipes_subprocess_client.run(command=cmd,context=context
).get_materialize_result()

這段代碼做了什么:

  • PipesSubprocessClient資源暴露了一個run方法
  • 當資產執行時,這個方法會在管道會話中同步執行子進程
  • 返回一個PipesClientCompletedInvocation對象
  • 可以使用get_materialize_result()方法訪問子進程報告的MaterializeResult事件

第二步:定義Definitions對象

為了讓Dagster工具(如CLI、UI和Dagster+)能夠加載和訪問資產及子進程資源,我們需要創建一個Definitions對象。

dagster_code.py文件末尾添加以下代碼:

from dagster import Definitionsdefs = Definitions(assets=[subprocess_asset],resources={"pipes_subprocess_client": dg.PipesSubprocessClient()}
)

此時,dagster_code.py文件應該如下所示:

import shutil
import dagster as dg@dg.asset
def subprocess_asset(context: dg.AssetExecutionContext,pipes_subprocess_client: dg.PipesSubprocessClient
) -> dg.MaterializeResult:cmd = [shutil.which("python"),dg.file_relative_path(__file__, "external_code.py")]return pipes_subprocess_client.run(command=cmd,context=context).get_materialize_result()from dagster import Definitionsdefs = Definitions(assets=[subprocess_asset],resources={"pipes_subprocess_client": dg.PipesSubprocessClient()}
)

第三步:從Dagster UI運行子進程

現在,讓我們在Dagster UI中執行我們創建的子進程資產。

  1. 在新的命令行會話中運行以下命令啟動UI:

    dagster dev -f dagster_code.py
    
  2. 點擊右上角的"Materialize"按鈕來運行你的代碼

  3. 導航到"Run details"頁面,在這里你可以看到運行的日志

  4. external_code.py中,我們有一個打印語句將輸出到stdout。Dagster會在UI的原始計算日志視圖中顯示這些內容。

  5. 要查看stdout日志,切換日志部分到stdout:

在這里插入圖片描述

下一步

到目前為止,你已經創建了一個調用外部Python腳本的Dagster資產,在子進程中執行了代碼,并在Dagster UI中查看了結果。接下來,你將學習如何修改外部代碼以與Dagster Pipes配合工作,將信息發送回Dagster。

總結

通過本教程的第一部分,我們實現了:

  • 創建了一個Dagster資產來調用外部Python腳本
  • 配置了必要的資源來支持子進程執行
  • 在Dagster UI中成功運行并查看了結果

這個基礎設置為你在后續步驟中實現更復雜的管道通信打下了良好的基礎。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/80803.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/80803.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/80803.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【SQL系列】多表關聯更新

💝💝💝歡迎來到我的博客,很高興能夠在這里和您見面!希望您在這里可以感受到一份輕松愉快的氛圍,不僅可以獲得有趣的內容和知識,也可以暢所欲言、分享您的想法和見解。 推薦:kwan 的首頁,持續學…

C++進階學習:STL常用容器--map/multimap容器

1. map 容器基本概念 map 中所有元素都是 pair pair 中第一個元素為 key (鍵值) 起到索引運用 第二個元素為 value(實值) 所有元素都會根據元素的鍵值自動排序 本質: map/multimap 屬于關聯式容器 底層結構是用二…

let,const,var關鍵字的區別

let,const,var關鍵字 let,const,var都存在變量提升 它們都存在變量提升但是稍微有點不同 var變量聲明會被提升到作用域的頂部,并且會被初始化為 undefinedlet 和 const:變量聲明也會被提升到作用域的頂部,但不會被初…

Nuitka 已經不再安全? Nuitka/Cython 打包應用逆向工具 -- pymodhook

pymodhook是一個記錄任意對Python模塊的調用的庫,用于Python逆向分析。 pymodhook庫類似于Android的xposed框架,但不僅能記錄函數的調用參數和返回值,還能記錄模塊的類的任意方法調用,以及任意派生對象的訪問,基于pyob…

path環境變量滿了如何處理,分割 PATH 到 Path1 和 Path2

要正確設置 Path1 的值,你需要將現有的 PATH 環境變量 中的部分路徑復制到 Path1 和 Path2 中。以下是詳細步驟: 步驟 1:獲取當前 PATH 的值 打開環境變量窗口: 按 Win R,輸入 sysdm.cpl,點擊 確定。在 系…

SEMI E40-0200 STANDARD FOR PROCESSING MANAGEMENT(加工管理標準)-(一)

1 目的 物料(例如晶圓)加工在設備中的自動化管理與控制是實現工廠自動化的關鍵要素。本標準針對半導體制造環境中與設備內部物料處理相關的通信需求進行了規范。本標準規定了在加工單元接收到的指定材料所應適用的加工方法(例如Etch腔室需要Run哪支Recipe)。它闡述了物料加工的…

【Hadoop】集群搭建實戰:超詳細保姆級教程

🐇明明跟你說過:個人主頁 🏅個人專欄:《大數據前沿:技術與應用并進》🏅 🔖行路有良友,便是天堂🔖 目錄 一、引言 1、Hadoop簡介 2、Hadoop集群概念 3、 Hadoop 集…

阿里云人工智能大模型通義千問Qwen3開發部署

本文主要描述阿里云人工智能大模型開源社區ModelScope提供的通義千問Qwen3開發部署。 與阿里云一起 輕松實現數智化 讓算力成為公共服務:用大規模的通用計算,幫助客戶做從前不能做的事情,做從前做不到的規模。讓數據成為生產資料:…

24.(vue3.x+vite)引入組件并動態掛載(mount)

示例截圖 組件代碼: <template><div><div>{{message }}</div>

《Python星球日記》 第56天:循環神經網絡(RNN)入門

名人說:路漫漫其修遠兮,吾將上下而求索。—— 屈原《離騷》 創作者:Code_流蘇(CSDN)(一個喜歡古詩詞和編程的Coder??) 目錄 一、序列數據的特點與挑戰1. 什么是序列數據?2. 序列數據的挑戰二、RNN 的基本結構與前向傳播1. RNN的核心理念2. RNN的數學表達3. RNN的前向傳…

手寫 vue 源碼 === computed 實現

目錄 計算屬性的基本概念 計算屬性的核心實現 ComputedRefImpl 類的實現 ReactiveEffect 與計算屬性的關系 計算屬性的工作流程 1. 創建計算屬性 2. 依賴收集過程 3. 嵌套 effect 的處理 4. 更新過程 嵌套 effect 關系圖解 依賴關系建立過程 代碼實現分析 1. 創建…

【Lattice FPGA 開發】Diamond在線調試Reveal邏輯亂跳的解決

在Vivado中在always塊中寫邏輯時如果出現always塊中的異步復位敏感詞在塊內部未使用的情況&#xff0c;如下例的rst&#xff1a; always (posedge clk or posedge rst) begin if(~tx_sense_flag)o_rd_adr < d1;else if((o_rd_adr d94) & (bit_cnt d7))o_rd_adr <…

【hadoop】Sqoop數據遷移工具的安裝部署

一、Sqoop安裝與配置 步驟&#xff1a; 1、使用XFTP將Sqoop安裝包sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz發送到master機器的主目錄。 2、解壓安裝包&#xff1a; tar -zxvf ~/sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz 3、修改文件夾的名字&#xff0c;將其改為s…

BUUCTF——PYWebsite

BUUCTF——PYWebsite 進入靶場 看看基本信息 沒有什么信息 掃個目錄看看 http://node5.buuoj.cn:28115/.DS_Store http://node5.buuoj.cn:28115/flag.php http://node5.buuoj.cn:28115/index.html訪問flag.php 提示保存購買者的IP 抓包看看 直接XFF偽造一下 X-Forwarded-F…

基于Qt開發的多線程TCP服務端

目錄 一、Qt TCP服務端開發環境準備1. 項目配置2. 核心類說明二、服務端搭建步驟詳解步驟1:初始化服務端對象步驟2:啟動端口監聽步驟3:處理客戶端連接三、數據通信與狀態管理1. 數據收發實現2. 客戶端狀態監控四、進階功能擴展1. 多客戶端并發處理2. 心跳檢測機制五、調試與…

【Tools】VScode使用CMake構建項目

這里寫目錄標題 vscode 使用 CMake**安裝插件**新建CMake項目 vscode 使用 CMake 安裝插件 CMake和CMake Tools c等等 CMake插件主要功能是CMake語法高亮、自動補全CMake Tools的功能主要是結合VSCode IDE使用CMake這個工具&#xff0c;比如生成CMake項目、構建CMake項目等…

neo4j圖數據庫基本概念和向量使用

一.節點 1.新建節點 create (n:GroupProduct {name:都邦高保額團意險,description: "保險產品名稱"} ) return n CREATE&#xff1a;Neo4j 的關鍵字&#xff0c;用于創建新節點或關系。 (n:GroupProduct)&#xff1a; n 是節點的臨時別名&#xff08;變量名&#…

2025年滲透測試面試題總結-滲透測試紅隊面試八(題目+回答)

網絡安全領域各種資源&#xff0c;學習文檔&#xff0c;以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各種好玩的項目及好用的工具&#xff0c;歡迎關注。 目錄 滲透測試紅隊面試八 二百一十一、常見中間件解析漏洞利用方式 二百一十二、MySQL用戶密碼存儲與加密 …

大語言模型主流架構解析:從 Transformer 到 GPT、BERT

&#x1f4cc; 友情提示&#xff1a; 本文內容由銀河易創AI&#xff08;https://ai.eaigx.com&#xff09;創作平臺的gpt-4-turbo模型生成&#xff0c;旨在提供技術參考與靈感啟發。文中觀點或代碼示例需結合實際情況驗證&#xff0c;建議讀者通過官方文檔或實踐進一步確認其準…

Java設計模式之裝飾器模式:從基礎到高級的全面解析(萬字解析)

裝飾器模式(Decorator Pattern)是一種結構型設計模式,它允許向一個現有的對象添加新的功能,同時又不改變其結構。這種模式創建了一個裝飾類,用來包裝原有的類,并在保持類方法簽名完整性的前提下,提供了額外的功能。 一、裝飾器模式基礎概念 1.1 什么是裝飾器模式 裝飾…