SparkSQL 之 Shuffle Join 內核原理及應用深度剖析-Spark商業源碼實戰

本套技術專欄是作者(秦凱新)平時工作的總結和升華,通過從真實商業環境抽取案例進行總結和分享,并給出商業應用的調優建議和集群環境容量規劃等內容,請持續關注本套博客。版權聲明:禁止轉載,歡迎學習。QQ郵箱地址:1120746959@qq.com,如有任何商業交流,可隨時聯系。

1 Spark SQL 堅實后盾DataFrame

  • DataFrame是一個分布式數據容器,更像傳統數據庫的二維表格,除了數據以外,還掌握數據的結構信息,即schema。同時,與Hive類似,DataFrame也支持嵌套數據類型(struct、array和map)。
  • JSON schema自動推導
  • Hive風格分區表自動識別
  • 充分利用RCFile、ORC、Parquet等列式存儲格式的優勢,僅掃描查詢真正涉及的列,忽略其余列的數據。
  • 聚合統計函數支持

2 Spark SQL 源碼包結構(溯本逐源)

主要分為4類:

  • core模塊:處理數據的輸入輸出,比如:把不同數據源(RDD,json,Parquet等)獲取到數據,并將查詢結果輸出到DataFrame。
  • catalyst模塊:處理SQL語句的整個過程,包括解析,綁定,優化,物理計劃等查詢優化。
  • hive模塊:對hive數據進行處理。
  • hive-ThriftServer:提供CLI以及JDBC和ODBC接口。

3 Spark SQL catalyst模塊設計思路

(詳細請參看我的SparkSQL源碼解析內容)

catalyst主要組件有

  • sqlParse => sql語句的語法解析
  • Analyzer => 將不同來源的Unresolved Logical Plan和元數據(如hive metastore、Schema catalog)進行綁定,生成resolved Logical Plan
  • optimizer => 根據OptimizationRules,對resolvedLogicalPlan進行合并、列裁剪、過濾器下推等優化作業而轉換成optimized Logical Plan
  • Planner => LogicalPlan轉換成PhysicalPlan
  • CostModel => 根據過去的性能統計數據,選擇最佳的物理執行計劃

4 Hash Join的衍生(劍走偏鋒)

4.1 Hash join 設計思路剖析(總領全局)

  • 第一步:一般情況下,streamIter為大表,buildIter為小表,不用關心哪個表為streamIter,哪個表為buildIter,這個spark會根據join語句自動幫我們完成。
  • 第二步:根據buildIter Table的join key構建Hash Table,把每一行記錄都存進HashTable,位于內存中。
  • 第三步:掃描streamIter Table 每一行數據,使用相同的hash函數匹配 Hash Table中的記錄,匹配成功之后再檢查join key 是否相等,最后join在一起
  • 總結 : hash join 只掃描兩表一次,可以認為運算復雜度為o(a+b),效率非常高。笛卡爾集運算復雜度為a*b。另外,構建的Hash Table最好能全部加載在內存,效率最高,這就決定了hash join算法只適合至少一個小表的join場景,對于兩個大表的join場景并不適用。

4.2 broadcast Hash join 設計思路剖析(大表join極小表)

  • 第一步:一般情況下,streamIter為大表,buildIter為小表,不用關心哪個表為streamIter,哪個表為buildIter,這個spark會根據join語句自動幫我們完成。

  • 第二步: 先把小表廣播到所有大表分區所在節點,然后根據buildIter Table的join key構建Hash Table,把每一行記錄都存進HashTable

  • 第三步:掃描streamIter Table 每一行數據,使用相同的hash函數匹配 Hash Table中的記錄,匹配成功之后再檢查join key 是否相等,最后join在一起

  • 總結 : hash join 只掃描兩表一次,可以認為運算復雜度為o(a+b)。

  • 調優

     1 buildIter總體估計大小超過spark.sql.autoBroadcastJoinThreshold設定的值,即不滿足broadcast join條件2 開啟嘗試使用hash join的開關,spark.sql.join.preferSortMergeJoin=false3 每個分區的平均大小不超過spark.sql.autoBroadcastJoinThreshold設定的值,即shuffle read階段每個分區來自buildIter的記錄要能放到內存中4 streamIter的大小是buildIter三倍以上
    復制代碼

