MapReduce 基礎介紹
- 起源與發展:是 2004 年 10 月谷歌發表的 MAPREDUCE 論文的開源實現,最初用于大規模網頁數據并行處理,現成為 Hadoop 核心子項目之一,是面向批處理的分布式計算框架。
- 基本原理:分為 map 和 reduce 兩個階段。map 階段將計算任務分發到數據節點并行運算,各節點得出部分結果;reduce 階段匯總部分結果得到最終結果,體現分而治之與并行運算思想,遵循計算跟著數據走、移動計算而非數據的原則。
MapReduce 特點
- 計算與數據關系:計算任務移動到數據所在節點,數據不動,降低分布式編程門檻。
- 擴展性:具有良好擴展性,隨著節點增加,存儲和計算能力近乎線性遞增。
MapReduce 適用場景
- 離線批處理任務:適合海量數據離線批處理,如數據統計(PVUV 統計)、搜索引擎索引構建、海量數據查詢、復雜數據分析算法實現等。
MapReduce 不適用場景
- 實時性要求高的場景:不適合毫秒或秒級返回結果的場景,如 OLAP、流計算,因其計算效率達不到實時性要求,且無法處理無界數據集和支持實時計算模式。
- DAG 運算場景:不能進行有向無環圖(DAG)運算,由于中間結果需落盤、讀盤和網絡傳輸,導致延遲高、效率低。
MapReduce 與 Spark 在 DAG 運算上的對比
- MapReduce 的劣勢:做 DAG 運算慢,中間結果落盤、讀盤和網絡傳輸過程繁瑣,效率低。
- Spark 的優勢:支持 DAG 運算,數據存于內存,可直接將結果給到下一個任務計算,速度快,但存在內存不足問題。
MAPREDUCE 作業運行原理與詞頻統計示例
- 以詞頻統計展示 MAPREDUCE 作業運行原理,即統計英文文本中單詞出現頻率。
- 若文本存于 HDFS,其自動進行 split 操作;若未存于 HDFS,則按 128 兆一塊進行數據塊拆分,每個數據塊啟動一個 map task。
map 任務處理過程
- map task 將每行文本按空格拆分單詞,把單詞作為 K,給每個單詞標 value 值為 1,形成 K-V 形式中間結果。
reduce 節點聚合操作
- 把相同 K 的數據分發到同一個 reduce 節點進行聚合,將相同 K 的 value 值累加得到最終詞頻結果,其中難點在于如何把相同 K 分發到同一 reduce,此過程需經過 shuffle(洗牌)階段。
哈希取模分發機制
- shuffle 階段通過哈希取模實現分發,先將字符串形式的 K 進行數字編碼,再對 reducer 個數進行哈希取模(即轉換后的數字除以 reducer 個數取余數),余數對應相應的 reduce 節點,以此保證相同 K 能聚合到同一 reducer。
生產中 reduce 個數設置
- 生產中 reduce 個數可手動指定,實際應用中可能不像示例中有較多 reduce,如可能只有兩個 reduce,此時單詞會按哈希取模結果分發到這兩個 reduce 中進行處理。
map reduce 執行流程
- 文件拆分與 map 任務啟動:文件上傳至 HDFS 后會自動進行 split,拆分成多個 block,每個 block 啟動一個 map 任務。
- map 任務處理與分組:map 任務處理數據得到 key-value 結果,并依據 key 對 reduce 個數進行哈希取模分組。例如有三個 reduce,則按對三取模結果分為三組。
- reduce 任務拉取與處理:reduce 任務啟動 fetch 線程,從各 map 拉取對應組數據,將來自多個地方的同組數據合并為一個大文件后,對文件按 key 進行 reduce 處理(如詞頻統計中對相同 key 的 value 累加求和),每個 reducer 會輸出一個結果文件存于同一目錄下,這些文件總和即為最終結果。
執行階段劃分
- 按任務劃分:分為 map task 和 reduce task。
- 按運行階段劃分:包括 split 階段、shuffle 階段、reduce 階段及輸出階段。其中 shuffle 階段由 map task 和 reduce task 共同完成,map task 負責對內存緩存區(100 兆,達 80%即 80 兆時觸發)的數據進行分組排序并落盤,可能產生多個小文件后再合并成大文件;reduce task 從 map 拉取數據到緩存(有閾值,超閾值也會落盤),同樣要對數據合并、分組排序后再進行 reduce 處理。
shuffle 階段詳細分析
- map 端操作:map 運算結果存放在 100 兆內存緩存區,達到 80 兆時觸發溢寫到磁盤,同時進行分組排序,根據 reduce 個數哈希取模分組并在組內排序,多次溢寫會生成多個分組有序小文件,最后需合并成大的分組有序文件。
- reduce 端操作:reduce 啟動 fetch 線程從 map 拉取數據到緩存,緩存達到閾值后溢寫生成小文件,再合并成大文件并進行分組排序(按 K 值分組排序),最后對分組有序文件進行 reduce 處理。
- 效率問題:shuffle 階段是 map reduce 執行慢的關鍵。其在 map 和 reduce 過程中大量數據落盤,且 reduce 拉取數據時存在大量網絡傳輸,內存緩存使用量小(僅幾百兆),頻繁與磁盤交互及網絡傳輸導致整體效率低下。
作業提交與運行
- 提交方式:使用
hadoop -jar
命令提交作業,需指定 jar 包名稱及要運行的主類名,并可添加參數。例如,官方示例包中運行 MAREDUCE 作業統計π值時,需按此方式提交。 - 作業管理:通過
yarn application - list
查看作業運行情況,用yarn application -kill
取消作業。
作業運行監控
- 運行狀態顯示:作業提交后會生成作業 id,運行時控制臺會實時顯示 map 和 reduce 的進度信息。需注意,按
CTRL + C
只能中斷控制臺輸出,無法終止后臺作業。 - 可視化監控:可訪問yarn集群主節點的 8088 端口進入可視化監控頁面,在
applications
中找到正在運行的作業,點擊作業 id 查看詳細運行情況。
日志查看與排錯
- 查看途徑:除可視化界面外,可在作業運行節點查找日志。運維人員可登錄節點,依據
yarn node manager
相關配置找到日志存儲目錄(通常在log
目錄下),查看作業輸出日志以分析運行狀況。普通用戶一般通過可視化界面查看日志。 - 排錯方法:從日志信息中排查和解決作業運行問題。