Dagster中的Ops與Assets:數據管道構建的兩種選擇

Dagster是一個強大的數據編排平臺,它提供了多種工具來幫助數據工程師構建可靠的數據管道。在Dagster中,Ops和Assets是兩種核心概念,用于定義數據處理邏輯。本文將全面介紹Ops的概念、特性及其使用方法,特別補充了Op上下文和Op工廠等重要內容,并解釋為什么對于新用戶我們推薦優先使用Assets。

在這里插入圖片描述

什么是Ops?

Ops是Dagster中的基本計算單元,代表一個獨立的數據處理任務。每個Op應該執行相對簡單的任務,例如:

  • 從其他數據集派生新數據集
  • 執行數據庫查詢
  • 在遠程集群中啟動Spark作業
  • 查詢API并將結果存儲到數據倉庫
  • 發送電子郵件或Slack消息

Ops的核心特性

1. 靈活的執行策略

Ops是獨立于執行策略的邏輯單元,這使得它們可以在開發和生產環境之間無縫轉換。Ops可以組合成圖(graphs),并通過jobs綁定到適當的執行器上,實現單機執行或在集群中分布式執行。

2. 可插拔的外部系統集成

對于需要與外部系統交互的數據管道,Dagster提供了資源(resources)抽象層。你可以針對抽象資源(如數據庫)編寫Op邏輯,然后在job級別綁定具體的資源定義。這樣,開發階段可以使用本地替代方案,而生產環境則使用云服務。

3. 輸入和輸出管理

Ops具有明確的輸入和輸出,類似于Python函數的參數和返回值。這些輸入輸出可以附加Dagster類型進行運行時驗證,并可以通過IO Manager管理數據存儲,實現不同執行環境間的I/O策略切換和中間數據的高效緩存。

4. 配置能力

數據管道中的操作通常需要參數化配置。Ops允許通過配置模式(config schema)定義這些參數,使Ops更加靈活和可重用。例如,可以通過配置指定API端點:

from dagster import Configclass MyOpConfig(Config):api_endpoint: str@op
def my_configurable_op(config: MyOpConfig):data = requests.get(f"{config.api_endpoint}/data").json()return data

5. 事件流

Ops在執行過程中會發出一系列事件,包括默認事件(如開始執行)和通過事件API報告的自定義事件(如數據資產創建、數據質量檢查結果等)。這些事件流可以在Dagster UI中可視化,便于調試、檢查和實時監控。

6. 可測試性

Ops的設計使其易于測試,可以單獨測試或在管道中測試。資源API還允許在需要時替換外部系統(如數據庫)的存根(stub)。

定義和使用Ops

使用@op裝飾器定義Ops:

@op
def my_op():return "hello"

輸入和輸出

Ops通過參數接收輸入,通過返回值產生輸出:

@op
def add(a: int, b: int) -> int:return a + b

對于多輸出,可以使用Out對象:

@op(out={"sum": Out(), "product": Out()})
def math_ops(a: int, b: int):yield Output(a + b, "sum")yield Output(a * b, "product")

配置

為Ops添加配置:

from dagster import Config, opclass GreetingConfig(Config):name: str@op(config_schema=GreetingConfig)
def greet(context, config: GreetingConfig):context.log.info(f"Hello, {config.name}!")return f"Hello, {config.name}!"

Op上下文(Op Context)

在編寫Op時,用戶可以可選地提供一個上下文參數(通常命名為context)。當這個參數被提供時,Dagster會在Op執行時自動注入一個上下文對象,該對象提供了訪問系統信息的能力,如日志記錄器、當前運行ID等。

上下文對象的作用

  1. 日志記錄:通過context.log記錄不同級別的日志信息
  2. 訪問運行信息:獲取當前運行的ID、作業名稱等信息
  3. 資源訪問:在某些情況下可以訪問配置的資源
  4. 錯誤處理:提供更豐富的錯誤報告能力

使用示例