4.2 shuffle Hash join 設計思路剖析(大表join小表)

  • 第一步:一般情況下,streamIter為大表,buildIter為小表,不用關心哪個表為streamIter,哪個表為buildIter,這個spark會根據join語句自動幫我們完成。
  • 第二步: 將具有相同性質的(如Hash值相同)join key 進行Shuffle到同一個分區。
  • 第三步:先把小表廣播到所有大表分區所在節點,然后根據buildIter Table的join key構建Hash Table,把每一行記錄都存進HashTable
  • 第四步:掃描streamIter Table 每一行數據,使用相同的hash函數匹配 Hash Table中的記錄,匹配成功之后再檢查join key 是否相等,最后join在一起

5 Sort Merge join (橫行無敵)(大表join大表)

  • 第一步:一般情況下,streamIter為大表,buildIter為小表,不用關心哪個表為streamIter,哪個表為buildIter,這個spark會根據join語句自動幫我們完成。
  • 第二步: 將具有相同性質的(如Hash值相同)join key 進行Shuffle到同一個分區。
  • 第三步: 對streamIter 和 buildIter在shuffle read過程中先排序,join匹配時按順序查找,匹配結束后不必重頭開始,利用shuffle sort特性,查找性能解決了大表對大表的情形。

6 Spark Join 類型詳解

6.0 準備數據集( Justin => 左表有,Rose =>右表有)

學習 Python中單引號,雙引號,3個單引號及3個雙引號的區別請參考:https://blog.csdn.net/woainishifu/article/details/76105667from pyspark.sql.types import * >>> rdd1 = sc.parallelize([(1,'Alice', 18),(2,'Andy', 19),(3,'Bob', 17),(4,'Justin', 21),(5,'Cindy', 20)]
park.createDataFrame(rdd, schema)
df.show()>>> schema = StructType([ StructField("id", IntegerType(), True), StructField("name", StringType(), True), StructField("age", IntegerType(), True) ]) 
>>> df = spark.createDataFrame(rdd, schema)
>>> df.show()+---+------+---+
| id|  name|age|
+---+------+---+
|  1| Alice| 18|
|  2|  Andy| 19|
|  3|   Bob| 17|
|  4|Justin| 21|
|  5| Cindy| 20|
+---+------+---+>>> rdd2 = sc.parallelize([('Alice', 160),('Andy', 159),('Bob', 170),('Cindy', 165),('Rose', 160)]) 
show()>>> schema2 = StructType([ StructField("name", StringType(), True), StructField("height", IntegerType(), True) ]) 
>>> df2 = spark.createDataFrame(rdd2, schema2) 
>>> df2.show()
+-----+------+
| name|height|
+-----+------+
|Alice|   160|
| Andy|   159|
|  Bob|   170|
|Cindy|   165|
| Rose|   160|
+-----+------+
復制代碼

6.1 inner join

  • inner join是一定要找到左右表中滿足join key 條件的記錄,join key都存在的情形。

      df.join(df2, "name", "inner").select("id", df.name, "age", "height").orderBy("id").show()df.join(df3, ["id", "name"], "inner").select(df.id, df.name,"age", "height").orderBy(df.id).show()df.join(df3, ["id", "name"], "inner").select(df.id, df['name'],"age", "height").orderBy(df.id).show()>>> df.join(df2, "name", "inner").select("id", df.name, "age", "height").orderBy("id").show()+---+-----+---+------+| id| name|age|height|+---+-----+---+------+|  1|Alice| 18|   160||  2| Andy| 19|   159||  3|  Bob| 17|   170||  5|Cindy| 20|   165|+---+-----+---+------+   
    復制代碼

