從零構建Dagster分區管道:時間+類別分區實戰案例

分區是Dagster中的核心抽象概念,它允許我們管理大型數據集、處理增量更新并提高管道性能。本文將詳細介紹如何創建和實現基于時間和類別的分區資產。

在這里插入圖片描述

什么是分區?

分區是將數據集劃分為更小、更易管理的部分的技術。在Dagster中,分區可以基于時間、類別或其他自定義邏輯創建,從而優化數據處理流程。

創建時間分區資產

基于時間的月度分區

首先,我們將創建一個按月份分區的資產,用于計算每個銷售代表的月度績效:

monthly_partition = dg.MonthlyPartitionsDefinition(start_date="2023-01-01")@dg.asset(partitions_def=monthly_partition,compute_kind="duckdb",group_name="analysis",deps=[joined_data]
)
def monthly_sales_performance(context: dg.AssetExecutionContext, duckdb: DuckDBResource):partition_date_str = context.partition_keymonth_to_fetch = partition_date_str[:-3]  # 格式化為YYYY-MMwith duckdb.get_connection() as conn:# 創建表(如果不存在)conn.execute("""CREATE TABLE IF NOT EXISTS monthly_sales_performance (partition_date varchar,rep_name varchar,product varchar,total_dollar_amount double);""")# 刪除該月已有數據conn.execute(f"""DELETE FROM monthly_sales_performance WHERE partition_date = '{month_to_fetch}';""")# 插入新數據conn.execute(f"""INSERT INTO monthly_sales_performanceSELECT '{month_to_fetch}' AS partition_date, rep_name, product_name AS product, SUM(dollar_amount) AS total_dollar_amountFROM joined_dataWHERE strftime(date, '%Y-%m') = '{month_to_fetch}'GROUP BY '{month_to_fetch}', rep_name, product_name;""")# 預覽數據preview_query = f"SELECT * FROM monthly_sales_performance WHERE partition_date = '{month_to_fetch}';"preview_df = conn.execute(preview_query).fetchdf()row_count = conn.execute(f"""SELECT COUNT(*) FROM monthly_sales_performance WHERE partition_date = '{month_to_fetch}';""").fetchone()[0] if conn.execute(f"""SELECT COUNT(*) FROM monthly_sales_performance WHERE partition_date = '{month_to_fetch}';""").fetchone() else 0return dg.MaterializeResult(metadata={"row_count": dg.MetadataValue.int(row_count),"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False))})

創建類別分區資產

基于產品類別的分區

接下來,我們創建一個基于預定義產品類別的靜態分區資產:

product_category_partition = dg.StaticPartitionsDefinition(["Electronics", "Books", "Home and Garden", "Clothing"
])@dg.asset(deps=[joined_data],partitions_def=product_category_partition,group_name="analysis",compute_kind="duckdb"
)
def product_performance(context: dg.AssetExecutionContext, duckdb: DuckDBResource):product_category_str = context.partition_keywith duckdb.get_connection() as conn:# 創建表(如果不存在)conn.execute("""CREATE TABLE IF NOT EXISTS product_performance (product_category varchar,product_name varchar,total_dollar_amount double,total_units_sold double);""")# 刪除該類別已有數據conn.execute(f"""DELETE FROM product_performance WHERE product_category = '{product_category_str}';""")# 插入新數據conn.execute(f"""INSERT INTO product_performanceSELECT '{product_category_str}' AS product_category, product_name, SUM(dollar_amount) AS total_dollar_amount, SUM(quantity) AS total_units_soldFROM joined_dataWHERE category = '{product_category_str}'GROUP BY '{product_category_str}', product_name;""")# 預覽數據preview_query = f"SELECT * FROM product_performance WHERE product_category = '{product_category_str}';"preview_df = conn.execute(preview_query).fetchdf()row_count = conn.execute(f"""SELECT COUNT(*) FROM product_performance WHERE product_category = '{product_category_str}';""").fetchone()[0] if conn.execute(f"""SELECT COUNT(*) FROM product_performance WHERE product_category = '{product_category_str}';""").fetchone() else 0return dg.MaterializeResult(metadata={"row_count": dg.MetadataValue.int(row_count),"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False))})