from dagster import op, OpExecutionContext@op
def context_op(context: OpExecutionContext):# 記錄info級別日志context.log.info(f"My run ID is {context.run_id}")# 記錄debug級別日志(默認可能不顯示)context.log.debug("This is a debug message")# 在實際業務邏輯中使用上下文try:result = do_something()return resultexcept Exception as e:context.log.error(f"Operation failed: {str(e)}")raise

Op工廠模式

在實際項目中,我們經常需要創建多個相似的Ops,或者需要動態生成Ops。這時,Op工廠模式就非常有用。Op工廠允許我們通過函數來生成Ops,而不是為每個Op手動編寫裝飾器。

工廠模式的應用場景

  1. 參數化Op創建:當需要創建多個相似但配置不同的Ops時
  2. 動態Op生成:根據運行時條件或配置動態生成Ops
  3. 代碼復用:避免重復的Op定義代碼

創建Op工廠

from dagster import op, OpDefinitiondef create_math_op(name: str, operation):"""創建數學運算Op的工廠函數Args:name (str): 新Op的名稱operation (callable): 數學運算函數Returns:OpDefinition: 生成的Op定義"""@op(name=name)def math_op(a: float, b: float) -> float:return operation(a, b)return math_op# 使用工廠創建具體的Ops
add_op = create_math_op("add", lambda a, b: a + b)
multiply_op = create_math_op("multiply", lambda a, b: a * b)# 或者更復雜的工廠函數
def advanced_op_factory(config_schema=None, tags=None):"""更高級的Op工廠,支持配置和標簽Args:config_schema: Op的配置模式tags: 要附加到Op的標簽Returns:函數:接受compute函數并返回OpDefinition"""def decorator(compute_fn):op_def = op(name=compute_fn.__name__,config_schema=config_schema,tags=tags)(compute_fn)return op_defreturn decorator# 使用高級工廠
@advanced_op_factory(config_schema={"precision": int},tags={"team": "analytics"}
)
def divide(a: float, b: float, context) -> float:precision = context.op_config["precision"]result = a / breturn round(result, precision)

工廠模式的注意事項

  1. 性能考慮:工廠模式會引入額外的函數調用層,但在大多數情況下影響可以忽略
  2. 類型提示:使用工廠創建的Ops可能需要額外的類型提示處理
  3. 文檔:確保為工廠函數和生成的Ops提供清晰的文檔

為什么推薦Assets而非Ops?

雖然Ops功能強大,但對于新用戶我們推薦優先使用Assets:

  1. 更高級的抽象:Assets提供了數據版本控制、血緣追蹤和自動緩存等高級功能
  2. 聲明式API:Assets允許以更聲明式的方式定義數據管道
  3. 內置集成:Assets與Dagster的其他功能(如調度、監控)有更好的集成
  4. 簡化復雜性:對于大多數用例,Assets可以簡化數據管道的定義和維護

結論

Ops是Dagster中強大的計算單元,適合處理復雜的數據處理邏輯。然而,對于新用戶或構建標準數據管道的場景,Assets提供了更高級的抽象和更簡化的開發體驗。隨著對Dagster的深入理解,用戶可以根據需要選擇使用Ops來處理更復雜的場景。

無論選擇哪種方式,Dagster都提供了豐富的工具和靈活性來構建可靠、可維護的數據管道。特別是Op上下文和Op工廠模式等高級特性,為復雜的數據工程需求提供了強大的支持。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/81516.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/81516.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/81516.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

參數包展開到初始化列表

上次寫過參數包展開和靜態斷言的使用——Accumulator-CSDN博客&#xff0c;數組是靜態定義的&#xff0c;并且遞歸展開參數包。這里改用動態數組&#xff0c;并且將參數包展開到初始化列表中&#xff0c;成為一個動態數組。 #include <stdio.h> #include <vector>…

React18組件通信與插槽

1、為DOM組件設置Props 在react中jsx中的標簽屬性被稱為Props DOM組件的類屬性&#xff0c;為了防止與js中的class屬性沖突改成了className DOM組件的style屬性 import image from "./logo.svg"; function App() {const imgStyleObj {width: 200,height: 200,};re…

