參考文檔及示例代碼均基于pyspark==3.1.2
- 1.什么是RDD?
- 2.job、stage、task如何劃分?
- 3.什么是寬窄依賴?
- 4.spark有哪幾種部署模式?
- 5.spark中的算子分為哪些類型,舉例說明。
- 6.cache、persist、checkpoint的區別,及各自的使用場景?
- 7.廣播變量與累加器。
- 8.reduceByKey與groupByKey的區別?
- 9.spark數據傾斜及通用調優。
- 10.map與flatMap區別?
- 11.spark中的shuffle有哪幾種方式?
1.什么是RDD?
RDD,彈性分布式數據集(Resilient Distributed Datasets),即一個分布于多個節點機器上的數據集合。為開發人員提供編程抽象,具有只讀的特點。這里只讀的意思是,當對RDD中的數據修改時,并不修改原RDD,而是返回一個新的RDD。注意RDD本身并不保存數據,只是定義了一組計算規則。
RDD中的彈性體現在:
1)容錯性:包括基于血緣關系的容錯和自動失敗重試的容錯。
- 血緣關系的容錯:RDD中一個分區的數據丟失,可以通過RDD間的血緣關系重新計算得到該分區的數據。單個節點的故障不影響其他節點的任務處理。
- 自動失敗重試的容錯:包括task失敗重試和stage失敗重試,由spark自動支持。且stage失敗重試時只重試任務失敗的分區,而不是全部計算。
2)計算存儲方面:內存和磁盤空間的自動切換和管理。包括計算過程中RDD的存儲,及持久化時持久化級別的動態管理。
- 計算過程中RDD的存儲:當內存使用完畢時自動溢寫磁盤,使得內存較小時也可以處理大數據量。
- 持久化方面:開發者可以自定義選擇持久化級別,包括持久化內存,持久化磁盤,持久化內存磁盤相結合的方式。
3)計算過程中可動態調整分區(repartition、coalesce)。
2.job、stage、task如何劃分?
job:應用程序中每遇到一個action算子就會劃分為一個job。
stage:一個job任務中從后往前劃分,分區間每產生了shuffle也就是寬依賴則劃分為一個stage,stage的劃分體現了spark的pipeline思想,即數據在內存中盡可能的往后多計算,減少磁盤或者網絡IO。
task:RDD中一個分區對應一個task。
3.什么是寬窄依賴?
根據分區之間是否產生shuffle來確定。
寬依賴:上游一個分區的數據被打散到下游的多個分區,1:N
窄依賴:上游一個分區的數據全部進入到下游的一個分區,可以是1:1,也可以是N:1
4.spark有哪幾種部署模式?
1.Local:本地模式,運行在單個機器,一般用作測試環境。
2.Standalone:一個基于Master+Slaves的資源調度集群。spark任務提交給Master調度管理,是spark自帶的一個調度系統。
3.Yarn:spark客戶端直接連接yarn,不需要額外構建spark集群。有yarn-client和yarn-cluster兩種模式,主要區別在于:driver程序的運行節點。yarn-client時driver運行在本地提交任務的客戶端,yarn-cluster是driver運行在集群中隨機的任一節點。
4.Mesos:比較少用,不了解。
5.K8s:spark后續高版本新增支持。
5.spark中的算子分為哪些類型,舉例說明。
spark中算子類型分為兩類:
1)轉換算子(Transformation):惰性求值,需要action算子進行觸發才會執行。返回一個新的RDD。不負責數據存儲,只是定義了一個計算規則。
- map:對RDD中的每個元素應用規則。
filter:對RDD中的每個元素按規則過濾。
groupByKey:將相同key的數據合并。
glom:將RDD中的每個分區合并為一個列表。
union:合并兩個RDD。
simple:抽樣。
注:關于持久化類算子,也有人叫控制算子(cache、persist、checkpoint),嚴格意義上也屬于轉換算子,需要動作算子才能觸發。2)動作算子(Action):觸發spark任務執行,立即構建DAG有向無環圖,不返回RDD,返回RDD的結果或者沒有返回值。
- collect:以數組形式獲取RDD中所有元素。
count:獲取RDD中元素個數。
first:獲取RDD中的第一個元素,等價于take(1)。
take:通過指定參數n獲取RDD中前n個元素。
top:通過指定參數n獲取RDD中排序后的前n個元素。更多RDD相關API參考官方文檔:https://spark.apache.org/docs/3.1.2/api/python/reference/pyspark.html#rdd-apis
6.cache、persist、checkpoint的區別,及各自的使用場景?
共同點:1)都用來做持久化,避免多個action算子對同一個RDD的重復計算。2)都遵循spark的惰性執行策略,需要通過action算子觸發執行。
區別:
- cache:僅持久化到內存,MEMORY_ONLY級別。等價于persist的默認持久化級別。
- persist:默認持久化到內存(MEMORY_ONLY),但同時支持開發者自定義存儲級別,例如僅磁盤(DISK_ONLY),磁盤內存結合(MEMORY_AND_DISK)。
更多的存儲級別設置及使用場景參考:https://spark.apache.org/docs/3.1.2/rdd-programming-guide.html#rdd-persistence- checkpoint:將數據持久化到節點指定路徑中(sc.setCheckpointDir方法設置),如果執行模式是cluster則檢查點路徑必須為HDFS路徑。該方法與上述兩種方法最大的不同點在于會截斷RDD的血緣關系,而上述兩種方法不會截斷血緣關系,只是起到了緩存數據避免重復計算的作用。checkpoint實際使用中有兩點需要注意:1)checkpoint之前不要觸發RDD的動作算子,否則會截斷血緣關系,導致checkpoint重新計算時找不到血緣鏈條從而保存不到數據。2)checkpoint前最好將需要保存的RDD通過cache或者persist緩存一下,避免RDD的重復計算。
7.廣播變量與累加器。
廣播變量和累加器是spark中提供的兩種共享變量,分別用來解決廣播通信和任務結果匯總的兩種業務場景問題。詳細參考官方文檔:https://spark.apache.org/docs/3.1.2/rdd-programming-guide.html#shared-variables
1)廣播變量
簡而言之,就是在每個集群節點中緩存一份driver端定義的公共變量,且該被廣播的變量在executor中只讀。
當不使用廣播變量的時候,spark任務中需要用到的公共變量會copy到每個task中,這種方式弊端一是重復存儲占用內存資源,二是增加了IO操作。而使用廣播變量,driver端定義的公共變量只會往每個集群中的worker節點中copy一份,由executor中的所有task共享。且該方法的底層實現涉及到了序列化與反序列化以及高效的廣播算法,所以效率比較高。
demo:
from pyspark.sql import SparkSession"""
需求:從rdd中過濾掉singer中歌手的歌曲
"""
spark = SparkSession.builder \.master("local[*]") \.appName("broadcast_demo") \.config("spark.executor.instances", "4") \.config("spark.executor.cores", "2") \.config("spark.executor.memory", "1g") \.getOrCreate()
sc = spark.sparkContextrdd = sc.parallelize([("梁靜茹", "向左轉向右轉"), ("梁靜茹", "親親"), ("王詩安", "Home"), ("李宗盛", "山丘"), ("邵夷貝", "未來俱樂部")], 2)
print(f"過濾前:{rdd.collect()}")singer = ["梁靜茹", "王詩安"]
# 設置廣播變量并將singer廣播到executor
bc = sc.broadcast(singer)# 根據廣播變量過濾并輸出過濾結果
rdd_filter = rdd.filter(lambda x: x[0] not in bc.value)
print(f"過濾后:{rdd_filter.collect()}")sc.stop()
spark.stop()
2)累加器
累加器,簡要的概括,是一種分布式共享只寫變量。在driver端定義,并被序列化到每個executor中,在使用時被反序列化。所有executor中的task持有一個累加器的副本進行累加操作。并將結果回傳給driver進行匯總。spark原生支持數值型累加器,也支持開發人員自定義累計器類型。
demo:
from pyspark.sql import SparkSession"""
需求:統計rdd中屬于singer中歌手的歌曲數量
"""
spark = SparkSession.builder \.master("local[*]") \.appName("accumulator_demo") \.config("spark.executor.instances", "4") \.config("spark.executor.cores", "2") \.config("spark.executor.memory", "1g") \.getOrCreate()
sc = spark.sparkContextrdd = sc.parallelize([("梁靜茹", "向左轉向右轉"), ("梁靜茹", "親親"), ("王詩安", "Home"), ("李宗盛", "山丘"), ("邵夷貝", "未來俱樂部")], 2)
singer = ["梁靜茹", "王詩安"]# 初始化一個初值為0的累加器
acc = sc.accumulator(0)# 定義map函數,統計屬于singer的歌曲數量
def map_fun(x, s):if x[0] in s:acc.add(1)# 使用collect算子觸發執行map函數并輸出結果
rdd.map(lambda x: map_fun(x, singer)).collect()
print(f"屬于singer的歌曲數量:{acc.value}")sc.stop()
spark.stop()
8.reduceByKey與groupByKey的區別?
https://blog.csdn.net/atwdy/article/details/133155108
9.spark數據傾斜及通用調優。
10.map與flatMap區別?
map:對RDD中的每個元素應用規則,并返回一個新的元素。也就是結果RDD的元素數量與原始RDD元素數量相等。
flatMap:對RDD中每個元素應用規則,并返回一個集合,集合中的元素可以為0個或多個。在此基礎之上,再對所有的集合進行flat平鋪操作,可以理解為將各個集合元素合并到一起。
demo:
from pyspark.sql import SparkSessionspark = SparkSession.builder \.master("local[*]") \.appName("demo") \.config("spark.executor.instances", "4") \.config("spark.executor.cores", "2") \.config("spark.executor.memory", "1g") \.getOrCreate()
sc = spark.sparkContextrdd = sc.parallelize([2, 3, 4], 2)
rdd1 = rdd.map(lambda x: range(1, x))
rdd2 = rdd.flatMap(lambda x: range(1, x))print(f"map: {rdd1.collect()}")
print(f"flatMap: {rdd2.collect()}")sc.stop()
spark.stop()
11.spark中的shuffle有哪幾種方式?
兩種。早期的HashShuffle,和后期的SortShuffle。
HashShuffle(后續高版本已被SortShuffle取代):
- 未優化:基于對下游分區個數hash取模實現,下游有多少個分區,上游每個task都會產生多少個小文件,帶來的問題是小文件過多,增大磁盤和網絡IO,拖慢執行效率。同時上游每個task維護了多個小文件緩沖區,增加內存壓力。理論上的小文件個數 = map task數量 x 下游分區數量。
- 優化后:HashShuffle的優化其實就是針對上游task產生的小文件的合并優化。未優化前,每個task維護各自的緩沖區并生成和下游分區數量相等的小文件,優化后,每個executor中屬于同一個的core的task,會產生和下游分區數量相等的小文件并復用同一組小文件。所以理論上的小文件個數 = 上游core個數 x 下游分區數量。
SortShuffle:
- 普通SortShuffle:上游的每個map task會不斷地往磁盤溢寫小文件(溢寫前會進行排序),每次溢寫產生一個小文件,最終將所有屬于同一個task溢寫的小文件merge為一個大文件,并且產生一個索引文件,下游的reduce task根據索引文件去讀取屬于自己分區的數據。即產生的小文件個數 = map task數量 x 2。
- bypass機制:這種機制,可以理解為,在未優化的HashShuffle機制基礎上,對同一個task產生的小文件進行了一個合并的功能,產生一個大文件,同時生成一個索引文件。這種機制相比普通SortShuffle省略了排序的過程。產生的文件個數 = map task數量 x 2。觸發該機制的兩個閾值條件:1)reduce task數量 <
spark.shuffle.sort.bypassMergeThreshold
參數的值,默認為200。2)不是聚合類的shuffle算子。準確來說,不是map端預聚合的算子(eg:reduceByKey,因為為了聚合的高效,通常要求數據有序,而bypass機制并不對數據排序)。
12.spark為什么比MR快?