6.2 left outer join

  • left outer join是以左表為準,在右表中查找匹配的記錄,如果查找失敗,左表行Row不變,右表一行Row中所有字段都為null的記錄。

  • 要求:左表是streamIter,右表是buildIter

          df.join(df2, "name", "left").select("id", df.name, "age", "height").orderBy("id").show()>>> df.join(df2, "name", "left").select("id", "name", "age", "height").orderBy("id").show()+---+------+---+------+| id|  name|age|height|+---+------+---+------+|  1| Alice| 18|   160||  2|  Andy| 19|   159||  3|   Bob| 17|   170||  4|Justin| 21|  null||  5| Cindy| 20|   165|+---+------+---+------+
    復制代碼

6.3 right outer join

  • right outer join是以右表為準,在左表中查找匹配的記錄,如果查找失敗,右表行Row不變,左表一行Row中所有字段都為null的記錄。

  • 要求:右表是streamIter,左表是buildIter

          df.join(df2, "name", "right").select("id", df2.name, "age", "height").orderBy("id").show()>>> df.join(df2, "name", "right").select("id", "name", "age", "height").orderBy("id").show()+----+-----+----+------+|  id| name| age|height|+----+-----+----+------+|null| Rose|null|   160||   1|Alice|  18|   160||   2| Andy|  19|   159||   3|  Bob|  17|   170||   5|Cindy|  20|   165|+----+-----+----+------+
    復制代碼

6.4 full outer join

  • full outer join僅采用sort merge join實現,左邊和右表既要作為streamIter,又要作為buildIter

  • 左表和右表已經排好序,首先分別順序取出左表和右表中的一條記錄,比較key,如果key相等,則joinrowA和rowB,并將rowA和rowB分別更新到左表和右表的下一條記錄。

  • 如果keyA<keyB,說明右表中沒有與左表rowA對應的記錄,那么joinrowA與nullRow。

  • 將rowA更新到左表的下一條記錄;如果keyA>keyB,則說明左表中沒有與右表rowB對應的記錄,那么joinnullRow與rowB。

  • 將rowB更新到右表的下一條記錄。如此循環遍歷直到左表和右表的記錄全部處理完。

      >>> df.join(df2, "name", "outer").select("id", "name", "age", "height").orderBy("id").show()+----+------+----+------+|  id|  name| age|height|+----+------+----+------+|null|  Rose|null|   160||   1| Alice|  18|   160||   2|  Andy|  19|   159||   3|   Bob|  17|   170||   4|Justin|  21|  null||   5| Cindy|  20|   165|+----+------+----+------+
    復制代碼

6.5 left semi join

left semi join是以左表為準,在右表中查找匹配的記錄,如果查找成功,則僅返回左表Row的記錄,否則返回null。

6.6 left anti join

left anti join與left semi join相反,是以左表為準,在右表中查找匹配的記錄,如果查找成功,則返回null,否則僅返回左邊的記錄

6.6 row_number().over()

from pyspark.sql.types import *
from pyspark.sql import Window
from pyspark.sql.functions import *
rdd = sc.parallelize([(1,'Alice', 18),(2,'Andy', 19),(3,'Bob', 17),(1,'Justin', 21),(1,'Cindy', 20)])
schema = StructType([StructField("id", IntegerType(), True),StructField("name", StringType(), True),StructField("age", IntegerType(), True)
])df = spark.createDataFrame(rdd, schema)
df.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy("age"))).show()+---+------+---+---+| id|  name|age| rn|+---+------+---+---+|  1| Alice| 18|  1||  1| Cindy| 20|  2||  1|Justin| 21|  3||  3|   Bob| 17|  1||  2|  Andy| 19|  1|+---+------+---+---+df.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy("age"))).orderBy("age").show()+---+------+---+---+| id|  name|age| rn|+---+------+---+---+|  3|   Bob| 17|  1||  1| Alice| 18|  1||  2|  Andy| 19|  1||  1| Cindy| 20|  2||  1|Justin| 21|  3|+---+------+---+---+
復制代碼

7 結語

