對于緩慢變化的維度表,如客戶表,員工表,為了不丟失歷史數據,又不至于太浪費存儲空間,我們采用拉鏈表實現。
實現過程如下:
1、采集初始數據:
1.1 從mysql導出數據到hdfs
/data/dolphinscheduler/loadMysqlTable.sh "${dbConnect}" "select ${slctColums} from loan.${tableName} t where update_time < date_add('${bizDate}', INTERVAL 1 DAY) or update_time is null" "/dmp/${DMP_DB}" "${srcSystem}" "${bizDate}"
關于loadMysqlTable.sh腳本可以看我的另一篇文章:
https://blog.csdn.net/weixin_45357522/article/details/149720803
其中,變量的示例:
${slctColums}:“t.id, t.name, t.age …”
${dbConnect}:“-h host1 -uroot -ppwd123”
${tableName}: “t_customer”
${bizDate}: “2025-07-30”
${srcSystem}: “crm”
${DMP_DB}: 數據中臺的hive db name, 如:“rdm”
這里要注意的是sql語句加入一個條件,update_time < date_add(‘${bizDate}’, INTERVAL 1 DAY) or update_time is null,只把T-1以前更新的數據導出來,當時更新的數據要等到第二天才導出,以免重復導入,產生假的變更記錄。
1.2 基于上傳到hdfs的文件創建一個臨時表
1.3 從臨時表中過濾掉非法數據,如id為空的數據,加載到ODS
1.4 從ods表加載到dwd,并初始化valid_from = ‘1900-01-01’, valid_to = ‘2999-12-31’
insert overwrite table ${DMP_DB}.dwd_dim_${tableName}
SELECT ${slctColums}, '1900-01-01', '2999-12-31'
from ${DMP_DB}.ods_${srcSystem}_${tableName} t;
其中,${slctColums}的示例:t.id, t.name, t.age …
dwd表最后兩個字段分別是valid_from, valid_to, 日期型
表結構示例:
CREATE TABLE cust.dwd_dim_loan_individual_customer (id varchar(32) COMMENT '個人客戶id',code varchar(50) COMMENT '客戶編號',name varchar(100) COMMENT '客戶名稱',status tinyint COMMENT '客戶狀態',id_type int COMMENT '證件類型 id_type',id_code varchar(22) COMMENT '證件號碼',id_expire_date timestamp COMMENT '身份證有效期到',gender tinyint COMMENT '性別',age TINYINT COMMENT '年齡',valid_from date COMMENT '生效日期(含)',valid_to date COMMENT '失效日期(不含)'
) COMMENT '個人客戶'
STORED AS parquet
location '/dmp/cust/dwd/dim/loan_individual_customer';
取數邏輯:
select ... from table1
where valid_from <= current_date and current_date < valid_to
2、增量數據加載
2.1 從mysql導出數據到hdfs
/data/dolphinscheduler/loadMysqlTable.sh "${dbConnect}" "select ${slctColums} from loan.${tableName} t where update_time < date_add('${bizDate}', INTERVAL 1 DAY) and update_time >= '${bizDate}'" "/dmp/${DMP_DB}" "${srcSystem}" "${bizDate}"
其中,只取更新時間是T-1的數據,T日更新數據留到下一天處理。
2.2 基于上傳到hdfs的文件創建一個臨時表
2.3 從臨時表中過濾掉非法數據,如id為空的數據,加載到ODS
2.4 從ods表加載到dwd
這步比較關鍵,有些邏輯:
insert overwrite table ${DMP_DB}.dwd_dim_${tableName}
SELECT ${slctColums}, valid_from,
CASE WHEN t.valid_to = to_date('2999-12-31') AND B.id IS NOT NULLTHEN date_add('${bizDate}', 1) -- 有新記錄匹配到了,把valid_to設置為下一天ELSE t.valid_to -- 沒有新記錄匹配到,valid_to不變END AS valid_to
FROM cust.dwd_dim_loan_individual_customer t
LEFT JOIN cust.ods_yecai_loan_individual_customer AS b
ON t.id = b.id where valid_from != date_add('${bizDate}', 1) -- 重跑這個條件用于濾除當天先前加入的數據
UNION all -- 把新記錄加進去
SELECT ${slctColums},date_add('${bizDate}', 1) AS valid_from, to_date('2999-12-31') AS valid_to
FROM ${DMP_DB}.ods_${srcSystem}_${tableName} AS t
重要優化就是舊表和新表只join一次!!!
示例數據:
a. 老數據
id | name | age | valid_from | valid_to |
---|---|---|---|---|
1 | 張三 | 25 | 1900-01-01 | 2999-12-31 |
2 | 李四 | 45 | 1900-01-01 | 2999-12-31 |
b.新數據
id | name | age |
---|---|---|
1 | 張三 | 26 |
c. 合并后的數據
id | name | age | valid_from | valid_to |
---|---|---|---|---|
1 | 張三 | 25 | 1900-01-01 | 2025-07-31 |
1 | 張三 | 26 | 2025-07-31 | 2999-12-31 |
2 | 李四 | 45 | 1900-01-01 | 2999-12-31 |
sql中“where valid_from != date_add(‘${bizDate}’, 1)”的作用是在重跑數據時把當天新加入的數據濾除掉