第五章 實戰案例
5.1. 案例一
5.1.1. 案例介紹
MySQL數據庫中有兩張表:用戶表(users),訂單表(orders)。其中用戶表中存儲的是所有的用戶的信息,訂單表中存儲的是所有的訂單的信息。表結構如下:
-
用戶表 users:
- id:用戶id
- username:用戶名
- password:用戶密碼
- email:用戶郵箱
- phone:用戶手機號碼
- real_name:用戶的真實姓名
- registration_time:用戶的注冊時間
- last_login_time:用戶的上次登錄時間
- status:用戶的狀態(活躍、不活躍、凍結)
-
訂單表 orders:
- id:訂單ID
- user_id:用戶ID
- seller_id:賣家ID
- product_id:商品ID
- product_name:商品名稱
- product_price:商品單價
- quantity:購買數量
- total_price:訂單總價
- order_time:訂單時間
業務系統中,每天都有新的用戶注冊,每天也都在產生大量的訂單。今天公司剛剛搭建了數據倉庫,需要將已有的數據導入到Hive表中,此時需要將已有的數據全量的導入到Hive的表中。后續每天產生的新用戶注冊和新的訂單,增量的導入到對應的Hive表中。
5.1.2. 數據準備
MySQL中初始數據
# 創建數據庫
CREATE DATABASE datax_shop;
USE datax_shop;# 創建用戶表
DROP TABLE IF EXISTS users;
CREATE TABLE users (id INT(11) UNSIGNED NOT NULL AUTO_INCREMENT,username VARCHAR(50) NOT NULL,password VARCHAR(255) NOT NULL,email VARCHAR(255) NOT NULL,phone VARCHAR(20) NOT NULL,real_name VARCHAR(50) NOT NULL,registration_time DATE NOT NULL,last_login_time DATE NULL DEFAULT NULL,status ENUM('active', 'inactive', 'frozen') NOT NULL DEFAULT 'active',PRIMARY KEY (id),UNIQUE KEY (username),UNIQUE KEY (email),UNIQUE KEY (phone)
);# 插入一些數據
INSERT INTO users (username, password, email, phone, real_name, registration_time, last_login_time) VALUES
('johndoe','123456','johndoe@163.com','17767827612','John Doe','2020-12-12','2022-09-12'),
('janedoe','123123','janedoe@qq.com','18922783392','Jane Doe','2021-02-12','2022-12-10'),
('bobsmith','121212','bobsmith@126.com','17122811292','Bob Smith','2020-10-11','2022-01-15'),
('sarahlee','11111','sarahlee@qq.com','17122810911','Sarah Lee','2019-03-15','2022-02-15'),
('jimmychang','123121','jimmychang@qq.com','155514442134','Jimmy Chang','2022-12-11', NULL),
('alexjohnson','121212','alexjohnson@126.com','15522427212','Alex Johnson','2021-09-01', NULL);# 創建訂單表
DROP TABLE IF EXISTS orders;
CREATE TABLE orders (id INT PRIMARY KEY AUTO_INCREMENT,user_id INT NOT NULL,seller_id INT NOT NULL,product_id INT NOT NULL,product_name VARCHAR(255) NOT NULL,product_price DECIMAL(10, 2) NOT NULL,quantity INT NOT NULL,total_price DECIMAL(10, 2) NOT NULL,order_time DATE NOT NULL
);# 插入一些數據
INSERT INTO orders (user_id, seller_id, product_id, product_name, product_price, quantity, total_price, order_time) VALUES
(1, 1, 12, '電動牙刷', 90, 1, 90, '2020-12-20'),
(1, 2, 15, '洗面奶', 45, 1, 45, '2020-12-20'),
(1, 3, 17, '面膜', 110, 2, 220, '2020-12-20'),
(2, 1, 11, 'iPad', 5990, 1, 5990, '2021-12-20'),
(2, 2, 19, 'iPhone數據線', 18, 1, 18, '2021-11-20'),
(3, 1, 20, 'iPhone手機殼', 80, 1, 80, '2020-12-20'),
(3, 2, 22, '榴蓮', 45, 4, 180, '2021-09-12'),
(3, 3, 23, '西瓜', 12, 5, 60, '2021-11-11'),
(4, 1, 4, '洗地機', 2990, 1, 2990, '2020-06-18'),
(4, 2, 7, '油污清潔劑', 78, 2, 156, '2020-07-11'),
(4, 3, 11, '鏡子', 10, 1, 10, '2020-06-20'),
(5, 1, 9, '健力寶', 48, 2, 96, '2022-12-20');
Hive表的創建
-- 創建數據庫
create database datax_shop;-- 創建用戶表
drop table if exists datax_shop.users;
create table datax_shop.users (id int,username string,password string,email string,phone string,real_name string,registration_time string,last_login_time string,status string
)
row format delimited
fields terminated by '\t'
lines terminated by '\n'
stored as orcfile;-- 創建訂單表
drop table if exists datax_shop.orders;
create table datax_shop.orders (id int,user_id int,seller_id int,product_id int,product_name string,product_price double,quantity int,total_price double,order_time string
)
partitioned by (year string, month string)
row format delimited
fields terminated by '\t'
lines terminated by '\n'
stored as orcfile;
5.1.3. 數據全量導入
用戶表全量導入
{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "123456","column": ["id","username","password","email","phone","real_name","registration_time","last_login_time","status"],"connection": [{"jdbcUrl": ["jdbc:mysql://qianfeng01:3306/datax_shop"],"table": ["users"]}]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://qianfeng01:9820","path": "/user/hive/warehouse/datax_shop.db/users","fileName": "original","writeMode": "append","fieldDelimiter": "\t","fileType": "orc","column": [{"name": "id", "type": "int"},{"name": "username", "type": "string"},{"name": "password", "type": "string"},{"name": "email", "type": "string"},{"name": "phone", "type": "string"},{"name": "real_name", "type": "string"},{"name": "registration_time", "type": "string"},{"name": "last_login_time", "type": "string"},{"name": "status", "type": "string"}]}}}]}
}
訂單表全量導入
訂單表在全量導入的時候,因為要按照訂單創建時候的日期作為分區的字段。所以需要創建一張臨時表,先將MySQL中的訂單數據全量的導入到這個臨時表中,然后再從這個臨時表加載到訂單表的指定分區中。
-- 創建臨時表,用來承接全量導入的訂單信息
drop table if exists datax_shop.orders_origin;
create table datax_shop.orders_origin (id int,user_id int,seller_id int,product_id int,product_name string,product_price double,quantity int,total_price double,order_time string,year string,month string
)
row format delimited
fields terminated by '\t'
lines terminated by '\n';
創建數據同步方案,同步MySQL的訂單數據到這個臨時表中
{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://qianfeng01:3306/datax_shop"],"table": ["orders"]}],"column": ["id","user_id","seller_id","product_id","product_name","product_price","quantity","total_price","order_time","year(order_time)","lpad(month(order_time), 2, 0)"]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://qianfeng01:9820","path": "/user/hive/warehouse/datax_shop.db/orders_origin/","fileName": "orders_origin","writeMode": "append","fieldDelimiter": "\t","fileType": "text","column": [{"name": "id", "type": "int"},{"name": "user_id", "type": "int"},{"name": "seller_id", "type": "int"},{"name": "product_id", "type": "int"},{"name": "product_name", "type": "string"},{"name": "product_price", "type": "double"},{"name": "quantity", "type": "double"},{"name": "total_price", "type": "double"},{"name": "order_time", "type": "string"},{"name": "year", "type": "string"},{"name": "month", "type": "string"}]}}}]}
}
加載數據,到orders表的對應分區中
-- 關閉嚴格模式
set hive.exec.dynamic.partition.mode=nonstrict;-- 導入數據到訂單表中
insert into datax_shop.orders partition(year, month) select * from datax_shop.orders_origin;
5.1.4. 增量數據導入
用戶表增量導入
在現有數據全量導入到Hive表中之后,每日新增的數據只需要增量導入到Hive即可。此時我們就可以按照用戶注冊的時間來確定需要將什么數據導入到Hive的用戶表中。
首先,我們在將現有的數據全量的導入到Hive之后,模擬新用戶的注冊。
INSERT INTO users (username, password, email, phone, real_name, registration_time, last_login_time) VALUES
('natalielin','121212','natalielin@qq.com','17788889999','Natalie Lin','2023-01-01', NULL),
('harrytran','123123','harrytran@126.com','17666228192','Harry Tran','2023-01-01', NULL),
('gracewang','313131','gracewang@163.com','18872631272','Grace Wang','2023-01-01', NULL),
('peterlee','123123','peterlee@qq.com','19822781829','Peter Lee','2023-01-01',NULL);
現在我們需要將在 2023-01-01 注冊的用戶,增量導入到Hive的用戶表中。
{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "123456","column": ["id","username","password","email","phone","real_name","registration_time","last_login_time","status"],"connection": [{"jdbcUrl": ["jdbc:mysql://qianfeng01:3306/datax_shop"],"table": ["users"]}],"where": "registration_time = '$date'"}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://qianfeng01:9820","path": "/user/hive/warehouse/datax_shop.db/users","fileName": "append","writeMode": "append","fieldDelimiter": "\t","fileType": "orc","column": [{"name": "id", "type": "int"},{"name": "username", "type": "string"},{"name": "password", "type": "string"},{"name": "email", "type": "string"},{"name": "phone", "type": "string"},{"name": "real_name", "type": "string"},{"name": "registration_time", "type": "string"},{"name": "last_login_time", "type": "string"},{"name": "status", "type": "string"}]}}}]}
}
在上述的數據同步的方案中,我們使用到了變量 date
用來表示需要增量導入用戶的注冊時間。在使用這個數據同步方案的時候,需要指定變量 date
的值:
python3 /usr/local/datax/bin/datax.py -p "-Ddate=2023-01-01" incr-users.json
訂單表增量導入
在現有的所有訂單數據全量導入到Hive的訂單表后,每天仍然會有大量的訂單數據生成。此時我們只需要按照天為單位,將某一天產生的所有的數據增量導入到Hive的訂單表中,并放置在指定的分區位置即可。
首先,在現有的訂單數據全量導入到Hive的訂單表之后,我們來模擬一些新增的訂單信息
INSERT INTO orders (user_id, seller_id, product_id, product_name, product_price, quantity, total_price, order_time) VALUES
(6, 1, 110, '大米', 90, 1, 90, '2023-01-01'),
(6, 2, 120, '護手霜', 20, 2, 40, '2023-01-01'),
(6, 3, 130, '地板', 120, 5, 600, '2023-01-01'),
(7, 1, 140, '筒燈', 100, 10, 1000, '2023-01-01'),
(7, 2, 150, '假發', 2000, 1, 2000, '2023-01-01'),
(7, 3, 160, '牛奶', 100, 2, 200, '2023-01-01'),
(8, 1, 170, '百褶裙', 1000, 2, 2000, '2023-01-01'),
(8, 2, 180, '真絲絲巾', 300, 2, 600, '2023-01-01'),
(8, 3, 190, '太陽鏡', 250, 1, 250, '2023-01-01'),
(9, 1, 200, '遮陽傘', 120, 1, 120, '2023-01-01'),
(9, 2, 210, '盆栽', 220, 5, 1100, '2023-01-01'),
(10, 1, 220, '口琴', 50, 1, 50, '2023-01-01'),
(10, 2, 230, 'RIO', 12, 10, 120, '2023-01-01');
現在我們需要將某一天的數據增量的導入到Hive對應的分區中,其實這個過程就是使用hdfswriter
將增量的數據直接寫入到HDFS的指定分區目錄下即可。但是需要保證這個分區已經被創建出來了。
-- 檢查指定分區是否存在
show partitions datax_shop.orders partition(year='2023', month='01');-- 如果這個分區不存在,就創建這個分區
alter table datax_shop.orders add partition(year='2023', month='01');
分區創建完成之后,就可以將某天新增的數據同步到對應的分區目錄了
{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "123456","column": ["id","user_id","seller_id","product_id","product_name","product_price","quantity","total_price","order_time"],"connection": [{"jdbcUrl": ["jdbc:mysql://qianfeng01:3306/datax_shop"],"table": ["orders"]}],"where": "order_time = '$date'"}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://qianfeng01:9820","path": "/user/hive/warehouse/datax_shop.db/orders/year=$year/month=$month","fileName": "append","writeMode": "append","fieldDelimiter": "\t","fileType": "orc","column": [{"name": "id", "type": "int"},{"name": "user_id", "type": "int"},{"name": "seller_id", "type": "int"},{"name": "product_id", "type": "int"},{"name": "product_name", "type": "string"},{"name": "product_price", "type": "double"},{"name": "quantity", "type": "double"},{"name": "total_price", "type": "double"},{"name": "order_time", "type": "string"}]}}}]}
}
在上述的數據同步方案中,我們定義了三個變量:date
、year
、month
,用來控制從MySQL數據庫導入的數據和存放到Hive的對應的分區。因此我們在使用這個配置同步數據的時候,需要指定這三個變量值:
python3 /usr/local/datax/bin/datax.py -p "-Ddate=2023-01-01 -Dyear=2023 -Dmonth=01" incr-orders.json
5.1.5. 腳本調度
我們已經實現了將指定日期(2023-01-01)的增量的數據導入到Hive對應的數據表中,但是這樣做不夠靈活,我們不能每一次在需要增量導入的時候都去執行上述的一個個命令。為了能夠更加方便的同步數據,以及可以定時的進行調度,我們可以將其做成一個腳本,在需要的時候直接調用即可。
shell腳本
#!/bin/bash# 獲取需要同步的數據的日期為昨天
# dt=`date -d yesterday +"%Y-%m-%d"`
dt='2023-01-01'# 提取年
year=${dt:0:4}
month=${dt:5:2}# 設置DataX路徑
DATAX_HOME=/usr/local/datax# 設置jobs路徑
JOBS_HOME=/root/datax-example# 增量導入用戶數據
python $DATAX_HOME/bin/datax.py -p "-Ddate=$dt" $JOBS_HOME/incr-users.json# 增量導入訂單數據
# 1. 檢查Hive表目標分區是否存在,如果目標分區不存在,創建分區
if [ ! hive -e "show partitions datax_shop.orders partition(year='$year', month='$month')" ]; thenhive -e "alter table datax_shop.orders add partition(year='$year', month='$month')"
fi# 3. 執行增量導入訂單
python $DATAX_HOME/bin/datax.py -p "-Ddate=$dt -Dyear=$year -Dmonth=$month" $JOBS_HOME/incr-orders.json
python腳本
# @Author :
# @Company : import datetime
import os
from pyhive import hive# 在腳本中需要和Hive進行交互,查詢Hive表的分區是否存在、創建分區等操作,因此需要使用到PyHive
# 如果沒有安裝的話,需要手動安裝一下PyHive
# yum install cyrus-sasl-plain cyrus-sasl-devel cyrus-sasl-gssapi
# pip3 install thrift
# pip3 install sasl
# pip3 install thrift-sasl
# pip3 install pyhive# PyHive需要使用Hive的ThriftServer服務,因此需要保證你的Hive對應的服務是打開的
# nohup hive --service hiveserver2 > /var/log/hiveserver2 2>&1 &# 設置 DataX 和 Jobs 的Home路徑
DATAX_HOME = "/usr/local/datax"
JOBS_HOME = "/root/datax-example"# 設置同步任務的JSON配置文件名
INCR_USERS = "incr-users.json"
INCR_ORDERS = "incr-orders.json"# 獲取當前時間
# now = datetime.datetime(2023, 1, 1)
now = datetime.datetime.now()
dt = str(now.date())
year = f"{now.year:0>4}"
month = f"{now.month:0>2}"# 增量導入用戶數據
os.system(f'python {DATAX_HOME}/bin/datax.py -p "-Ddate={dt}" {JOBS_HOME}/{INCR_USERS}')# 查看Hive的指定分區是否存在,如果不存在,創建分區
with hive.Connection(host="192.168.10.111", port=10000, username="root", database="datax_shop") as conn:with conn.cursor() as cursor:cursor.execute(f"show partitions orders partition(year='{year}', month='{month}')")partitions = cursor.fetchone()if partitions is None:# 說明這個分區不存在,創建cursor.execute(f"alter table orders add partition(year='{year}', month='{month}')")# 增量導入訂單數據
os.system(f'python {DATAX_HOME}/bin/datax.py -p "-Ddate={dt} -Dyear={year} -Dmonth={month}" {JOBS_HOME}/{INCR_ORDERS}')