GTS-400 系列運動控制器板(十四)----軟限位使用

運動控制器函數庫的使用 運動控制器驅動程序、dll 文件、例程、Demo 等相關文件請通過固高科技官網下載,網 址為:www.googoltech.com.cn/pro_view-3.html 1 Windows 系統下動態鏈接庫的使用 在 Windows 系統下使用運動控制器,首先要安裝驅動程序。在安裝前需要提前下載運動…

C++ 開發指針問題:E0158 表達式必須為左值或函數指示符

問題與處理策略 問題描述 int* ptr &10;執行上述代碼&#xff0c;報如下錯誤 E0158 表達式必須為左值或函數指示符 C2101 常量上的“&”問題原因 10 是一個字面常量&#xff0c;常量是臨時值&#xff0c;編譯器不會為它們分配可尋址的內存空間 & 取地址運算符…

前端面經-VUE3篇(二)--vue3組件知識(二)依賴注入、異步組件、生命周期、組合式函數、插件

目錄 一、依賴注入 1、 依賴注入是什么&#xff1f; 2、最基礎的使用 3、為什么使用依賴注入&#xff1f; 4、 使用 Symbol 作注入名 二、異步組件 1、什么是異步組件&#xff1f; 2、最基礎用法&#xff1a;defineAsyncComponent 3、在模板中使用異步組件 4、配置加載狀態…

頭歌數據庫課程實驗(索引與數據庫完整性)

第1關&#xff1a;創建一般索引 任務描述 本關任務&#xff1a;為 student 表按姓名升序建立索引&#xff0c;索引名為 idx_sname。 相關知識 為了完成本關任務&#xff0c;你需要掌握&#xff1a; 索引是什么&#xff1b; 索引的分類&#xff1b; 索引的創建和刪除&#…

Socket 編程 UDP

Socket 編程 UDP UDP 網絡編程V1 版本 - echo serverV2 版本 - DictServerV3 版本 - 簡單聊天室 補充參考內容地址轉換函數關于 inet_ntoa UDP 網絡編程 聲明&#xff1a;下面代碼的驗證都是用Windows作為客戶端的&#xff0c;如果你有兩臺云服務器可以直接粘貼我在Linux下的客…

c++ 二級指針 vs 指針引用

二級指針 vs 指針引用&#xff1a;深入對比與分析 在C中&#xff0c;二級指針和指針引用都可以用于修改外部指針&#xff0c;但它們在語法、安全性和使用場景上有重要區別。下面我將從多個維度進行詳細對比。 1. 基本概念 1.1 二級指針 (Pointer to Pointer) int a 10; in…

【Hive入門】Hive與Spark SQL深度集成:通過Spark ThriftServer高效查詢Hive表

目錄 引言 1 Spark ThriftServer架構解析 1.1 核心組件與工作原理 1.2 與傳統HiveServer2的對比 2 Spark ThriftServer部署指南 2.1 環境準備與啟動流程 2.1.1 前置條件檢查 2.1.2 服務啟動流程 2.2 高可用部署方案 2.2.1 基于ZooKeeper的HA架構 3 性能優化實戰 3.…

[面試]SoC驗證工程師面試常見問題(二)

SoC驗證工程師面試常見問題(二) 摘要:面試SoC驗證工程師時,SystemVerilog (SV) 和 UVM (Universal Verification Methodology) 是核心技能,而AXI總線是現代SoC中最常見的接口協議之一,因此也是必考點。以下是可能被問到的問題及優質答案的詳細列表: 一、 System…

vue3 css模擬語音通話不同語音、正在加載等的效果

實現效果如下&#xff1a; 在不同的時間&#xff0c;顯示不一樣的效果&#xff08;大小是一樣的&#xff0c;截圖時尺寸發生了變化&#xff09; 具體實現代碼如下&#xff1a; <script setup> import {ref} from "vue";const max_hight ref(40px) const min…

