文章目錄
- 1 概念
- 2 編程模型
- 3 實現
- 3.1 MapReduce執行流程
- 3.2 master數據結構
- 3.3 容錯機制
- 3.3.1 worker故障
- 3.3.2 master故障
- 3.3.3 出現故障時的語義
- 3.4 存儲位置
- 3.5 任務粒度
- 3.6 備用任務
- 4 擴展技巧
- 4.1 分區函數
- 4.2 順序保證
- 4.3 Combiner函數
- 4.4 輸入和輸出的類型
- 4.5 副作用
- 4.6 跳過損壞的記錄
- 4.7 本地執行
- 4.8 狀態信息
- 4.9 計數器
- 5 應用場景
- 5.1 論文中提出的應用場景
- 5.2 其他應用場景
- 6 參考
1 概念
MapReduce 是一種用于在大型集群上進行簡化數據處理的編程模型和計算框架。它最初由 Google 公司設計用于解決大規模數據處理問題,后來被廣泛應用于各種大數據處理場景。
MapReduce 模型的核心思想是將大規模的數據集分解成多個小的數據塊,然后分配給集群中的多個計算節點進行并行處理,最終將結果合并成最終的輸出。
2 編程模型
MapReduce 編程模型由兩個主要階段組成:map 階段和 reduce 階段。
- map 階段:在 map 階段,輸入數據被分割成若干個數據塊,并由不同的計算節點進行并行處理。每個計算節點都會執行用戶定義的 map 函數,將輸入數據轉換為鍵值對的形式,并發出中間結果。
- reduce 階段:在 reduce 階段,會將中間結果按照鍵進行分組,并由不同的計算節點進行并行處理。每個計算節點都會執行用戶定義的 reduce 函數,對相同鍵的數據進行合并和處理,最終生成最終的輸出結果。
對于用戶(MapReduce的使用者)而言:MapReduce是一種抽象化的編程模型,它隱藏了分布式數據處理的細節,僅對外暴露map
和reduce
的抽象,用戶來實現具體的map
和reduce
功能。MapReduce自身關注的是并行計算、容錯、分布式數據、負載均衡等一系列問題,并且保證分布計算的結果和無錯誤的串形計算的結果一致。
形式化地說,由用戶提供的 map
函數和 reduce
函數應有如下類型:
map ( k 1 , v 1 ) → list ( k 2 , v 2 ) reduce ( k 2 , list ( v 2 ) ) → list ( v 2 ) \begin{align*} \text{map} &\quad (k_1, v_1)\quad\quad\quad\rightarrow\quad\text{list}(k_2, v_2)\\ \text{reduce} &\quad (k_2,\text{list}(v_2))\quad\rightarrow\quad\text{list}(v_2) \end{align*} mapreduce?(k1?,v1?)→list(k2?,v2?)(k2?,list(v2?))→list(v2?)?
其中,輸入的 key
和 value
值與輸出的 key
和 value
值在類型上推導的域不同。此外,中間結果 key
和 value
值與輸出 key
和 value
值在類型上推導的域相同。
例如,計算一個大的文檔集合中每個單詞出現的次數,下面是偽代碼段:
map(String key, String value):// key: document name// value: document contentsfor each word w in value:EmitIntermediate(w, “1″);
reduce(String key, Iterator values):// key: a word// values: a list of countsint result = 0;for each v in values:result += ParseInt(v);Emit(AsString(result));
map
函數輸出文檔中的每個詞、以及這個詞的出現次數(在這個簡單的例子里就是 1)。reduce
函數把 map
函數產生的每一個特定的詞的計數累加起來。
值得注意的是,在實際的實現中 MapReduce 框架使用 Iterator
來代表作為輸入的集合,主要是為了避免集合過大,無法被完整地放入到內存中。
3 實現
3.1 MapReduce執行流程
下圖展示了MapReduce操作的全部流程。當用戶調用 MapReduce 函數時,將發生下面的一 系列動作(下面的序號和圖中的序號一一對應):
- 用戶程序首先調用的
MapReduce
庫將輸入文件分成 M M M 個數據片度,每個數據片段的大小一般從 16 MB 16\text{ MB} 16?MB 到 64 MB 64\text{ MB} 64?MB(可以通過可選的參數來控制每個數據片段的大小)。然后用戶程序在集群中創建大量的程序副本。 - 這些程序副本中的有一個特殊的程序—
master
。副本中其它的程序都是worker
程序,由master
分配 任務。有 M M M 個map
任務和 R R R 個reduce
任務將被分配,master
將一個map
任務或reduce
任務分配給一個空閑的worker
。 - 被分配了
map
任務的 worker 程序讀取相關的輸入數據片段,從輸入的數據片段中解析出鍵值對,然后把鍵值對傳遞給用戶自定義的 Map 函數,由 Map 函數生成并輸出的中間結果鍵值對,并緩存在內存中。 - 緩存中的鍵值對通過分區函數(可由用戶指定,默認為
hasy(key) mod R
)分成 R R R 個區域,之后周期性的寫入到本地磁盤上。緩存的鍵值對在本地磁盤上的存儲位置將被回傳給master
,由master
負責把這些存儲位置再傳送給reduce worker
。 - 當
reduce worker
程序接收到master
程序發來的數據存儲位置信息后,使用RPC
從Map worker
所在主機的磁盤上讀取這些緩存數據。當reduce worker
讀取了所有的中間數據后,通過對key
進行排序后使得具有相同key
值的數據聚合在一起。由于許多不同的key
值會映射到相同的reduce
任務上, 因此必須進行排序。如果中間數據太大無法在內存中完成排序,那么就要在外部進行排序。 reduce worker
程序遍歷排序后的中間數據,對于每一個唯一的中間key
值,reduce worker
程序將這 個key
值和它相關的中間結果value
值的集合傳遞給用戶自定義的reduce
函數。reduce
函數的輸出被追加到所屬分區的輸出文件。- 當所有的
map
和reduce
任務都完成之后,master
喚醒用戶程序。在這個時候,在用戶程序里的對MapReduce
調用才返回。
在成功完成任務之后,MapReduce
的輸出存放在 R R R 個輸出文件中(對應每個 reduce
任務產生一個輸出文件,文件名由用戶指定)。一般情況下,用戶不需要將這 R R R 個輸出文件合并成一個文件—他們經常把這些文件作為另外一個 MapReduce
的輸入,或者在另外一個可以處理多個分割文件的分布式應用中使用。
3.2 master數據結構
master
持有一些數據結構,它存儲每一個 map
和 reduce
任務的狀態(空閑、工作中或完成),以及 worker
機器(非空閑任務的機器)的標識。
master
就像一個數據管道,中間文件存儲區域的位置信息通過這個管道從 map
傳遞到 reduce
。因此, 對于每個已經完成的 map
任務,master
存儲了 map
任務產生的 R R R 個中間文件存儲區域的大小和位置。當 map
任務完成時,master
接收到位置和大小的更新信息,這些信息被逐步遞增的推送給那些正在工作的 reduce
任務。
3.3 容錯機制
3.3.1 worker故障
-
故障判定
master
周期性的ping
每個worker
。如果在一個約定的時間范圍內沒有收到worker
返回的信息,master
將 把這個worker
標記為失效。 -
故障處理
- 正在運行:正在運行的
map
或reduce
任務將被重新置為空閑狀態,等待重新調度。 - 已完成:所有由這個故障的
worker
完成的map
任務也會被重設為初始的空閑狀態,等待重新調度,因為該worker
不可用也意味著存儲在該worker
本地磁盤上的中間結果也不可用了;已經完成的reduce
任務的輸出存儲在全局文件系統(eg. Google File System)上,因此不需要重新執行。
- 正在運行:正在運行的
當一個 map
任務首先被 worker A
執行,之后由于 worker A
故障了又被調度到 worker B
執行,這個“重新執行”的動作會被通知給所有執行 reduce
任務的 worker
。任何還沒有從 worker A
讀取數據的 reduce
任務 將從 worker B
讀取數據。
3.3.2 master故障
一個簡單的解決辦法是讓 master
周期性的將上面描述的master數據結構的寫入磁盤,即檢查點(checkpoint)。如果這個 master 任務失敗了,可以從最后一個檢查點(checkpoint)開始啟動另一個 master
進程。
然而,由于只有一個 master
進程,master
失效后再恢復是比較麻煩的,因此現在的實現是如果 master
故障,就中止MapReduce
運算。用戶可以檢查到這個狀態,并且可以根據需要重新執行 MapReduce
操作。
3.3.3 出現故障時的語義
當用戶提供的 map
和 reduce
操作是輸入確定性函數(即相同的輸入產生相同的輸出)時,MapReduce保證任何情況下的輸出都和所有程序沒有出現任何錯誤、順序的執行產生的輸出是一樣的。 這依賴對 map
和 reduce
任務的輸出是原子提交的來完成這個特性。
- 每個工作中的任務把它的輸出寫到私有的臨時文件中。
- 每個
reduce
任務生成一個這樣的文件,而每個map
任務則生成 R R R 個這樣的文件(一 個reduce
任務對應一個文件)。 - 當一個
map
任務完成的時,worker
發送一個包含R
個臨時文件名的完成消息給master
。如果master
從一個已經完成的map
任務再次接收到到一個完成消息,master
將忽略這個消息;否 則,master
將這 R R R 個文件的名字記錄在數據結構里。 - 當
reduce
任務完成時,reduce worker
進程以原子的方式把臨時文件重命名為最終的輸出文件。如果同一個reduce
任務在多臺機器上執行,針對同一個最終的輸出文件將有多個重命名操作執行。這就依賴底層文件系統提供的重命名操作的原子性來保證最終的文件系統狀態僅僅包含一個reduce
任務產生的數據。
使用 MapReduce 模型的程序員可以很容易的理解他們程序的行為,因為我們絕大多數的 map
和 reduce
操作是確定性的,而且存在這樣的一個事實:我們的語義(也可以理解為處理機制)等價于一個順序的執行的操作。
當 map
and/or reduce
操作是不確定性的時候,MapReduce提供雖然較弱但是依然合理的語義。當使用非確定操作的時候, 一個 reduce
任務 R 1 R_1 R1? 的輸出等價于一個非確定性程序順序執行產生時的輸出。但是,另一個 reduce
任務 R 2 R_2 R2?的輸出也許符合一個不同的非確定程序順序執行產生的 R 2 R_2 R2? 的輸出。
考慮 map
任務 M M M 和 reduce
任務 R 1 R_1 R1?、 R 2 R_2 R2? 的情況。我們設定 e ( R i ) e(R_i) e(Ri?)是 R i R_i Ri? 已經提交的執行過程(有且僅有一個這樣的執行過程)。出現較弱語義是因為 e ( R 1 ) e(R_1) e(R1?)可能讀取了 M M M 一次執行產生的輸出,而 e ( R 2 ) e(R_2) e(R2?)可能讀取了 M M M 的另一次執行產生的輸出。
3.4 存儲位置
核心思想:盡量把輸入數據(由 GFS 管理)存儲在集群中機器的本地磁盤上來節省網絡帶寬。
GFS 把每個文件按 64MB 一個 Block 分隔,每個 Block 保存 在多臺機器上,環境中就存放了多份拷貝(一般是 3 個拷貝)。MapReduce 的 master
在調度 map
任務時會考慮輸入文件的位置信息,盡量將一個 map
任務調度在包含相關輸入數據拷貝的機器上執行;
如果上述努力失敗 了,master
將嘗試在保存有輸入數據拷貝的機器附近的機器上執行 map
任務(例如,分配到一個和包含輸入數據的機器在一個 switch 里的 worker 機器上執行)。當在一個足夠大的 cluster 集群上運行大型 MapReduce 操作的時候,大部分的輸入數據都能從本地機器讀取,因此消耗非常少的網絡帶寬。
3.5 任務粒度
理想情況下, M M M 和 R R R 應當比集群中 worker
的機器數量要多得多。在每臺 worker
機器都執行大量的不同任務能夠提高集群的動態的負載均衡能力,并且能夠加快故障恢復的速度:失效機器上執行的大量 map
任務都可以分布到所有其他的 worker
機器上去執行。
但是實際上,在具體實現中對 M M M 和 R R R 的取值都有一定的客觀限制,因為 master
必須執行 $O(M+R) $次調度,并且在內存中保存 O ( M × R ) O(M\times R) O(M×R)個狀態(對影響內存使用的因素還是比較小的: O ( M × R ) O(M\times R) O(M×R)塊狀態,大概每對 map
任務/reduce
任務 1 個字節就可以了)。
更進一步, R R R 值通常是由用戶指定的,因為每個 reduce
任務最終都會生成一個獨立的輸出文件。實際使用時我們也傾向于選擇合適的 M M M 值,以使得每一個獨立任務都是處理大約 16M 到 64M 的輸入數據(這樣, 上面描寫的輸入數據本地存儲優化策略才最有效),另外,我們把 R R R 值設置為我們想使用的 worker
機器數量的小的倍數。
所以我們通常會用這樣的比例來執行 MapReduce: M = 200000 M=200000 M=200000, R = 5000 R=5000 R=5000,使用 2000 2000 2000 臺 worker
機器。
3.6 備用任務
如果集群中有某個 worker
花了特別長的時間來完成最后的幾個 map
或 reduce
任務,整個 MapReduce 計算任務的耗時就會因此被拖長,這樣的 worker
也就成了落后者(Straggler)。
因此,論文提出一個通用的機制來減少“落伍者”出現的情況。當一個 MapReduce 操作接近完成的時候,master
會調度備用(backup)任務進程來執行剩下的、處于處理中狀態(in-progress)的任務。無論是最初的執行、還是備用(backup)任務進程完成了任務,我們都把這個任務標記成為已經完成。
4 擴展技巧
4.1 分區函數
MapReduce 的使用者通常會指定 reduce
任務和 reduce
任務輸出文件的數量( R R R)。我們在中間結果key
上使用分區函數來對數據進行分區,之后再輸入到后續任務執行進程。
一個缺省的分區函數是使用 hash
方法(比如, hash(key) mod R
)進行分區。hash
方法能產生非常平衡的分區。然而,有的時候,其它的一些分區函數對 key
值進行的分區將非常有用。
比如,輸出的 key
值是 URLs,我們希望每個主機的所有條目保持在同一個輸出文件中。為了支持類似的情況,MapReduce庫的用戶需要提供專門的分區函數。例如,使用hash(Hostname(urlkey)) mod R
作為分區函數就可以把所有來自同一個主機的 URLs 保存在同一個輸出文件中。
4.2 順序保證
在給定的分區 R R R中,MapReduce保證所有中間鍵值對數據的處理順序是按照 key
值增量順序處理的。
4.3 Combiner函數
在某些情況下,map
函數產生的中間 key
值的重復數據會占很大的比重,并且,用戶自定義的 reduce
函數滿足結合律和交換律。詞數統計程序是個很好的例子。由于詞頻率傾向于一個 zipf 分布,每個 map
任務將產生成千上萬個這樣的記錄。所有的這些記錄將通過網絡被發送到一個單獨的 reduce
任務,然后由這個reduce
任務把所有這些記錄累加起來產生一個數字。
MapReduce允許用戶指定一個可選的 combiner
函數,combiner
函數首先在本地將這些記錄進行一次合并,然后將合并的結果再通過網絡發送出 去。 combiner
函數在每臺執行 Map 任務的機器上都會被執行一次。一般情況下,combiner
和 reduce
函數是 一樣的。combiner
函數和 Reduce 函數之間唯一的區別是 MapReduce 庫怎樣控制函數的輸出。
reduce
函數的輸出被保存在最終的輸出文件里,而 combiner
函數的輸出被寫到中間文件里,然后被發送給 reduce
任務。 部分的合并中間結果可以顯著的提高一些 MapReduce 操作的速度。
4.4 輸入和輸出的類型
MapReduce庫支持幾種不同的格式的輸入數據。比如文本模式中,key
是文件的偏移量,value
是該行內容。
程序員可以定義Reader
接口來適應不同的輸入類型,程序員需要保證必須能把輸入數據切分成數據片段,且這些數據片段能夠由單獨的Map任務來處理就行了。Reader
的數據源可能是數據庫,可能是文本文件,甚至是內存等。
同樣,用戶采用類似添加新的輸入數據類型的方式增加新的輸出類型(定義Writer
接口)。
4.5 副作用
程序員在寫map
和/或reduce
操作的時候,可能會因為方便,定義很多額外功能,比如增加輔助的輸出文件等。但應當時刻記住,map
和reduce
操作應當保證原子性和冪等性。
比如,一個任務生成了多個輸出文件,但是我們沒有原子化多段commit的操作。這就需要程序員自己保證生成多個輸出的任務是確定性任務。
4.6 跳過損壞的記錄
有時候,用戶程序中的 bug 導致 map
或者 reduce
函數在處理某些記錄的時候 crash 掉,MapReduce 操作 無法順利完成。相較于修復無法執行的 Bug,跳過引發 Bug 的記錄可能更為明智。因此,我們希望 MapReduce 檢測哪些記錄導致確定性的crash, 并且跳過這些記錄不處理。
MapReduce 如何自動檢測這種情況呢?首先,每個worker
進程都設置了信號處理函數捕獲內存段異常(segmentation violation)和總線錯誤(bus error)。 在執行 map
或者 reduce
操作之前,MapReduce 庫通過全局變量保存記錄序號。如果用戶程序觸發了一個系統信號,信號處理函數將用“最后一口氣”通過 UDP 包向 master
發送處理的最后一條記錄的序號。當 master
看到在處理某條特定記錄不止失敗一次時,master
就標志這條記錄需要被跳過,并且在下次重新執行相關的map
或者 reduce
任務的時候跳過這條記錄。
4.7 本地執行
調試 map
和 reduce
函數的 bug 非常困難,因為它們在分布式系統中執行,并且通常跨多臺計算機執行,由 master
動態調度。為了簡化調試、性能分析和小規模測試,Google開發了本地版本的 MapReduce 庫。這個本地版本可以讓 MapReduce 操作在單臺計算機上順序執行。用戶可以控制操作的執行,并且可以將其限制在特定的 map
任務上。通過設置特殊標志,用戶可以在本地執行他們的程序,并且輕松使用本地調試和測試工具(如 gdb
)。
4.8 狀態信息
在 master
內部,設有一個內置的 HTTP 服務器,用于展示一系列狀態信息頁面。這些頁面會顯示計算進度,例如已完成的任務數量、正在執行的任務數量、輸入、中間數據和輸出的字節數,以及處理速率等。
這些頁面還包含了指向每個任務的stderr
和stdout
文件的鏈接。用戶可以利用這些數據來預測計算完成所需的時間,以及是否需要增加更多資源。當計算花費的時間超過預期時,這些頁面還可以幫助用戶找出執行速度緩慢的原因。
另外,頂層狀態頁面還會顯示出現故障的worker
及其故障時正在執行的 map
和 reduce
任務。這些信息對于調試用戶代碼中的 bug 非常有幫助。
很多分布式系統架構都會提供可視化監控界面,這是提升分布式系統的可維護性的重要手段。
4.9 計數器
MapReduce 庫提供計數器機制,用來統計不同操作發生次數。比如,用戶可能想統計已經處理了多少個單詞、已經索引的多少篇 German 文檔等等。
要想使用這個特性,用戶需要創建Counter對象,然后在map
和reduce
函數中以正確的方式增加counter
。這些計數器的值周期性的從各個單獨的worker
機器上傳遞給master
(附加在ping的應答包中傳遞)。master
把執行成功的 map
和 reduce
任務的計數器值進行累計,當 MapReduce 操作完成之后,返回給用戶代碼。 計數器當前的值也會顯示在 master
的狀態頁面上,這樣用戶就可以看到當前計算的進度。
當累加這些counter
的值時,master
會去掉那些重復執行的相同map
或者reduce
操作的次數,以此避免重復計數(之前提到的備用任務和故障后重新執行任務,這兩種情況會導致相同的任務被多次執行)。
有些counter
值是由MapReduce庫自動維護的,例如已經處理過的輸入鍵值對的數量以及生成的輸出鍵值對的數量等等。
5 應用場景
5.1 論文中提出的應用場景
- 分布式的 Grep:
map
函數輸出匹配某個模式的一行,reduce
函數是一個恒等函數,即把中間數據復制到輸出。 - 計算 URL 訪問頻率:
map
函數處理日志中 web 頁面請求的記錄,然后輸出 (URL,1)。reduce
函數把相同 URL 的 value 值都累加起來,產生 (URL, 記錄總數)結果。 - 倒轉網絡鏈接圖:
map
函數在源頁面(source)中搜索所有的鏈接目標(target)并輸出為(target,source)。reduce
函數把給定鏈接目標(target)的鏈接組合成一個列表,輸出(target,list(source))。 - 每個主機的檢索詞向量:檢索詞向量用一個(詞,頻率)列表來概述出現在文檔或文檔集中的最重要的一些詞。
map
函數為每一個輸入文檔輸出(主機名,檢索詞向量),其中主機名來自文檔的 URL。reduce
函數接收給定主機的所有文檔的檢索詞向量,并把這些檢索詞向量加在一起,丟棄掉低頻的檢索詞,輸出一個最終的(主機名,檢索詞向量)。 - 倒排索引:
map
函數分析每個文檔輸出一個(詞,文檔號)的列表,reduce
函數的輸入是一個給定詞的所有 (詞,文檔號),排序所有的文檔號,輸出(詞,list(文檔號))。所有的輸出集合形成一個簡單的倒排索引,它以一種簡單的算法跟蹤詞在文檔中的位置。 - 分布式排序:
map
函數從每個記錄提取key
,輸出(key,record)。reduce
函數不改變任何的值。這個運算依賴分區機制和排序屬性。 - 重建索引系統:重寫了 Google 網絡搜索服務所使用的索引系統。這個索引系統的輸入數據是網絡爬蟲抓取回來的大量文檔,這些文檔數據保存在 GFS 文件系統中,其原始內容超過了 20TB。通過一系列的 MapReduce 操作(大約 5 到 10 次),來建立索引。使用 MapReduce(替換上一個特別設計的、分布式處理的索引程序)帶來這些好處:
- 簡化的代碼:索引部分的代碼變得簡單、小巧、易于理解;
- 靈活性:MapReduce 庫的性能已經足夠好,因此可以將概念上不相關的計算步驟分開處理,減少數據傳遞的額外開銷;
- 操作管理的簡化:因為由機器失效、機器處理速度緩慢、以及網絡的瞬間阻塞等引起的絕大部分問題都已經由 MapReduce 庫解決了,不再需要操作人員的介入了。另外,我們可以通過在索引系統集群中增加機器的簡單方法提高整體處理性能。
5.2 其他應用場景
- 數據清洗和預處理:MapReduce 可以用于處理大規模數據集的清洗和預處理,包括數據去重、數據過濾、數據格式轉換等操作;
- 日志分析和異常檢測:MapReduce 可以用于分析大規模日志數據,檢測異常行為、故障事件和系統性能問題;
- 圖算法和社交網絡分析:MapReduce 可以應用于圖算法和社交網絡分析,包括圖的遍歷、最短路徑計算、社區發現等操作;
- 文本挖掘和信息抽取:MapReduce 可以用于處理文本數據,進行信息抽取、實體識別、主題建模等自然語言處理任務。
6 參考
paper:MapReduce
知乎 【分布式】MapReduce論文筆記
知乎 Google MapReduce 論文詳解