列裁剪與分區裁剪
在生產環境中,會面臨列很多或者數據量很大時,如果使用select * 或者不指定分區進行全列或者全表掃描時效率很低。Hive在讀取數據時,可以只讀取查詢中所需要的列,忽視其他的列,這樣做可以節省讀取開銷(中間表存儲開銷和數據整合開銷)
1.列裁剪:在查詢時只讀取需要的列。避免select *
2.分區裁剪:在查詢中只讀取需要的分區。
遵循一個原則:盡量少的讀入數據,盡早地數據收斂!
分組聚合優化
Map端聚合
Hive中未經優化的分組聚合,是通過一個MapReduce Job實現的。Map端負責讀取數據,并按照分組字段分區,通過Shuffle,將數據發往Reduce端,各組數據在Reduce端完成最終的聚合運算。
Hive對分組聚合的優化主要圍繞著減少Shuffle數據量進行,具體做法是map-side聚合。所謂map-side聚合,在Hive的Map階段開啟預聚合,先在Map階段預聚合,然后在Reduce階段進行全局的聚合。map-side聚合能有效減少shuffle的數據量,提高分組聚合運算的效率。
通俗理解:假設有張8000w數據表,聚合后30組數據,有10個map,若是沒有開啟分組聚合,則會map將8000w條數據傳給reduce,開啟分組聚合后,就會每個map先進行分組,10個map各有30組,再將這30*10組數據從map傳給reduce,這樣效率就會大大增加
map-side 聚合相關的參數如下:
|
?默認為開啟狀態
|
默認為0.5和10w條
(會先從大的數據表內,先抽取10w數據進行檢測,判斷看(分組后的數據)/10w是否在0.5以下,若是則會啟用map的分組聚合)
|
優化案例
示例SQL
|
開啟分組聚合后,執行時間26s左右? application_1716866155638_175450??
關閉分組聚合后,執行時間在60s左右,效率提升了34s??application_1716866155638_175451
由上圖的詳細執行過程分析可知,開啟map聚合后,map輸出--reduce接受的數據是340,而關閉map分組聚合后,map數據--reduce接受的數據是8000w條,傳輸時間大大影響
大致的運行前后的步驟對比:
Count Distinct 的優化
在Hive中,DISTINCT關鍵字用于對查詢結果進行去重,以返回唯一的值。其主要作用是消除查詢結果中的重復記錄,使得返回的結果集中每個值只出現一次。
具體而言,當你在Hive中使用SELECT DISTINCT時,系統會對指定的列或表達式進行去重操作。盡管Hive中的DISTINCT關鍵字對于去重查詢是非常有用的,但在某些情況下可能存在一些缺點:性能開銷、數據傾斜、內存需求等。
group by 操作的具體實現原理。
1.map階段,將group by后的字段組合作為一個key,如果group by單個字段,那么key就一個。將group by之后要進行的聚合操作字段作為值,如果要進行count,則value是賦1;如要sum另一個字段,那么value就是該字段。
2.shuffle階段,按照key的不同分發到不同的reducer。注意此時可能因為key分布不均勻而出現數據傾斜的問題。這個問題是我們處理數據傾斜比較常規的查找原因的方法之一,也是我們解決數據傾斜的處理階段。(當執行過程中,出現其他任務都已完成,持續等待一個reudce過程的時候,就看出現了數據傾斜問題)
3.reduce階段,如果是count將相同key的值累加,如果是其他操作,按需要的聚合操作,得到結果。
distinct 的具體實現,當執行Distinct操作時,Hive會將操作轉化為一個MapReduce作業,并按照指定的列進行分組。在Map階段,每個Mapper會讀取輸入數據,并將指定的列作為輸出的key,然后,通過Shuffle過程將具有相同key的數據發送到同一個Reducer中。
當distinct一個字段時,這里會將group by的字段和distinct的字段組合在一起作為map輸出的key,value設置為1,同時將group by的字段定為分區鍵,這一步非常重要,這樣就可以將GroupBy字段作為reduce的key,在reduce階段,利用mapreduce的排序,輸入天然就是按照組合key排好序的。根據分區鍵將記錄分發到reduce端后,按順序取出組合鍵中的distinct字段,這時distinct字段也是排好序的。依次遍歷distinct字段,每找到一個不同值,計數器就自增1,即可得到count distinct結果。
count(distinct)全局合操作的時候,即使我們設定了reduce task的具體個數,例如set mapred.reduce.tasks=100;hive最終也只會啟動一個reducer。這就造成了所有map端傳來的數據都在一個tasks中執行,這唯一的Reduce Task需要Shuffle大量的數據,并且進行排序聚合等處理,這使得這個操作成為整個作業的IO和運算瓶頸。
?針對上述說的問題,我們可以修改對應的sql來進行優化, count+group by 或者sum+group by的方案來優化,在第一階段選出全部的非重復的字段id,在第二階段再對這些已消重的id進行計數
重到細粒度的(日),再聚合到粗粒度(省份)
(目前測試結果不能完全驗證如上理論,暫放,確定后再更新)
-- count(distinct) select count(distinct province_id) from ds_hive.ch12_order_detail_orc ; |
-- 優化版 count + group by?
|
第一階段我們可以通過增大Reduce的并發數,并發處理Map輸出。在第二階段,由于id已經消重,因此COUNT(*)操作在Map階段不需要輸出原id數據,只輸出一個合并后的計數即可。這樣即使第二階段Hive強制指定一個Reduce Task的時候,極少量的Map輸出數據也不會使單一的Reduce Task成為瓶頸。
其實在實際運行時,Hive還對這兩階段的作業做了額外的優化。它將第二個MapReduce作業Map中的Count過程移到了第一個作業的Reduce階段。這樣在第一階Reduce就可以輸出計數值,而不是消重的全部id。這一優化大幅地減少了第一個作業的Reduce輸出IO以及第二個作業Map的輸入數據量。最終在同樣的運行環境下優化后的語句可以說是大大提升了執行效率。
Join優化
Hive擁有多種join算法,包括Common Join,Map Join,Bucket Map Join,Sort Merge Buckt Map Join等,下面對每種join算法做簡要說明:
Common Join? (完整進行map-reduce階段)
Common Join是Hive中最穩定的join算法,其通過一個MapReduce Job完成一個join操作。Map端負責讀取join操作所需表的數據,并按照關聯字段進行分區,通過Shuffle,將其發送到Reduce端,相同key的數據在Reduce端完成最終的Join操作。
如果不指定MapJoin或者不符合MapJoin的條件,那么Hive解析器會將Join操作轉換成Common Join,即:在Reduce階段完成join。
整個過程包含Map、Shuffle、Reduce階段。
(1)Map階段
Step1: 讀取源表的數據,Map輸出時候以Join on條件中的列為key,如果Join有多個關聯鍵,則以這些關聯鍵的組合作為key;
Step2: Map輸出的value為join之后所關心的(select或者where中需要用到的)列;同時在value中還會包含表的Tag信息,用于標明此value對應哪個表;
Step3: 按照key進行排序。
(2)Shuffle階段
根據key的值進行hash,并將key/value按照hash值推送至不同的reduce中,這樣確保兩個表中相同的key位于同一個reduce中。
(3)Reduce階段
根據key的值完成join操作,期間通過Tag來識別不同表中的數據。
舉個例子:
|
執行計劃如下,完整的common join 會完整的經過map-reduce階段
?執行過程如下:
JOIN操作涉及合并兩個或多個表的數據,以便通過共同的列值將它們關聯起來。這樣的關聯操作在處理大規模數據時可能會面臨一些性能挑戰,因此有必要進行優化。
性能開銷: JOIN操作通常涉及將分布在不同節點上的數據進行合并。在傳統的MapReduce執行環境中,這意味著需要進行數據的分發、排序和聚合操作,這些操作都會帶來較大的性能開銷。
Shuffle開銷: 在傳統的MapReduce中,JOIN操作的Shuffle階段涉及將相同鍵的數據合并到一起。這個過程需要大量的網絡通信和數據傳輸,尤其是當數據分布不均勻時。
內存消耗: 處理大規模數據的JOIN操作可能需要大量的內存,特別是在進行排序和合并時。這可能導致內存不足的問題,進而影響性能。
復雜度: JOIN操作可能涉及復雜的計算,特別是在關聯多個表或在多列上進行關聯時。這增加了查詢的復雜性,可能導致較長的執行時間。
Map Join (大表join小表)
Map Join算法可以通過兩個只有map階段的Job完成一個join操作。其適用場景為大表join小表。因為只經map+map階段,減少了shuffle的處理,reduce的讀取和處理過程,從而進行性能優化。若某join操作滿足要求,則:
第一個Job會讀取小表數據,將其制作為hash table,并上傳至Hadoop分布式緩存(本質上是上傳至每個執行任務的NodeManager節點本地磁盤)。
第二個Job會先從分布式緩存中讀取小表數據,并緩存在Map Task的內存中,然后掃描大表數據,這樣在map端即可完成關聯操作。
Map Join有兩種觸發方式,一種是用戶在SQL語句中增加hint提示,另外一種是Hive優化器根據參與join表的數據量大小,自動觸發。
1)Hint提示
用戶可通過如下方式,指定通過map join算法,并且將作為map join中的小表。這種方式已經過時,不推薦使用。
|
2)自動觸發
Hive在編譯SQL語句階段,起初所有的join操作均采用Common Join算法實現。
之后在物理優化階段,Hive會根據每個Common Join任務所需表的大小判斷該Common Join任務是否能夠轉換為Map Join任務,若滿足要求,便將Common Join任務自動轉換為Map Join任務。
但有些Common Join任務所需的表大小,在SQL的編譯階段是未知的(例如對子查詢進行join操作),所以這種Common Join任務是否能轉換成Map Join任務在編譯階是無法確定的。
針對這種情況,Hive會在編譯階段生成一個條件任務(Conditional Task),其下會包含一個計劃列表,計劃列表中包含轉換后的Map Join任務以及原有的Common Join任務。最終具體采用哪個計劃,是在運行時決定的。大致思路如下圖所示:
Map join自動轉換的具體判斷邏輯如下圖所示
參數如下:
|
3)示例SQL
explain |
參數設置為false(未優化):既有map,又有reduce,然后join是在reduce階段
執行完時間 161s
參數設置為true(優化):第一個map加載本地文件,第二個map進行join
執行時間35s
結論:未開啟mapjoin,進行commonjoin,執行時間161s,使用mapjoin,執行時間35s,執行效率大大提升
接著我們再來測試另外一個參數。調整hive.auto.convert.join.noconditionaltask.size參數(小于此設置的表會識別為小表
),使其小于t3 表 的大小
explain
|
第一個sql,因為設置了.noconditionaltask.size=252300,小于表的大小,最終選擇了commonjoin執行,第二遍我們關閉有條件執行,由于smalltable.filesize大于小表只有commonjoin,這時候調大set hive.mapjoin.smalltable.filesize=379000002;讓其小表大于smalltable.filesize,這時候最終會選擇mapjoin。
Bucket Map Join(大表join大表)
兩張表都相對較大,若采用普通的Map Join算法,則Map端需要較多的內存來緩存數據,當然可以選擇為Map段分配更多的內存,來保證任務運行成功。但是,Map端的內存不可能無上限的分配,所以當參與Join的表數據量均過大時,就可以考慮采用Bucket Map Join算法。Bucket Map Join是對Map Join算法的改進,其打破了Map Join只適用于大表join小表的限制,可用于大表join大表的場景。
Bucket Map Join的核心思想是:若能保證參與join的表均為分桶表,且關聯字段為分桶字段,且其中一張表的分桶數量是另外一張表分桶數量的整數倍,就能保證參與join的兩張表的分桶之間具有明確的關聯關系,所以就可以在兩表的分桶間進行Map Join操作了。這樣一來,第二個Job的Map端就無需再緩存小表的全表數據了,而只需緩存其所需的分桶即可。其原理如圖所示:
優化條件:
1) set hive.optimize.bucketmapjoin = true;
2) 一個表的bucket數是另一個表bucket數的整數倍
3) bucket列 == join列
4) 必須是應用在map join的場景中
Bucket Map Join不支持自動轉換,發須通過用戶在SQL語句中提供如下Hint提示,并配置如下相關參數,方可使用。
1)Hint提示
|
2)相關參數
|
Sort Merge Bucket Map Join(大表join大表)
Sort Merge Bucket Map Join(簡稱SMB Map Join)基于Bucket Map Join。SMB Map Join要求,參與join的表均為分桶表,且需保證分桶內的數據是有序的,且分桶字段、排序字段和關聯字段為相同字段,且其中一張表的分桶數量是另外一張表分桶數量的整數倍。
SMB Map Join同Bucket Join一樣,同樣是利用兩表各分桶之間的關聯關系,在分桶之間進行join操作,不同的是,分桶之間的join操作的實現原理。Bucket Map Join,兩個分桶之間的join實現原理為Hash Join算法;而SMB Map Join,兩個分桶之間的join實現原理為Sort Merge Join算法。
Hash Join和Sort Merge Join均為關系型數據庫中常見的Join實現算法。Hash Join的原理相對簡單,就是對參與join的一張表構建hash table,然后掃描另外一張表,然后進行逐行匹配。Sort Merge Join需要在兩張按照關聯字段排好序的表中進行,其原理如圖所示:
Hive中的SMB Map Join就是對兩個分桶的數據按照上述思路進行Join操作。可以看出,SMB Map Join與Bucket Map Join相比,在進行Join操作時,Map端是無需對整個Bucket構建hash table,也無需在Map端緩存整個Bucket數據的,每個Mapper只需按順序逐個key讀取兩個分桶的數據進行join即可。
Sort Merge Bucket Map Join有兩種觸發方式,包括Hint提示和自動轉換。Hint提示已過時,不推薦使用。下面是自動轉換的相關參數:
|
兩張表都相對較大,除了可以考慮采用Bucket Map Join算法,還可以考慮SMB Join。相較于Bucket Map Join,SMB Map Join對分桶大小是沒有要求的。
謂詞下推
謂詞下推(predicate pushdown)是指,盡量將過濾操作前移,以減少后續計算步驟的數據量。數倉實際開發中經常會涉及到多表關聯,這個時候就會涉及到on與where的使用。一般在面試的時候會提問:條件寫在where里和寫在on有什么區別?
相關參數為:
|
示例SQL語句
|
關閉謂詞下推優化
|
通過執行計劃可以看到,當我們把謂詞下推關閉以后,數據是所有數據關聯以后才進行過濾的,這樣如果量表數據量大,就大大降低了我們的執行效率
開啟謂詞下推優化
|
通過執行計劃可以看出,過濾操作位于執行計劃中的join操作之前。大大減少了關聯的數據量。對整體執行效率有很大提升。
開啟謂詞執行做關聯,優化一下SQL
|
通過執行計劃可以看出,t1和t2過濾操作都位于執行計劃中的join操作之前,對倆個表都是先過濾再關聯,效率更一步提升。
當我們使用左關聯的時候:1.所有條件寫在where中,只有左邊的條件先過濾;2.當所有條件寫在 on 里面只有右邊的條件起作用,3.為了可以讓條件都起作用,就把左表條件寫在where里,右邊條件寫在 on 里,兩者都先過濾。
結論:
- 對于Join(Inner Join)、Full outer Join,條件寫在on后面,還是where后面,性能上面沒有區別,join謂詞下推都生效,Full outer Join都不生效;
- 對于Left outer Join ,右側的表寫在on后面、左側的表寫在where后面,性能上有提高;
- 對于Right outer Join,左側的表寫在on后面、右側的表寫在where后面,性能上有提高;
合理選擇排序
order by
全局排序,只走一個reducer,當表數據量較大時容易計算不出來,性能不佳慎用,在嚴格模式下需要加limit
sort by
局部排序,即保證單個reduce內結果有序,但沒有全局排序的能力。
distribute by
按照指定的字段把數據劃分輸出到不同的reducer中,是控制數據如何從map端輸出到reduce端,hive會根據distribute by后面的字段和對應reducer的個數進行hash分發
cluster by
擁有distrubute by的能力,同時也擁有sort by的能力,所以可以理解cluster by是 distrubute by+sort by
實例代碼優化:
-- 優化前
select
id
,count(*) as cnt
from ds_hive.ch12_order_detail_orc t1
group by id
order by cnt
limit 100
;
執行時間109.9s
-- 優化后
select
id
,cnt
from
(select id ,count(*) as cntfrom ds_hive.ch12_order_detail_orc t1group by id
) t1
distribute by cnt
sort by id
limit 100
;
執行時間69s
通過優化前后時間對比,可以看到優化效果
注意實際企業運維可以通過參數 set hive.mapred.mode=strict 來設置嚴格模式,這個時候使用 orderby 全局排序必須加 limit;建議如果不是非要全局有序的話,局部有序的話建議使用 sortby,它會視情況啟動多個 reducer 進行排序,并且保證每個 reducer 內局部有序。為了控制map 端數據分配到 reducer 的 key,往往還要配合 distribute by 一同使用。如果不加 distribute by 的話,map 端數據就會隨機分配到 reducer。