一直想深入挖掘一下SparkSQL內部join原理,終于有時間詳細的理一下 Shuffle Join 。作者還準備進一步研究Spark SQL 內核原理,敬請期待我的Spark SQL源碼剖析系列。大數據商業實戰社區微信公眾號即將開啟,敬請關注,謝謝!

秦凱新 于深圳 201811200130

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/276878.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/276878.shtml
英文地址,請注明出處:http://en.pswp.cn/news/276878.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Python標準庫之csv(1)

1.Python處理csv文件之csv.writer() import csvdef csv_write(path,data):with open(path,w,encodingutf-8,newline) as f:writer csv.writer(f,dialectexcel)for row in data:writer.writerow(row)return True 調用上面的函數 data [[Name,Height],[Keys,176cm],[HongPing,1…

python自動化測試腳本可以測php嗎_請對比分析一下php的自動化測試與python的自動化測試...

Unit Level Test: Python: doctest, nose PHP: PHPUnit Behaviour Driven Test (Cucumber-style): Python: Lettuce, Behave PHP: Behat Behaviour Driven Test (Spec-style): Python: spec PHP: PHPSpec Acceptance Test (Selenium): Python: 有官方的Selenium binding。 PHP:…

簡單易變的CSS陰影效果

厭倦了在圖片處理軟件上給每張圖片加上邊框修飾&#xff1f;讓CSS幫你一把吧&#xff01;嘿嘿&#xff0c;看看下面的幾張效果圖&#xff0c;邊框都不是用圖片做的&#xff0c;很方便吧&#xff1f; 文字塊的應用效果 NARROW This is the text that goes in the middle. MEDIUM…

我用代碼來給你們分析一個賺錢的技巧

2019獨角獸企業重金招聘Python工程師標準>>> 賺錢是個俗氣的話題&#xff0c;但又是人人都繞不開的事情。我今天來“科學”地觸碰下這個話題。 談賺錢&#xff0c;就會談到理財、投資&#xff0c;談到炒股。有這樣一個笑話&#xff1a; 問&#xff1a;如何成為百萬富…

idea中自動deployment的步驟

轉載于:https://www.cnblogs.com/littlehb/p/11322666.html

python怎么編輯文件_如何使用python中的方法對文件進行修改文件名

在使用python語言中的方法操作文件時&#xff0c;打開方法可以直接使用open&#xff0c;但是對文件重命名需要調用os模塊中的方法&#xff0c;刪除文件也是工具/原料 python 編輯器 截圖工具 臺式機 方法/步驟 1 進入到python安裝文件目錄&#xff0c;新建txt文件kou.txt2 打開…

球迷必備Euro Cup Mobile 2008 !-dopod touch diamond試用之歐洲杯

歐洲杯從6月8日開始&#xff0c;到現在已經進行了半個多月了。到今天為止已經進入到了尾聲&#xff0c;也到了激戰正酣的時刻&#xff01;(相信在國足出線無望后大伙的目光都聚集到了歐洲杯上) 但是平時上班忙&#xff0c;晚上也沒法熬夜看球&#xff0c;哥們心理著急呀。白天上…

【工具】switchhost

1.前提 主要功能切換host 2.下載路徑 https://oldj.github.io/SwitchHosts/ 3.使用略&#xff08;太簡單&#xff09;轉載于:https://www.cnblogs.com/totoro-cat/p/9987101.html

C# ?. 判斷Null值

有一句代碼&#xff1a; Html.DisplayFor(modelItem > item.SellDate, "RegularDate") RegularDate.cshtml 內容如下&#xff1a; model System.DateTime Model.ToString("yyyy/MM/dd") 目的是將數據庫里的 DateTime 顯示為完整日期&#xff0c;如 2019…

MOSS站點的FORM認證修改小結

項目中&#xff0c;將moss站點修改成form認證的方法&#xff0c;園子里面已經很多了&#xff0c;我就不再重提&#xff0c;其中有1點有些文章沒有提及&#xff0c;但是實際操作中又是比較重要的&#xff1a;在管理中心的web.config中添加roleManager之后&#xff0c;一定要將ht…

python中意外縮進是什么意思_如何處理python中的“意外縮進”?

慕工程0101907 Python在行的開頭使用間距來確定代碼塊何時開始和結束。你可以得到的錯誤是&#xff1a;意外的縮進。這行代碼在開始時比前一行有更多空格&#xff0c;但前一行不是子塊的開頭&#xff08;例如if / while / for語句&#xff09;。塊中的所有代碼行必須以完全相同…

HDU 1042 N!(高精度階乘、大數乘法)

N! Time Limit: 10000/5000 MS (Java/Others) Memory Limit: 262144/262144 K (Java/Others)Total Submission(s): 100274 Accepted Submission(s): 30006 Problem Description Given an integer N(0 ≤ N ≤ 10000), your task is to calculate N!Input One N in one li…

設計模式學習筆記九:原型模式(Prototype Pattern)

1&#xff0e;概述 意圖&#xff1a;我們將已經存在的對象作為原型&#xff0c;用戶可以通過復制這些原型創建新的對象。 使用場合&#xff1a;當一個系統應該獨立于產品的創建、構造和表示時&#xff0c;可以使用原型模式。在原型模式中&#xff0c;產品的創建和初始化…

Centos7上安裝docker

步驟&#xff1a;1、Docker 要求 CentOS 系統的內核版本高于 3.10 &#xff0c;查看本頁面的前提條件來驗證你的CentOS 版本是否支持 Docker 。通過 uname -r 命令查看你當前的內核版本2、使用 root 權限登錄 Centos。確保 yum 包更新到最新。 &#xff08;這個可能需要幾分鐘的…

pythonista3安裝stash_Pythonista下stash安裝教程

前言 “StaSh is a serious attempt to implement a Bash-like shell for Pythonista.” StaSh是一個Pythonista環境下的仿shell程序&#xff0c;Sta來自于Pythonista的后三個字母&#xff0c;Sh即shell縮寫。除了能完成shell的基本功能外&#xff0c;最主要的功能還有實現pip安…

通過java類的反射機制獲取類的屬性類型

import java.lang.reflect.Field;import java.lang.reflect.Method; Class<?> clsClass.forName(className);//通過類的名稱反射類//Class<?> cls Object.getClass();Field field cls.getDeclaredField("name");//根據屬性名稱獲取單個屬性if (field…

建立合理的索引提高SQL Server的性能

在應用系統中,尤其在聯機事務處理系統中,對數據查詢及處理速度已成為衡量應用系統成敗的標準。而采用索引來加快數據處理速度也成為廣大數據庫用戶所接受的優化方法。 在良好的數據庫設計基礎上&#xff0c;能有效地使用索引是SQL Server取得高性能的基礎&#xff0c;SQL Serv…

c++ map用法_Python的 5 種高級用法,效率提升沒毛病

原創&#xff1a;機器之心(ID&#xff1a;almosthuman2014)任何編程語言的高級特征通常都是通過大量的使用經驗才發現的。比如你在編寫一個復雜的項目&#xff0c;并在 stackoverflow 上尋找某個問題的答案。然后你突然發現了一個非常優雅的解決方案&#xff0c;它使用了你從不…

非對稱加密算法RSA加密傳輸數據python3源代碼實現

2019獨角獸企業重金招聘Python工程師標準>>> import rsa# RSA 算法規定&#xff1a; # 待加密的字節數不能超過密鑰的長度值除以 8 再減去 11NBIT 4096 CAN_ENCODE_LEN NBIT // 8 - 11 PER_ENCODE_LEN CAN_ENCODE_LEN - (CAN_ENCODE_LEN % 2) PER_DECODE_LEN CA…

(Microsoft) Visual Studio LightSwitch

在藍色小鋪&#xff0c;聽到了前輩 阿源哥哥提到 Visual Studio LightSwitch"號稱" 可以快速開發桌面、云端的應用程序。http://www.microsoft.com/visualstudio/en-us/lightswitch &#xff08;這里也提供下載&#xff09; 原廠提供的圖片&#xff1a; 跟「正…