文章目錄
- 1.什么是Shuffle?
- 2.Shuffle解決什么問題?
- 3.Shuffle Write與Shuffle Read
- 4.Shuffle的計算需求
- 4.1 計算需求表
- 4.2 partitionby
- 4.3 groupByKey
- 4.4 reduceByKey
- 4.5 sortByKey
- 5.Shuffle Write框架設計與實現
- 5.1 Shuffle Write框架實現的功能
- 5.2 Shuffle Write的多種情況
- 5.2.1 不需要combine和sort
- 5.2.1.1 操作流程
- 5.2.1.2 優缺點
- 5.2.1.3 適用性
- 5.2.2 不需要combine,需要sort
- 5.2.2.1 操作流程
- 5.2.2.2 優缺點
- 5.2.2.3 適用性
- 5.2.3 需要combile,需要/不需要sort
- 5.2.3.1 操作流程
- 5.2.3.2 優缺點
- 5.2.3.3 適用性
- 6.Shuffle Read框架設計與實現
- 6.1 Shuffle Read框架實現的功能
- 6.2 Shuffle Read的不同情況
- 6.2.1 不需要combine和sort
- 6.2.1.1 操作流程
- 6.2.1.2 優缺點
- 6.2.1.3 適用性
- 6.2.2 不需要combine,需要sort
- 6.2.2.1 操作流程
- 6.2.2.2 優缺點
- 6.2.2.3 適用性
- 6.2.3 需要combine,需要/不需要sort
- 6.2.3.1 操作流程
- 6.2.3.2 優缺點
- 6.2.3.3 適用性
閱讀本篇文章前,需要閱讀 Spark執行計劃與UI分析
1.什么是Shuffle?
運行在不同stage、不同節點上的task間如何進行數據傳遞。這個數據傳遞過程通常被稱為Shuffle機制。
2.Shuffle解決什么問題?
如果是單純的數據傳遞,則只需要將數據進行分區、通過網絡傳輸即可,沒有太大難度,但Shuffle機制還需要進行各種類型的計算(如聚合、排序),而且數據量一般會很大。如何支持這些不同類型的計算,如何提高Shuffle的性能都是Shuffle機制設計的難點問題。
3.Shuffle Write與Shuffle Read
- Shuffle Write:上游stage預先將輸出數據進行劃分,按照分區存放,分區個數與下游task個數一致,這個過程被稱為"Shuffle Write"。
- Shuffle Read:上游數據按照分區存放完成后,下游的task將屬于自己分區的數據通過網絡傳輸獲取,然后將來自上游不同分區的數據聚合再一起處理,這個過程稱為"Shuffle Read"。
4.Shuffle的計算需求
4.1 計算需求表
所謂計算需求,也就是Shuffle要解決具體算子的哪些計算需求:
這里我來分析幾個例子:
4.2 partitionby
可以看到partitionby操作只進行了數據分區操作,并沒有涉及到數據的聚合和排序操作。
4.3 groupByKey
可以看到groupByKey的操作既需要分區,又需要做聚合,并且在Shuffle Read階段做的聚合。
4.4 reduceByKey
可以看到reduceByKey做了兩步聚合,在Shuffle Write中先執行func聚合一次(由spark內部執行,不生成新的rdd),然后進行分區數據傳輸,最后再在每個分區聚合一次,執行相同的func函數。同時func需要滿足交換律和結合律。兩次聚合(多了Shuffle Write端聚合)的優點是優化Shuffle的性能,一是傳輸的數據量大大減少,二是降低Shuffle Read端的內存消耗。
4.5 sortByKey
分區后,在ShuffleRead端進行排序。sortByKey() 為了保證生成的RDD中的數據是全局有序(按照Key排序) 的, 采用Range劃分來分發數據。 Range劃分可以保證在生成的RDD中, partition 1中的所有record的Key小于(或大于) partition 2中所有的record的Key。
可以看到當前并沒有算子需要在Shuffle Write端進行排序的,但不能保證用戶實現的算子不會在Shuffle Write端進行排序,因此在spark實現Shuffle框架的時候保留了在Shuffle Write端進行排序的功能。
5.Shuffle Write框架設計與實現
5.1 Shuffle Write框架實現的功能
如第四節中的圖所示,每個數據操作只需要其中的一個或兩個功能。Spark為了支持所有的情況,設計了一個通用的Shuffle Write框架,框架的計算順序為“map()輸出→數據聚合→排序→分區”輸出。
map task每計算出一個record及其partitionId,就將record放入類似HashMap的數據結構中進行聚合;聚合完成后,再將HashMap中的數據放入類似Array的數據結構中進行排序,既可按照partitionId,也可以按照partitionId+Key進行排序;最后根據partitionId將數據寫入不同的數據分區中,存放到本地磁盤上。partitionId=Hash(Key)% 下游分區數。
5.2 Shuffle Write的多種情況
5.2.1 不需要combine和sort
這種Shuffle Write方式稱為:BypassMergeSortShuffleWriter
這種情況最簡單,只需要實現分區功能:
5.2.1.1 操作流程
map()依次輸出KV record,并計算其partitionId(PID),Spark根據 partitionId,將record依次輸出到不同的buffer中,每當buffer填滿就將record溢寫到磁盤上的分區文件中。分配buffer的原因是map()輸出record的速度很快,需要進行緩沖來減少磁盤I/O。
5.2.1.2 優缺點
該模式的優點是速度快,直接將record輸出到不同文件中。缺點是資源消耗過高,每個分區都需要有一個buffer(默認大小為32KB,由spark.Shuffle.file.buffer進行控制),當分區數過大時,內存消耗會很高。
5.2.1.3 適用性
適用于Shuffle Write端不需要聚合和排序且分區個數較少(小于spark.Shuffle.sort.bypassMergeThreshold,默認值為200),例如groupBy(100),partitionBy(100),sortByKey(100)。
5.2.2 不需要combine,需要sort
這種Shuffle模式被命名為:SortShuffleWriter(KeyOrdering=true),使用的Array被命名為PartitionedPairBuffer。
5.2.2.1 操作流程
- 這種情況需要使用partitionId+key進行排序,Spark采用的實現方法是建立一個Array:PartitionedPairBuffer,來存放map()輸出的record,并將每個<K,V>record轉化為<(PartitionId,K),V>record,然后按照PartitionId+Key對record進行排序,最后將所有record寫入寫入一個文件中,通過建立索引來標示每個分區。
- 如果Array存放不下,則會先擴容,如果還存放不下,就將Array中的record排序后spill到磁盤上,等待map()輸出完以后,再將Array中的record與磁盤上已排序的record進行全局排序,得到最終有序的record,并寫入文件中。
5.2.2.2 優缺點
- 優點是只需要一個Array結構就可以支持按照partitionId+Key進行排序,Array大小可控,而且具有擴容和spill到磁盤上的功能,支持從小規模到大規模數據的排序。同時,輸出的數據已經按照partitionId進行排序,因此只需要一個分區文件存儲,即可標示不同的分區數據,克服了BypassMergeSortShuffleWriter中建立文件數過多的問題,適用于分區個數很大的情況。缺點是排序增加計算時延。
5.2.2.3 適用性
- map()端不需要聚合(combine)、Key需要排序、分區個數無限制。目前,Spark本身沒有提供這種排序類型的數據操作,但不排除用戶會自定義,或者系統未來會提供這種類型的操作。sortByKey()操作雖然需要按Key進行排序,但這個排序過程在Shuffle Read端完成即可,不需要在Shuffle Write端進行排序。
最后,使用這種Shuffle如何解決BypassMergeSortShuffleWriter存在的buffer分配過多的問題?我們只需要將“按PartitionId+Key排序”改為“只按PartitionId排序”,就可以支持“不需要map()端combine、不需要按照Key進行排序,分區個數過大”的操作。例如,groupByKey(300)、partitionBy(300)、sortByKey(300)。
5.2.3 需要combile,需要/不需要sort
這種Shuffle模式被稱為:sort-based Shuffle Write,哈希表為:PartitionedAppendOnlyMap
5.2.3.1 操作流程
- 需要實現按Key進行聚合(combine)的功能,Spark采用的實現方法是建立一個類似HashMap的數據結構對map()輸出的record進行聚合。HashMap中的Key是“partitionId+Key”,HashMap中的Value是經過相同combine的聚合結果。在圖中,combine()是sum()函數,那么Value中存放的是多個record對應的Value相加的結果。
- 聚合完成后,Spark對HashMap中的record進行排序。如果不需要按Key進行排序,如上圖所示,那么只按partitionId進行排序;如果需要按Key進行排序,如圖6.7的下圖所示,那么按partitionId+Key進行排序。最后,將排序后的record寫入一個分區文件中。其中使用的hash表既可以實現聚合功能,也可以實現排序功能。
- 如果HashMap存放不下,則會先擴容為兩倍大小,如果還存放不下,就將HashMap中的record排序后spill到磁盤上。此時,HashMap被清空,可以繼續對map()輸出的record進行聚合,如果內存再次不夠用,那么繼續spill到磁盤上,此過程可以重復多次。當map()輸出完成以后,將此時HashMap中的reocrd與磁盤上已排序的record進行再次聚合(merge),得到最終的record,輸出到分區文件中。
5.2.3.2 優缺點
- 優缺點同5.4.2
5.2.3.3 適用性
- 適合map()端聚合(combine)、需要或者不需要按Key進行排序、分區個數無限制的應用,如reduceByKey()、aggregateByKey()等。
6.Shuffle Read框架設計與實現
6.1 Shuffle Read框架實現的功能
reduce task不斷從各個map task的分區文件中獲取數據(Fetch records),然后使用類似HashMap的結構來對數據進行聚(aggregate),該過程是邊獲取數據邊聚合。聚合完成后,將HashMap中的數據放入類似Array的數據結構中按照Key進行排序(sort byKey),最后將排序結果輸出或者傳遞給下一個操作。
6.2 Shuffle Read的不同情況
6.2.1 不需要combine和sort
6.2.1.1 操作流程
- 這種情況最簡單,只需要實現數據獲取功能即可。等待所有的map task結束后,reduce task開始不斷從各個map task獲取<K,V>record,并將record輸出到一個buffer中(大小為spark.reducer.maxSizeInFlight=48MB),下一個操作直接從buffer中獲取數據即可。
6.2.1.2 優缺點
- 優點是邏輯和實現簡單,內存消耗很小。缺點是不支持聚合、排序等復雜功能。
6.2.1.3 適用性
- 適合既不需要聚合也不需要排序的應用,如partitionBy()等。
6.2.2 不需要combine,需要sort
使用的Array結構:PartitionedPairBuffer
6.2.2.1 操作流程
- 獲取數據后,將buffer中的record依次輸出到一個Array結構(PartitionedPairBuffer)中。由于這里采用了本來用于Shuffle Write端的PartitionedPairBuffer結構,所以還保留了每個record的partitionId。然后,對Array中的record按照Key進行排序,并將排序結果輸出或者傳遞給下一步操作。
- 當內存無法存下所有的record時,PartitionedPairBuffer將record排序后spill到磁盤上,最后將內存中和磁盤上的record進行全局排序,得到最終排序后的record。
6.2.2.2 優缺點
- 優點是只需要一個Array結構就可以支持按照Key進行排序,Array大小可控,而且具有擴容和spill到磁盤上的功能,不受數據規模限制。缺點是排序增加計算時延。
6.2.2.3 適用性
- 適合reduce端不需要聚合,但需要按Key進行排序的操作,如sortByKey()、sortBy()等。
6.2.3 需要combine,需要/不需要sort
哈希表:ExternalAppendOnlyMap
6.2.3.1 操作流程
- 獲取record后,Spark建立一個類似HashMap的數據結構(ExternalAppendOnlyMap)對buffer中的record進行聚合,HashMap中的Key是record中的Key,HashMap中的Value是經過相同聚合函數(func())計算后的結果。
- 聚合函數是sum()函數,那么Value中存放的是多個record對應Value相加后的結果。之后,如果需要按照Key進行排序,如下圖所示,則建立一個Array結構,讀取HashMap中的record,并對record按Key進行排序,排序完成后,將結果輸出或者傳遞給下一步操作。
- 如果HashMap存放不下,則會先擴容為兩倍大小,如果還存放不下,就將HashMap中的record排序后spill到磁盤上。此時,HashMap被清空,可以繼續對buffer中的record進行聚合。如果內存再次不夠用,那么繼續spill到磁盤上,此過程可以重復多次。當聚合完成以后,將此時HashMap中的reocrd與磁盤上已排序的record進行再次聚合,得到最終的record,輸出到分區文件中。
注意,這里的排序和聚合依然使用的同一個數據結構。
6.2.3.2 優缺點
- 同上一節。
6.2.3.3 適用性
- 適合reduce端需要聚合、不需要或需要按Key進行排序的操作,如reduceByKey()、aggregateByKey()等。