引言
在數據工程領域,處理大規模數據集和高頻率數據更新是一項挑戰。SQLMesh作為一款強大的數據編排工具,提供了增量模型功能,幫助數據工程師高效地管理和更新數據。本文將詳細介紹如何使用SQLMesh創建和管理基于時間范圍的增量模型,涵蓋從開發到生產的完整工作流程。
需求背景
假設你是一名數據工程師,負責處理一家直接面向客戶銷售軟件的公司的數據。你每天需要處理數百萬筆銷售交易數據,并且需要將這些數據與產品使用數據進行關聯,以更好地理解銷售趨勢和產品使用情況。
你面臨以下挑戰:
- 如何處理延遲到達的數據?
- 如何處理UTC和PST時間戳的轉換?
- 應該在什么時間運行這些任務?
- 如何測試這些數據?
- 如何高效地運行增量更新?
- 如何處理邊緣情況下的歷史數據錯誤?
- 如何編寫單元測試?
- 如何確保生產環境的數據完整性?
本文將通過一個完整的示例,展示如何使用SQLMesh解決這些問題。
開發工作流程
在SQLMesh中,典型的開發工作流程如下:
sqlmesh plan dev
: 創建一個新的開發環境sqlmesh fetchdf
: 在開發環境中預覽數據sqlmesh create_external_models
: 自動生成原始源表的列級血緣文檔sqlmesh plan
: 將模型從開發環境推廣到生產環境sqlmesh plan dev --forward-only
: 在開發環境中進行代碼更改,并僅處理新數據sqlmesh fetchdf
: 在開發環境中預覽更改后的數據sqlmesh create_test
: 自動生成單元測試sqlmesh test
: 運行單元測試sqlmesh plan
: 將更改推廣到生產環境
環境設置
我們將從一個現有的SQLMesh項目開始,該項目已經包含一些生產模型。假設我們已經有以下原始數據表:
原始產品使用數據
product_id | customer_id | last_usage_date | usage_count | feature_utilization_score | user_segment |
---|---|---|---|---|---|
PROD-101 | CUST-001 | 2024-10-25 23:45:00+00 | 120 | 0.85 | enterprise |
PROD-103 | CUST-001 | 2024-10-27 12:30:00+00 | 95 | 0.75 | enterprise |
… | … | … | … | … | … |
原始銷售數據
transaction_id | product_id | customer_id | transaction_amount | transaction_timestamp | payment_method | currency |
---|---|---|---|---|---|---|
TX-001 | PROD-101 | CUST-001 | 99.99 | 2024-10-25 08:30:00+00 | credit_card | USD |
TX-002 | PROD-102 | CUST-002 | 149.99 | 2024-10-25 09:45:00+00 | paypal | USD |
… | … | … | … | … | … | … |
模型配置
我們將創建一個增量模型demo.incrementals_demo
,該模型按天分區,并處理銷售數據和產品使用數據的關聯。
MODEL(name="demo.incrementals_demo",kind=INCREMENTAL_BY_TIME_RANGE(time_column="transaction_date",lookback=2, # 處理過去2天的延遲數據),start="2024-10-25", # 不回填此日期之前的數據cron="@daily", # 每天午夜UTC運行grain="transaction_id", # 主鍵audits=[UNIQUE_VALUES(columns=("transaction_id",)),NOT_NULL(columns=("transaction_id",)),]
)WITH sales_data AS (SELECTtransaction_id,product_id,customer_id,transaction_amount,transaction_timestamp,payment_method,currencyFROM sqlmesh-public-demo.tcloud_raw_data.salesWHERE transaction_timestamp BETWEEN @start_dt AND @end_dt
),product_usage AS (SELECTproduct_id,customer_id,last_usage_date,usage_count,feature_utilization_score,user_segmentFROM sqlmesh-public-demo.tcloud_raw_data.product_usageWHERE last_usage_date BETWEEN DATE_SUB(@start_dt, INTERVAL 30 DAY) AND @end_dt
)SELECTs.transaction_id,s.product_id,s.customer_id,s.transaction_amount,DATE(s.transaction_timestamp) as transaction_date,DATETIME(s.transaction_timestamp, 'America/Los_Angeles') as transaction_timestamp_pst,s.payment_method,s.currency,p.last_usage_date,p.usage_count,p.feature_utilization_score,p.user_segment,CASEWHEN p.usage_count > 100 AND p.feature_utilization_score > 0.8 THEN 'Power User'WHEN p.usage_count > 50 THEN 'Regular User'WHEN p.usage_count IS NULL THEN 'New User'ELSE 'Light User'END as user_type,DATE_DIFF(s.transaction_timestamp, p.last_usage_date, DAY) as days_since_last_usage
FROM sales_data s
LEFT JOIN product_usage pON s.product_id = p.product_idAND s.customer_id = p.customer_id
創建模型
首次創建模型時,我們需要將其添加到開發環境中:
sqlmesh plan dev
按照提示輸入回填的起始和結束日期,SQLMesh將自動創建物理表并執行初始數據加載。
跟蹤列級血緣
SQLMesh可以自動生成外部模型文檔,記錄原始表的列信息和數據類型:
sqlmesh create_external_models
通過SQLMesh UI,可以直觀地查看列級血緣關系。
進行更改
假設我們需要調整“Power User”的定義,將閾值從100次使用調整為50次使用。我們可以使用--forward-only
標志,僅對新數據應用更改:
sqlmesh plan dev --forward-only
SQLMesh會生成一個預覽表,允許我們在開發環境中測試更改,而不會影響歷史數據。
添加單元測試
使用sqlmesh create_test
命令可以自動生成單元測試配置文件:
sqlmesh create_test demo.incrementals_demo \--query sqlmesh-public-demo.tcloud_raw_data.product_usage "select * from sqlmesh-public-demo.tcloud_raw_data.product_usage where customer_id='CUST-001'" \--query sqlmesh-public-demo.tcloud_raw_data.sales "select * from sqlmesh-public-demo.tcloud_raw_data.sales where customer_id='CUST-001'" \--var start_dt '2024-10-25' \--var end_dt '2024-10-27'
運行單元測試:
sqlmesh test
推廣到生產環境
確認開發環境中的更改無誤后,可以將其推廣到生產環境:
sqlmesh plan
SQLMesh會自動處理模式演進和數據回填,確保生產環境的數據完整性。
總結
通過本文的示例,我們展示了如何使用SQLMesh創建和管理基于時間范圍的增量模型。SQLMesh的優勢在于:
- 自動處理數據分區,提高查詢效率
- 支持增量更新,減少資源消耗
- 提供強大的測試和驗證工具,確保數據質量
- 簡化開發到生產的流程,減少人為錯誤
希望這篇指南能幫助你更好地理解和使用SQLMesh,提升數據工程的效率和準確性。
注意:本文基于SQLMesh官方文檔和示例編寫,實際操作中請參考最新版本的SQLMesh文檔。