分區是Dagster中的核心抽象概念,它允許我們管理大型數據集、處理增量更新并提高管道性能。本文將詳細介紹如何創建和實現基于時間和類別的分區資產。
什么是分區?
分區是將數據集劃分為更小、更易管理的部分的技術。在Dagster中,分區可以基于時間、類別或其他自定義邏輯創建,從而優化數據處理流程。
創建時間分區資產
基于時間的月度分區
首先,我們將創建一個按月份分區的資產,用于計算每個銷售代表的月度績效:
monthly_partition = dg.MonthlyPartitionsDefinition(start_date="2023-01-01")@dg.asset(partitions_def=monthly_partition,compute_kind="duckdb",group_name="analysis",deps=[joined_data]
)
def monthly_sales_performance(context: dg.AssetExecutionContext, duckdb: DuckDBResource):partition_date_str = context.partition_keymonth_to_fetch = partition_date_str[:-3] # 格式化為YYYY-MMwith duckdb.get_connection() as conn:# 創建表(如果不存在)conn.execute("""CREATE TABLE IF NOT EXISTS monthly_sales_performance (partition_date varchar,rep_name varchar,product varchar,total_dollar_amount double);""")# 刪除該月已有數據conn.execute(f"""DELETE FROM monthly_sales_performance WHERE partition_date = '{month_to_fetch}';""")# 插入新數據conn.execute(f"""INSERT INTO monthly_sales_performanceSELECT '{month_to_fetch}' AS partition_date, rep_name, product_name AS product, SUM(dollar_amount) AS total_dollar_amountFROM joined_dataWHERE strftime(date, '%Y-%m') = '{month_to_fetch}'GROUP BY '{month_to_fetch}', rep_name, product_name;""")# 預覽數據preview_query = f"SELECT * FROM monthly_sales_performance WHERE partition_date = '{month_to_fetch}';"preview_df = conn.execute(preview_query).fetchdf()row_count = conn.execute(f"""SELECT COUNT(*) FROM monthly_sales_performance WHERE partition_date = '{month_to_fetch}';""").fetchone()[0] if conn.execute(f"""SELECT COUNT(*) FROM monthly_sales_performance WHERE partition_date = '{month_to_fetch}';""").fetchone() else 0return dg.MaterializeResult(metadata={"row_count": dg.MetadataValue.int(row_count),"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False))})
創建類別分區資產
基于產品類別的分區
接下來,我們創建一個基于預定義產品類別的靜態分區資產:
product_category_partition = dg.StaticPartitionsDefinition(["Electronics", "Books", "Home and Garden", "Clothing"
])@dg.asset(deps=[joined_data],partitions_def=product_category_partition,group_name="analysis",compute_kind="duckdb"
)
def product_performance(context: dg.AssetExecutionContext, duckdb: DuckDBResource):product_category_str = context.partition_keywith duckdb.get_connection() as conn:# 創建表(如果不存在)conn.execute("""CREATE TABLE IF NOT EXISTS product_performance (product_category varchar,product_name varchar,total_dollar_amount double,total_units_sold double);""")# 刪除該類別已有數據conn.execute(f"""DELETE FROM product_performance WHERE product_category = '{product_category_str}';""")# 插入新數據conn.execute(f"""INSERT INTO product_performanceSELECT '{product_category_str}' AS product_category, product_name, SUM(dollar_amount) AS total_dollar_amount, SUM(quantity) AS total_units_soldFROM joined_dataWHERE category = '{product_category_str}'GROUP BY '{product_category_str}', product_name;""")# 預覽數據preview_query = f"SELECT * FROM product_performance WHERE product_category = '{product_category_str}';"preview_df = conn.execute(preview_query).fetchdf()row_count = conn.execute(f"""SELECT COUNT(*) FROM product_performance WHERE product_category = '{product_category_str}';""").fetchone()[0] if conn.execute(f"""SELECT COUNT(*) FROM product_performance WHERE product_category = '{product_category_str}';""").fetchone() else 0return dg.MaterializeResult(metadata={"row_count": dg.MetadataValue.int(row_count),"preview": dg.MetadataValue.md(preview_df.to_markdown(index=False))})
將分區資產添加到Definitions
完成資產定義后,需要將它們添加到Dagster的Definitions對象中:
defs = dg.Definitions(assets=[products, sales_reps, sales_data, joined_data, monthly_sales_performance, product_performance,], asset_checks=[missing_dimension_check],resources={"duckdb": DuckDBResource(database="data/mydb.duckdb")}
)
物化分區資產
在Dagster UI中操作這些分區資產的步驟:
- 導航到"Assets"頁面
- 點擊"Reload definitions"重新加載定義
- 選擇"monthly_sales_performance"資產,然后點擊"Materialize selected"
- 確保選擇所有分區
- 啟動回填(backfill)作業
- 選擇"product_performance"資產,然后點擊"Materialize selected"
- 確保選擇所有分區
- 啟動回填作業
下一步計劃
現在我們已經建立了ETL管道的主要資產,下一步可以考慮:
- 添加自動化調度
- 實現數據質量監控
- 添加異常處理機制
- 優化查詢性能
- 擴展更多維度的分析
通過合理使用分區技術,我們可以顯著提高Dagster管道的性能和可維護性,特別是在處理大規模數據集時。