一、主題
- hive表的數據壓縮和文件存儲格式
- hive的自定義UDF函數
- hive的JDBC代碼操作
- hive的SerDe介紹和使用
- hive的優化
二、要點
1. hive表的文件存儲格式
Hive支持的存儲數的格式主要有:TEXTFILE(行式存儲) 、SEQUENCEFILE(行式存儲)、ORC(列式存儲)、PARQUET(列式存儲)。
1、列式存儲和行式存儲
上圖左邊為邏輯表,右邊第一個為行式存儲,第二個為列式存儲。
行存儲的特點: 查詢滿足條件的一整行數據的時候,列存儲則需要去每個聚集的字段找到對應的每個列的值,行存儲只需要找到其中一個值,其余的值都在相鄰地方,所以此時行存儲查詢的速度更快。select *
列存儲的特點: 因為每個字段的數據聚集存儲,在查詢只需要少數幾個字段的時候,能大大減少讀取的數據量;每個字段的數據類型一定是相同的,列式存儲可以針對性的設計更好的設計壓縮算法。 select 某些字段效率更高
TEXTFILE和SEQUENCEFILE的存儲格式都是基于行存儲的;
ORC和PARQUET是基于列式存儲的。
2 、TEXTFILE格式
默認格式,數據不做壓縮,磁盤開銷大,數據解析開銷大。可結合Gzip、Bzip2使用(系統自動檢查,執行查詢時自動解壓),但使用這種方式,hive不會對數據進行切分,從而無法對數據進行并行操作。
????3 、ORC格式
Orc (Optimized Row Columnar)是hive 0.11版里引入的新的存儲格式。
可以看到每個Orc文件由1個或多個stripe組成,每個stripe250MB大小,這個Stripe實際相當于RowGroup概念,不過大小由4MB->250MB,這樣能提升順序讀的吞吐率。每個Stripe里有三部分組成,分別是Index Data,Row Data,Stripe Footer:
一個orc文件可以分為若干個Stripe
一個stripe可以分為三個部分
indexData:某些列的索引數據
rowData :真正的數據存儲
StripFooter:stripe的元數據信息
1)Index Data:一個輕量級的index,默認是每隔1W行做一個索引。這里做的索引只是記錄某行的各字段在Row Data中的offset。
? 2)Row Data:存的是具體的數據,先取部分行,然后對這些行按列進行存儲。對每個列進行了編碼,分成多個Stream來存儲。
? 3)Stripe Footer:存的是各個stripe的元數據信息
每個文件有一個File Footer,這里面存的是每個Stripe的行數,每個Column的數據類型信息等;每個文件的尾部是一個PostScript,這里面記錄了整個文件的壓縮類型以及FileFooter的長度信息等。在讀取文件時,會seek到文件尾部讀PostScript,從里面解析到File Footer長度,再讀FileFooter,從里面解析到各個Stripe信息,再讀各個Stripe,即從后往前讀。
??4 、PARQUET格式
Parquet是面向分析型業務的列式存儲格式,由Twitter和Cloudera合作開發,2015年5月從Apache的孵化器里畢業成為Apache頂級項目。
Parquet文件是以二進制方式存儲的,所以是不可以直接讀取的,文件中包括該文件的數據和元數據,因此Parquet格式文件是自解析的。
通常情況下,在存儲Parquet數據的時候會按照Block大小設置行組的大小,由于一般情況下每一個Mapper任務處理數據的最小單位是一個Block,這樣可以把每一個行組由一個Mapper任務處理,增大任務執行并行度。Parquet文件的格式如下圖所示。
上圖展示了一個Parquet文件的內容,一個文件中可以存儲多個行組,文件的首位都是該文件的Magic Code,用于校驗它是否是一個Parquet文件,Footer length記錄了文件元數據的大小,通過該值和文件長度可以計算出元數據的偏移量,文件的元數據中包括每一個行組的元數據信息和該文件存儲數據的Schema信息。除了文件中每一個行組的元數據,每一頁的開始都會存儲該頁的元數據,在Parquet中,有三種類型的頁:數據頁、字典頁和索引頁。數據頁用于存儲當前行組中該列的值,字典頁存儲該列值的編碼字典,每一個列塊中最多包含一個字典頁,索引頁用來存儲當前行組下該列的索引,目前Parquet中還不支持索引頁。
5 主流文件存儲格式對比實驗
從存儲文件的壓縮比和查詢速度兩個角度對比。
存儲文件的壓縮比測試:
測試數據 參見log.data
1)TextFile
(1)創建表,存儲數據格式為TEXTFILE
use myhive;
create table log_text (
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE ;
(2)向表中加載數據
load data local inpath '/kkb/install/hivedatas/log.data' into table log_text ;
(3)查看表中數據大小,大小為18.1M
dfs -du -h /user/hive/warehouse/myhive.db/log_text;
18.1 M /user/hive/warehouse/log_text/log.data
2)ORC
(1)創建表,存儲數據格式為ORC
create table log_orc(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc ;
(2)向表中加載數據
insert into table log_orc select * from log_text ;
(3)查看表中數據大小
dfs -du -h /user/hive/warehouse/myhive.db/log_orc;2.8 M /user/hive/warehouse/log_orc/123456_0
orc這種存儲格式,默認使用了zlib壓縮方式來對數據進行壓縮,所以數據會變成了2.8M,非常小
3)Parquet
(1)創建表,存儲數據格式為parquet
create table log_parquet(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS PARQUET ;
(2)向表中加載數據
insert into table log_parquet select * from log_text ;
(3)查看表中數據大小
dfs -du -h /user/hive/warehouse/myhive.db/log_parquet;13.1 M /user/hive/warehouse/log_parquet/123456_0
- ??存儲文件的壓縮比總結:
ORC > Parquet > textFile
存儲文件的查詢速度測試:
1)TextFile
hive (default)> select count(*) from log_text;
_c0
100000
Time taken: 21.54 seconds, Fetched: 1 row(s) 2)ORC
hive (default)> select count(*) from log_orc;
_c0
100000
Time taken: 20.867 seconds, Fetched: 1 row(s) 3)Parquet
hive (default)> select count(*) from log_parquet;
_c0
100000
Time taken: 22.922 seconds, Fetched: 1 row(s)
- ??存儲文件的查詢速度總結:
ORC > TextFile > Parquet
2、存儲和壓縮結合
官網:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC
ORC存儲方式的壓縮:
Key | Default | Notes |
---|---|---|
orc.compress | ZLIB | high level compression (one of NONE, ZLIB, SNAPPY) |
orc.compress.size | 262,144 | number of bytes in each compression chunk |
orc.stripe.size | 67,108,864 | number of bytes in each stripe |
orc.row.index.stride | 10,000 | number of rows between index entries (must be >= 1000) |
orc.create.index | true | whether to create row indexes |
orc.bloom.filter.columns | “” | comma separated list of column names for which bloom filter should be created |
orc.bloom.filter.fpp | 0.05 | false positive probability for bloom filter (must >0.0 and <1.0) |
1)創建一個非壓縮的的ORC存儲方式
(1)建表語句
create table log_orc_none(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties ("orc.compress"="NONE");
(2)插入數據
insert into table log_orc_none select * from log_text ;
(3)查看插入后數據
dfs -du -h /user/hive/warehouse/myhive.db/log_orc_none;7.7 M /user/hive/warehouse/log_orc_none/123456_0
2)創建一個SNAPPY壓縮的ORC存儲方式
(1)建表語句
create table log_orc_snappy(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties ("orc.compress"="SNAPPY");
(2)插入數據
insert into table log_orc_snappy select * from log_text ;
(3)查看插入后數據
dfs -du -h /user/hive/warehouse/myhive.db/log_orc_snappy ;
3.8 M /user/hive/warehouse/log_orc_snappy/123456_0
3)上一節中默認創建的ORC存儲方式,導入數據后的大小為
2.8 M /user/hive/warehouse/log_orc/123456_0
比Snappy壓縮的還小。原因是orc存儲文件默認采用ZLIB壓縮。比snappy壓縮的小。
4)存儲方式和壓縮總結:
? 在實際的項目開發當中,hive表的數據存儲格式一般選擇:orc或parquet。壓縮方式一般選擇snappy。
??3. hive的SerDe
1 hive的SerDe是什么
? **Serde是 Serializer/Deserializer的簡寫。hive使用Serde進行行對象的序列與反序列化。**最后實現把文件內容映射到 hive 表中的字段數據類型。
? 為了更好的闡述使用 SerDe 的場景,我們需要了解一下 Hive 是如何讀數據的(類似于 HDFS 中數據的讀寫操作):
HDFS files –> InputFileFormat –> <key, value> –> Deserializer –> Row objectRow object –> Serializer –> <key, value> –> OutputFileFormat –> HDFS files
2 hive的SerDe(Serialize/Deserialize) 類型
- Hive 中內置org.apache.hadoop.hive.serde2 庫,內部封裝了很多不同的SerDe類型。
- hive創建表時, 通過自定義的SerDe或使用Hive內置的SerDe類型指定數據的序列化和反序列化方式。
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
[(col_name data_type [COMMENT col_comment], ...)] [COMMENT table_comment] [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)]
[CLUSTERED BY (col_name, col_name, ...)
[SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS]
[ROW FORMAT row_format]
[STORED AS file_format]
[LOCATION hdfs_path]
-
如上創建表語句, 使用row format 參數說明SerDe的類型。
-
你可以創建表時使用用戶自定義的Serde或者native Serde, 如果 ROW FORMAT沒有指定或者指定了 ROW FORMAT DELIMITED就會使用native Serde。
-
Hive SerDes:
- Avro (Hive 0.9.1 and later)
- ORC (Hive 0.11 and later)
- RegEx
- Thrift
- Parquet (Hive 0.13 and later)
- CSV (Hive 0.14 and later)
- MultiDelimitSerDe
3 企業實戰
1 通過MultiDelimitSerDe 解決多字符分割場景
- 1、創建表
use myhive;
create table t1 (id String, name string)
row format serde 'org.apache.hadoop.hive.contrib.serde2.MultiDelimitSerDe'
WITH SERDEPROPERTIES ("field.delim"="##");
- 2、準備數據 t1.txt
cd /kkb/install/hivedatas
vim t1.txt1##xiaoming
2##xiaowang
3##xiaozhang
- 3、加載數據
load data local inpath '/kkb/install/hivedatas/t1.txt' into table t1;
- 4、查詢數據
0: jdbc:hive2://node1:10000> select * from t1;
+--------+------------+--+
| t1.id | t1.name |
+--------+------------+--+
| 1 | xiaoming |
| 2 | xiaowang |
| 3 | xiaozhang |
+--------+------------+--+
2 通過RegexSerDe 解決多字符分割場景
- 1、創建表
create table t2(id int, name string)
row format serde 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES ("input.regex" = "^(.*)\\#\\#(.*)$");
- 2、準備數據 t1.txt
1##xiaoming
2##xiaowang
3##xiaozhang
- 3、加載數據
load data local inpath '/kkb/install/hivedatas/t1.txt' into table t2;
- 4、查詢數據
0: jdbc:hive2://node1:10000> select * from t2;
+--------+------------+--+
| t2.id | t2.name |
+--------+------------+--+
| 1 | xiaoming |
| 2 | xiaowang |
| 3 | xiaozhang |
+--------+------------+--+
??????4. hive的企業級調優
1、Fetch抓取
-
Fetch抓取是指,Hive中對某些情況的查詢可以不必使用MapReduce計算
- 例如:select * from score;
- 在這種情況下,Hive可以簡單地讀取employee對應的存儲目錄下的文件,然后輸出查詢結果到控制臺
-
在hive-default.xml.template文件中 hive.fetch.task.conversion默認是more,老版本hive默認是minimal,該屬性修改為more以后,在全局查找、字段查找、limit查找等都不走mapreduce。
-
案例實操
- 把 hive.fetch.task.conversion設置成**none**,然后執行查詢語句,都會執行mapreduce程序
set hive.fetch.task.conversion=none; select * from score; select s_id from score; select s_id from score limit 3;
- 把hive.fetch.task.conversion設置成==more==,然后執行查詢語句,如下查詢方式都不會執行mapreduce程序。
set hive.fetch.task.conversion=more; select * from score; select s_id from score; select s_id from score limit 3;
2、本地模式
-
在Hive客戶端測試時,默認情況下是啟用hadoop的job模式,把任務提交到集群中運行,這樣會導致計算非常緩慢;
-
Hive可以通過本地模式在單臺機器上處理任務。對于小數據集,執行時間可以明顯被縮短。
-
案例實操
--開啟本地模式,并執行查詢語句 set hive.exec.mode.local.auto=true; //開啟本地mr--設置local mr的最大輸入數據量,當輸入數據量小于這個值時采用local mr的方式, --默認為134217728,即128M set hive.exec.mode.local.auto.inputbytes.max=50000000;--設置local mr的最大輸入文件個數,當輸入文件個數小于這個值時采用local mr的方式, --默認為4 set hive.exec.mode.local.auto.input.files.max=5;--執行查詢的sql語句 select * from student cluster by s_id;
--關閉本地運行模式
set hive.exec.mode.local.auto=false;
select * from student cluster by s_id;
3、表的優化
1 小表、大表 join
-
將key相對分散,并且數據量小的表放在join的左邊,這樣可以有效減少內存溢出錯誤發生的幾率;再進一步,可以使用map join讓小的維度表(1000條以下的記錄條數)先進內存。在map端完成reduce。
select count(distinct s_id) from score;select count(s_id) from score group by s_id; 在map端進行聚合,效率更高
-
實際測試發現:新版的hive已經對小表 join 大表和大表 join 小表進行了優化。小表放在左邊和右邊已經沒有明顯區別。
-
多個表關聯時,最好分拆成小段,避免大sql(無法控制中間Job)
2 大表 join 大表
-
1.空 key 過濾
-
有時join超時是因為某些key對應的數據太多,而相同key對應的數據都會發送到相同的reducer上,從而導致內存不夠。
-
此時我們應該仔細分析這些異常的key,很多情況下,這些key對應的數據是異常數據,我們需要在SQL語句中進行過濾。
-
測試環境準備:
use myhive; create table ori(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';create table nullidtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';create table jointable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';load data local inpath '/kkb/install/hivedatas/hive_big_table/*' into table ori; load data local inpath '/kkb/install/hivedatas/hive_have_null_id/*' into table nullidtable;
過濾空key與不過濾空key的結果比較
不過濾: INSERT OVERWRITE TABLE jointable SELECT a.* FROM nullidtable a JOIN ori b ON a.id = b.id; 結果: No rows affected (152.135 seconds)過濾: INSERT OVERWRITE TABLE jointable SELECT a.* FROM (SELECT * FROM nullidtable WHERE id IS NOT NULL ) a JOIN ori b ON a.id = b.id; 結果: No rows affected (141.585 seconds)
-
-
2、空 key 轉換
-
有時雖然某個 key 為空對應的數據很多,但是相應的數據不是異常數據,必須要包含在 join 的結果中,此時我們可以表 a 中 key 為空的字段賦一個隨機的值,使得數據隨機均勻地分不到不同的 reducer 上。
不隨機分布:
set hive.exec.reducers.bytes.per.reducer=32123456; set mapreduce.job.reduces=7; INSERT OVERWRITE TABLE jointable SELECT a.* FROM nullidtable a LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN 'hive' ELSE a.id END = b.id; No rows affected (41.668 seconds)
結果:這樣的后果就是所有為null值的id全部都變成了相同的字符串,及其容易造成數據的傾斜(所有的key相同,相同key的數據會到同一個reduce當中去)
為了解決這種情況,我們可以通過hive的rand函數,隨記的給每一個為空的id賦上一個隨機值,這樣就不會造成數據傾斜
? 隨機分布:
set hive.exec.reducers.bytes.per.reducer=32123456; set mapreduce.job.reduces=7; INSERT OVERWRITE TABLE jointable SELECT a.* FROM nullidtable a LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN concat('hive', rand()) ELSE a.id END = b.id;No rows affected (42.594 seconds)
-
3、大表join小表與小表join大表實測
需求:測試大表JOIN小表和小表JOIN大表的效率 (新的版本當中已經沒有區別了,舊的版本當中需要使用小表)
(1)建大表、小表和JOIN后表的語句
create table bigtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';create table smalltable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';create table jointable2(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';
(2)分別向大表和小表中導入數據
hive (default)> load data local inpath '/kkb/install/hivedatas/big_data' into table bigtable;hive (default)>load data local inpath '/kkb/install/hivedatas/small_data' into table smalltable;
????3 map join
-
如果不指定MapJoin 或者不符合 MapJoin的條件,那么Hive解析器會將Join操作轉換成Common Join,即:在Reduce階段完成join。容易發生數據傾斜。可以用 MapJoin 把小表全部加載到內存在map端進行join,避免reducer處理。
-
1、開啟MapJoin參數設置
--默認為true set hive.auto.convert.join = true;
-
2、大表小表的閾值設置(默認25M一下認為是小表)
set hive.mapjoin.smalltable.filesize=26214400;
- 3、MapJoin工作機制
首先是Task A,它是一個Local Task(在客戶端本地執行的Task),負責掃描小表b的數據,將其轉換成一個HashTable的數據結構,并寫入本地的文件中,之后將該文件加載到DistributeCache中。
接下來是Task B,該任務是一個沒有Reduce的MR,啟動MapTasks掃描大表a,在Map階段,根據a的每一條記錄去和DistributeCache中b表對應的HashTable關聯,并直接輸出結果。
由于MapJoin沒有Reduce,所以由Map直接輸出結果文件,有多少個Map Task,就有多少個結果文件。
案例實操:
(1)開啟Mapjoin功能
set hive.auto.convert.join = true; 默認為true
(2)執行小表JOIN大表語句
INSERT OVERWRITE TABLE jointable2
SELECT b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
FROM smalltable s
JOIN bigtable b
ON s.id = b.id;Time taken: 31.814 seconds
(3)執行大表JOIN小表語句
INSERT OVERWRITE TABLE jointable2
SELECT b.id, b.time, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
FROM bigtable b
JOIN smalltable s
ON s.id = b.id;Time taken: 28.46 seconds
4 group By
-
默認情況下,Map階段同一Key數據分發給一個reduce,當一個key數據過大時就傾斜了。
-
并不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端進行部分聚合,最后在Reduce端得出最終結果。
-
開啟Map端聚合參數設置
--是否在Map端進行聚合,默認為True set hive.map.aggr = true; --在Map端進行聚合操作的條目數目 set hive.groupby.mapaggr.checkinterval = 100000; --有數據傾斜的時候進行負載均衡(默認是false) set hive.groupby.skewindata = true;當選項設定為 true,生成的查詢計劃會有兩個MR Job。第一個MR Job中,Map的輸出結果會隨機分布到Reduce中,每個Reduce做部分聚合操作,并輸出結果,這樣處理的結果是相同的Group By Key有可能被分發到不同的Reduce中,從而達到負載均衡的目的;第二個MR Job再根據預處理的數據結果按照Group By Key分布到Reduce中(這個過程可以保證相同的Group By Key被分布到同一個Reduce中),最后完成最終的聚合操作。
5 count(distinct)
-
數據量小的時候無所謂,數據量大的情況下,由于count distinct 操作需要用一個reduce Task來完成,這一個Reduce需要處理的數據量太大,就會導致整個Job很難完成,一般count distinct使用先group by 再count的方式替換
環境準備:
create table bigtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';load data local inpath '/kkb/install/hivedatas/data/100萬條大表數據(id除以10取整)/bigtable' into table bigtable;--每個reduce任務處理的數據量 默認256000000(256M)set hive.exec.reducers.bytes.per.reducer=32123456;select count(distinct ip ) from log_text;轉換成set hive.exec.reducers.bytes.per.reducer=32123456;select count(ip) from (select ip from log_text group by ip) t;雖然會多用一個Job來完成,但在數據量大的情況下,這個絕對是值得的。
6 笛卡爾積
- 盡量避免笛卡爾積,即避免join的時候不加on條件,或者無效的on條件
- Hive只能使用1個reducer來完成笛卡爾積。
????4、使用分區剪裁、列剪裁
- 盡可能早地過濾掉盡可能多的數據量,避免大量數據流入外層SQL。
- 列剪裁
- 只獲取需要的列的數據,減少數據輸入。
- 分區裁剪
- 分區在hive實質上是目錄,分區裁剪可以方便直接地過濾掉大部分數據。
- 盡量使用分區過濾,少用select *
? 環境準備:
create table ori(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';create table bigtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';load data local inpath '/home/admin/softwares/data/加遞增id的原始數據/ori' into table ori;load data local inpath '/home/admin/softwares/data/100萬條大表數據(id除以10取整)/bigtable' into table bigtable;
先關聯再Where:
SELECT a.id
FROM bigtable a
LEFT JOIN ori b ON a.id = b.id
WHERE b.id <= 10;
正確的寫法是寫在ON后面:先Where再關聯
SELECT a.id
FROM ori a
LEFT JOIN bigtable b ON (a.id <= 10 AND a.id = b.id);
或者直接寫成子查詢:
SELECT a.id
FROM bigtable a
RIGHT JOIN (SELECT id
FROM ori
WHERE id <= 10
) b ON a.id = b.id;
5、并行執行
- 把一個sql語句中沒有相互依賴的階段并行去運行。提高集群資源利用率
--開啟并行執行
set hive.exec.parallel=true;
--同一個sql允許最大并行度,默認為8。
set hive.exec.parallel.thread.number=16;
6、嚴格模式
-
Hive提供了一個嚴格模式,可以防止用戶執行那些可能意想不到的不好的影響的查詢。
-
通過設置屬性hive.mapred.mode值為默認是非嚴格模式nonstrict 。開啟嚴格模式需要修改hive.mapred.mode值為strict,開啟嚴格模式可以禁止3種類型的查詢。
--設置非嚴格模式(默認) set hive.mapred.mode=nonstrict;--設置嚴格模式 set hive.mapred.mode=strict;
-
(1)對于分區表,除非where語句中含有分區字段過濾條件來限制范圍,否則不允許執行
--設置嚴格模式下 執行sql語句報錯; 非嚴格模式下是可以的 select * from order_partition;異常信息:Error: Error while compiling statement: FAILED: SemanticException [Error 10041]: No partition predicate found for Alias "order_partition" Table "order_partition"
-
(2)對于使用了order by語句的查詢,要求必須使用limit語句
--設置嚴格模式下 執行sql語句報錯; 非嚴格模式下是可以的 select * from order_partition where month='2019-03' order by order_price; 異常信息:Error: Error while compiling statement: FAILED: SemanticException 1:61 In strict mode, if ORDER BY is specified, LIMIT must also be specified. Error encountered near token 'order_price'
-
(3)限制笛卡爾積的查詢
- 嚴格模式下,避免出現笛卡爾積的查詢
7、JVM重用
-
JVM重用是Hadoop調優參數的內容,其對Hive的性能具有非常大的影響,特別是對于很難避免小文件的場景或task特別多的場景,這類場景大多數執行時間都很短。
Hadoop的默認配置通常是使用派生JVM來執行map和Reduce任務的。這時JVM的啟動過程可能會造成相當大的開銷,尤其是執行的job包含有成百上千task任務的情況。JVM重用可以使得JVM實例在同一個job中重新使用N次。N的值可以在Hadoop的mapred-site.xml文件中進行配置。通常在10-20之間,具體多少需要根據具體業務場景測試得出。
<property><name>mapreduce.job.jvm.numtasks</name><value>10</value><description>How many tasks to run per jvm. If set to -1, there isno limit. </description> </property>
我們也可以在hive當中通過
set mapred.job.reuse.jvm.num.tasks=10;
這個設置來設置我們的jvm重用
這個功能的缺點是,開啟JVM重用將一直占用使用到的task插槽,以便進行重用,直到任務完成后才能釋放。如果某個“不平衡的”job中有某幾個reduce task執行的時間要比其他Reduce task消耗的時間多的多的話,那么保留的插槽就會一直空閑著卻無法被其他的job使用,直到所有的task都結束了才會釋放。
8、推測執行
-
在分布式集群環境下,因為程序Bug(包括Hadoop本身的bug),負載不均衡或者資源分布不均等原因,會造成同一個作業的多個任務之間運行速度不一致,有些任務的運行速度可能明顯慢于其他任務(比如一個作業的某個任務進度只有50%,而其他所有任務已經運行完畢),則這些任務會拖慢作業的整體執行進度。為了避免這種情況發生,Hadoop采用了推測執行(Speculative Execution)機制,它根據一定的法則推測出“拖后腿”的任務,并為這樣的任務啟動一個備份任務,讓該任務與原始任務同時處理同一份數據,并最終選用最先成功運行完成任務的計算結果作為最終結果。
設置開啟推測執行參數:Hadoop的mapred-site.xml文件中進行配置
<property><name>mapreduce.map.speculative</name><value>true</value><description>If true, then multiple instances of some map tasks may be executed in parallel.</description>
</property><property><name>mapreduce.reduce.speculative</name><value>true</value><description>If true, then multiple instances of some reduce tasks may be executed in parallel.</description>
</property>
不過hive本身也提供了配置項來控制reduce-side的推測執行:
<property><name>hive.mapred.reduce.tasks.speculative.execution</name><value>true</value><description>Whether speculative execution for reducers should be turned on. </description></property>
關于調優這些推測執行變量,還很難給一個具體的建議。如果用戶對于運行時的偏差非常敏感的話,那么可以將這些功能關閉掉。如果用戶因為輸入數據量很大而需要執行長時間的map或者Reduce task的話,那么啟動推測執行造成的浪費是非常巨大大。
9、壓縮
? 參見數據的壓縮
-
Hive表中間數據壓縮
#設置為true為激活中間數據壓縮功能,默認是false,沒有開啟 set hive.exec.compress.intermediate=true; #設置中間數據的壓縮算法 set mapred.map.output.compression.codec= org.apache.hadoop.io.compress.SnappyCodec;
-
Hive表最終輸出結果壓縮
set hive.exec.compress.output=true; set mapred.output.compression.codec= org.apache.hadoop.io.compress.SnappyCodec;
10、使用EXPLAIN(執行計劃)
查看hql執行計劃
????11、數據傾斜
1 合理設置Map數
-
- 通常情況下,作業會通過input的目錄產生一個或者多個map任務。
主要的決定因素有:input的文件總個數,input的文件大小,集群設置的文件塊大小。舉例: a) 假設input目錄下有1個文件a,大小為780M,那么hadoop會將該文件a分隔成7個塊(6個128m的塊和1個12m的塊),從而產生7個map數。 b) 假設input目錄下有3個文件a,b,c大小分別為10m,20m,150m,那么hadoop會分隔成4個塊(10m,20m,128m,22m),從而產生4個map數。即,如果文件大于塊大小(128m),那么會拆分,如果小于塊大小,則把該文件當成一個塊。
-
2) 是不是map數越多越好?
答案是否定的。如果一個任務有很多小文件(遠遠小于塊大小128m),則每個小文件也會被當做一個塊,用一個map任務來完成,而一個map任務啟動和初始化的時間遠遠大于邏輯處理的時間,就會造成很大的資源浪費。而且,同時可執行的map數是受限的。
-
3) 是不是保證每個map處理接近128m的文件塊,就高枕無憂了?
答案也是不一定。比如有一個127m的文件,正常會用一個map去完成,但這個文件只有一個或者兩個小字段,卻有幾千萬的記錄,如果map處理的邏輯比較復雜,用一個map任務去做,肯定也比較耗時。針對上面的問題2和3,我們需要采取兩種方式來解決:即減少map數和增加map數;
2 小文件合并
-
在map執行前合并小文件,減少map數:
-
CombineHiveInputFormat 具有對小文件進行合并的功能(系統默認的格式)
set mapred.max.split.size=112345600; set mapred.min.split.size.per.node=112345600; set mapred.min.split.size.per.rack=112345600; set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
這個參數表示執行前進行小文件合并,前面三個參數確定合并文件塊的大小,大于文件塊大小128m的,按照128m來分隔,小于128m,大于100m的,按照100m來分隔,把那些小于100m的(包括小文件和分隔大文件剩下的),進行合并。
3 復雜文件增加Map數
-
當input的文件都很大,任務邏輯復雜,map執行非常慢的時候,可以考慮增加Map數,來使得每個map處理的數據量減少,從而提高任務的執行效率。
-
增加map的方法為
- 根據 ==computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))==公式
- 調整maxSize最大值。讓maxSize最大值低于blocksize就可以增加map的個數。
mapreduce.input.fileinputformat.split.minsize=1 默認值為1mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue 默認值Long.MAXValue因此,默認情況下,切片大小=blocksize maxsize(切片最大值): 參數如果調到比blocksize小,則會讓切片變小,而且就等于配置的這個參數的值。minsize(切片最小值): 參數調的比blockSize大,則可以讓切片變得比blocksize還大。
- 例如
--設置maxsize大小為10M,也就是說一個fileSplit的大小為10M set mapreduce.input.fileinputformat.split.maxsize=10485760;
4 合理設置Reduce數
-
1、調整reduce個數方法一
-
1)每個Reduce處理的數據量默認是256MB
set hive.exec.reducers.bytes.per.reducer=256000000;
-
- 每個任務最大的reduce數,默認為1009
set hive.exec.reducers.max=1009;
-
- 計算reducer數的公式
N=min(參數2,總輸入數據量/參數1)
-
-
2、調整reduce個數方法二
--設置每一個job中reduce個數 set mapreduce.job.reduces=3;
-
3、reduce個數并不是越多越好
-
過多的啟動和初始化reduce也會消耗時間和資源;
-
同時過多的reduce會生成很多個文件,也有可能出現小文件問題
-