Hive
文章目錄
- Hive
- 6.1 Hive的架構
- 6.2 HQL轉換為MR流程
- 6.3 Hive和數據庫比較
- 6.4 內部表和外部表
- 6.5 系統函數
- 6.6 自定義UDF、UDTF函數
- 6.7 窗口函數
- 6.8 Hive優化
- 6.8.1 分組聚合
- 6.8.2 Map Join
- 6.8.3 SMB Map Join
- 6.8.4 Reduce并行度
- 6.8.5 小文件合并
- 6.8.6 謂詞下推
- 6.8.7 并行執行
- 6.8.8 CBO優化
- 6.8.9 列式存儲
- 6.8.10 壓縮
- 6.8.11 分區和分桶
- 6.8.12 更換引擎
- 6.8.13 幾十張表join 如何優化
- 6.9 Hive解決數據傾斜方法
- 6.10 Hive的數據中含有字段的分隔符怎么處理?
- 6.11 MySQL元數據備份
- 6.12 如何創建二級分區表?
- 6.13 Union與Union all區別
6.1 Hive的架構
6.2 HQL轉換為MR流程
1)解析器(SQLParser):將SQL字符串轉換成抽象語法樹(AST)
(2)語義分析器(Semantic Analyzer):將AST進一步抽象為QueryBlock(可以理解為一個子查詢劃分成一個QueryBlock)
(2)邏輯計劃生成器(Logical Plan Gen):由QueryBlock生成邏輯計劃
(3)邏輯優化器(Logical Optimizer):對邏輯計劃進行優化
(4)物理計劃生成器(Physical Plan Gen):根據優化后的邏輯計劃生成物理計劃
(5)物理優化器(Physical Optimizer):對物理計劃進行優化
(6)執行器(Execution):執行該計劃,得到查詢結果并返回給客戶端
6.3 Hive和數據庫比較
Hive 和數據庫除了擁有類似的查詢語言,再無類似之處。
1)數據存儲位置
Hive 存儲在 HDFS 。數據庫將數據保存在塊設備或者本地文件系統中。
2)數據更新
Hive中不建議對數據的改寫。而數據庫中的數據通常是需要經常進行修改的。
3)執行延遲
Hive 執行延遲較高。數據庫的執行延遲較低。當然,這個是有條件的,即數據規模較小,當數據規模大到超過數據庫的處理能力的時候,Hive的并行計算顯然能體現出優勢。
4)數據規模
Hive支持很大規模的數據計算;數據庫可以支持的數據規模較小。
6.4 內部表和外部表
元數據、原始數據
1)刪除數據時
內部表:元數據、原始數據,全刪除
外部表:元數據 只刪除
2)在公司生產環境下,什么時候創建內部表,什么時候創建外部表?
在公司中絕大多數場景都是外部表。
自己使用的臨時表,才會創建內部表;
6.5 系統函數
1)數值函數
(1)round:四舍五入;(2)ceil:向上取整;(3)floor:向下取整
2)字符串函數
(1)substring:截取字符串;(2)replace:替換;(3)regexp_replace:正則替換
(4)regexp:正則匹配;(5)repeat:重復字符串;(6)split:字符串切割
(7)nvl:替換null值;(8)concat:拼接字符串;
(9)concat_ws:以指定分隔符拼接字符串或者字符串數組;
(10)get_json_object:解析JSON字符串
3)日期函數
(1)unix_timestamp:返回當前或指定時間的時間戳
(2)from_unixtime:轉化UNIX時間戳(從 1970-01-01 00:00:00 UTC 到指定時間的秒數)到當前時區的時間格式
(3)current_date:當前日期
(4)current_timestamp:當前的日期加時間,并且精確的毫秒
(5)month:獲取日期中的月;(6)day:獲取日期中的日
(7)datediff:兩個日期相差的天數(結束日期減去開始日期的天數)
(8)date_add:日期加天數;(9)date_sub:日期減天數
(10)date_format:將標準日期解析成指定格式字符串
4)流程控制函數
(1)case when:條件判斷函數
(2)if:條件判斷,類似于Java中三元運算符
5)集合函數
(1)array:聲明array集合
(2)map:創建map集合
(3)named_struct:聲明struct的屬性和值
(4)size:集合中元素的個數
(5)map_keys:返回map中的key
(6)map_values:返回map中的value
(7)array_contains:判斷array中是否包含某個元素
(8)sort_array:將array中的元素排序
6)聚合函數
(1)collect_list:收集并形成list集合,結果不去重
(2)collect_set:收集并形成set集合,結果去重
6.6 自定義UDF、UDTF函數
1)在項目中是否自定義過UDF、UDTF函數,以及用他們處理了什么問題,及自定義步驟?
(1)目前項目中邏輯不是特別復雜就沒有用自定義UDF和UDTF
(2)自定義UDF:繼承G…UDF,重寫核心方法evaluate
(3)自定義UDTF:繼承自GenericUDTF,重寫3個方法:initialize(自定義輸出的列名和類型),process(將結果返回forward(result)),close
2)企業中一般什么場景下使用UDF/UDTF?
(1)因為自定義函數,可以將自定函數內部任意計算過程打印輸出,方便調試。
(2)引入第三方jar包時,也需要。
6.7 窗口函數
一般在場景題中出現手寫:分組TopN、行轉列、列轉行。
1)聚合函數
max:最大值。
min:最小值。
sum:求和。
avg:平均值。
count:計數。
2)跨行取值函數
(1)lead和lag
注:lag和lead函數不支持自定義窗口。
(2)first_value和last_value
3)排名函數
注:rank 、dense_rank、row_number不支持自定義窗口。
6.8 Hive優化
6.8.1 分組聚合
一個分組聚合的查詢語句,默認是通過一個MapReduce Job完成的。Map端負責讀取數據,并按照分組字段分區,通過Shuffle,將數據發往Reduce端,各組數據在Reduce端完成最終的聚合運算。
分組聚合的優化主要圍繞著減少Shuffle數據量進行,具體做法是map-side聚合。所謂map-side聚合,就是在map端維護一個Hash Table,利用其完成部分的聚合,然后將部分聚合的結果,按照分組字段分區,發送至Reduce端,完成最終的聚合。
相關參數如下:
--啟用map-side聚合,默認是true
set hive.map.aggr=true;--用于檢測源表數據是否適合進行map-side聚合。檢測的方法是:先對若干條數據進行map-side聚合,若聚合后的條數和聚合前的條數比值小于該值,則認為該表適合進行map-side聚合;否則,認為該表數據不適合進行map-side聚合,后續數據便不再進行map-side聚合。
set hive.map.aggr.hash.min.reduction=0.5;--用于檢測源表是否適合map-side聚合的條數。
set hive.groupby.mapaggr.checkinterval=100000;--map-side聚合所用的hash table,占用map task堆內存的最大比例,若超出該值,則會對hash table進行一次flush。
set hive.map.aggr.hash.force.flush.memory.threshold=0.9;
6.8.2 Map Join
Hive中默認最穩定的Join算法是Common Join。其通過一個MapReduce Job完成一個Join操作。Map端負責讀取Join操作所需表的數據,并按照關聯字段進行分區,通過Shuffle,將其發送到Reduce端,相同key的數據在Reduce端完成最終的Join操作。
優化Join的最為常用的手段就是Map Join,其可通過兩個只有Map階段的Job完成一個join操作。第一個Job會讀取小表數據,將其制作為Hash Table,并上傳至Hadoop分布式緩存(本質上是上傳至HDFS)。第二個Job會先從分布式緩存中讀取小表數據,并緩存在Map Task的內存中,然后掃描大表數據,這樣在map端即可完成關聯操作。
注:由于Map Join需要緩存整個小標的數據,故只適用于大表Join小表的場景。
相關參數如下:
--啟動Map Join自動轉換
set hive.auto.convert.join=true;
--開啟無條件轉Map Join
set hive.auto.convert.join.noconditionaltask=true;
--無條件轉Map Join小表閾值,默認值10M,推薦設置為Map Task總內存的三分之一到二分之一
set hive.auto.convert.join.noconditionaltask.size=10000000;
6.8.3 SMB Map Join
上節提到,Map Join只適用于大表Join小表的場景。若想提高大表Join大表的計算效率,可使用Sort Merge Bucket Map Join。
需要注意的是SMB Map Join有如下要求:
(1)參與Join的表均為分桶表,且分桶字段為Join的關聯字段。
(2)兩表分桶數呈倍數關系。
(3)數據在分桶內是按關聯字段有序的。
SMB Join的核心原理如下:只要保證了上述三點要求的前兩點,就能保證參與Join的兩張表的分桶之間具有明確的關聯關系,因此就可以在兩表的分桶間進行Join操作了。
若能保證第三點,也就是參與Join的數據是有序的,這樣就能使用數據庫中常用的Join算法之一——Sort Merge Join了,Merge Join原理如下:
在滿足了上述三點要求之后,就能使用SMB Map Join了。
由于SMB Map Join無需構建Hash Table也無需緩存小表數據,故其對內存要求很低。適用于大表Join大表的場景。
6.8.4 Reduce并行度
Reduce端的并行度,也就是Reduce個數,可由用戶自己指定,也可由Hive自行根據該MR Job輸入的文件大小進行估算。
Reduce端的并行度的相關參數如下:
--指定Reduce端并行度,默認值為-1,表示用戶未指定
set mapreduce.job.reduces;
--Reduce端并行度最大值
set hive.exec.reducers.max;
--單個Reduce Task計算的數據量,用于估算Reduce并行度
set hive.exec.reducers.bytes.per.reducer;
Reduce端并行度的確定邏輯如下:
若指定參數mapreduce.job.reduces的值為一個非負整數,則Reduce并行度為指定值。否則,Hive自行估算Reduce并行度,估算邏輯如下:
假設Job輸入的文件大小為totalInputBytes
參數hive.exec.reducers.bytes.per.reducer的值為bytesPerReducer。
參數hive.exec.reducers.max的值為maxReducers。
則Reduce端的并行度為:
根據上述描述,可以看出,Hive自行估算Reduce并行度時,是以整個MR Job輸入的文件大小作為依據的。因此,在某些情況下其估計的并行度很可能并不準確,此時就需要用戶根據實際情況來指定Reduce并行度了。
需要說明的是:若使用Tez或者是Spark引擎,Hive可根據計算統計信息(Statistics)估算Reduce并行度,其估算的結果相對更加準確。
6.8.5 小文件合并
若Hive的Reduce并行度設置不合理,或者估算不合理,就可能導致計算結果出現大量的小文件。該問題可由小文件合并任務解決。其原理是根據計算任務輸出文件的平均大小進行判斷,若符合條件,則單獨啟動一個額外的任務進行合并。
相關參數為:
–開啟合并map only任務輸出的小文件
set hive.merge.mapfiles=true;--開啟合并map reduce任務輸出的小文件
set hive.merge.mapredfiles=true;--合并后的文件大小
set hive.merge.size.per.task=256000000;--觸發小文件合并任務的閾值,若某計算任務輸出的文件平均大小低于該值,則觸發合并
set hive.merge.smallfiles.avgsize=16000000;
6.8.6 謂詞下推
謂詞下推(predicate pushdown)是指,盡量將過濾操作前移,以減少后續計算步驟的數據量。開啟謂詞下推優化后,無需調整SQL語句,Hive就會自動將過濾操作盡可能的前移動。
相關參數為:
--是否啟動謂詞下推(predicate pushdown)優化
set hive.optimize.ppd = true;
6.8.7 并行執行
Hive會將一個SQL語句轉化成一個或者多個Stage,每個Stage對應一個MR Job。默認情況下,Hive同時只會執行一個Stage。但是某SQL語句可能會包含多個Stage,但這多個Stage可能并非完全互相依賴,也就是說有些Stage是可以并行執行的。此處提到的并行執行就是指這些Stage的并行執行。相關參數如下:
--啟用并行執行優化,默認是關閉的
set hive.exec.parallel=true; --同一個sql允許最大并行度,默認為8
set hive.exec.parallel.thread.number=8;
6.8.8 CBO優化
CBO是指Cost based Optimizer,即基于計算成本的優化。
在Hive中,計算成本模型考慮到了:數據的行數、CPU、本地IO、HDFS IO、網絡IO等方面。Hive會計算同一SQL語句的不同執行計劃的計算成本,并選出成本最低的執行計劃。目前CBO在Hive的MR引擎下主要用于Join的優化,例如多表Join的Join順序。
相關參數為:
--是否啟用cbo優化
set hive.cbo.enable=true;
6.8.9 列式存儲
采用ORC列式存儲加快查詢速度。
id name age
1 zs 18
2 lishi 19
行:1 zs 18 2 lishi 19
列:1 2 zs lishi 18 19
select name from user
6.8.10 壓縮
壓縮減少磁盤IO:因為Hive底層計算引擎默認是MR,可以在Map輸出端采用Snappy壓縮。
Map(Snappy ) Reduce
6.8.11 分區和分桶
(1)創建分區表 防止后續全表掃描
(2)創建分桶表 對未知的復雜的數據進行提前采樣
6.8.12 更換引擎
1)MR/Tez/Spark區別:
MR引擎:多Job串聯,基于磁盤,落盤的地方比較多。雖然慢,但一定能跑出結果。一般處理,周、月、年指標。
Spark引擎:雖然在Shuffle過程中也落盤,但是并不是所有算子都需要Shuffle,尤其是多算子過程,中間過程不落盤 DAG有向無環圖。 兼顧了可靠性和效率。一般處理天指標。
2)Tez引擎的優點
(1)使用DAG描述任務,可以減少MR中不必要的中間節點,從而減少磁盤IO和網絡IO。
(2)可更好的利用集群資源,例如Container重用、根據集群資源計算初始任務的并行度等。
(3)可在任務運行時,根據具體數據量,動態的調整后續任務的并行度。
6.8.13 幾十張表join 如何優化
(1)減少join的表數量:不影響業務前提,可以考慮將一些表進行預處理和合并,從而減少join操作。
(2)使用Map Join:將小表加載到內存中,從而避免了Reduce操作,提高了性能。通過設置hive.auto.convert.join為true來啟用自動Map Join。
(3)使用Bucketed Map Join:通過設置hive.optimize.bucketmapjoin為true來啟用Bucketed Map Join。
(4)使用Sort Merge Join:這種方式在Map階段完成排序,從而減少了Reduce階段的計算量。通過設置hive.auto.convert.sortmerge.join為true來啟用。
(5)控制Reduce任務數量:通過合理設置hive.exec.reducers.bytes.per.reducer和mapreduce.job.reduces參數來控制Reduce任務的數量。
(6)過濾不需要的數據:join操作之前,盡量過濾掉不需要的數據,從而提高性能。
(7)選擇合適的join順序:將小表放在前面可以減少中間結果的數據量,提高性能。
(8)使用分區:可以考慮使用分區技術。只需要讀取與查詢條件匹配的分區數據,從而減少數據量和計算量。
(9)使用壓縮:通過對數據進行壓縮,可以減少磁盤和網絡IO,提高性能。注意選擇合適的壓縮格式和壓縮級別。
(10)調整Hive配置參數:根據集群的硬件資源和實際需求,合理調整Hive的配置參數,如內存、CPU、IO等,以提高性能。
6.9 Hive解決數據傾斜方法
數據傾斜問題,通常是指參與計算的數據分布不均,即某個key或者某些key的數據量遠超其他key,導致在shuffle階段,大量相同key的數據被發往同一個Reduce,進而導致該Reduce所需的時間遠超其他Reduce,成為整個任務的瓶頸。以下為生產環境中數據傾斜的現象:
Hive中的數據傾斜常出現在分組聚合和join操作的場景中,下面分別介紹在上述兩種場景下的優化思路。
1)分組聚合導致的數據傾斜
前文提到過,Hive中的分組聚合是由一個MapReduce Job完成的。Map端負責讀取數據,并按照分組字段分區,通過Shuffle,將數據發往Reduce端,各組數據在Reduce端完成最終的聚合運算。若group by分組字段的值分布不均,就可能導致大量相同的key進入同一Reduce,從而導致數據傾斜。
由分組聚合導致的數據傾斜問題,有如下解決思路:
(1)判斷傾斜的值是否為null
若傾斜的值為null,可考慮最終結果是否需要這部分數據,若不需要,只要提前將null過濾掉,就能解決問題。若需要保留這部分數據,考慮以下思路。
(2)Map-Side聚合
開啟Map-Side聚合后,數據會現在Map端完成部分聚合工作。這樣一來即便原始數據是傾斜的,經過Map端的初步聚合后,發往Reduce的數據也就不再傾斜了。最佳狀態下,Map端聚合能完全屏蔽數據傾斜問題。
相關參數如下:
set hive.map.aggr=true;
set hive.map.aggr.hash.min.reduction=0.5;
set hive.groupby.mapaggr.checkinterval=100000;
set hive.map.aggr.hash.force.flush.memory.threshold=0.9;
(3)Skew-GroupBy優化
Skew-GroupBy是Hive提供的一個專門用來解決分組聚合導致的數據傾斜問題的方案。其原理是啟動兩個MR任務,第一個MR按照隨機數分區,將數據分散發送到Reduce,并完成部分聚合,第二個MR按照分組字段分區,完成最終聚合。
相關參數如下:
--啟用分組聚合數據傾斜優化
set hive.groupby.skewindata=true;
2)Join導致的數據傾斜
若Join操作使用的是Common Join算法,就會通過一個MapReduce Job完成計算。Map端負責讀取Join操作所需表的數據,并按照關聯字段進行分區,通過Shuffle,將其發送到Reduce端,相同key的數據在Reduce端完成最終的Join操作。
如果關聯字段的值分布不均,就可能導致大量相同的key進入同一Reduce,從而導致數據傾斜問題。
由Join導致的數據傾斜問題,有如下解決思路:
(1)Map Join
使用Map Join算法,Join操作僅在Map端就能完成,沒有Shuffle操作,沒有Reduce階段,自然不會產生Reduce端的數據傾斜。該方案適用于大表Join小表時發生數據傾斜的場景。
相關參數如下:
set hive.auto.convert.join=true;
set hive.auto.convert.join.noconditionaltask=true;
set hive.auto.convert.join.noconditionaltask.size=10000000;
(2)Skew Join
若參與Join的兩表均為大表,Map Join就難以應對了。此時可考慮Skew Join,其核心原理是Skew Join的原理是,為傾斜的大key單獨啟動一個Map Join任務進行計算,其余key進行正常的Common Join。原理圖如下:
相關參數如下:
--啟用skew join優化
set hive.optimize.skewjoin=true;
--觸發skew join的閾值,若某個key的行數超過該參數值,則觸發
set hive.skewjoin.key=100000;
3)調整SQL語句
若參與Join的兩表均為大表,其中一張表的數據是傾斜的,此時也可通過以下方式對SQL語句進行相應的調整。
假設原始SQL語句如下:A,B兩表均為大表,且其中一張表的數據是傾斜的。
hive (default)>
select
*
from A
join B
on A.id=B.id;
其Join過程如下:
圖中1001為傾斜的大key,可以看到,其被發往了同一個Reduce進行處理。
調整之后的SQL語句執行計劃如下圖所示:
調整SQL語句如下:
hive (default)>
select*
from(select --打散操作concat(id,'_',cast(rand()*2 as int)) id,valuefrom A
)ta
join(select --擴容操作concat(id,'_',1) id,valuefrom Bunion allselectconcat(id,'_',2) id,valuefrom B
)tbon ta.id=tb.id;
6.10 Hive的數據中含有字段的分隔符怎么處理?
Hive 默認的字段分隔符為Ascii碼的控制符\001(^A),建表的時候用fields terminated by ‘\001’。注意:如果采用\t或者\001等為分隔符,需要要求前端埋點和JavaEE后臺傳遞過來的數據必須不能出現該分隔符,通過代碼規范約束。
一旦傳輸過來的數據含有分隔符,需要在前一級數據中轉義或者替換(ETL)。通常采用Sqoop和DataX在同步數據時預處理。
6.11 MySQL元數據備份
元數據備份(重點,如數據損壞,可能整個集群無法運行,至少要保證每日零點之后備份到其它服務器兩個復本)。
(1)MySQL備份數據腳本(建議每天定時執行一次備份元數據)
#/bin/bash
#常量設置
MYSQL_HOST='hadoop102'
MYSQL_USER='root'
MYSQL_PASSWORD='000000'
/# 備份目錄,需提前創建
BACKUP_DIR='/root/mysql-backup'
/# 備份天數,超過這個值,最舊的備份會被刪除
FILE_ROLL_COUNT='7'/# 備份MySQL數據庫
[ -d "${BACKUP_DIR}" ] || exit 1
mysqldump \
--all-databases \
--opt \
--single-transaction \
--source-data=2 \
--default-character-set=utf8 \
-h"${MYSQL_HOST}" \
-u"${MYSQL_USER}" \
-p"${MYSQL_PASSWORD}" | gzip > "${BACKUP_DIR}/$(date +%F).gz"if [ "$(ls "${BACKUP_DIR}" | wc -l )" -gt "${FILE_ROLL_COUNT}" ]
thenls "${BACKUP_DIR}" | sort |sed -n 1p | xargs -I {} -n1 rm -rf "${BACKUP_DIR}"/{}
fi
(2)MySQL恢復數據腳本
#/bin/bash
#常量設置
MYSQL_HOST='hadoop102'
MYSQL_USER='root'
MYSQL_PASSWORD='000000'
BACKUP_DIR='/root/mysql-backup'
# 恢復指定日期,不指定就恢復最新數據
RESTORE_DATE=''[ "${RESTORE_DATE}" ] && BACKUP_FILE="${RESTORE_DATE}.gz" || BACKUP_FILE="$(ls ${BACKUP_DIR} | sort -r | sed -n 1p)"
gunzip "${BACKUP_DIR}/${BACKUP_FILE}" --stdout | mysql \
-h"${MYSQL_HOST}" \
-u"${MYSQL_USER}" \
-p"${MYSQL_PASSWORD}"
6.12 如何創建二級分區表?
create table dept_partition2(deptno int, -- 部門編號dname string, -- 部門名稱
)
partitioned by (day string, hour string)
row format delimited fields terminated by '\t';
6.13 Union與Union all區別
(1)union會將聯合的結果集去重
(2)union all不會對結果集去重