1. 維度主題表數據導出
1.1 PostgreSQL介紹
PostgreSQL 是一個功能強大的開源對象關系數據庫系統,它使用和擴展了 SQL 語言,并結合了許多安全存儲和擴展最復雜數據工作負載的功能。
官方網址:PostgreSQL: The world's most advanced open source database 中文文檔:http://www.postgres.cn/docs/14/index.html
PostgreSQL數據庫是目前功能最強大的開源數據庫,它是最接近工業標準SQL92的查詢語言,至少實現了SQL:2011標準中要求的179項主要功能中的160項(注:目前沒有哪個數據庫管理系統能完全實現SQL:2011標準中的所有主要功能)。
1.2 PostgreSQL基本操作
-
登錄客戶端
#psql -h 服務器 -p 端口地址 -d 數據庫 -U 用戶名 ? ?
psql -h 127.0.0.1 -p 5432 -d postgres -U postgres
#密碼:itcast123
-
基本增刪改查:
select datname from pg_database; 或 \l ? 查看所有的庫 ? ? ? ?
切換數據庫: \c 數據庫名稱
查看所有表: \d
查看表結構: SELECT column_name FROM information_schema.columns WHERE table_name ='table_name';
其他操作跟MySQL大體類似。詳情可參考拓展資料
1.3 PostgreSQL中創建結果表
所有的DWD層表都需要進行導出,這里以dwd_dim_goods_i表為例,進行演示。 其他建表語句詳見 資料/代碼/pg建表/postsql_dw_dim.sql
創建數據庫: CREATE DATABASE dw;
通過datagrip切換到public下: 在datagrip中,點擊右上方的數據庫選項
創建表操作:
建表的時候學會使用快捷鍵
alt+鼠標左鍵: 字符多選
alt+鼠標左鍵 然后 再ctrl+shift+右鍵→ : 選多個單詞
home: 快速到行開頭
end: 快速到行結尾
CREATE TABLE IF NOT EXISTS dwd_dim_goods_i(goods_id ? ? ? ? ? ? ? INT ? ? ? ? ? ,goods_no ? ? ? ? ? ? ? VARCHAR(50) ? ,goods_name ? ? ? ? ? ? VARCHAR(200) ,first_category_id ? ? ? INT ? ? ? ? ? ,first_category_no ? ? ? VARCHAR(50) ? ,first_category_name ? ? VARCHAR(50) ? ,second_category_id ? ? INT ? ? ? ? ? ,second_category_no ? ? VARCHAR(50) ? ,second_category_name ? VARCHAR(50) ? ,third_category_id ? ? ? INT ? ? ? ? ? ,third_category_no ? ? ? VARCHAR(50) ? ,third_category_name ? ? VARCHAR(50) ? ,brand_no ? ? ? ? ? ? ? VARCHAR(50) ? ,spec ? ? ? ? ? ? ? ? ? VARCHAR(50) ? ,sale_unit ? ? ? ? ? ? ? VARCHAR(50) ? ,life_cycle_status ? ? ? VARCHAR(50) ? ,tax_rate_status ? ? ? ? INT ? ? ? ? ? ,tax_rate ? ? ? ? ? ? ? VARCHAR(50) ? ,tax_value ? ? ? ? ? ? ? DECIMAL(27, 3),order_multiple ? ? ? ? DECIMAL(27, 2),pack_qty ? ? ? ? ? ? ? DECIMAL(27, 3),split_type ? ? ? ? ? ? VARCHAR(50) ? ,is_sell_by_piece ? ? ? INT ? ? ? ? ? ,is_self_support ? ? ? ? INT ? ? ? ? ? ,is_variable_price ? ? ? INT ? ? ? ? ? ,is_double_measurement ? INT ? ? ? ? ? ,is_must_sell ? ? ? ? ? INT ? ? ? ? ? ,is_seasonal ? ? ? ? ? ? INT ? ? ? ? ? ,seasonal_start_time ? ? VARCHAR(50) ? ,seasonal_end_time ? ? ? VARCHAR(50) ? ,is_deleted ? ? ? ? ? ? INT ? ? ? ? ? ,goods_type ? ? ? ? ? ? VARCHAR(50) ? ,create_time ? ? ? ? ? ? TIMESTAMP ? ? ,update_time ? ? ? ? ? ? TIMESTAMP ? ? ,PRIMARY KEY (goods_no)
);
COMMENT on table dwd_dim_goods_i is '商品表';
COMMENT on column dwd_dim_goods_i.goods_id ? ? ? ? ? ? ? ? ? ? is '商品ID';
COMMENT on column dwd_dim_goods_i.goods_no ? ? ? ? ? ? ? ? ? ? is '商品編碼';
COMMENT on column dwd_dim_goods_i.goods_name ? ? ? ? ? ? ? ? ? is '名稱';
COMMENT on column dwd_dim_goods_i.first_category_id ? ? ? ? ? ? is '一級分類ID';
COMMENT on column dwd_dim_goods_i.first_category_no ? ? ? ? ? ? is '一級分類編碼';
COMMENT on column dwd_dim_goods_i.first_category_name ? ? ? ? ? is '一級分類';
COMMENT on column dwd_dim_goods_i.second_category_id ? ? ? ? ? is '二級分類ID';
COMMENT on column dwd_dim_goods_i.second_category_no ? ? ? ? ? is '二級分類編碼';
COMMENT on column dwd_dim_goods_i.second_category_name ? ? ? ? is '二級分類';
COMMENT on column dwd_dim_goods_i.third_category_id ? ? ? ? ? ? is '三級分類ID';
COMMENT on column dwd_dim_goods_i.third_category_no ? ? ? ? ? ? is '三級分類編碼';
COMMENT on column dwd_dim_goods_i.third_category_name ? ? ? ? ? is '三級分類';
COMMENT on column dwd_dim_goods_i.brand_no ? ? ? ? ? ? ? ? ? ? is '品牌編號';
COMMENT on column dwd_dim_goods_i.spec ? ? ? ? ? ? ? ? ? ? ? ? is '商品規格';
COMMENT on column dwd_dim_goods_i.sale_unit ? ? ? ? ? ? ? ? ? ? is '銷售單位';
COMMENT on column dwd_dim_goods_i.life_cycle_status ? ? ? ? ? ? is '生命周期狀態';
COMMENT on column dwd_dim_goods_i.tax_rate_status ? ? ? ? ? ? ? is '稅率審核狀態 (0:未提交審核 1:待財務審核 2:稅率已審核 3:未通過)';
COMMENT on column dwd_dim_goods_i.tax_rate ? ? ? ? ? ? ? ? ? ? is '稅率code';
COMMENT on column dwd_dim_goods_i.tax_value ? ? ? ? ? ? ? ? ? ? ? ? ? ? is '稅率';
COMMENT on column dwd_dim_goods_i.order_multiple ? ? ? ? ? ? ? ? ? ? ? is '訂貨倍數';
COMMENT on column dwd_dim_goods_i.pack_qty ? ? ? ? ? ? ? ? ? ? ? ? ? ? is '箱裝數量';
COMMENT on column dwd_dim_goods_i.split_type ? ? ? ? ? ? ? ? ? is '分割屬性';
COMMENT on column dwd_dim_goods_i.is_sell_by_piece ? ? ? ? ? ? is '是否拆零,0:不拆;1:拆';
COMMENT on column dwd_dim_goods_i.is_self_support ? ? ? ? ? ? ? is '是否自營 0:非自營;1:自營';
COMMENT on column dwd_dim_goods_i.is_variable_price ? ? ? ? ? ? is '分店可變價 0:不可;1:可以';
COMMENT on column dwd_dim_goods_i.is_double_measurement ? ? ? ? is '是否雙計量商品 0:否;1:是';
COMMENT on column dwd_dim_goods_i.is_must_sell ? ? ? ? ? ? ? ? is '必賣品 0:非;1:是';
COMMENT on column dwd_dim_goods_i.is_seasonal ? ? ? ? ? ? ? ? ? is '季節性商品 0:非;1:是';
COMMENT on column dwd_dim_goods_i.seasonal_start_time ? ? ? ? ? is '季節性開始時間';
COMMENT on column dwd_dim_goods_i.seasonal_end_time ? ? ? ? ? ? is '季節性結束時間';
COMMENT on column dwd_dim_goods_i.is_deleted ? ? ? ? ? ? ? ? ? is '是否刪除0:正常;1:刪除';
COMMENT on column dwd_dim_goods_i.goods_type ? ? ? ? ? ? ? ? ? is '商品類型 1-國產食品 2-進口食品 3-國產非食品 4-進口非食品';
COMMENT on column dwd_dim_goods_i.create_time ? ? ? ? ? ? ? ? ? ? is '該記錄創建時間';
COMMENT on column dwd_dim_goods_i.update_time ? ? ? ? ? ? ? ? ? ? is '該記錄最后更新時間';
1.4 基于DataX完成數據導出
新建postgresql-dw數據源
構建任務
hive中以-i結尾的維表是有分區的,每個分區保存一個快照,而postgresql中只保留最新的快照數據。所以構建reader讀取hive表時不需要dt字段,導入到postgresql時,默認只導入最新的快照。 另外,為了防止postgresql中的歷史數據有臟數據,在導入之前可以先清空數據。所以在構建postgresql writer時,需要加上前置sql:truncate table public.dwd_dim_goods_i。 ? 操作如下:注意:在構建reader時,要指定導出的分區,指定的方式是在path中通過傳參的方式,${partition}在運行時動態指定。 這個案例中path為:/user/hive/warehouse/dim.db/dwd_dim_goods_i/${partition} ?
依次點擊構建、選擇模板。
編輯任務:
注意:如果是以-f結尾的維表,因為沒有分區,在指定path路徑以及在最后指定參數時,都不需要考慮分區。
執行任務:
按照以上步驟配置完dwd層所有維表導出任務的配置。
2. 基于海豚調度完成維度主題上線
2.1 DS的基本介紹
DS是apache旗下的頂級開源項目, 是一款工作流的任務調度的系統, 可以對工作流的定時周期化的調度工作, DS早期來自于國內的易觀大數據公司開發, 最終貢獻給Apache
2.2 DS的架構
針對DS的架構流程, 要求: 整個過程能夠講的出來
通過UI進行工作流的配置操作, 配置完成后, 將其提交執行, 此時執行請求會被API服務接收到, 接收到后, 隨機選擇一臺Master來完成任務的處理(DAG, 任務分配, 資源處理)(底層最終是有對應scheduler具體完成)(Master是去中心化的),完成分配后, 將對應執行的任務交給對應worker(從節點)來執行, worker對應有一個logger服務進行日志的記錄, 在執行過程中, 通過logger實時查看執行日志, 當執行完成后, 通知Master, Master進行狀態變更,同時告警服務實時監控狀態, 一旦發現狀態出現異常, 會立即根據所匹配的告警方案, 通知給相關的人員
2.3 如何啟動DS
cd /export/server/dolphinscheduler/
./bin/start-all.sh
如何訪問DS:
訪問地址: http://192.168.88.80:12345/dolphinscheduler/ui/view/login/index.html ? 用戶名: admin 密碼: dolphinscheduler123
2.4 DS的安全中心
2.4.1 隊列和租戶
2.4.2 用戶管理
2.4.3 告警組
2.4.4 worker分組
一般在安裝DS的時候會直接配置好
2.4.5 權限管理
2.5 項目和調度操作[練習]
創建項目
創建工作流
-
創建目錄節點:
-
創建文件節點:
-
建立連接:
保存工作流:
上線運行工作流
-
注意如下配置選項:
配置解釋如下:
點擊上線工作流
查看工作流狀態:
2.6 數據源中心
作用: 用于配置在工作流中需要連接各個數據源信息
比如: 工作流中需要直接連接HIVE,那么我們就可以配置一個HIVE的數據源
連接HIVE的數據源:
2.7 進行部署上線操作
注意: 從業務庫 –> ODS層操作, 是由DataX-Web進行周期調度執行處理, 每天凌晨20分開始運行, 此部分我們不需要在DS中配置
本次上線: 需要將從ODS –> DWD –> 數據導出 整個流程需要在DS中進行配置
注意: worker分組, 必須只能選擇hadoop01(因為DataX Hive 都在Hadoop01節點上, Hadoop02沒有的)
start的shell節點
-
1- 創建一個 start的shell節點, 表示整個工作流的開始
無分區表
ODS層到DWD層
-
2- 配置 ODS層 到 DWD層相關SQL語句 (以其中一個表詳細記錄)
insert overwrite table dim.dwd_dim_date_f
selecttrade_date,year_code,month_code,day_code,quanter_code,quanter_name,week_trade_date,month_trade_date,week_end_date,month_end_date,last_week_trade_date,last_month_trade_date,last_week_end_date,last_month_end_date,year_week_code,week_day_code,day_year_num,month_days,is_weekend,days_after1,days_after2,days_after3,days_after4,days_after5,days_after6,days_after7
from dim.ods_dim_date_f
-- 開啟動態分區方案
-- 開啟非嚴格模式
set hive.exec.dynamic.partition.mode=nonstrict;
-- 開啟動態分區支持(默認true)
set hive.exec.dynamic.partition=true;
-- 設置各個節點生成動態分區的最大數量: 默認為100個 (一般在生產環境中, 都需要調整更大)
set hive.exec.max.dynamic.partitions.pernode=10000;
-- 設置最大生成動態分區的數量: 默認為1000 (一般在生產環境中, 都需要調整更大)
set hive.exec.max.dynamic.partitions=100000;
-- hive一次性最大能夠創建多少個文件: 默認為10w
set hive.exec.max.created.files=150000;
--hive壓縮
--開啟中間結果壓縮
set hive.exec.compress.intermediate=true;
--開啟最終結果壓縮
set hive.exec.compress.output=true;
--寫入時壓縮生效
set hive.exec.orc.compression.strategy=COMPRESSION;
連線:
DWD層導出到PG
思路: 在DataX中配置每一個表從dwd到PG的Json文件, 然后通過shell命令執行調度即可
cd /export/server/datax/job/
mkdir -p dim_dwd2pg_job
cd dim_dwd2pg_job/
配置json文件:
vim hive2pg_dwd_dim_date_f.json
輸入 i 進入插入模式:對應的Json內容, 可以直接從datax-web中獲取對應json內容 (注意: 需要將密碼修改回來, 不要使用加密, 因為加密的是datax-web加的, 與datax沒關系)
接下來直接在DS中配置使用即可
JOB_DIR="/export/server/datax/job/自己job目錄名稱/自己json文件名稱.json"
hdfs_path="/user/hive/warehouse/dim.db/dwd_dim_date_f"
if hdfs dfs -test -e "$hdfs_path";
thenpython /export/server/datax/bin/datax.py $JOB_DIR
elseecho "路徑不存在"
fi
上線運行
嘗試上線運行, 查看是否可以導入以及是否可以導出數據
建議: 可以將pg中對應表數據清空表以及HIVE中表數據清空掉
保存上線, 運行
驗證: 通過DS的狀態以及通過hive表和pg表查看是否成功
有分區表
ODS層到DWD層:
DWD層導出到PG
cd /export/server/datax/job/dim_dwd2pg_job/vim hive2pg_dwd_dim_source_type_map_i.json
輸入i進入插入模式添加json配置: 此配置直接從datax-web中獲取, 注意更改用戶和密碼
partition="dt=${inputdate}"
JOB_DIR="/export/server/datax/job/自己job目錄名稱/自己json文件名稱.json"
hdfs_path="/user/hive/warehouse/dim.db/dwd_dim_source_type_map_i/${partition}"
if hdfs dfs -test -e "$hdfs_path";
thenpython /export/server/datax/bin/datax.py -p "-Dpartition=$partition" $JOB_DIR
elseecho "路徑不存在"
fi
點擊保存, 設置全局參數: inputdate
上線運行
上線, 測試 查看是否可以正常導入:
完整工作流圖
DS的定時操作:
設置, 定時的狀態是下線狀態, 需要將其調整為上線
附錄:
hive參數配置
說明: 發現在執行數據導入到各個層次的時候, 需要在執行SQL之前, 添加很多的set的參數, 而且每個表的參數基本是一樣的, 此時可以嘗試將其配置到HIVE的公共部分
-
選擇Hive,點擊配置,搜索hive-site,然后選擇hive-site.xml 的 HiveServer2 高級配置代碼段(安全閥),然后點擊加號,將參數進行一個一個的配置
配置后, 點擊保存更改,然后重啟相關服務
pg所有腳本
以job目錄為: dim_job為例
hive2pg_dwd_dim_date_f
JOB_DIR="/export/server/datax/job/dim_job/dwd_dim_date_f.json"
hdfs_path="/user/hive/warehouse/dim.db/dwd_dim_date_f"
if hdfs dfs -test -e "$hdfs_path";
thenpython /export/server/datax/bin/datax.py $JOB_DIR
elseecho "路徑不存在"
fi
hive2pg_dwd_dim_category_statistics_i
partition="dt=${inputdate}"
JOB_DIR="/export/server/datax/job/dim_job/dwd_dim_category_statistics_i.json"
hdfs_path="/user/hive/warehouse/dim.db/dwd_dim_category_statistics_i/${partition}"
if hdfs dfs -test -e "$hdfs_path";
thenpython /export/server/datax/bin/datax.py -p "-Dpartition=$partition" $JOB_DIR
elseecho "路徑不存在"
fi
hive2pg_dwd_dim_goods_i
partition="dt=${inputdate}"
JOB_DIR="/export/server/datax/job/dim_job/dwd_dim_goods_i.json"
hdfs_path="/user/hive/warehouse/dim.db/dwd_dim_goods_i/${partition}"
if hdfs dfs -test -e "$hdfs_path";
thenpython /export/server/datax/bin/datax.py -p "-Dpartition=$partition" $JOB_DIR
elseecho "路徑不存在"
fi
hive2pg_dwd_dim_store_goods_i
partition="dt=${inputdate}"
JOB_DIR="/export/server/datax/job/dim_job/dwd_dim_store_goods_i.json"
hdfs_path="/user/hive/warehouse/dim.db/dwd_dim_store_goods_i/${partition}"
if hdfs dfs -test -e "$hdfs_path";
thenpython /export/server/datax/bin/datax.py -p "-Dpartition=$partition" $JOB_DIR
elseecho "路徑不存在"
fi
hive2pg_dwd_dim_store_clear_goods_i
partition="dt=${inputdate}"
JOB_DIR="/export/server/datax/job/dim_job/dwd_dim_store_clear_goods_i.json"
hdfs_path="/user/hive/warehouse/dim.db/dwd_dim_store_clear_goods_i/${partition}"
if hdfs dfs -test -e "$hdfs_path";
thenpython /export/server/datax/bin/datax.py -p "-Dpartition=$partition" $JOB_DIR
elseecho "路徑不存在"
fi
hive2pg_dwd_dim_source_type_map_i
partition="dt=${inputdate}"
JOB_DIR="/export/server/datax/job/dim_job/dwd_dim_source_type_map_i.json"
hdfs_path="/user/hive/warehouse/dim.db/dwd_dim_source_type_map_i/${partition}"
if hdfs dfs -test -e "$hdfs_path";
thenpython /export/server/datax/bin/datax.py -p "-Dpartition=$partition" $JOB_DIR
elseecho "路徑不存在"
fi
hive2pg_dwd_dim_store_i
partition="dt=${inputdate}"
JOB_DIR="/export/server/datax/job/dim_job/dwd_dim_store_i.json"
hdfs_path="/user/hive/warehouse/dim.db/dwd_dim_store_i/${partition}"
if hdfs dfs -test -e "$hdfs_path";
thenpython /export/server/datax/bin/datax.py -p "-Dpartition=$partition" $JOB_DIR
elseecho "路徑不存在"
fi