將分區資產添加到Definitions

完成資產定義后,需要將它們添加到Dagster的Definitions對象中:

defs = dg.Definitions(assets=[products, sales_reps, sales_data, joined_data, monthly_sales_performance, product_performance,], asset_checks=[missing_dimension_check],resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")}
)

物化分區資產

在Dagster UI中操作這些分區資產的步驟:

  1. 導航到"Assets"頁面
  2. 點擊"Reload definitions"重新加載定義
  3. 選擇"monthly_sales_performance"資產,然后點擊"Materialize selected"
    • 確保選擇所有分區
    • 啟動回填(backfill)作業
  4. 選擇"product_performance"資產,然后點擊"Materialize selected"
    • 確保選擇所有分區
    • 啟動回填作業

下一步計劃

現在我們已經建立了ETL管道的主要資產,下一步可以考慮:

  1. 添加自動化調度
  2. 實現數據質量監控
  3. 添加異常處理機制
  4. 優化查詢性能
  5. 擴展更多維度的分析

通過合理使用分區技術,我們可以顯著提高Dagster管道的性能和可維護性,特別是在處理大規模數據集時。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/78731.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/78731.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/78731.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Cursor:AI時代的智能編輯器

在開發者社區掀起熱潮的Cursor,正以破竹之勢重塑編程工具格局。這款基于VS Code的AI優先編輯器,不僅延續了經典IDE的穩定基因,更通過深度集成的智能能力,將開發效率推向全新維度。2023年Anysphere公司獲得的6000萬美元A輪融資&…

SpringMVC再復習1

一、三層架構 表現層(WEB 層) 定義 :是應用程序與客戶端進行交互的最外層,主要負責接收用戶的請求,并將處理結果顯示給用戶。 作用 :在 Spring MVC 中,表現層通常采用 MVC 設計模式來構建。 技…

Centos 7系統 寶塔部署Tomcat項目(保姆級教程)

再看文章之前默認已經安裝好系統,可能是云系統,或者是虛擬機。 寶塔安裝 這個比較簡單,參考這個老哥的即可: https://blog.csdn.net/weixin_42753193/article/details/125959289 環境配置 進入寶塔面板之后會出現環境安裝&…

Nginx核心功能

目錄 一:基于授權的訪問控制 1:基于授權的訪問控制簡介 2:基于授權的訪問控制步驟 (1)使用htpasswd 生成用戶認證文件 (2)修改密碼文件權限為400,將所有者改為nginx,…

AnimateCC基礎教學:漫天繁星-由DeepSeek輔助完成

1.界面及元件抓圖: 2.核心代碼: // 初始化設置 var stars []; var stars2 []; var numStars 100; var stageWidth stage.canvas.width; var stageHeight stage.canvas.height; console.log(stageWidth, stageHeight);// 創建星星函數 function createStar() {var star n…

通過DeepSeek大語言模型控制panda機械臂,聽懂人話,擬人性回答。智能機械臂助手又進一步啦

文章目錄 前言環境配置運行測試報錯 前言 通過使用智能化的工作流控制系統來精確操控機械臂,不僅能夠基于預設算法可靠地規劃每個動作步驟的執行順序和力度,確保作業流程的標準化和可重復性,還能通過模塊化的程序設計思路靈活地在原有工作流中…

分享一款免費的 AI 工作流平臺

分享一款 AI 工作流/任務流平臺,通過直觀的流程圖設計,輕松實現復雜業務流程的自動化與可視化,無縫集成 AI 大模型、AI 生圖、數據庫、條件分支、并行節點、自定義任務節點等等。 效果圖: 官網體驗地址:https://www.…

前端開發本地配置 HTTPS 全面詳細教程

