一、Hadoop的shuffle
前置知識:
Map任務的數量由Hadoop框架自動計算,等于分片數量,等于輸入文件總大小 / 分片大小,分片大小為HDFS默認值128M,可調
Reduce任務數由用戶在作業提交時通過Job.setNumReduceTasks(int)
設置
數據分配到Reduce任務的時間點,在Map任務執行期間,通過Partitioner(分區器)確定每個鍵值對的目標Reduce分區。默認采取partition=hash(key)?%?numReduceTasks策略
Shuffle過程:
hadoop會先將map數據寫入緩沖區,緩沖區達到某個閾值后,會把數據溢寫至磁盤,溢寫磁盤時會根據先將數據寫入相應分區文件,進行排序
溢寫完畢后,會將多個分區文件合并,再進行歸并排序
Reduce任務主動從所有Map任務的磁盤中拉取(Pull)屬于自己分區的數據,拉取到數據后,還會進行一次歸并排序
可以看到一共進行了三次排序。這一設計是后來所有分布式計算框架混洗任務的基石。
QA:為什么Hadoop需要三次排序?
第一次排序是為了第二次歸并排序方便
第二次歸并排序是為了給reduce任務時,reduce任務可以順序讀
第三次排序是因為hadoop要保證同一個reduce的輸出是有序的,同時如果輸入的key是有序的,reduce處理完輸出即可,如果是無序的,那么reduce需要保存再重排序,對于數據量大的場景容易oom
二、Spark的shuffle
前置知識:
map個數由Saprk分區數計算或者自定義,reduce個數由用戶指定,如果沒指定,通常是機器核數
map和reduce數據的交互方式依舊是,map后把數據寫入文件中,reduce從文件中讀取數據
分區ID是數據在Shuffle過程中被分配到的目標Reduce任務的編號,決定了數據最終由哪個Reduce任務處理。
計算方式:
默認使用HashPartitioner
,根據Key的哈希值對Reduce任務數取模:
分區ID=hash(key)?%?numReduceTasks分區ID=hash(key)?%?numReduceTasks
2.1 哈希混洗
Spark 1.2 之前默認的Shuffle機制
map輸出的數據不再排序,若有M
個map任務和R
個reduce任務,每個map任務生成R個文件,每個reduce任務拉取屬于自己的文件
這樣導致文件句柄數太多了,若M=1000
、R=1000
,則生成?1,000,000個文件,同時內存壓力也比較大,如果需要排序要在reduce端把一個key的所有數據全部加載,所以后面使用了sort混洗
2.2 sort 混洗
Spark 1.2 引入,逐步成為默認機制
1. Map任務處理輸入數據,生成<Key, Value>
對,并按分區ID暫存到內存緩沖區
2. 當緩沖區達到閾值(如spark.shuffle.spill.numElementsForceSpillThreshold
默認值)時,開始排序。
-
排序規則:
-
僅按分區ID排序(默認):將數據按分區ID排序,同一分區內的數據無序。
-
按分區ID + Key排序(需配置):
若設置spark.shuffle.sort.byKey=true
,則按(分區ID, Key)
排序,同一分區內的數據按鍵有序。
-
3. 排序后的數據按分區ID順序寫入磁盤,生成一個臨時溢寫文件
4. Map任務結束時,將所有臨時溢寫文件合并為單個數據文件(data
)和一個索引文件(index
)
-
合并方式:
-
多路歸并排序:將多個已按分區ID(或Key)排序的溢寫文件合并,保持全局有序性。
-
索引文件生成:記錄每個分區ID在數據文件中的起始和結束偏移量。
-
5.?Reduce任務向Driver查詢所有Map任務生成的數據文件和索引文件的位置
6.?若Map端已按Key排序,Reduce任務直接對多個有序數據塊進行歸并,生成全局有序數據集。
-
內存與磁盤結合:
-
數據量較小時,直接在內存中歸并。
-
數據量較大時,使用外排序(溢出到磁盤,分批次歸并
-
感覺這樣下來,跟hadoop的shuffle就有點像了,這樣有個好處是,map生成的文件就只有兩個了,最終的文件就是 2 * R個
2.3 Spark和Hadoop shuffle的內存使用上的不同之處
Hadoop寫文件時,是設置了一個內存閾值,到達了該閾值就會把內存內容寫入文件中,比如閾值是80M,一個200M文件就要溢寫三次,且緩沖區大小不可動態調整,無法根據任務需求擴展或收縮。
Spark 將內存劃分為?存儲內存(Storage Memory)?和?執行內存(Execution Memory),兩者可動態借用,
-
Map 任務將數據按分區ID(或 Key)緩存在內存中。
-
溢出到磁盤:若內存不足,部分數據排序后寫入磁盤臨時文件。
-
合并最終文件:Map 結束時合并內存和磁盤數據,生成一個數據文件和一個索引文件。
舉個spark處理數據的例子,假設有200MB數據:
(1) 內存排序
-
Map 任務處理數據后,先將鍵值對緩存在內存中,并按?分區ID(和可選的 Key)排序。
-
假設可用執行內存為 150MB,前 150MB 數據在內存中完成排序,生成一個?有序的內存塊。
(2) 溢出到磁盤
-
當內存不足時,Spark 將內存中已排序的 150MB 數據?溢寫到磁盤,生成一個臨時文件(如?
spill1
),該文件內部保持有序。 -
剩余 50MB 數據繼續在內存中排序,直到 Map 任務結束。
在 Map 任務結束時,所有內存和磁盤上的數據會被合并為一個全局有序的輸出文件。具體流程如下:
假設 Map 任務生成以下兩個有序片段:
-
內存塊(150MB):
[A, B, D, F]
-
溢寫文件(50MB):
[C, E, G]
歸并過程:
-
初始化指針:內存塊指向?
A
,溢寫文件指向?C
。 -
比較當前元素,選擇最小者:
-
第一輪:
A
(內存塊) → 寫入最終文件。 -
第二輪:
B
(內存塊) → 寫入最終文件。 -
第三輪:
C
(溢寫文件) → 寫入最終文件。 -
...
-
-
最終合并結果:
[A, B, C, D, E, F, G]
。
reduce階段拉取數據的時候,會優先從內存中獲取,內存中沒有才去文件中獲取
三、Flink的shuffle
雖然Flink是批流一體的,因為Flink現在主要是作為流處理,所以我們分析Flink在流處理場景下的shuffle
因為Flink處理的是流數據,自然不會有上面介紹的批處理的那些從文件中拉取數據,文件歸并排序之類的操作
如果硬要說的話,Flink是哈希混洗,用戶定義上游算子和下游算子的并發度,上游算子的數據默認會采用?Round-Robin 輪詢算法,通過rpc(netty)發給下游的算子,在Flink UI圖中我們會看到圖中的線是 Rebalance
如果有key by,那么會對key做hash,然后對并發度取模,根據取模結果發送給下游算子