本套技術專欄是作者(秦凱新)平時工作的總結和升華,通過從真實商業環境抽取案例進行總結和分享,并給出商業應用的調優建議和集群環境容量規劃等內容,請持續關注本套博客。版權聲明:禁止轉載,歡迎學習。QQ郵箱地址:1120746959@qq.com,如有任何商業交流,可隨時聯系。
1 Spark SQL 堅實后盾DataFrame
- DataFrame是一個分布式數據容器,更像傳統數據庫的二維表格,除了數據以外,還掌握數據的結構信息,即schema。同時,與Hive類似,DataFrame也支持嵌套數據類型(struct、array和map)。
- JSON schema自動推導
- Hive風格分區表自動識別
- 充分利用RCFile、ORC、Parquet等列式存儲格式的優勢,僅掃描查詢真正涉及的列,忽略其余列的數據。
- 聚合統計函數支持
2 Spark SQL 源碼包結構(溯本逐源)
主要分為4類:
- core模塊:處理數據的輸入輸出,比如:把不同數據源(RDD,json,Parquet等)獲取到數據,并將查詢結果輸出到DataFrame。
- catalyst模塊:處理SQL語句的整個過程,包括解析,綁定,優化,物理計劃等查詢優化。
- hive模塊:對hive數據進行處理。
- hive-ThriftServer:提供CLI以及JDBC和ODBC接口。
3 Spark SQL catalyst模塊設計思路
(詳細請參看我的SparkSQL源碼解析內容)
catalyst主要組件有
- sqlParse => sql語句的語法解析
- Analyzer => 將不同來源的Unresolved Logical Plan和元數據(如hive metastore、Schema catalog)進行綁定,生成resolved Logical Plan
- optimizer => 根據OptimizationRules,對resolvedLogicalPlan進行合并、列裁剪、過濾器下推等優化作業而轉換成optimized Logical Plan
- Planner => LogicalPlan轉換成PhysicalPlan
- CostModel => 根據過去的性能統計數據,選擇最佳的物理執行計劃
4 Hash Join的衍生(劍走偏鋒)
4.1 Hash join 設計思路剖析(總領全局)
- 第一步:一般情況下,streamIter為大表,buildIter為小表,不用關心哪個表為streamIter,哪個表為buildIter,這個spark會根據join語句自動幫我們完成。
- 第二步:根據buildIter Table的join key構建Hash Table,把每一行記錄都存進HashTable,位于內存中。
- 第三步:掃描streamIter Table 每一行數據,使用相同的hash函數匹配 Hash Table中的記錄,匹配成功之后再檢查join key 是否相等,最后join在一起
- 總結 : hash join 只掃描兩表一次,可以認為運算復雜度為o(a+b),效率非常高。笛卡爾集運算復雜度為a*b。另外,構建的Hash Table最好能全部加載在內存,效率最高,這就決定了hash join算法只適合至少一個小表的join場景,對于兩個大表的join場景并不適用。
4.2 broadcast Hash join 設計思路剖析(大表join極小表)
-
第一步:一般情況下,streamIter為大表,buildIter為小表,不用關心哪個表為streamIter,哪個表為buildIter,這個spark會根據join語句自動幫我們完成。
-
第二步: 先把小表廣播到所有大表分區所在節點,然后根據buildIter Table的join key構建Hash Table,把每一行記錄都存進HashTable
-
第三步:掃描streamIter Table 每一行數據,使用相同的hash函數匹配 Hash Table中的記錄,匹配成功之后再檢查join key 是否相等,最后join在一起
-
總結 : hash join 只掃描兩表一次,可以認為運算復雜度為o(a+b)。
-
調優
1 buildIter總體估計大小超過spark.sql.autoBroadcastJoinThreshold設定的值,即不滿足broadcast join條件2 開啟嘗試使用hash join的開關,spark.sql.join.preferSortMergeJoin=false3 每個分區的平均大小不超過spark.sql.autoBroadcastJoinThreshold設定的值,即shuffle read階段每個分區來自buildIter的記錄要能放到內存中4 streamIter的大小是buildIter三倍以上 復制代碼
4.2 shuffle Hash join 設計思路剖析(大表join小表)
- 第一步:一般情況下,streamIter為大表,buildIter為小表,不用關心哪個表為streamIter,哪個表為buildIter,這個spark會根據join語句自動幫我們完成。
- 第二步: 將具有相同性質的(如Hash值相同)join key 進行Shuffle到同一個分區。
- 第三步:先把小表廣播到所有大表分區所在節點,然后根據buildIter Table的join key構建Hash Table,把每一行記錄都存進HashTable
- 第四步:掃描streamIter Table 每一行數據,使用相同的hash函數匹配 Hash Table中的記錄,匹配成功之后再檢查join key 是否相等,最后join在一起
5 Sort Merge join (橫行無敵)(大表join大表)
- 第一步:一般情況下,streamIter為大表,buildIter為小表,不用關心哪個表為streamIter,哪個表為buildIter,這個spark會根據join語句自動幫我們完成。
- 第二步: 將具有相同性質的(如Hash值相同)join key 進行Shuffle到同一個分區。
- 第三步: 對streamIter 和 buildIter在shuffle read過程中先排序,join匹配時按順序查找,匹配結束后不必重頭開始,利用shuffle sort特性,查找性能解決了大表對大表的情形。
6 Spark Join 類型詳解
6.0 準備數據集( Justin => 左表有,Rose =>右表有)
學習 Python中單引號,雙引號,3個單引號及3個雙引號的區別請參考:https://blog.csdn.net/woainishifu/article/details/76105667from pyspark.sql.types import * >>> rdd1 = sc.parallelize([(1,'Alice', 18),(2,'Andy', 19),(3,'Bob', 17),(4,'Justin', 21),(5,'Cindy', 20)]
park.createDataFrame(rdd, schema)
df.show()>>> schema = StructType([ StructField("id", IntegerType(), True), StructField("name", StringType(), True), StructField("age", IntegerType(), True) ])
>>> df = spark.createDataFrame(rdd, schema)
>>> df.show()+---+------+---+
| id| name|age|
+---+------+---+
| 1| Alice| 18|
| 2| Andy| 19|
| 3| Bob| 17|
| 4|Justin| 21|
| 5| Cindy| 20|
+---+------+---+>>> rdd2 = sc.parallelize([('Alice', 160),('Andy', 159),('Bob', 170),('Cindy', 165),('Rose', 160)])
show()>>> schema2 = StructType([ StructField("name", StringType(), True), StructField("height", IntegerType(), True) ])
>>> df2 = spark.createDataFrame(rdd2, schema2)
>>> df2.show()
+-----+------+
| name|height|
+-----+------+
|Alice| 160|
| Andy| 159|
| Bob| 170|
|Cindy| 165|
| Rose| 160|
+-----+------+
復制代碼
6.1 inner join
-
inner join是一定要找到左右表中滿足join key 條件的記錄,join key都存在的情形。
df.join(df2, "name", "inner").select("id", df.name, "age", "height").orderBy("id").show()df.join(df3, ["id", "name"], "inner").select(df.id, df.name,"age", "height").orderBy(df.id).show()df.join(df3, ["id", "name"], "inner").select(df.id, df['name'],"age", "height").orderBy(df.id).show()>>> df.join(df2, "name", "inner").select("id", df.name, "age", "height").orderBy("id").show()+---+-----+---+------+| id| name|age|height|+---+-----+---+------+| 1|Alice| 18| 160|| 2| Andy| 19| 159|| 3| Bob| 17| 170|| 5|Cindy| 20| 165|+---+-----+---+------+ 復制代碼
6.2 left outer join
-
left outer join是以左表為準,在右表中查找匹配的記錄,如果查找失敗,左表行Row不變,右表一行Row中所有字段都為null的記錄。
-
要求:左表是streamIter,右表是buildIter
df.join(df2, "name", "left").select("id", df.name, "age", "height").orderBy("id").show()>>> df.join(df2, "name", "left").select("id", "name", "age", "height").orderBy("id").show()+---+------+---+------+| id| name|age|height|+---+------+---+------+| 1| Alice| 18| 160|| 2| Andy| 19| 159|| 3| Bob| 17| 170|| 4|Justin| 21| null|| 5| Cindy| 20| 165|+---+------+---+------+ 復制代碼
6.3 right outer join
-
right outer join是以右表為準,在左表中查找匹配的記錄,如果查找失敗,右表行Row不變,左表一行Row中所有字段都為null的記錄。
-
要求:右表是streamIter,左表是buildIter
df.join(df2, "name", "right").select("id", df2.name, "age", "height").orderBy("id").show()>>> df.join(df2, "name", "right").select("id", "name", "age", "height").orderBy("id").show()+----+-----+----+------+| id| name| age|height|+----+-----+----+------+|null| Rose|null| 160|| 1|Alice| 18| 160|| 2| Andy| 19| 159|| 3| Bob| 17| 170|| 5|Cindy| 20| 165|+----+-----+----+------+ 復制代碼
6.4 full outer join
-
full outer join僅采用sort merge join實現,左邊和右表既要作為streamIter,又要作為buildIter
-
左表和右表已經排好序,首先分別順序取出左表和右表中的一條記錄,比較key,如果key相等,則joinrowA和rowB,并將rowA和rowB分別更新到左表和右表的下一條記錄。
-
如果keyA<keyB,說明右表中沒有與左表rowA對應的記錄,那么joinrowA與nullRow。
-
將rowA更新到左表的下一條記錄;如果keyA>keyB,則說明左表中沒有與右表rowB對應的記錄,那么joinnullRow與rowB。
-
將rowB更新到右表的下一條記錄。如此循環遍歷直到左表和右表的記錄全部處理完。
>>> df.join(df2, "name", "outer").select("id", "name", "age", "height").orderBy("id").show()+----+------+----+------+| id| name| age|height|+----+------+----+------+|null| Rose|null| 160|| 1| Alice| 18| 160|| 2| Andy| 19| 159|| 3| Bob| 17| 170|| 4|Justin| 21| null|| 5| Cindy| 20| 165|+----+------+----+------+ 復制代碼
6.5 left semi join
left semi join是以左表為準,在右表中查找匹配的記錄,如果查找成功,則僅返回左表Row的記錄,否則返回null。
6.6 left anti join
left anti join與left semi join相反,是以左表為準,在右表中查找匹配的記錄,如果查找成功,則返回null,否則僅返回左邊的記錄
6.6 row_number().over()
from pyspark.sql.types import *
from pyspark.sql import Window
from pyspark.sql.functions import *
rdd = sc.parallelize([(1,'Alice', 18),(2,'Andy', 19),(3,'Bob', 17),(1,'Justin', 21),(1,'Cindy', 20)])
schema = StructType([StructField("id", IntegerType(), True),StructField("name", StringType(), True),StructField("age", IntegerType(), True)
])df = spark.createDataFrame(rdd, schema)
df.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy("age"))).show()+---+------+---+---+| id| name|age| rn|+---+------+---+---+| 1| Alice| 18| 1|| 1| Cindy| 20| 2|| 1|Justin| 21| 3|| 3| Bob| 17| 1|| 2| Andy| 19| 1|+---+------+---+---+df.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy("age"))).orderBy("age").show()+---+------+---+---+| id| name|age| rn|+---+------+---+---+| 3| Bob| 17| 1|| 1| Alice| 18| 1|| 2| Andy| 19| 1|| 1| Cindy| 20| 2|| 1|Justin| 21| 3|+---+------+---+---+
復制代碼
7 結語
一直想深入挖掘一下SparkSQL內部join原理,終于有時間詳細的理一下 Shuffle Join 。作者還準備進一步研究Spark SQL 內核原理,敬請期待我的Spark SQL源碼剖析系列。大數據商業實戰社區微信公眾號即將開啟,敬請關注,謝謝!