分為兩步:生成證書、本地服務配置使用證書一、HTTPS 的基本概念 HTTPS 是一種安全的 HTTP 協議,它通過 SSL/TLS 對數據進行加密,確保數據在傳輸過程中不被竊取或篡改。在前端開發中,某些功能(如 Geolocation API、Web…

day10 python機器學習全流程實踐

在機器學習的實踐中,數據預處理與模型構建是極為關鍵的環節。本文將回顧數據預處理的全流程,并基于處理后的數據完成簡單的機器學習建模與評估,暫不涉及復雜的調參過程。 一、預處理流程回顧 機器學習的成功,很大程度上依賴于高…

4月28號

初認web前端: web標準: HTML:

【Linux系統】systemV共享內存

system V共享內存 在Linux系統中,共享內存是一種高效的進程間通信(IPC)機制,它允許兩個或者多個進程共享同一塊物理內存區域,這些進程可以將這塊區域映射到自己的虛擬地址空間中。 共享內存區是最快的IPC形式。一旦這…

(七)RestAPI 毛子(Http 緩存/樂觀鎖/Polly/Rate limiting/異步大文件上傳)

文章目錄 項目地址一、Http Cache1.1 服務注冊1.2 Validation with ETag1. 添加ETagMiddleware中間件2. 創建內存ETag存儲器3. 服務注冊4. 測試二、使用ETag實現樂觀鎖2.1 添加樂觀鎖方法2.2 修改Controller2.3 測試樂觀鎖三、Rate Limiting3.1 添加速率控制服務1. 在Program里…

2025.4.26_STM32_SPI

1.SPI簡介 2.硬件電路 所有SPI設備的SCK(時鐘)、MOSI(主機輸出從機輸入)、MISO(主機輸入從機輸出)分別連在一起。SCK線只能被主機控制,和I2C相同。 主機另外引出多條SS控制線,分別接到各從機的SS引腳 (SS不用的時候為高電平,當主機需要選中某…

JAVA:單例模式

單例模式是設計模式之一 設計模式,就像古代打仗,我們都聽過孫子兵法,把計謀概括下來弄成一種模式,形成一種套路。 軟件開發中也有很多場景,多數類似的問題場景,解決方案就形成固定的模式,單例…

腦機接口:重塑人類未來的神經增強革命

引言 人類對大腦的探索從未停止,而腦機接口(Brain-Computer Interface, BCI)的崛起,正在將科幻電影中的“意念操控”變為現實。 這項技術通過解碼腦電信號,實現人腦與外部設備的直接交互,不僅為醫療康復帶來…

從SOA到微服務:架構演進之路與實踐示例

一、架構演進背景 在軟件開發領域,架構風格隨著業務需求和技術發展不斷演進。從早期的單體架構,到面向服務架構(SOA),再到如今的微服務架構,每一次變革都是為了解決當時面臨的核心問題。 二、SOA架構解析 2.1 SOA核心概念 SOA&…

可靈AI 2.0上線:重新定義AI創作?好萊塢級特效觸手可及

2025年4月15日,快手正式發布可靈AI 2.0,這款被譽為“讓好萊塢特效師顫抖”的AI工具,以物理引擎級動態生成和電影語言自由操控兩大核心技術,徹底顛覆了內容創作的想象邊界。上線24小時內,全球用戶已用它生成超過100萬條…

Mysql存儲引擎、鎖機制

Mysql存儲引擎 InnoDB?(MySQL 5.5 及以后版本中的默認存儲引擎) ??事務支持??:支持 ??ACID 事務??,適合需要高可靠性的場景(如支付、訂單)。 ??鎖機制??:默認使用 ??行級鎖??…

飛蛾撲火算法優化+Transformer四模型回歸打包(內含MFO-Transformer-LSTM及單獨模型)

飛蛾撲火算法優化Transformer四模型回歸打包(內含MFO-Transformer-LSTM及單獨模型) 目錄 飛蛾撲火算法優化Transformer四模型回歸打包(內含MFO-Transformer-LSTM及單獨模型)預測效果基本介紹程序設計參考資料 預測效果 基本介紹 …

音視頻開發---視頻編碼基礎

一、視頻編碼的必要性 1. 存儲與傳輸成本高 未經編碼壓縮的原始視頻的數據量極大,例如:一般電影的亮度信號采樣頻率為13.5MHz;色度信號的頻帶通常為亮度信號的一半或更少,為6.75MHz或3.375MHz。以4:2:2的采樣頻率為例,Y信號采用13.5MHz,色度信號U和V采用6.75MHz采樣,…