目錄
一、前言
二、hive執行計劃
2.1 hive explain簡介
2.1.1 語法格式
2.1.2 查詢計劃階段說明
2.2?操作演示
2.2.1 不加條件的查詢計劃分析
2.2.2?帶條件的查詢計劃分析
三、MapReduce屬性優化
3.1 本地模式
3.1.1 本地模式參數設置
3.1.2?本地模式操作演示
3.2?JVM重用
3.2.1 什么是JVM重用
3.3?并行執行
四、join優化
4.1 hive sql的join執行簡介
4.2?Map Join
4.2.1 執行原理
4.2.2 使用方式
4.3?Reduce Join
4.3.1 使用場景
4.3.2 執行原理
4.3.3?使用方式
4.4 Bucket Join
4.4.1 使用場景
4.4.2 執行原理
4.4.3?使用方式1
4.4.4?使用方式2
五、優化器
5.1 關聯優化
5.2 優化器引擎
5.2.1 背景
5.2.2?優化器引擎 ——?RBO
5.2.3?優化器引擎 ——?CBO
5.3?Analyze分析器
5.3.1 Analyze 功能
5.3.1 Analyze 語法
5.3.2 實際操作
六、謂詞下推
6.1 謂詞下推概述
6.1.1 謂詞下推案例總結
6.2?謂詞下推常用規則總結
七、數據傾斜
7.1 概述
7.2 數據傾斜場景一
7.2.1 方案一
7.2.2?方案二
7.2.3?方案三
7.3??數據傾斜場景二
7.3.1 方案一
7.3.2?方案二
7.3.3 方案三
一、前言
上一篇,我們分享了hive表數據常用的優化策略,本篇再從hive的job執行層面來聊聊可以優化的常用的一些手段。
二、hive執行計劃
在正式分享job優化之前,有必要先了解下hive的一條sql執行時經歷的事情,即explain執行計劃,在學習mysql的時候,DBA或者開發人員經常通過explain關鍵字來分析一條慢sql的執行計劃,從而指導sql優化。
2.1 hive explain簡介
HiveQL,是一種類SQL語言,從編程語言規范來說是一種聲明式語言,用戶會根據查詢需求提交聲明式的HQL查詢,而Hive會根據底層計算引擎將其轉化成Mapreduce/Tez/Spark的job;
hive explain 補充說明:
- 使用hive的explain命令可以幫助用戶了解一條HQL語句在底層的實現過程,通俗來說就是Hive打算如何去做這件事;
- explain會解析HQL語句,將整個HQL語句的實現步驟、依賴關系、實現過程都會進行解析返回,可以了解一條HQL語句在底層是如何實現數據的查詢及處理的過程,輔助用戶對Hive進行優化;
- 官網:hive sql官網地址;
2.1.1 語法格式
EXPLAIN [FORMATTED|EXTENDED|DEPENDENCY|AUTHORIZATION|] query
參數說明:
- FORMATTED:對執行計劃進行格式化,返回JSON格式的執行計劃;
- EXTENDED:提供一些額外的信息,比如文件的路徑信息;
- DEPENDENCY:以JSON格式返回查詢所依賴的表和分區的列表 ;
- AUTHORIZATION:列出需要被授權的條目,包括輸入與輸出;
2.1.2 查詢計劃階段說明
每個查詢計劃由以下幾個部分組成:
The Abstract Syntax Tree for the query
抽象語法樹(AST):Hive使用Antlr解析生成器,可以自動地將HQL生成為抽象語法樹
The dependencies between the different stages of the plan
Stage依賴關系:會列出運行查詢劃分的stage階段以及之間的依賴關系
The description of each of the stages
Stage內容:包含了每個stage非常重要的信息,比如運行時的operator和sort orders等具體的信息
2.2?操作演示
2.2.1 不加條件的查詢計劃分析
使用explain關鍵分析如下查詢sql
explain extended select * from tb_login;
執行結果如下,重點關注下圖中,TableScan后面的信息
2.2.2?帶條件的查詢計劃分析
執行下面的sql,根據主鍵字段查詢
explain extended select count(*) from tb_emp01 where empno=7369;
使用explain分析執行計劃,此時將會看到更復雜的執行語法樹;
關于執行計劃的結果分析中的各個參數作用,可以結合官網或者其他資料進行深入的學習,這里就不過多展開了;
三、MapReduce屬性優化
通過上面的執行計劃分析,其實可以知道,hive在大多數情況下,其得到的查詢結果底層會走MR的job,所以可以理解為,MR與hive結合相關的屬性調優也是整個job優化中非常重要的內容。
3.1 本地模式
在使用Hive過程中,有一些數據量不大的表也會轉換為MapReduce處理,提交到集群時,需要申請資源,等待資源分配,啟動JVM進程,再運行Task,一系列的過程比較繁瑣,本身數據量并不大,提交到YARN運行返回會導致性能較差的問題。
Hive為了解決這個問題,延用了MapReduce中的設計,提供本地計算模式,允許程序不提交給YARN,直接在本地運行,以便于提高小數據量程序的性能。
3.1.1 本地模式參數設置
如果要使用本地模式,只需要開啟下面的參數即可,也就是說,當開啟這個參數之后,hive在提交一條查詢sql之后,會被MR自動識別并解析到,從而走本地模式;
-- 開啟本地模式
set hive.exec.mode.local.auto = true;
注意:該參數默認為false,要使用的話需要手動開啟,如下即顯示了hive要是走本地模式的幾個條件,即條件都滿足的情況下才會走本地模式;
3.1.2?本地模式操作演示
設置參數
set hive.exec.mode.local.auto = true;
執行查詢
select count(*) from tb_emp01;
可以看到,在這種情況下,140多萬的數據只需要不到2秒的時間就統計出了結果?,同時在執行日志中,也看到job的執行中使用了local,即走了本地模式;
3.2?JVM重用
Hadoop默認會為每個Task啟動一個JVM來運行,而在JVM啟動時內存開銷大,Job數據量大的情況,如果單個Task數據量比較小,也會申請JVM,這就導致了資源緊張及浪費的情況;
3.2.1 什么是JVM重用
JVM重用可以使得JVM實例在同一個job中重新使用N次,當一個Task運行結束以后,JVM不會進行釋放,而是繼續供下一個Task運行,直到運行了N個Task以后,就會釋放;
N的值可以在Hadoop的mapred-site.xml文件中進行配置,通常在10-20之間;
參數設置如下
-- Hadoop3之前的配置,在mapred-site.xml中添加以下參數
-- Hadoop3中已不再支持該選項
mapreduce.job.jvm.numtasks=10
3.3?并行執行
Hive在實現HQL計算運行時,會解析為多個Stage,有時候Stage彼此之間有依賴關系,只能挨個執行,但是在一些別的場景下,很多的Stage之間是沒有依賴關系的;
例如Union語句,Join語句等等,這些Stage沒有依賴關系,但是Hive依舊默認挨個執行每個Stage,這樣會導致性能非常差,我們可以通過修改參數,開啟并行執行,當多個Stage之間沒有依賴關系時,允許多個Stage并行執行,提高性能。
并行執行關鍵參數設置
-- 開啟Stage并行化,默認為false
SET hive.exec.parallel=true;
-- 指定并行化線程數,默認為8
SET hive.exec.parallel.thread.number=16;
四、join優化
join在mysql的日常編寫sql過程中可以說是非常常見了,也是數據分析處理過程中必不可少的操作,Hive同樣支持Join的語法;
4.1 hive sql的join執行簡介
Hive Join的底層是通過MapReduce來實現的,Hive實現Join時,為了提高MapReduce的性能,提供了多種Join方案來實現;
例如:適合小表Join大表的Map Join,大表Join大表的Reduce Join,以及大表Join的優化方案Bucket Join等。
4.2?Map Join
即沒有reduce階段的task的場景,適合于小表join大表或者小表Join小表,如下圖執行流程所示,在這種情況下,減少了reduce階段task帶來的執行時的資源開銷;
4.2.1 執行原理
將小的那份數據給每個MapTask的內存都放一份完整的數據,大的數據每個部分都可以與小數據的完整數據進行join 底層不需要經過shuffle,需要占用內存空間存放小的數據文件
4.2.2 使用方式
盡量使用Map Join來實現Join過程,Hive中默認自動開啟了Map Join:hive.auto.convert.join=true,Hive中小表的大小限制,在不同的版本中主要設置參數如下:
-- 2.0版本之前的控制屬性
hive.mapjoin.smalltable.filesize=25M
-- 2.0版本開始由以下參數控制
hive.auto.convert.join.noconditionaltask.size=512000000
4.3?Reduce Join
如果map端的join處理不了的情況下,比如兩個join表的數據量都比較大的時候,就要考慮使用Reduce Join;
4.3.1 使用場景
適合于大表Join大表
4.3.2 執行原理
如下圖所示,將兩張表的數據在shuffle階段利用shuffle的分組來將數據按照關聯字段進行合并 必須經過shuffle,利用Shuffle過程中的分組來實現關聯;
4.3.3?使用方式
Hive會自動判斷是否滿足Map Join,如果不滿足Map Join,則自動執行Reduce Join
4.4 Bucket Join
顧名思義,即利用分桶表進行join,從之前的學習中我們了解到,使用分桶表,在進行join操作的時候,可以減少笛卡爾乘積的值,從而達到提升性能的目的;
4.4.1 使用場景
適合于大表Join大表
4.4.2 執行原理
將兩張表按照相同的規則將數據劃分 根據對應的規則的數據進行join 減少了比較次數,提高了性能,如下圖所示;
4.4.3?使用方式1
基于某個字段,語法:clustered by colName,參數設置
set hive.optimize.bucketmapjoin = true;
使用要求:分桶字段 = Join字段 ,桶的個數相等或者成倍數
4.4.4?使用方式2
使用Sort Merge Bucket Join(SMB),基于有序的數據Join,語法:
clustered by colName sorted by (colName)
具體參數設置
set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
使用要求
分桶字段 = Join字段 = 排序字段 ,桶的個數相等或者成倍數
五、優化器
5.1 關聯優化
當一個程序中如果有一些操作彼此之間有關聯性,是可以在一個MapReduce中實現的,但是Hive會不智能的選擇,Hive會使用兩個MapReduce來完成這兩個操作。
例如:當我們執行 ?select …… from table group by id order by id desc。該SQL語句轉換為MapReduce時,我們可以有兩種方案來實現:
方案一
- 第一個MapReduce做group by,經過shuffle階段對id做分組;
- 第二個MapReduce對第一個MapReduce的結果做order by,經過shuffle階段對id進行排序;
方案二
- 因為都是對id處理,可以使用一個MapReduce的shuffle既可以做分組也可以排序;
在這種場景下,Hive會默認選擇用第一種方案來實現,這樣會導致性能相對較差,可以在Hive中開啟關聯優化,對有關聯關系的操作進行解析時,可以盡量放在同一個MapReduce中實現,關鍵配置參數如下:
set hive.optimize.correlation=true;
5.2 優化器引擎
5.2.1 背景
Hive默認的優化器在解析一些聚合統計類的處理時,底層解析的方案有時候不是最佳的方案,例如:當前有一張表【共1000條數據】,id構建了索引,id =100值有900條,我們的需求是:
查詢所有id = 100的數據,SQL語句為:select * from table where id = 100;
對于這個需求,可以考慮下面的方案進行實現
方案一
由于id這一列構建了索引,索引默認的優化器引擎RBO,會選擇先從索引中查詢id = 100的值所在的位置,再根據索引記錄位置去讀取對應的數據,但是這并不是最佳的執行方案。
方案二
有id=100的值有900條,占了總數據的90%,這時候是沒有必要檢索索引以后再檢索數據的,可以直接檢索數據返回,這樣的效率會更高,更節省資源,這種方式就是CBO優化器引擎會選擇的方案。
5.2.2?優化器引擎 ——?RBO
rule basic optimise
基于規則的優化器,根據設定好的規則來對程序進行優化
5.2.3?優化器引擎 ——?CBO
cost basic optimise
基于代價的優化器,根據不同場景所需要付出的代價來合適選擇優化的方案,對數據的分布的信息【數值出現的次數,條數,分布】來綜合判斷用哪種處理的方案是最佳方案
Hive中支持RBO與CBO這兩種引擎,默認使用的是RBO優化器引擎,很明顯CBO引擎更加智能,所以在使用Hive時,我們可以配置底層的優化器引擎為CBO引擎,配置參數如下:
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true; set hive.stats.fetch.column.stats=true;
思考:CBO引擎是基于代價的優化引擎,那么CBO如何知道每種方案的計算代價的呢?接下來就要說說Analyze分析器了。
5.3?Analyze分析器
5.3.1 Analyze 功能
用于提前運行一個MapReduce程序將表或者分區的信息構建一些元數據【表的信息、分區信息、列的信息】,搭配CBO引擎一起使用。
5.3.1 Analyze 語法
構建分區信息元數據
ANALYZE TABLE tablename
[PARTITION(partcol1[=val1], partcol2[=val2], ...)]
COMPUTE STATISTICS [noscan];
構建列的元數據
ANALYZE TABLE tablename
[PARTITION(partcol1[=val1], partcol2[=val2], ...)]
COMPUTE STATISTICS FOR COLUMNS ( columns name1, columns name2...) [noscan];
查看元數據
DESC FORMATTED [tablename] [columnname];
5.3.2 實際操作
比如我們按照下面的操作步驟分析?tb_login_part 這張表
--分析優化器
use tb_part; -- 構建表中分區數據的元數據信息
ANALYZE TABLE tb_login_part PARTITION(logindate) COMPUTE STATISTICS;-- 構建表中列的數據的元數據信息
ANALYZE TABLE tb_login_part COMPUTE STATISTICS FOR COLUMNS userid;-- 查看構建的列的元數據
desc formatted tb_login_part userid;
可以看到如下信息
六、謂詞下推
什么是謂詞
用來描述或判定客體性質、特征或者客體之間關系的詞項。比如"3 大于 2"中"大于"是一個謂詞。
6.1 謂詞下推概述
謂詞下推Predicate Pushdown(PPD)基本思想:將過濾表達式盡可能移動至靠近數據源的位置,以使真正執行時能直接跳過無關的數據。簡單點說就是在不影響最終結果的情況下,盡量將過濾條件提前執行。
Hive中謂詞下推后,過濾條件會下推到map端,提前執行過濾,減少map到reduce的傳輸數據,提升整體性能。
開啟下面的參數來使用(默認開啟)
hive.optimize.ppd=true;
比如有下面的2個執行sql,哪一個更好呢?推薦形式1的寫法,先過濾再join;
select a.id,a.value1,b.value2 from table1 a
join (select b.* from table2 b where b.ds>='20181201' and b.ds<'20190101') c
on (a.id=c.id);select a.id,a.value1,b.value2 from table1 a
join table2 b on a.id=b.id
where b.ds>='20181201' and b.ds<'20190101';
6.1.1 謂詞下推案例總結
通過下面這些日常經常可能涉及到的sql,可以檢驗自己在編寫sql的時候有沒有滿足謂詞下推的規則;
6.2?謂詞下推常用規則總結
下面整理了一些關聯sql中關于謂詞下推的優化參考規則;
- 對于Join(Inner Join)、Full outer Join,條件寫在on后面,還是where后面,性能上面沒有區別;
- 對于Left outer Join ,右側的表寫在on后面、左側的表寫在where后面,性能上有提高;
- 對于Right outer Join,左側的表寫在on后面、右側的表寫在where后面,性能上有提高;
- 當條件分散在兩個表時,謂詞下推可按上述結論2和3自由組合;
七、數據傾斜
7.1 概述
分布式計算中最常見的,最容易遇到的問題就是數據傾斜,數據傾斜的現象是:
當提交運行一個程序時,這個程序的大多數的Task都已經運行結束了,只有某一個Task一直在運行,遲遲不能結束,導致整體的進度卡在99%或者100%,這時候就可以判定程序出現了數據傾斜的問題。
而造成數據傾斜的原因大多是:數據分配的不均勻,比如下面這張圖,太多相同的單詞被分配到某一個任務上去處理,導致這個任務所在的節點壓力非常大,這個任務的執行將會非常慢,這就是發生了數據傾斜;
7.2 數據傾斜場景一
當程序中出現group by或者count(distinct)等分組聚合的場景時,如果數據本身是傾斜的,根據MapReduce的Hash分區規則,肯定會出現數據傾斜的現象。
根本原因是因為分區規則導致的,所以可以通過以下幾種方案來解決group by導致的數據傾斜的問題。
7.2.1 方案一
開啟Map端聚合,通過減少shuffle數據量和Reducer階段的執行時間,避免每個Task數據差異過大導致數據傾斜;
hive.map.aggr=true;
7.2.2?方案二
實現隨機分區,比如下面這種寫法,distribute by用于指定底層按照哪個字段作為Key實現分區、分組等 通過rank函數隨機值實現隨機分區,避免數據傾斜;
select * from table distribute by rand();
7.2.3?方案三
數據傾斜時自動負載均衡,通過開啟如下參數
?hive.groupby.skewindata=true;
開啟該參數以后,當前程序會自動通過兩個MapReduce來運行
- 第一個MapReduce自動進行隨機分布到Reducer中,每個Reducer做部分聚合操作,輸出結果;
- 第二個MapReduce將上一步聚合的結果再按照業務(group by key)進行處理,保證相同的分布到一起,最終聚合得到結果;
7.3??數據傾斜場景二
Join操作時,如果兩張表比較大,無法實現Map Join,只能走Reduce Join,那么當關聯字段中某一種值過多的時候依舊會導致數據傾斜的問題,面對Join產生的數據傾斜,核心的思想是盡量避免Reduce Join的產生,優先使用Map Join來實現;
但往往很多的Join場景不滿足Map Join的需求,那么可以以下幾種方案來解決Join產生的數據傾斜問題:
7.3.1 方案一
提前過濾,將大數據變成小數據,實現Map Join,如下sql
select a.id,a.value1,b.value2 from table1 a
join (select b.* from table2 b where b.ds>='20181201' and b.ds<'20190101') c
on (a.id=c.id);
7.3.2?方案二
使用Bucket Join
如果使用方案一,過濾后的數據依舊是一張大表,那么最后的Join依舊是一個Reduce Join,這種場景下,可以將兩張表的數據構建為桶表,實現Bucket Map Join,避免數據傾斜;
7.3.3 方案三
使用Skew Join,Skew Join是Hive中一種專門為了避免數據傾斜而設計的特殊的Join過程
這種Join的原理是將Map Join和Reduce Join進行合并,如果某個值出現了數據傾斜,就會將產生數據傾斜的數據單獨使用Map Join來實現 ,其他沒有產生數據傾斜的數據由Reduce Join來實現,這樣就避免了Reduce Join中產生數據傾斜的問題 最終將Map Join的結果和Reduce Join的結果進行Union合并;
Skew Joi實現原理示意圖
Skew Join使用需要開啟下面的參數配置
-- 開啟運行過程中skewjoin
set hive.optimize.skewjoin=true;-- 如果這個key的出現的次數超過這個范圍
set hive.skewjoin.key=100000;-- 在編譯時判斷是否會產生數據傾斜
set hive.optimize.skewjoin.compiletime=true;-- 不合并,提升性能
set hive.optimize.union.remove=true;-- 如果Hive的底層走的是MapReduce,必須開啟這個屬性,才能實現不合并
set mapreduce.input.fileinputformat.input.dir.recursive=true;