Spark Join類型
1. Inner Join (內連接)
- 示例:
val result = df1.join(df2, df1("id") === df2("id"), "inner")
- 執行邏輯:只返回那些在兩個表中都有匹配的行。
2. Left Join (左外連接)
- 示例:
val result = df1.join(df2, df1("id") === df2("id"), "left")
- 執行邏輯:返回左表的所有記錄,并且右表的匹配行,若右表沒有匹配行則返回
null
。
3. Right Join (右外連接)
- 示例:
val result = df1.join(df2, df1("id") === df2("id"), "right")
- 執行邏輯:返回右表的所有記錄,并且左表的匹配行,若左表沒有匹配行則返回
null
。
4. Full Join (全外連接)
- 示例:
val result = df1.join(df2, df1("id") === df2("id"), "outer")
- 執行邏輯:返回左表和右表的所有記錄,若某一方沒有匹配,另一方則填充
null
。
5. Left Semi Join (左半連接)
- 示例:
val result = df1.join(df2, df1("id") === df2("id"), "left_semi")
- 執行邏輯:返回左表中與右表匹配的行,只返回左表數據,不返回右表。
6. Left Anti Join (左反連接)
- 示例:
val result = df1.join(df2, df1("id") === df2("id"), "left_anti")
- 執行邏輯:返回左表中不與右表匹配的行。
7. Cross Join (笛卡爾積連接)
- 示例:
val result = df1.crossJoin(df2)
- 執行邏輯:返回兩表的笛卡爾積,左表的每一行與右表的每一行組合。
Spark Join實現方式
在 Spark 中,
Join
操作有多種實現方式,每種方式的實現原理、適用場景和執行性能有所不同。接下來我們詳細討論以下幾種常見的
Join
實現方式:
1. CPJ(Cartesion Product Join)笛卡爾積連接
工作原理:
- 笛卡爾積連接是最基礎的連接方式,它將兩個數據集的每一條記錄與另一個數據集的每一條記錄進行配對,從而生成一個新的結果集。這個操作是非常低效的,因為它會產生
N * M
條記錄(N
和M
分別是兩個數據集的行數)。- 這種方式不需要連接條件,因此通常不是我們期望的連接類型。
執行性能:
- 效率低:當兩個數據集的大小很大時,計算量將急劇增加。通常,笛卡爾積連接僅在明確需要時使用(例如,計算所有可能的配對)。
Spark 選擇笛卡爾積的情況:
- 笛卡爾積連接在 Spark 中通常是顯式調用
crossJoin()
時使用。2. SMJ(Shuffle Sort Merge Join)排序歸并連接
工作原理:
- 排序歸并連接首先對兩個數據集按照連接鍵進行排序,然后使用
merge
操作將排序后的數據集進行合并。數據集會被按連接鍵進行shuffle
,然后在每個分區內執行歸并操作。- 這種方法非常適合處理大規模的分布式數據,尤其是當兩個數據集都很大并且有良好的分區時。
執行性能:
- 效率較高:適合大數據量的連接,尤其當連接鍵有排序特性時。
- 由于需要對數據進行排序和
shuffle
,這會增加網絡和磁盤的 I/O 成本。Spark 選擇 SMJ 的情況:
- 當數據集較大并且 Spark 能夠進行有效的
shuffle
操作時,Spark 會選擇SMJ
。- 如果連接的表已經分區或有排序字段,則 Spark 會優先選擇該方式。
3. SHJ(Shuffle Hash Join)哈希連接
工作原理:
- 哈希連接(SHJ) 是一種基于哈希表的連接方式。其基本思想是將一個表(通常是較小的表)哈希到內存中,然后通過哈希表查找另一個表的匹配記錄。該方法特別適合處理大規模的數據集,尤其是當連接的兩個數據集都比較大時,或者當連接鍵不具有順序或排序特性時。
- 執行步驟:
- 分區階段(Shuffle):首先,Spark 會將兩個數據集根據連接鍵進行
shuffle
(重分區),確保具有相同連接鍵的記錄被發送到同一個節點。此時,數據會按照連接鍵進行重分區。- 構建哈希表:選擇較小的表(通常是內表),在每個節點上對該表進行哈希,構建哈希表。哈希表存儲連接鍵及其對應的記錄。
- 匹配查找:然后,在同一個節點上掃描較大的表(外表),對于每一條記錄,使用相同的連接鍵查找哈希表中的匹配項。如果匹配,則生成結果。
執行性能:
- 高效:相比傳統的嵌套循環連接(
NLJ
),哈希連接通常在處理大數據集時更為高效,特別是當連接條件是等值連接時。Spark 選擇 SHJ 的情況:
- 外表大小至少是內表的3倍且內表的數據分片平均大小要小于廣播變量閾值,Spark 會選擇 Shuffle Hash Join。
4. BNLJ(Broadcast Nested Loop Join)廣播嵌套循環連接
工作原理:
- 廣播嵌套循環連接是嵌套循環連接的一種優化形式,針對連接的一個表較小的情況。它首先將較小的表(通常是內表)廣播到所有執行節點,然后對大表(通常是外表)進行掃描。在每個節點上,將小表加載到內存中,并在每個分區上與外表進行連接。
執行性能:
- 高效:相比于傳統的嵌套循環連接(
Nested Loop Join
),廣播嵌套循環連接的效率較高,因為它通過將小表廣播到每個節點,避免了全局的shuffle
操作,減少了數據傳輸的延遲。- 適合當一個表非常小(例如,
broadcast()
小表時)時,執行性能特別好。Spark 選擇 BNLJ 的情況:
- Spark 會自動選擇 Broadcast Nested Loop Join,當數據集中的一個表較小(可以放入內存)時,Spark 會選擇該表進行廣播,從而提高連接操作的性能。通常,Spark
會根據表的大小和內存限制來決定是否使用廣播join
。5. BHJ(Broadcast Hash Join)廣播哈希連接
工作原理:
- 廣播哈希連接通過將一個小表廣播到所有執行節點,從而避免了全局的
shuffle
操作。大的數據集會被分配到多個節點,而小的數據集會被廣播到每個節點。- 這種方式非常高效,適用于連接一個大表和一個小表的情況。
執行性能:
- 效率非常高:適用于大表和小表連接,避免了大規模的
shuffle
操作。- 適合當一個表非常小(例如,
broadcast()
小表時)時,執行性能特別好。Spark 選擇 BHJ 的情況:
- 如果其中一個表很小,Spark 會選擇
BHJ
,因為將小表廣播到所有節點可以大大減少shuffle
的開銷。Spark 如何選擇
Join
策略?1. 等值 Join
在等值數據關聯中,Spark 會嘗試按照以下順序選擇最優的連接策略:
- BHJ(Broadcast Hash Join)
- SMJ(Shuffle Sort Merge Join)
- SHJ(Shuffle Hash Join)
適用場景:
- BHJ(Broadcast Hash Join): 連接類型不能是全連接(Full Outer Join),基表需要足夠小,能夠放入內存并通過廣播發送到所有節點。
- SMJ(Shuffle Sort Merge Join)與 SHJ(Shuffle Hash Join):支持所有連接類型,如Full Outer Join,Anti join
為什么SHJ比SMJ執行效率高,排名卻不如SMJ靠前
- 相比 SHJ,Spark優先選擇SMJ的原因在于,SMJ的實現方式更加穩定,更不容易OOM
- 在 Spark 中,SHJ(Shuffle Hash Join) 策略要想被選中,需要滿足以下兩個先決條件:
- a. 外表大小至少是內表的 3 倍:只有當內外表的尺寸懸殊到一定程度時,SHJ 的性能優勢才會明顯超過 SMJ。
- b. 內表的數據分片平均大小要小于廣播變量閾值:內表的數據分片必須足夠小,以便能夠通過廣播傳遞到各個節點,而不引起內存溢出或性能問題。
- 相比 SHJ,SMJ沒有這么多的附加條件,無論是單表排序,還是兩表做歸并關聯,都可以借助磁盤來完成。內存中放不下的數據,可以臨時溢出到磁盤
2. 非等值 Join
- 在非等值數據關聯中,Spark可選的Join策略只有BNLJ(Broadcast Nested Loop Join)和CPJ(Cartesion Product Join),BNLJ適合內表滿足廣播情況,否則只能用CPJ兜底