引入
通過引入篇,我們可以總結,MapReduce針對海量數據計算核心痛點的解法如下:
- 統一編程模型,降低用戶使用門檻
- 分而治之,利用了并行處理提高計算效率
- 移動計算,減少硬件瓶頸的限制
優秀的設計,總會借鑒使用到前人總結的精華。
在MapReduce設計中,就有很多經典的設計模式的影子:
- 責任鏈模式,讓每個組件發揮自己的作用,串聯起來完成一個完整的分布式應用程序的執行。
- 模板方法模式,在責任鏈的基礎上,又用了模板的形式來定義數據處理的基本流程架構。
- 策略模式,在模板方法的基礎上,提供靈活的具體業務實現方式。
下面我們就深入了解一下,MapReduce這個所謂的通用計算模型,到底是如何設計落地的。
MapReduce計算模型設計
首先,我們要知道,任何通用的計算模型,本質都可以劃分為輸入->計算->輸出三個模塊。既然說MapReduce是一個通用的計算模型,那我們就來看看它是怎么設計實現的。
核心設計思路
我們先從核心設計思路方面入手,MapReduce的編程模型中的核心計算模塊設計很簡單,正如其名,分為Map和Reduce兩個部分:
- Map負責“分”,即把復雜的任務分解為若干個“簡單的任務”來并行處理。可以進行拆分的前提是這些小任務可以并行計算,彼此間幾乎沒有依賴關系。
- Reduce負責“合”,即對map階段的結果進行全局匯總。
可以看到,這個計算模塊的設計非常簡單,下面我們看下在代碼層面,它是如何基于這個核心思路,去提供輸入,計算,輸出的能力給用戶的。
編程組件設計
在代碼層面,MapReduce結合了分布式場景的特殊性,針對這三個模塊對外提供了5個可編程組件,分別是InputFormat、Mapper、Partitioner、Reducer和OutputFormat。
下面我們分別介紹一下:
InputFormat
- 數據讀取與分片:因為MapReduce是構建在HDFS上的,那要計算的數據肯定是以一個個Block塊的形式,分散存儲在不同的DataNode里。InputFormat 組件負責從各種數據源讀取數據,并將數據切分成合適的分片(split),從而實現在多個計算節點上并行處理。例如,在處理大規模的文本數據時,InputFormat 可以按行或按固定大小對數據進行分片,使得每個 Mapper 任務可以獨立處理一個數據分片,實現數據的并行讀取和處理。
- 數據格式適配:不同的數據源可能有不同的數據格式,如文本格式、二進制格式、數據庫記錄格式等。InputFormat 能夠將各種不同格式的數據轉換為 MapReduce 可以處理的鍵值對形式,為后續的處理提供統一的輸入格式。
InputFormat主要用于描述輸入數據的格式,核心就是以下兩件事:
- 數據切分:按照某個策略將輸入數據切分成若干個split,以便確定Map Task個數以及對應的split。
- 為Mapper提供輸入數據:給定某個split,通過創建讀取數據的工具(RecordReader)來將其解析成一個個 key-value 對。
??
這種設計有點類似工廠方法,主要有以下好處:
- 解耦數據劃分和讀取過程:
通過這種設計,將輸入數據的劃分(InputFormat的職責)和具體的數據讀取(RecordReader的職責)兩個過程分開。這樣可以讓開發者獨立地修改和擴展這兩個部分。
例如,如果要支持一種新的數據格式,只需要創建一個新的InputFormat子類和對應的RecordReader,而不會影響到其他部分的代碼。- 提高可維護性和可擴展性:
這種設計使得MapReduce框架能夠方便地支持多種輸入數據格式。對于不同的數據來源和格式,只需要實現相應的InputFormat和RecordReader組合。
比如,對于數據庫數據、日志文件、二進制文件等不同類型的數據,都可以通過自定義的InputFormat和RecordReader來實現數據的有效處理。- 支持數據局部性優化:
InputFormat在劃分數據分片時,可以考慮數據的存儲位置等因素,使得RecordReader讀取數據時能夠更好地利用數據局部性。
例如,將在同一物理存儲位置的數據劃分到一個分片,這樣可以減少數據傳輸開銷,提高MapReduce的整體性能。其中文件切分算法在v1和v2版本有所區別:
- v1:splitSize = max{minSize, min{goalSize, blockSize}}
- v2:splitSize = max{minSize, min{maxSize, blockSize}}
新版本用 maxSize 替換了 goalSize ,可以更直接地對 splitSize 的上限進行嚴格控制。
例如,在處理一些對單個Map任務處理數據量上限有嚴格要求的場景(如資源有限的小型集群或者對任務響應時間敏感的場景),能夠明確設置 maxSize ,避免出現因 goalSize 計算復雜而導致輸入分片過大的情況。
在InputSplit切分方案確定后,會確定每個InputSplit的元數據信息。這通常由四部分組成:<file, start, length, hosts>,分別表示InputSplit所在的文件、起始位置、長度以及所在的host(節點)列表。
其中,前三項很容易確定,難點在于host列表的選擇方法。
FileInputFormat設計了一個簡單有效的啟發式算法,核心就是盡量選擇本地節點。
其實現主要考慮以下幾點:
- 性能提升。通過盡量選擇本地和機架本地的節點,可以盡可能減少網絡帶寬帶來的瓶頸,如果能走本地,更是可以完全利用本地磁盤IO,避免網絡傳輸帶來的延遲。
- 資源優化利用。考慮節點的資源狀況進行host選擇,可以盡可能平衡各個節點的負載,并有效提升集群的吞吐能力。
- 增強容錯。盡可能的選擇本地,避免網絡傳輸,能很好的降低數據丟失風險,并提高故障恢復效率。
Mapper
- 并行數據處理:Mapper 是 MapReduce 中實現并行計算的核心組件。對于大規模的數據處理任務,將數據分片后,每個 Mapper 任務在不同的計算節點上獨立地對數據分片進行處理,實現了數據的并行處理,大大提高了處理效率。例如,在進行文本數據的詞頻統計時,每個 Mapper 可以對自己負責的數據分片中的文本進行單詞拆分和初步計數。
- 數據轉換與過濾:Mapper 可以對輸入數據進行各種轉換和過濾操作,將原始數據轉換為更適合后續處理的中間表示形式。比如,可以在 Mapper 中對數據進行清洗、格式轉換、提取關鍵信息等操作,為后續的聚合和分析做準備。
Partitioner
- 數據分區與分發:在分布式計算中,Mapper 任務的輸出需要按照一定的規則分配到不同的 Reducer 任務中進行處理。Partitioner 組件負責根據鍵的特征將 Mapper 的輸出數據劃分到不同的分區,確保具有相同或相關鍵的數據能夠被發送到同一個 Reducer 任務中,以便進行有效的聚合和處理。例如,在對大規模用戶數據按用戶 ID 進行統計分析時,Partitioner 可以根據用戶 ID 的哈希值將數據分配到不同的 Reducer,使得同一用戶的數據能夠在同一個 Reducer 中進行處理。
- 負載均衡:通過合理的分區策略,Partitioner 可以實現數據在 Reducer 任務之間的均衡分配,避免某些 Reducer 任務處理的數據量過大,而其他 Reducer 任務閑置的情況,從而充分利用集群資源,提高整個系統的性能和效率。
Partitioner的作用是對Mapper產生的中間結果進行分區,以便將同一分組的數據交給同一個Reducer處理,它直接影響Reduce階段的負載均衡。
MapReduce提供了兩個Partitioner實現:HashPartitioner和TotalOrderPartitioner。
- HashPartitioner是默認實現,它是基于哈希值的分片方法實現的。
- TotalOrderPartitioner提供了一種基于區間的分片方法,通常用在數據全排序中。
關于全排序,通常容易想到的是歸并排序,主要是利用二分去提升效率,其與一些簡單的排序算法如插入,冒泡,選擇相比,核心就在于沒有浪費比較行為。
但由于作業只能有一個ReduceTask,因而Reduce階段會成為作業的瓶頸。為了提高全局排序的性能和擴展性, MapReduce提供了TotalOrderPartitioner。它能夠按照大小將數據分成若干個區間(分片),并保證后一個區間的所有數據均大于前一個區間數據。
TotalOrderPartitioner的全排序的步驟如下:
- 數據采樣。
在Client端通過采樣獲取分片的分割點。(Hadoop自帶了幾個采樣算法,IntercalSampler、RandomSampler、SplitSampler等。)- Map階段。
本階段涉及兩個組件,分別是Mapper和Partitioner。其中,Mapper可選用不同的Mapper實現類,如IdentityMapper,直接將輸入數據輸出,但Partitioner必須選TotalOrderPartitioner,它將步驟1中獲取的分割點保存到trie樹(前綴樹,字典樹)中以便快速定位任意一個記錄所在的區間,這樣,每個MapTask產生R(Reduce Task個數)個區間,且區間之間有序。- Reduce階段。
每個Reducer對分配到的區間數據進行局部排序,最終得到全排序數據。從以上步驟可以看出,基于TotalOrderPartitioner全排序的效率跟key分布規律和采樣算法有直接關系;key值分布越均勻且采樣越具有代表性,則Reduce Task負載越均衡,全排序效率越高。
TotalOrderPartitioner有兩個典型的應用實例:TeraSort和HBase批量數據導入。
- TeraSort是Hadoop自帶的一個應用程序實例。它曾在TB級數據排序基準評估中贏得第一名,而TotalOrderPartitioner正是從該實例中提煉出來的。
- HBase是一個構建在Hadoop之上的NoSQL數據倉庫。它以Region為單位劃分數據,Region內部數據有序(按key排序),Region之間也有序。一個MapReduce全排序作業的R個輸出文件正好可對應HBase的R個Region。
Reducer
- 數據聚合與合并:Reducer 主要用于對 Mapper 輸出的經過分區和排序的數據進行聚合和合并操作。在許多分布式計算場景中,需要對數據進行匯總、統計、合并等操作,Reducer 能夠將具有相同鍵的值進行合并和計算,得到最終的結果。如在詞頻統計中,Reducer 將各個 Mapper 輸出的相同單詞的計數進行累加,得到最終的單詞出現頻率。
- 復雜數據分析:對于一些需要全局視角或多輪處理的復雜數據分析任務,Reducer 可以在收到所有相關數據后進行綜合處理。例如,在計算數據的平均值、中位數,或者進行數據的關聯和整合等操作時,Reducer 可以根據具體的業務邏輯對數據進行進一步的分析和處理,得到最終的分析結果。
OutputFormat
- 數據存儲與持久化:在 MapReduce 任務完成后,需要將最終的計算結果存儲到合適的位置,以便后續的查詢和使用。OutputFormat 組件負責將 Reducer 的輸出數據按照指定的格式和存儲方式進行存儲,如將結果存儲為文本文件、二進制文件、數據庫表等。
- 結果格式定制:不同的應用場景可能對結果的輸出格式有不同的要求,OutputFormat 允許用戶根據實際需求定制輸出結果的格式和內容,或者按照特定的文件結構和數據組織方式進行存儲,方便與其他系統或工具進行集成和交互。
任務架構設計
用戶通過借助前面MapReduce提供的編程組件,實現了業務邏輯以后,會將程序打包提交到Hadoop集群中,這里就涉及如何去調度執行任務。
如下圖所示,是MRv1的架構設計(MRv2,也就是Yarn,可以看后面深入Yarn篇的內容)
我們來介紹一下里面的涉及的核心模塊:
Client
用戶編寫的MapReduce程序通過Client提交到JobTracker端;同時,用戶可通過Client提供的一些接口查看作業運行狀態。
在Hadoop內部用Job(任務)表示MapReduce程序。一個MapReduce程序可對應若干個Job,而每個作業會被分解成若干個 Map/Reduce Task?。
JobTracker
JobTracker主要負責資源監控和作業調度。
JobTracker監控所有TaskTracker與作業的健康狀況,一旦發現失敗情況后,其會將相應的任務轉移到其他節點;同時,JobTracker會跟蹤任務的執行進度、資源使用量等信息,并將這些信息告訴任務調度器,而調度器會在資源出現空閑時,選擇合適的任務使用這些資源。
TaskTracker
TaskTracker會周期性地通過Heartbeat(心跳),將本節點上資源的使用情況和任務的運行進度匯報給JobTracker,同時接收JobTracker發送過來的命令并執行相應的操作(如啟動新任務、殺死任務等)?。
TaskTracker使用slot來等量劃分本節點上的資源量。
slot是MapReduce針對CPU、內存等計算資源的一個抽象,它代表集群中計算節點上的一個基本資源分配單位。
其設計的核心目的,是為了控制同時運行的任務數量,并有效地管理和分配集群的計算資源,避免資源過度使用或閑置。
一個Task獲取到一個slot后才有機會運行,而Hadoop調度器的作用就是將各個TaskTracker上的空閑slot分配給Task使用。slot分為Map slot和Reduce slot兩種,分別供Map Task和Reduce Task使用。TaskTracker通過slot數目(可配置參數)限定Task的并發度。
Task
Task分為Map Task和Reduce Task兩種,均由TaskTracker啟動。
從深入HDFS篇章,我們知道HDFS會以固定大小的block為基本單位存儲數據,而對于MapReduce而言,其處理單位是Split。這是一個邏輯概念,它只包含一些元數據信息,比如數據起始位置、數據長度、數據所在節點等。它的劃分方法完全由用戶自己決定。但需要注意的是,split的多少決定了Map Task的數目,因為每個split會交由一個Map Task處理。
任務調度流程
MapReduce任務的調度流程如下:
Job提交
- 客戶端配置與提交:用戶編寫實現了?
Mapper
?和?Reducer
?接口的 Java 程序,設置作業的各項參數,如輸入輸出路徑、Mapper 和 Reducer 類等。接著,客戶端調用?JobClient
?類將作業提交給 JobTracker。 - 作業檢查:在提交作業前,客戶端會檢查作業的輸入輸出路徑是否合法等,同時計算輸入數據的分片信息。
Job初始化
- JobTracker 接收作業:JobTracker 接收到客戶端提交的作業后,為作業分配一個唯一的作業 ID,創建一個?
JobInProgress
?對象來跟蹤該作業的執行進度。 - 資源和任務初始化:JobTracker 會將作業相關信息(如作業配置、輸入分片信息等)存儲在 HDFS 上,同時為作業的 Map 和 Reduce 任務分配資源。
Job分配
- Map 任務分配:JobTracker 根據輸入數據的分片情況,將 Map 任務分配給 TaskTracker。一般會盡量將 Map 任務分配到存儲有對應輸入分片數據的節點上,以實現數據的本地化處理,減少數據傳輸開銷。
- Reduce 任務分配:JobTracker 會根據作業配置中指定的 Reduce 任務數量,將 Reduce 任務分配給合適的 TaskTracker。Reduce 任務的分配沒有數據本地化的要求。
Map 階段
- TaskTracker 接收任務:TaskTracker 從 JobTracker 接收分配的 Map 任務后,為該任務啟動一個新的 Java 進程。
- 數據讀取:該 Java 進程從 HDFS 讀取對應的輸入分片數據,將其解析成鍵值對形式,作為?
Mapper
?的輸入。 - Map 函數執行:
Mapper
?對輸入的鍵值對執行用戶自定義的?map
?方法,生成一系列中間鍵值對。這些中間鍵值對會先被寫入到內存緩沖區。 - 溢寫磁盤:當內存緩沖區達到一定閾值(默認 80%)時,會觸發溢寫操作。在溢寫過程中,數據會按照鍵進行分區和排序(默認使用哈希分區),并將排序后的結果寫入本地磁盤。如果配置了 Combiner,還會在溢寫前對相同鍵的值進行局部合并。
- 多次溢寫合并:如果在 Map 處理過程中發生了多次溢寫,最終會將這些溢寫文件合并成一個大的分區且排序好的文件。
Shuffle 階段
- 數據復制:Reduce 任務啟動后,會從各個 Map 任務所在的 TaskTracker 上復制屬于自己分區的數據。
- 歸并排序:Reduce 任務將復制過來的數據進行歸并排序,確保相同鍵的值相鄰排列。這個過程會將來自不同 Map 任務的相同分區的數據合并在一起。
Reduce 階段
- TaskTracker 接收并執行:TaskTracker 從 JobTracker 接收分配的 Reduce 任務后,為其啟動一個新的 Java 進程。
- Reduce 函數執行:
Reducer
?對排序好的數據執行用戶自定義的?reduce
?方法,對相同鍵的值進行聚合處理,生成最終的輸出結果。 - 結果輸出:
Reducer
?將處理后的結果寫入到 HDFS 等指定的輸出存儲系統中。
Job完成清理
- 狀態更新:當所有的 Map 任務和 Reduce 任務都成功完成后,JobTracker 將作業的狀態標記為成功完成。
- 資源清理:JobTracker 會清理作業運行過程中產生的臨時文件和其他相關資源。同時,TaskTracker 也會清理本地磁盤上的中間數據文件。
總結
今天梳理了MapReduce這個通用計算模型的總體設計落地思路,后面我們基于源碼去進一步深入它是如何實現的。