KeyPresser 一款自動化按鍵工具

1. 簡介 KeyPresser 是一款自動化按鍵工具,它可以與窗口交互,并支持后臺運行, 無需保持被控窗口在前臺運行。用戶可以選擇要操作的目標窗口,并通過勾選復選框來控制要發送哪些按鍵消息。可以從組合框中選擇所需的按鍵,并在編輯框中輸入時間間隔以控制按鍵發送之間的延遲。程…

ai之paddleOCR 識別PDF python312和paddle版本沖突 GLIBCXX_3.4.30

這里寫自定義目錄標題 問題一**解決方案****方法 1&#xff1a;使用符號鏈接將系統庫鏈接到 Conda 環境** **補充說明****驗證修復結果** 問題二&#xff1a;**問題根源****解決方案****1. 確認 TensorRT 安裝狀態****2. 安裝 TensorRT 并配置環境變量****3. 驗證 TensorRT 與 …

【RabbitMQ】 RabbitMQ快速上手

文章目錄 一、RabbitMQ 核心概念1.1 Producer和Consumer2.2 Connection和Channel2.3 Virtual host2.4 Queue2.5 Exchange2.6 RabbitMQ工作流程 二、AMQP協議三 、web界面操作4.1 用戶相關操作4.2 虛擬主機相關操作 四、RabbitMQ快速入門4.1 引入依賴4.2 編寫生產者代碼4.2.1 創…

Beatoven AI 自動生成音樂

Beatoven AI 自動生成音樂 文章目錄 Beatoven AI 自動生成音樂一、源代碼二、準備工作1. 安裝 Python 環境2. 安裝依賴庫 三、配置 API 密鑰四、運行腳本示例一&#xff1a;使用默認參數示例二&#xff1a;生成一段電影預告片風格音樂&#xff08;30秒&#xff09; 五、生成結果…

筆試專題(十四)

文章目錄 mari和shiny題解代碼 體操隊形題解代碼 二叉樹中的最大路徑和題解代碼 mari和shiny 題目鏈接 題解 1. 可以用多狀態的線性dp 2. 細節處理&#xff1a;使用long long 存儲個數 3. 空間優化&#xff1a;只需要考慮等于’s’&#xff0c;‘sh’&#xff0c;shy’的情況…

LeetCode —— 94. 二叉樹的中序遍歷

&#x1f636;?&#x1f32b;?&#x1f636;?&#x1f32b;?&#x1f636;?&#x1f32b;?&#x1f636;?&#x1f32b;?Take your time ! &#x1f636;?&#x1f32b;?&#x1f636;?&#x1f32b;?&#x1f636;?&#x1f32b;?&#x1f636;?&#x1f32b;?…

conda相關操作

安裝torch 直接使用conda install torch1.12.0會報錯&#xff0c;因為 Conda 通常使用 pytorch 作為包名&#xff08;而非 torch&#xff09; 正確使用方法&#xff1a; conda install pytorch1.12.0 -c pytorch使用 pip 安裝 pip install torch1.12.0在 Conda 中查看可安裝…

【Java面試筆記:進階】26.如何監控和診斷JVM堆內和堆外內存使用?

監控和診斷JVM內存使用是優化性能和解決內存問題的關鍵。 1.JVM內存監控與診斷方法 1.圖形化工具 JConsole:提供圖形化界面,可直接連接到Java進程,查看內存使用情況。VisualVM:功能強大的圖形化工具,但注意從Oracle JDK 9開始不再包含在JDK安裝包中。Java Mission Contr…

AVIOContext 再學習

這個目前階段用的不多&#xff0c;暫時不要花費太多精力。 url 的格式不同&#xff0c;使用的傳輸層協議也不同。這塊看代碼還沒看到自己想的這樣。 目前看的信息是&#xff1a;avformatContext 的 io_open 回調函數 在默認情況下叫 io_open_default&#xff0c;在解復用的 av…