在本文中,我們將探討如何利用dbt項目的代碼庫來實現一個簡單的SQLMesh項目。本文的基礎是基于Udemy講師為dbt課程創建的示例項目,可以在這個GitHub repo中獲得。這個dbt項目是相對完整的示例,我們將使用它作為模板來演示SQLMesh(下一代數據轉換工具)的功能。
dbt示例項目在Snowflake中使用Airbnb數據集分析端到端的數據工程工作流。該項目包括將原始數據加載到Snowflake中,創建各種模型,并執行轉換以從數據中獲得有意義的見解。我們打算使用duckdb作為分析數據庫,方便讀者直接在本機上測試運行。
簡要說明Airbnb數據集
本項目中使用的Airbnb數據集由三個主表組成:raw_listings、raw_reviews和raw_hosts。這些表格包含有關Airbnb房源的信息、對房源的評論以及房東的詳細信息。理解這些表之間的關系和意義對于處理和分析數據至關重要。
-
raw_listings(原始房源信息):
? 描述:此表包含有關愛彼迎房源的詳細信息,包括房源 ID、網址、名稱、房型、最短入住天數、房東 ID、價格以及創建和更新的時間戳。
? 重要性:房源數據提供了愛彼迎上可用房源的全面視圖。它是與評論和房東數據相連接的核心表。 -
原始評論:
? 描述:此表包含愛彼迎房源的評論,包含房源 ID、評論日期、評論者姓名、評論內容和情感傾向等信息。
? 重要性:評論數據能提供有關客戶體驗和滿意度的見解。它通過房源 ID 與房源數據相關聯。 -
raw_hosts:
? 描述:此表包含有關房源的信息,包括房源 ID、名稱、超級房源狀態以及創建和更新的時間戳。
? 重要性:房源數據提供了有關提供房源的個人或實體的詳細信息。它通過 host_id 與房源數據相關聯。
項目最終目標
本項目的首要目標是分析滿月日期對愛彼迎(Airbnb)評分和評論的影響。通過利用房源、評論和房東之間的關系,我們旨在得出有關月相周期對客戶反饋和滿意度影響的有意義的見解。
實現SQLMesh項目的步驟
- 初始化SQLMesh項目
? 創建一個名為airbnb_sqlmesh的文件夾,并進入該文件夾。執行如下命令初始化SQLMesh項目:
python -m venv .venv
source .venv/bin/activate
pip install "sqlmesh"
sqlmesh init duckdb
-
項目結構
在初始化完成后,將會創建以下目錄和文件,從而為我們的 SQLMesh 項目提供組織結構:
? config.yaml:用于您項目的數據庫配置文件。
? models:存放 SQL 和 Python 模型的目錄。
? audits:存放共享審計的目錄。
? tests:存放單元測試的目錄。
? macros:存放宏的目錄。
? 刪除models、seeds以及tests目錄下的示例文件,后續我們會添加實際業務需要的模型。
-
項目配置
在配置文件中定義SQLMesh項目配置。yaml文件。該文件包含數據庫連接細節和模型默認值:
gateways:duckdb:connection:type: duckdbdatabase: dw.dbdefault_gateway: duckdbmodel_defaults:dialect: duckdbstart: 2025-03-17
-
加載原始數據
使用以下SQL腳本將原始數據加載到duckdb表中:
INSTALL httpfs;
LOAD httpfs;-- Create and load raw_listings table
CREATE OR REPLACE TABLE raw_listings (id integer,listing_url string,name string,room_type string,minimum_nights integer,host_id integer,price string,created_at datetime,updated_at datetime
);COPY raw_listings FROM 's3://dbtlearn/listings.csv' (ignore_errors true);-- Create and load raw_reviews table
CREATE OR REPLACE TABLE raw_reviews (listing_id integer,date datetime,reviewer_name string,comments string,sentiment string
);COPY raw_reviews FROM 's3://dbtlearn/reviews.csv' (ignore_errors true);-- Create and load raw_hosts table
CREATE OR REPLACE TABLE raw_hosts (id integer,name string,is_superhost string,created_at datetime,updated_at datetime
);COPY raw_hosts FROM 's3://dbtlearn/hosts.csv' (ignore_errors true);
創建模型
源(Source )模型
我們在 models/source 文件夾內的原始數據表基礎上創建了三個模型 src_hosts.sql、src_listings.sql 和 src_reviews.sql:
本項目中的源模型旨在對來自 Airbnb 數據集的原始數據進行標準化和準備,以便進一步處理和分析。它們充當中間層,通過重命名列、選擇相關字段以及標準化數據等方式將原始數據轉換為更易于使用的格式。所有源模型都被實現為視圖,確保數據干凈、結構化,并為后續在維度和事實模型中的轉換和分析做好準備。
src_listings.sql 模型:
MODEL ( name src.SRC_LISTINGS,kind view
);WITH mr_listings AS (SELECT * FROM main.RAW_LISTINGS
)
SELECTid AS listing_id,name AS listing_name,listing_url,room_type,minimum_nights,host_id,price AS price_str,created_at,updated_at
FROM mr_listings;
src_reviews.sql模型:
MODEL (name src.SRC_REVIEWS,kind view
);WITH mr_reviews AS (SELECT * FROM main.raw_reviews
)
SELECTlisting_id,date AS review_date,reviewer_name,comments AS review_text,sentiment AS review_sentiment
FROM mr_reviews;
src_hosts.sql模型:
MODEL (name src.SRC_HOSTS,kind view
);WITH mr_hosts AS (SELECT * FROM main.RAW_HOSTS
)
SELECTid AS host_id,name AS host_name,is_superhost,created_at,updated_at
FROM mr_hosts;
運行命令,生成源模型:
sqlmesh plan dev
生成src__dev schema以及三個視圖:
D select table_catalog, table_schema,table_name, table_type from information_schema.tables where table_schema='src__dev';
┌───────────────┬──────────────┬──────────────┬────────────┐
│ table_catalog │ table_schema │ table_name │ table_type │
│ varchar │ varchar │ varchar │ varchar │
├───────────────┼──────────────┼──────────────┼────────────┤
│ dw │ src__dev │ src_hosts │ VIEW │
│ dw │ src__dev │ src_listings │ VIEW │
│ dw │ src__dev │ src_reviews │ VIEW │
└───────────────┴──────────────┴──────────────┴────────────┘
維度模型
在models/dim文件夾中創建維度模型作為視圖:
維度模型實現對源數據的清理、豐富和組合,以便進行詳細分析。它們處理空值、格式化字段和合并相關數據集。模型dim.dim_hosts_cleaned和dim. dim_listings_cleaned被具體化為視圖,而dim.dim_listings_w_hosts被具體化為一個完整的表,每次都完全重新加載。
dim.dim_hosts_cleaned模型:
MODEL (name dim.dim_hosts_cleansed,kind view
);WITH src_hosts AS (SELECT * FROM SOURCE.SRC_HOSTS
)
SELECThost_id,NVL(host_name, 'Anonymous') AS host_name,is_superhost,created_at,updated_at
FROM src_hosts;
dim. dim_listings_cleaned模型:
MODEL (name dim.dim_listings_cleansed,kind view
);WITH src_listings AS (SELECT * FROM SOURCE.SRC_LISTINGS
)
SELECTlisting_id,listing_name,room_type,CASE WHEN minimum_nights = 0 THEN 1 ELSE minimum_nights END AS minimum_nights,host_id,REPLACE(price_str, '$', '')::NUMERIC(10, 2) AS price,created_at,updated_at
FROM src_listings;
dim.dim_listings_w_hosts模型
MODEL (name dim.dim_listings_w_hosts,kind full
);WITH l AS (SELECT * FROM dim.dim_listings_cleansed
),
h AS (SELECT * FROM dim.dim_hosts_cleansed
)
SELECTl.listing_id,l.listing_name,l.room_type,l.minimum_nights,l.price,l.host_id,h.host_name,h.is_superhost AS host_is_superhost,l.created_at,GREATEST(l.updated_at, h.updated_at) AS updated_at
FROM l
LEFT JOIN h ON (h.host_id = l.host_id);
運行命令,生成維度表:
D select table_catalog, table_schema,table_name, table_type from information_schema.tables where table_schema='dim__dev';
┌───────────────┬──────────────┬───────────────────────┬────────────┐
│ table_catalog │ table_schema │ table_name │ table_type │
│ varchar │ varchar │ varchar │ varchar │
├───────────────┼──────────────┼───────────────────────┼────────────┤
│ dw │ dim__dev │ dim_hosts_cleansed │ VIEW │
│ dw │ dim__dev │ dim_listings_cleansed │ VIEW │
│ dw │ dim__dev │ dim_listings_w_hosts │ VIEW │
└───────────────┴──────────────┴───────────────────────┴────────────┘
事實模型
Fact 模型會在 models/fct 文件夾內創建增量型 Fact 模型:
Fact 模型 fct.reviews 會處理并匯總來自源模型的評論數據。這是一個增量型模型,意味著它只會加載新數據,這極大地減少了每次模型運行所需的計算資源。在模型中,@start 和 @end_date 是 SQLMesh 宏,在 sqlmesh 計劃或運行期間會根據運行的適當開始和結束日期進行渲染。此外,@GENERATE_SURROGATE_KEY 宏用于根據給定的輸入列生成 MD5 哈希值,為數據生成一個替代鍵。
MODEL (name fct.reviews,kind INCREMENTAL_BY_TIME_RANGE (time_column review_date)
);WITH src_reviews AS (SELECT * FROM SOURCE.SRC_REVIEWS
)
SELECT@GENERATE_SURROGATE_KEY(listing_id, review_date, reviewer_name, review_text) AS review_id,*
FROM src_reviews
WHERE review_text IS NOT NULLAND review_date BETWEEN @start_date AND @end_date;
運行命令,生成事實表:
D select table_catalog, table_schema,table_name, table_type from information_schema.tables where table_schema='fct__dev';
┌───────────────┬──────────────┬────────────┬────────────┐
│ table_catalog │ table_schema │ table_name │ table_type │
│ varchar │ varchar │ varchar │ varchar │
├───────────────┼──────────────┼────────────┼────────────┤
│ dw │ fct__dev │ reviews │ VIEW │
└───────────────┴──────────────┴────────────┴────────────┘
業務層模型
轉到業務層,我們需要分析滿月日期如何影響評審意見。為此,我們需要一個包含滿月日期的表。這可以使用SQLMesh中的種子文件來實現。從提供的鏈接下載種子文件,并將其放在SQLMesh項目的種子文件夾中。然后,創建一個名為full_moon_dates_seed.sql 的種子模型。
MODEL (name seed.full_moon_dates,kind SEED (path '../seeds/seed_full_moon_dates.csv')
);
運行命令,生成seed日期模型:
D select table_catalog, table_schema,table_name, table_type from information_schema.tables where table_schema='seed__dev';
┌───────────────┬──────────────┬─────────────────┬────────────┐
│ table_catalog │ table_schema │ table_name │ table_type │
│ varchar │ varchar │ varchar │ varchar │
├───────────────┼──────────────┼─────────────────┼────────────┤
│ dw │ seed__dev │ full_moon_dates │ VIEW │
└───────────────┴──────────────┴─────────────────┴────────────┘
對于最終的模型,創建名為models/mart的文件夾,并在其中添加以下模型:
MODEL (name mart.mart_fullmoon_reviews,kind full
);WITH fct_reviews AS (SELECT * FROM fct.reviews
),
full_moon_dates AS (SELECT * FROM seed.full_moon_dates
)SELECTr.*,CASEWHEN fm.full_moon_date IS NULL THEN 'not full moon'ELSE 'full moon'END AS is_full_moon
FROMfct_reviews as rLEFT JOIN full_moon_dates as fmON ( r.review_date = DATE_ADD(strptime(fm.full_moon_date, '%Y-%m-%d'), INTERVAL 1 DAY) )
運行命令,生成模型:
D select table_catalog, table_schema,table_name, table_type from information_schema.tables where table_schema='mart__dev';
┌───────────────┬──────────────┬───────────────────────┬────────────┐
│ table_catalog │ table_schema │ table_name │ table_type │
│ varchar │ varchar │ varchar │ varchar │
├───────────────┼──────────────┼───────────────────────┼────────────┤
│ dw │ mart__dev │ mart_fullmoon_reviews │ VIEW │
└───────────────┴──────────────┴───────────────────────┴────────────┘
一旦所有的模型都創建好了,項目結構應該是這樣的:
在開發模式下測試成功后,我們在生產模式下運行sqlmesh計劃:
sqlmesh plan prod
輸出結果:
sqlmesh plan prod`prod` environment will be initializedModels:
└── Added:├── dim.dim_hosts_cleansed├── dim.dim_listings_cleansed├── dim.dim_listings_w_hosts├── fct.reviews├── mart.mart_fullmoon_reviews├── seed.full_moon_dates├── src.src_hosts├── src.src_listings└── src.src_reviews
Apply - Virtual Update [y/n]:
選擇y,sqlmesh會自動創建生產環境所需的模型,SQLMesh不會像在開發環境中那樣重新處理所有數據。相反,它重用在開發環境中創建的模型。有關SQLMesh虛擬環境的更多信息,請參閱此鏈接。
最后通過命令sqlmesh ui
啟動web開發頁面,圖示如下: