文章目錄
- 前言
- 1. Common Join
- 2. Map Join
- 介紹:
- 使用方法:
- 限制:
- 3. Bucket Map Join
- 介紹:
- 好處:
- 使用條件:
- 使用方法:
- 4. Sort Merge Bucket Map Join
- 介紹:
- 如何使用:
- 5. Skew Join
- 介紹:
- 如何使用:
- 如何處理傾斜:
- 6. Left Semi Join
- 7. common join 的內部join
- 準備數據
- CUSTOMERS 數據
- 建表
- orders 表
- 建表
- 1. 內連接(Inner join)
- 2. 左外連接(Left Outer Join)
- 3. 右外連接(RIGHT OUTER JOIN)
- 4. 全外連接(FULL OUTER JOIN)
- 5. 笛卡爾積(CROSS JOIN)
- 特殊點
前言
- common join 主要是針對數據/業務邏輯的join。
Map join
,Bucket Map Join
,SMB Map Join
,Skew Join
是hive 針對特殊數據、場景 進行的優化。Left Semi Join
則是Sql
語句的優化,并且也可以應用上面的優化方案。
1. Common Join
如果不指定MapJoin或者不符合MapJoin的條件,那么Hive解析器會將Join操作轉換成Common Join,即:在Reduce階段完成join.
整個過程包含Map、Shuffle、Reduce階段。
- Map階段
讀取源表的數據,Map輸出時候以Join on條件中的列為key,如果Join有多個關聯鍵,則以這些關聯鍵的組合作為key;
Map輸出的value為join之后所關心的(select或者where中需要用到的)列;同時在value中還會包含表的Tag信息,用于標明此value對應哪個表;
按照key進行排序
- Shuffle階段
根據key的值進行hash,并將key/value按照hash值推送至不同的reduce中,這樣確保兩個表中相同的key位于同一個reduce中
- Reduce階段
根據key的值完成join操作,期間通過Tag來識別不同表中的數據。
以下面的HQL為例,圖解其過程:
SELECT
a.id,a.dept,b.age
FROM a join b
ON (a.id = b.id);
最為普通的join策略,不受數據量的大小影響,也可以叫做reduce side join ,最沒效率的一種join 方式. 它由一個mapreduce job 完成.
首先將大表和小表分別進行map 操作, 在map shuffle 的階段每一個map output key 變成了table_name_tag_prefix + join_column_value , 但是在進行partition 的時候它仍然只使用join_column_value 進行hash.
每一個reduce 接受所有的map 傳過來的split , 在reducce 的shuffle 階段,它將map output key 前面的table_name_tag_prefix 給舍棄掉進行比較. 因為reduce 的個數可以由小表的大小進行決定,所以對于每一個節點的reduce 一定可以將小表的split 放入內存變成hashtable. 然后將大表的每一條記錄進行一條一條的比較.
2. Map Join
介紹:
MAP JION
會把小表全部加載到內存中,在map
階段直接拿另外一個表的數據和內存中表數據做匹配,由于在map
端是進行了join
操作,省去了reduce
運行的時間,算是hive中的一種優化。
如上圖中的流程,首先Task A在客戶端本地執行,負責掃描小表b的數據,將其轉換成一個HashTable的數據結構,并寫入本地的文件中,之后將該文件加載到DistributeCache中。
接下來的Task B任務是一個沒有Reduce的MapReduce,啟動MapTasks掃描大表a,在Map階段,根據a的每一條記錄去和DistributeCache中b表對應的HashTable關聯,并直接輸出結果,因為沒有Reduce,所以有多少個Map Task,就有多少個結果文件。
使用方法:
SELECT /*+ MAPJOIN(b) */ a.key, a.value
FROM a JOIN b ON a.key = b.key;
前提b表是一張小表,具體小表有多小,由參數hive.mapjoin.smalltable.filesize來決定,默認值是25M。開啟hive.auto.convert.join=true參數時,默認值是false,滿足條件的話Hive在執行時候會自動轉化為MapJoin,或使用hint提示 /*+ mapjoin(table) */執行MapJoin。
具體參數:
1、小表自動選擇Mapjoin
set hive.auto.convert.join=true;
默認值:false。該參數為true時,Hive自動對左邊的表統計量,若是小表就加入內存,即對小表使用Map join
2、小表閥值
set hive.mapjoin.smalltable.filesize=25000000;
默認值:25M
hive.smalltable.filesize (replaced by hive.mapjoin.smalltable.filesize in Hive 0.8.1)
限制:
- 不支持以下內容。
unoin
后跟一個MapJoin
Lateral View
后跟MapJoin
Reduce Sink
(Group By/Join/Sort By/Cluster By/Distribute By)后跟
MapJoin`MapJoin``,然后是``Union
MapJoin`` 后跟
Join
MapJoin
其次是MapJoin
3. Bucket Map Join
介紹:
Bucket Map Join
也屬于是一個 map join
。 是hive join
的一個優化方案。主要是針對分桶表的優化。
在common join
中,如果表很大,MapReduce 中的Reducer
端就會負載過大。 因為從join
鍵和值接收所有數據,并且隨著更多數據shuffle
,性能也會下降。 因此,當我們join
在分桶列上分桶和連接的表時,我們可以使用 Hive Bucket Map Join 功能。
好處:
在分桶表中,數據是以桶的形式組織的。每個存儲桶都根據存儲桶鍵/列保存/包含某些行。因此這意味著當我們join時僅獲取所需的存儲桶,而不是在完整的表上獲取。
只有匹配的小表桶才會復制到每個mapper上。
使用條件:
- 當我們
join
的兩個表的join key
是bucket column
的時候,就可以使用。 - 一個表中的存儲桶數應是另一個表中的存儲桶數的倍數。
假設如果一個表有 2 個存儲桶,那么另一個表必須有 2 個存儲桶或 2 個存儲桶的倍數(2、4、6 等)。否則,將執行正常的內部 join。
- 開啟
set hive.optimize.bucketmapjoin = true;
配置。
使用方法:
假設有兩個表,table1 和 table2,并且兩個表的數據都使用emp_id
作為分桶鍵,存儲桶個數為 8 和 4 個。如果我們對這兩張表的 emp_id
列執行join
操作,并且如果可以將兩個表的 bucket1 發送到單個mapper
,我們可以實現大量的優化。這完全是通過執行 Hive 作業中的Bucket Map Join
完成的。
-- 創建 table1 表
hive> CREATE TABLE IF NOT EXISTS table1 (emp_id int, emp_name string, emp_city string, gender String) clustered by(emp_id) into 8 buckets row format delimited fields terminated BY ‘,’;-- 創建 table2 表
hive> CREATE TABLE IF NOT EXISTS table2 (emp_id int, job_title string) clustered by(emp_id) into 4 buckets row format delimited fields terminated BY ‘,’;-- 加載數據
hive> load data local inpath '/relativePath/data1.csv' into table table1;-- 加載數據
hive> load data local inpath '/relativePath/data2.csv' into table table2;-- 開啟優化
hive> set hive.optimize.bucketmapjoin=true;-- 查詢
hive> SELECT /*+ MAPJOIN(table2) */ table1.emp_id, table1.emp_name, table2.job_title FROM table1 JOIN table2 ON table1.emp_id = table2.emp_id;- table1 是張大表,table2 遠小于 table1.
每個 Mapper 進程從 Table2(較大的表)中拆分的文件時,都會檢索 Table1(較小的表)中唯一對應的存儲桶來完成join
任務。
4. Sort Merge Bucket Map Join
介紹:
Bucket Map Join 并沒有解決 map join 在小表必須完全裝載進內存的限制, 如果想要在一個reduce 節點的大表和小表都不用裝載進內存,必須使兩個表都在join key 上有序才行,你可以在建表的時候就指定 sorted by join key
或者使用index 的方式.
使用 SMB Map join 就要保證兩張表都是已經sort 過的。
如何使用:
開啟如下配置進行 SMB Map join:
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
有一個選項可以使用以下配置設置大表選擇策略:
set hive.auto.convert.sortmerge.join.bigtable.selection.policy
= org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ;
默認情況下,選擇策略為平均分區大小。與哈希和流式處理相比,大表選擇策略有助于確定僅為流式處理選擇哪個表。
可用的選擇策略包括:
org.apache.hadoop.hive.ql.optimizer.AvgPartitionSizeBasedBigTableSelectorForAutoSMJ (default)org.apache.hadoop.hive.ql.optimizer.LeftmostBigTableSelectorForAutoSMJorg.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ
5. Skew Join
介紹:
如何定義一張傾斜表:這張表的一個value
值于其他數據相比,其值在表中大量存在。
在 Hive 中,當表中一個或多個鍵的值明顯多于其他鍵時,就會發生傾斜聯接。這可能會導致性能問題,因為由于數據分布不均勻,聯接操作會變慢得多。為了解決此問題,Hive 提供了幾種可用于減少傾斜聯接和提高查詢性能的技術。
當我們join
的列中有一個包含傾斜數據的表時,我們可以使用傾斜連接功能。
如何使用:
hadoop 中默認的 Reducer
處理的數據大小為:
hive.exec.reducers.bytes.per.reducer = 1000000000
也就是每個節點的reduce 默認是處理1G大小的數據,如果你的join 操作也產生了數據傾斜,那么你可以在hive 中設定
set hive.optimize.skewjoin = true;
set hive.skewjoin.key = skew_key_threshold (default = 100000)
如何處理傾斜:
在運行時,會對數據進行掃描并檢測哪個key會出現傾斜,對于會傾斜的key,用map join做處理,不傾斜的key正常處理。
舉個栗子:
表 A 和表 B join,并且在 ID 為12345 時發生了數據傾斜,假設在表 B 中傾斜的數據量比表 A 少,則把 B 中所有的傾斜了的數據拿出來,存到內存中(可以用一個哈希表來存)。
對于表 A ,如果是傾斜的數據,則通過 B 存放在內存中的哈希表來 join;如果不是傾斜的 key,則按正常的 reduce 端 join 流程進行。
這樣就在map端完成了傾斜數據的處理,不會讓某一個reducer中數據量爆炸,從而拖累處理速度。
要查看語句是否用到了 Skew Join,可以 explain 一下你的 SQL,如果在 Join Operator 和 Reduce Operator Tree 下的 handleSkewJoin 為 true,那就是用了Skew Join啦。
6. Left Semi Join
hive 中沒有in/exist 這樣的子句,所以需要將這種類型的子句轉成left semi join. left semi join 是只傳遞表的join key給map 階段 , 如果key 足夠小還是執行map join, 如果不是則還是common join.
SELECT a.key, a.val
FROM a LEFT SEMI JOIN b ON (a.key = b.key)
如果被連接的表都很小,則可以將join
作為map join
作業來執行。如:
SELECT /*+ MAPJOIN(b) */ a.key, a.value
FROM a JOIN b ON a.key = b.key
不需要Reducer。對于 A 的每個Mapper,B 被完全讀取。限制是不能執行全/右外連接 b 表。
同樣當我們的 join 的key
如果滿足 map join
, bucket Map join
, SMB Map join
條件,只需要我們開啟對應的配置,就可以自動進行優化。
7. common join 的內部join
常見的 common join 有如下幾種:
- Inner join
- Left Outer Join
- Right Outer Join
- Full Outer Join
- CROSS JOIN
準備數據
CUSTOMERS 數據
ID Name Age Address Salary
1 Ross 32 Ahmedabad 2000
2 Rachel 25 Delhi 1500
3 Chandler 23 Kota 2000
4 Monika 25 Mumbai 6500
5 Mike 27 Bhopal 8500
6 Phoebe 22 MP 4500
7 Joey 24 Indore 10000
建表
CREATE TABLE CUSTOMERS (ID INT PRIMARY KEY,Name VARCHAR(255),Age INT,Address VARCHAR(255),Salary INT
);-- 導入數據
INSERT INTO CUSTOMERS (ID, Name, Age, Address, Salary)
VALUES(1, 'Ross', 32, 'Ahmedabad', 2000),(2, 'Rachel', 25, 'Delhi', 1500),(3, 'Chandler', 23, 'Kota', 2000),(4, 'Monika', 25, 'Mumbai', 6500),(5, 'Mike', 27, 'Bhopal', 8500),(6, 'Phoebe', 22, 'MP', 4500),(7, 'Joey', 24, 'Indore', 10000);
orders 表
OID Date Customer_ID Amount
102 2016-10-08 00:00:00 3 3000
100 2016-10-08 00:00:00 3 1500
101 2016-11-20 00:00:00 2 1560
103 2015-05-20 00:00:00 4 2060
建表
-- 創建表
CREATE TABLE ORDERS (OID INT PRIMARY KEY,Date DATETIME,Customer_ID INT,Amount INT
);-- 插入數據
INSERT INTO ORDERS (OID, Date, Customer_ID, Amount)
VALUES(102, '2016-10-08 00:00:00', 3, 3000),(100, '2016-10-08 00:00:00', 3, 1500),(101, '2016-11-20 00:00:00', 2, 1560),(103, '2015-05-20 00:00:00', 4, 2060);
1. 內連接(Inner join)
內連接返回兩個表中滿足連接條件的行,丟棄不匹配的行
查詢語句:
SELECT c.ID, c.NAME, c.AGE, o.AMOUNT
FROM CUSTOMERS c JOIN ORDERS o
ON (c.ID = o.CUSTOMER_ID);
結果如下:
+----+----------+------+--------+
| ID | NAME | AGE | AMOUNT |
+----+----------+------+--------+
| 3 | Chandler | 23 | 1500 |
| 2 | Rachel | 25 | 1560 |
| 3 | Chandler | 23 | 3000 |
| 4 | Monika | 25 | 2060 |
+----+----------+------+--------+
2. 左外連接(Left Outer Join)
左外連接返回左表中所有行,以及滿足連接條件的右表中的匹配行。如果右表中沒有匹配行,則右表的結果為 NULL
查詢語句:
SELECT c.ID, c.NAME, o.AMOUNT, o.DATE
FROM CUSTOMERS c
LEFT OUTER JOIN ORDERS o
ON (c.ID = o.CUSTOMER_ID);
結果如下:
+----+----------+--------+---------------------+
| ID | NAME | AMOUNT | DATE |
+----+----------+--------+---------------------+
| 3 | Chandler | 1500 | 2016-10-08 00:00:00 |
| 2 | Rachel | 1560 | 2016-11-20 00:00:00 |
| 3 | Chandler | 3000 | 2016-10-08 00:00:00 |
| 4 | Monika | 2060 | 2015-05-20 00:00:00 |
| 1 | Ross | NULL | NULL |
| 5 | Mike | NULL | NULL |
| 6 | Phoebe | NULL | NULL |
| 7 | Joey | NULL | NULL |
+----+----------+--------+---------------------+
3. 右外連接(RIGHT OUTER JOIN)
右外連接與左外連接類似,但是返回右表中所有行,以及滿足連接條件的左表中的匹配行。如果左表中沒有匹配行,則左表的結果為 NULL。
查詢語句:
SELECT c.ID, c.NAME, o.AMOUNT, o.DATE FROM CUSTOMERS c RIGHT OUTER JOIN ORDERS o ON (c.ID = o.CUSTOMER_ID);
結果如下:
+------+----------+--------+---------------------+
| ID | NAME | AMOUNT | DATE |
+------+----------+--------+---------------------+
| 3 | Chandler | 1500 | 2016-10-08 00:00:00 |
| 2 | Rachel | 1560 | 2016-11-20 00:00:00 |
| 3 | Chandler | 3000 | 2016-10-08 00:00:00 |
| 4 | Monika | 2060 | 2015-05-20 00:00:00 |
+------+----------+--------+---------------------+
4. 全外連接(FULL OUTER JOIN)
全外連接返回左表和右表中所有行,同時在任何一方沒有匹配行的地方填充 NULL。
查詢語句:
SELECT c.ID, c.NAME, o.AMOUNT, o.DATE
FROM CUSTOMERS c
FULL OUTER JOIN ORDERS o
ON (c.ID = o.CUSTOMER_ID);
結果如下:
+------+----------+--------+---------------------+
| ID | NAME | AMOUNT | DATE |
+------+----------+--------+---------------------+
| 1 | Ross | NULL | NULL |
| 2 | Rachel | 1560 | 2016-11-20 00:00:00 |
| 3 | Chandler | 1500 | 2016-10-08 00:00:00 |
| 3 | Chandler | 3000 | 2016-10-08 00:00:00 |
| 4 | Monika | 2060 | 2015-05-20 00:00:00 |
| 5 | Mike | NULL | NULL |
| 6 | Phoebe | NULL | NULL |
| 7 | Joey | NULL | NULL |
+------+----------+--------+---------------------+
5. 笛卡爾積(CROSS JOIN)
這個查詢會返回 “employees” 表中的每一行與 “ORDERS” 表中的每一行的組合,從而產生兩個表的笛卡爾積。
請注意,笛卡爾積可能會產生非常大的結果集,因此在使用 CROSS JOIN 時需要謹慎。在實際使用中,應確保只在需要時使用笛卡爾積操作,以避免性能問題。
查詢語句:
SELECT c.*, o.*
FROM CUSTOMERS c
CROSS JOIN ORDERS o;
c.ID | c.Name | c.Age | c.Address | c.Salary | o.OID | o.Date | o.Customer_ID | o.Amount
-----|---------|-------|-------------|----------|-------|---------------------|--------------|----------
1 | Ross | 32 | Ahmedabad | 2000.00 | 102 | 2016-10-08 00:00:00 | 3 | 3000
1 | Ross | 32 | Ahmedabad | 2000.00 | 100 | 2016-10-08 00:00:00 | 3 | 1500
1 | Ross | 32 | Ahmedabad | 2000.00 | 101 | 2016-11-20 00:00:00 | 2 | 1560
1 | Ross | 32 | Ahmedabad | 2000.00 | 103 | 2015-05-20 00:00:00 | 4 | 2060
... ... ... ... ... ... ... ... ...
特殊點
對于inner join 左表和右表 當有非等值過濾條件時,可以按照過濾條件進行過濾。
-
左表有過濾條件,只過濾左表,右表不過濾
-
右表有過濾條件,只過濾右表,左表不過濾
-
左右表有過濾條件,同時過濾
案例如下:
explain dependency select * from student_part a inner join student_part_parquet b
on a.s_name = b.s_name and a.s_age <23;
左4 右7
explain dependency select * from student_part a inner join student_part_parquet b
on a.s_name = b.s_name and b.s_age <23;
左7 右4
explain dependency select * from student_part a inner join student_part_parquet b
on a.s_name = b.s_name and b.s_age <23 and a.s_age >24;
左2 右3
對于 left join
-
只過濾左表 左表無過濾 右表有過濾
-
只過濾右表 左表無過濾 右表有過濾
-
同時過濾左右表 左表無過濾 右表有過濾
只過濾左表 —結果 左7 右 7 無過濾
explain dependency select * from student_part a left outer join student_part_parquet b
on a.s_name = b.s_name and a.s_age <23;
只過濾右表 —結果 左7 右 3 左表無過濾,右表有過濾
explain dependency select * from student_part a left outer join student_part_parquet b
on a.s_name = b.s_name and b.s_age <23;
同時過濾左右表 —結果 左7 右 3 左表無過濾,右表有過濾
explain dependency select * from student_part a left outer join student_part_parquet b
on a.s_name = b.s_name and b.s_age <23 and a.s_age >25;
所以建議盡早在子查詢中,提前過濾數據。