(一)Shuffle
MapReduce中的Shuffle過程指的是在Map方法執行后、Reduce方法執行前對數據進行分區排序的階段
(二)處理流程
1. 首先MapReduce會將處理的數據集劃分成多個split,split劃分是邏輯上進行劃分,而非物理上的切分,每個split默認與Block塊大小相同,每個split由1個map task進行處理。
2. map task以行為單位讀取split中的數據,將數據轉換成K,V格式數據,調用一次map方法執行處理邏輯。Map Task處理完的數據首先寫入到默認100M的環形緩沖區,當環形緩沖區中的空間被使用到80%時數據會發生溢寫。
溢寫的數據會經過分區、快速排序形成小文件數據。(根據Key計算出本條數據應該寫出的分區號,最終在內部得到(K,V,P)格式數據?寫入到當前map task 所在的物理節點磁盤,便于后續reduce task的處理)
3. 為了避免每條數據都產生一次IO,根據split大小不同,可能會發生多次溢寫磁盤過程。
4. 每次溢寫磁盤時會對數據進行二次排序:按照數據(K,V,P)中的P(分區)進行排序并在每個P(分區)中按照K進行排序,這樣能保證相同的分區數據放在一起并能保證每個分區內的數據按照key有序。
5. 最終多次溢寫的磁盤文件(多個小文件)?數據會根據歸并排序算法合并成一個完整的磁盤文件,此刻,該磁盤文件特點是分區有序且分區內部數據按照key有序。
6. Reduce端每個Reduce task會從每個map task所在的節點上拷貝落地的磁盤文件對應的分區數據,對于每個Reduce task來說,從各個節點上拉取到多個分區數據后,每個分區內的數據按照key分組有序,但是總體來看這些分區文件中key數據不是全局有序狀態(分區數據內部有序,外部無序)。
7. 每個Reduce task需要再通過一次歸并排序,將拷貝過來的所有同一分區數據進行merge,這樣每個分區內的數據變成分區內按照key有序狀態,然后通過Reduce task處理將結果寫出。
(三)HASH分區算法
MapReduce處理數據過程中,map端將數據轉換成K,V格式數據并寫入對應的分區,根據key進行hashcode取值然后與Reduce Task個數取模得到該條數據寫出的分區號。
public class HashPartitioner<K, V> extends Partitioner<K, V> {/** Use {@link Object#hashCode()} to partition. */public int getPartition(K key, V value, int numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}}
- hashCode值可能是負數,為了保證key的hashCode非負,所以使用key.hashCode() & Integer.MAX_VALUE 按位與操作
- ?Map端寫入的分區數默認與Reduce task個數相等
(四)壓縮
在MapReduce中,壓縮是一項常見的優化技術,用于減少數據在存儲和傳輸過程中所占用的空間。通過對輸入、中間和輸出數據進行壓縮,可以有效降低存儲成本、減少網絡傳輸開銷。
?? 壓縮比率對比: bzip2 > gzip > snappy > lzo > lz4,bzip2壓縮比可以達到8:1;gzip壓縮比可以達到5比1;lzo可以達到3:1。
? 壓縮性能對比:lz4 > lzo > snappy > gzip>bzip2 ,lzo壓縮速度可達約50M/s,解壓速度可達約70M/s;gzip速度約為20M/s,解壓速度約為60M/s;bzip2壓縮速度約為2.5M/s,解壓速度約為9.5M/s。