MapReduce簡介
MapReduce是一種編程模型,用于大規模數據集(大于1TB)的并行運算。概念"Map(映射)"和"Reduce(歸約)",是它們的主要思想。
MapReduce極大地方便了編程人員在不會分布式并行編程的情況下,將自己的程序運行在分布式系統上。
WordCount單詞計數
單詞計數是最簡單也是最能體現MapReduce思想的程序之一,可以稱為MapReduce版"Hello World"。
詞計數主要完成功能是:統計一系列文本文件中每個單詞出現的次數。
以下是WordCount過程圖解,可以先大致瀏覽下,然后結合下文的Mapper和Reduce任務詳解進行理解。
分析MapReduce執行過程
MapReduce運行的時候,會通過Mapper運行的任務讀取HDFS中的數據文件,然后調用自己的方法,處理數據,最后輸出。Reducer任務會接收Mapper任務輸出的數據,作為自己的輸入數據,調用自己的方法,最后輸出到HDFS的文件中。整個流程如圖:
Mapper任務詳解
每個Mapper任務是一個java進程,它會讀取HDFS中的文件,解析成很多的鍵值對,經過我們覆蓋的map方法處理后,轉換為很多的鍵值對再輸出。整個Mapper任務的處理過程又可以分為以下幾個階段,如圖所示。
在上圖中,把Mapper任務的運行過程分為六個階段。
第一階段是把輸入文件按照一定的標準進行分片(InputSplit),每個輸入片的大小是固定的。默認情況下,輸入片(InputSplit)的大小與數據塊(Block)的大小是相同的。如果數據塊(Block)的大小是默認值64MB,輸入文件有兩個,一個是32MB,一個是72MB。那么小的文件是一個輸入片,大文件會分為兩個數據塊,那么是兩個輸入片。一共產生三個輸入片。每一個輸入片由一個Mapper進程處理。這里的三個輸入片,會有三個Mapper進程處理。
第二階段是對輸入片中的記錄按照一定的規則解析成鍵值對。有個默認規則是把每一行文本內容解析成鍵值對。“鍵”是每一行的起始位置(單位是字節),“值”是本行的文本內容。
第三階段是調用Mapper類中的map方法。第二階段中解析出來的每一個鍵值對,調用一次map方法。如果有1000個鍵值對,就會調用1000次map方法。每一次調用map方法會輸出零個或者多個鍵值對。
第四階段是按照一定的規則對第三階段輸出的鍵值對進行分區。比較是基于鍵進行的。比如我們的鍵表示省份(如北京、上海、山東等),那么就可以按照不同省份進行分區,同一個省份的鍵值對劃分到一個區中。默認是只有一個區。分區的數量就是Reducer任務運行的數量。默認只有一個Reducer任務。
第五階段是對每個分區中的鍵值對進行排序。首先,按照鍵進行排序,對于鍵相同的鍵值對,按照值進行排序。比如三個鍵值對<2,2>、<1,3>、<2,1>,鍵和值分別是整數。那么排序后的結果是<1,3>、<2,1>、<2,2>。如果有第六階段,那么進入第六階段;如果沒有,直接輸出到本地的linux文件中。
第六階段是對數據進行歸約處理,也就是reduce處理。鍵相等的鍵值對會調用一次reduce方法。經過這一階段,數據量會減少。歸約后的數據輸出到本地的linxu文件中。本階段默認是沒有的,需要用戶自己增加這一階段的代碼。
Reducer任務詳解
每個Reducer任務是一個java進程。Reducer任務接收Mapper任務的輸出,歸約處理后寫入到HDFS中,可以分為如下圖所示的幾個階段。
第一階段是Reducer任務會主動從Mapper任務復制其輸出的鍵值對。Mapper任務可能會有很多,因此Reducer會復制多個Mapper的輸出。
第二階段是把復制到Reducer本地數據,全部進行合并,即把分散的數據合并成一個大的數據。再對合并后的數據排序。
第三階段是對排序后的鍵值對調用reduce方法。鍵相等的鍵值對調用一次reduce方法,每次調用會產生零個或者多個鍵值對。最后把這些輸出的鍵值對寫入到HDFS文件中。
Shuffle--MapReduce心臟
Shuffle過程是MapReduce的核心,也被稱為奇跡發生的地方。
上面這張圖是官方對Shuffle過程的描述,可以肯定的是,單從這張圖基本不可能明白Shuffle的過程,因為它與事實相差挺多,細節也是錯亂的。Shuffle可以大致理解成怎樣把map task的輸出結果有效地傳送到reduce端。也可以這樣理解, Shuffle描述著數據從map task輸出到reduce task輸入的這段過程。
在Hadoop這樣的集群環境中,大部分map task與reduce task的執行是在不同的節點上。很多時候Reduce執行時需要跨節點去拉取其它節點上的map task結果【注意:Map輸出總是寫到本地磁盤,但是Reduce輸出不是,一般是寫到HDFS】。
如果集群正在運行的job有很多,那么task的正常執行對集群內部的網絡資源消耗會很嚴重。這種網絡消耗是正常的,我們不能限制,能做的 就是最大化地減少不必要的消耗。還有在節點內,相比于內存,磁盤IO對job完成時間的影響也是可觀的。從最基本的要求來說,我們對Shuffle過程的 期望可以有:
- 完整地從map task端拉取數據到reduce 端。
- 在跨節點拉取數據時,盡可能地減少對帶寬的不必要消耗。
- 減少磁盤IO對task執行的影響。
比如為了減少磁盤IO的消耗,我們可以調節io.sort.mb的屬性。每個Map任務都有一個用來寫入輸出數據的循環內存緩沖區,這個緩沖區默認大小是100M,可以通過io.sort.mb設置,當緩沖區中的數據量達到一個特定的閥值(io.sort.mb * io.sort.spill.percent,其中io.sort.spill.percent 默認是0.80)時,系統將會啟動一個后臺線程把緩沖區中的內容spill 到磁盤。在spill過程中,Map的輸出將會繼續寫入到緩沖區,但如果緩沖區已經滿了,Map就會被阻塞直道spill完成。
spill線程在把緩沖區的數據寫到磁盤前,會對他進行一個二次排序,首先根據數據所屬的partition排序,然后每個partition中再按Key排序。輸出包括一個索引文件和數據文件,如果設定了Combiner,將在排序輸出的基礎上進行。Combiner就是一個Mini Reducer,它在執行Map任務的節點本身運行,先對Map的輸出作一次簡單的Reduce,使得Map的輸出更緊湊,更少的數據會被寫入磁盤和傳送到Reducer。Spill文件保存在由mapred.local.dir指定的目錄中,Map任務結束后刪除。
Shuffle其他細節這里不再詳述,下面這些文章可能對大家有所幫助:
http://my.oschina.net/u/2003855/blog/310301
http://blog.csdn.net/thomas0yang/article/details/8562910