文章來自于:http://www.cnblogs.com/geekma/p/3139823.html
MapReduce:大型集群上的簡單數據處理
摘要
MapReduce是一個設計模型,也是一個處理和產生海量數據的一個相關實現。用戶指定一個用于處理一個鍵值(key-value)對生成一組key/value對形式的中間結果的map函數,以及一個將中間結果鍵相同的鍵值對合并到一起的reduce函數。許多現實世界的任務都能滿足這個模型,如這篇文章所示。
使用這個功能形式實現的程序能夠在大量的普通機器上并行執行。這個運行程序的系統關心下面的這些細節:輸入數據的分區、一組機器上調度程序執行、處理機器失敗問題,以及管理所需的機器內部的通信。這使沒有任何并行處理和分布式系統經驗的程序員能夠利用這個大型分布式系統的資源。
我們的MapReduce實現運行在一個由普通機器組成的大規模集群上,具有很高的可擴展性:一個典型的MapReduce計算會在幾千臺機器上處理許多TB的數據。程序員們發現這個系統很容易使用:目前已經實現了幾百個MapReduce程序,在Google的集群上,每天有超過一千個的MapReduce工作在運行。
一、??????? 介紹
在過去的5年中,本文作者和許多Google的程序員已經實現了數百個特定用途的計算程序,處理了海量的原始數據,包括抓取到的文檔、網頁請求日志等,計算各種衍生出來的數據,如反向索引、網頁文檔的圖形結構的各種表示、每個host下抓取到的頁面數量的總計、一個給定日期內的最頻繁查詢的集合等。大多數這種計算概念明確。然而,輸入數據通常都很大,并且計算必須分布到數百或數千臺機器上以確保在一個合理的時間內完成。如何并行計算、分布數據、處理錯誤等問題使這個起初很簡單的計算,由于增加了處理這些問題的很多代碼而變得十分復雜。
為了解決這個復雜問題,我們設計了一個新的抽象模型,它允許我們將想要執行的計算簡單的表示出來,而隱藏其中并行計算、容錯、數據分布和負載均衡等很麻煩的細節。我們的抽象概念是受最早出現在lisp和其它結構性語言中的map和reduce啟發的。我們認識到,大多數的計算包含對每個在輸入數據中的邏輯記錄執行一個map操作以獲取一組中間key/value對,然后對含有相同key的所有中間值執行一個reduce操作,以此適當的合并之前的衍生數據。由用戶指定map和reduce操作的功能模型允許我們能夠簡單的進行并行海量計算,并使用re-execution作為主要的容錯機制。
這項工作的最大貢獻是提供了一個簡單的、強大的接口,使我們能夠自動的進行并行和分布式的大規模計算,通過在由普通PC組成的大規模集群上實現高性能的接口來進行合并。
第二章描述了基本的編程模型,并給出了幾個例子。第三章描述了一個為我們的聚類計算環境定制的MapReduce接口實現。第四章描述了我們發現對程序模型很有用的幾個優化。第六章探索了MapReduce在Google內部的使用,包括我們在將它作為生產索引系統重寫的基礎的一些經驗。第七章討論了相關的和未來的工作。
二、??????? 編程模型
這個計算輸入一個key/value對集合,產生一組輸出key/value對。MapReduce庫的用戶通過兩個函數來標識這個計算:Map和Reduce。
Map,由用戶編寫,接收一個輸入對,產生一組中間key/value對。MapReduce庫將具有相同中間key I的聚合到一起,然后將它們發送給Reduce函數。
Reduce,也是由用戶編寫的,接收中間key I和這個key的值的集合,將這些值合并起來,形成一個盡可能小的集合。通常,每個Reduce調用只產生0或1個輸出值。這些中間值經過一個迭代器(iterator)提供給用戶的reduce函數。這允許我們可以處理由于數據量過大而無法載入內存的值的鏈表。
2.1 例子
考慮一個海量文件集中的每個單詞出現次數的問題,用戶會寫出類似于下面的偽碼:
?
Map函數對每個單詞增加一個相應的出現次數(在這個例子中僅僅為“1”)。Reduce函數將一個指定單詞所有的計數加到一起。
此外,用戶使用輸入和輸出文件的名字、可選的調節參數編寫代碼,來填充一個mapreduce規格對象,然后調用MapReduce函數,并把這個對象傳給它。用戶的代碼與MapReduce庫(C++實現)連接到一起。。附錄A包含了這個例子的整個程序。
2.2 類型
盡管之前的偽代碼中使用了字符串格式的輸入和輸出,但是在概念上,用戶定義的map和reduce函數需要相關聯的類型:
map?????? (k1, v1) ? ? ? ? ? ? ? ? ? ? ?--> ? ? ? ? list(k2, v2)
reduce?? (k2, list(v2)) ? ? ? ? ? ? ? ?--> ? ? ? ? ?list(v2)
也就是說,輸入的鍵和值和輸出的鍵和值來自不同的域。此外,中間結果的鍵和值與輸出的鍵和值有相同的域。
MapReduce的C++實現與用戶定義的函數使用字符串類型進行參數傳遞,將類型轉換的工作留給用戶的代碼來處理。
2.3 更多的例子
這里有幾個簡單有趣的程序,能夠使用MapReduce計算簡單的表示出來。
分布式字符串查找(Distributed Grep):map函數將匹配一個模式的行找出來。Reduce函數是一個恒等函數,只是將中間值拷貝到輸出上。
URL訪問頻率計數(Count of URL Access Frequency):map函數處理web頁面請求的日志,并輸出<URL, 1>。Reduce函數將相同URL的值累加到一起,生成一個<URL, total count>對。
翻轉網頁連接圖(Reverse Web-Link Graph):map函數為在一個名為source的頁面中指向目標(target)URL的每個鏈接輸出<target, source>對。Reduce函數將一個給定目標URL相關的所有源(source)URLs連接成一個鏈表,并生成對:<target, list(source)>。
主機關鍵向量指標(Term-Vector per Host):一個檢索詞向量將出現在一個文檔或是一組文檔中最重要的單詞概述為一個<word, frequency>對鏈表。Map函數為每個輸入文檔產生一個<hostname, term vector>(hostname來自文檔中的URL)。Reduce函數接收一個給定hostname的所有文檔檢索詞向量,它將這些向量累加到一起,將罕見的向量丟掉,然后生成一個最終的<hostname, term vector>對。
倒排索引(Inverted Index):map函數解析每個文檔,并生成一個<word, document ID>序列。Reduce函數接收一個給定單詞的所有鍵值對,所有的輸出對形成一個簡單的倒排索引。可以通過對計算的修改來保持對單詞位置的追蹤。
分布式排序(Distributed Sort):map函數將每個記錄的key抽取出來,并生成一個<key, record>對。Reduce函數不會改變任何的鍵值對。這個計算依賴了在4.1節提到的分區功能和4.2節提到的排序屬性。
三、??????? 實現
MapReduce接口有很多不同的實現,需要根據環境來做出合適的選擇。比如,一個實現可能適用于一個小的共享內存機器,而另一個實現則適合一個大的NUMA多處理器機器,再另一個可能適合一個更大的網絡機器集合。
這一章主要描述了針對在Google內部廣泛使用的計算環境的一個實現:通過交換以太網將大量的普通PC連接到一起的集群。在我們的環境中:
(1)??? 機器通常是雙核x86處理器、運行Linux操作系統、有2-4G的內存。
(2)??? 使用普通的網絡硬件—通常是100Mb/s或者是1Gb/s的機器帶寬,但是平均值遠小于帶寬的一半。
(3)??? 由數百臺或者數千臺機器組成的集群,因此機器故障是很平常的事
(4)??? 存儲是由直接裝在不同機器上的便宜的IDE磁盤提供。一個內部的分布式文件系統用來管理存儲這些磁盤上的數據。文件系統在不可靠的硬件上使用副本機制提供了可用性和可靠性。
(5)??? 用戶將工作提交給一個調度系統,每個工作由一個任務集組成,通過調度者映射到集群中可用機器的集合上。
3.1 執行概述
通過自動的將輸入數據分區成M個分片,Map調用被分配到多臺機器上運行。數據的分片能夠在不同的機器上并行處理。使用分區函數(如,hash(key) mod R)將中間結果的key進行分區成R個分片,Reduce調用也被分配到多臺機器上運行。分區的數量(R)和分區函數是由用戶指定的。
?
圖1:執行概述
圖1中顯示了我們實現的一個MapReduce操作的整個流程。當用戶程序調用MapReduce函數時,下面一系列的行為將會發生(圖1中所使用的數字標識將與下面列表中的相對應):
1. 用戶程序中的MapReduce庫會先將輸入文件分割成M個通常為16MB-64MB大小的片(用戶可以通過可選參數進行控制)。然后它將在一個集群的機器上啟動許多程序的拷貝。
2. 這些程序拷貝中的一個是比較特殊的——master。其它的拷貝都是工作進程,是由master來分配工作的。有M個map任務和R個reduce任務被分配。Master挑選出空閑的工作進程,并把一個map任務或reduce任務分配到這個進程上。
3. 一個分配了map任務的工作進程讀取相關輸入分片的內容,它將從輸入數據中解析出key/value對,并將其傳遞給用戶定義的Map函數。Map函數生成的中間key/value對緩存在內存中。
4. 緩存中的鍵值對周期性的寫入到本地磁盤,并通過分區函數分割為R個區域。將這些緩存在磁盤上的鍵值對的位置信息傳回給master,master負責將這些位置信息傳輸給reduce工作進程。
5. 當一個reduce工作進程接收到master關于位置信息的通知時,它將使用遠程調用函數(RPC)從map工作進程的磁盤上讀取緩存的數據。當reduce工作進程讀取完所有的中間數據后,它將所有的中間數據按中間key進行排序,以保證相同key的數據聚合在一起。這個排序是需要的,因為通常許多不同的key映射到相同的reduce任務上。如果中間數據的總量太大而無法載入到內存中,則需要進行外部排序。
6. reduce工作進程迭代的訪問已排序的中間數據,并且對遇到的每個不同的中間key,它會將key和相關的中間values傳遞給用戶的Reduce函數。Reduce函數的輸出追加到當前reduce分區一個最終的輸出文件上。
7. 當所有的map任務和reduce任務完成后,master會喚醒用戶程序。這時候,用戶程序中的MapReduce調用會返回到用戶代碼上。
在成功完成后,MapReduce操作輸出到R個輸出文件(每個reduce任務生成一個,文件名是由用戶指定的)中的結果是有效的。通常,用戶不需要合并這R個輸出文件,它們經常會將這些文件作為輸入傳遞給另一個MapReduce調用,或者在另一個處理這些輸入分區成多個文件的分布式應用中使用。
3.2 Master數據結構
Master保留了幾個數據結構。對于每個Map和Reduce任務,它存儲了它們的狀態(idle、in-progress或者completed),以及工作進程機器的特性(對于非空閑任務)。
Master是中間文件區域的位置信息從map任務傳送到reduce任務的一個通道。因此,對于每個完成的map任務來說,master存儲了map任務產生的R個中間文件區域的位置信息和大小。在map任務完成時,master會接收到更新這個含有位置信息和大小信息的消息。信息被增量的傳輸到運行in-progress的reduce任務的工作進程上。
3.3 容錯
因為MapReduce庫是被設計成運行在數百或數千臺機器上幫助處理海量數據的,所以這個庫必須能夠優雅的處理機器故障。
工作進程故障
Master周期性的ping每個工作進程,如果在一個特定的時間內沒有收到響應,則master會將這個工作進程標記為失效。任何由失效的工作進程完成的map任務都被標記為初始idle狀態,因此這個map任務會被重新分配給其它的工作進程。同樣的,任何正在處理的map任務或reduce任務也會被置為idle狀態,進而可以被重新調度。
在一個失效的節點上完成的map任務會被重新執行,因為它們的輸出被存放在失效機器的本地磁盤上,而磁盤不可訪問。完成的reduce任務不需要重新執行,因為它們的輸出被存儲在全局文件系統上。
當一個map任務先被工作進程A執行,然后再被工作進程B執行(因為A失效了),所有執行reduce任務的工作進程都會接收到重新執行的通知,任何沒有從工作進程A上讀取數據的reduce任務將會從工作進程B上讀取數據。
MapReduce對于大規模工作進程失效有足夠的彈性。比如,在一個MapReduce操作處理過程中,網絡維護造成了80臺機器組成的集群幾分鐘內不可達。MapReduce的master會重新執行那些在不可達機器上完成的工作,并持續推進,最終完成MapReduce操作。
Master故障
將上面提到的master數據結構周期性的進行寫檢查點操作(checkpoint)是比較容易的。如果master任務死掉,一個新的拷貝會從最近的檢查點狀態上啟動。然而,假定只有一個單獨的master,它的故障是不大可能的。因此,如果master故障,我們當前的實現是中止MapReduce計算。
當前故障的語義
當用戶提供的map和reduce操作是輸入確定性函數,我們的分布式實現與無故障序列執行整個程序所生成的結果相同。
我們依靠map和reduce任務輸出的原子性提交來實現這個屬性。每個in-progress任務將它們的輸出寫入到一個私有的臨時文件中。一個reduce任務產生一個這樣的文件,一個map任務產生R個這樣的文件(每個reduce任務一個)。當一個map任務完成時,它將發送給master一個消息,其中包括R個臨時文件的名字。如果master收到一個已經完成的map任務的完成消息,則忽略這個消息。否則,它將這R個文件名記錄在master的數據結構中。
當一個reduce任務完成后,reduce的工作進程自動的將臨時文件更名為最終的輸出文件,如果相同的reduce任務運行在多臺機器上,會調用多個重命名操作將這些文件更名為最終的輸出文件。
絕大部分的map和reduce操作是確定性的,事實上,在這種情況下我們的語義與一個序列化的執行是相同的,這使程序開發者能夠簡單的推出他們程序的行為。當map和/或reduce操作是不確定性的時,我們提供較弱但依然合理的語義。在不確定性的操作面前,一個特定的reduce任務R1的輸出與一個序列執行的不確定性程序生成的輸出相同。然而,一個不同的reduce任務R2的輸出可能與一個不同的序列執行的不確定性程序生成的輸出可能一致。
考慮map任務M和reduce任務R1和R2。假定e(Ri)是提交的Ri的執行過程(有且僅有這樣一個過程)。e(R1)可能從M的一個執行生成的輸出中讀取數據,e(R2)可能從M的一個不同執行生成的輸出中讀取數據,則會產生較弱的語義。
3.4 位置
在我們的計算環境中,網絡帶寬是一個相對不足的資源。我們通過將輸入數據存放在組成集群的機器的本地磁盤來節省網絡帶寬。GFS將每個文件分割成64MB大小的塊,每個塊會在不同的機器上存儲幾個拷貝(通常為3個)。MapReduce master會考慮文件的位置信息,并試圖將一個map任務分配到包含相關輸入數據副本的機器上。如果這樣做失敗,它會試圖將map任務調度到一個包含任務輸入數據的臨近的機器上(例如,與包含輸入數據機器在同一個網絡下進行交互的一個工作進程)。當在集群的一個有效部分上運行大規模的MapReduce操作時,大多數輸入數據都從本地讀取,不消耗任何網絡帶寬。
3.5 任務粒度
根據上面所提到的,我們將map階段細分為M個片,將reduce階段細分為R個片。理想情況下,M和R應該比工作機器的數量大得多,每個工作進程執行很多不同的任務來促使負載均衡,在一個工作進程失效時也能夠快速的恢復:許多完成的map任務可以傳播到其它所有的工作機器上。
在我們的實現中,對于取多大的M和R有一個實際的界限,因為如上面提到的那樣,master必須進行O(M+R)次調度,在內存中保持O(M*R)個狀態。(對內存使用的恒定因素影響較小,然而:對由每個map任務/reduce任務對占用大約一個字節所組成的O(M*R)片的狀態影響較大。)
此外,R經常是由用戶約束的,因為每個reduce任務的輸出最終放在一個分開的輸出文件中。實際中,我們傾向選擇M值,以使每一個獨立的任務能夠處理大約16MB到64MB的輸入數據(可以使上面提到的位置優化有更好的效果),把R值設置為我們想使用的工作機器的一個小的倍數。我們經常使用2000個工作機器,設置M=200000和R=5000,來執行MapReduce計算。
3.6 備用任務
影響一個MapReduce操作整體執行時間的一個通常因素是“落后者”:一個使用了異常的時間完成了計算中最后幾個map任務或reduce任務中的一個的機器。可能有很多因素導致落后者的出現,例如,一個含有損壞磁盤的機器頻繁的處理可校正的錯誤,使它的讀取速度從30MB/s下降到了1MB/s。集群調度者可能將其它的任務分配到這個機器上,由于CPU、內存、磁盤或網絡帶寬的競爭會導致MapReduce代碼執行的更慢。我們遇到的最近一個問題是機器初始化代碼中的一個bug,它會使處理器的緩存不可用:受到這個問題影響的機器會慢上百倍。
我們使用一個普通的機制來緩解落后者問題。當一個MapReduce操作接近完成時,master調度備用(backup)任務執行剩下的、處于in-process狀態的任務。一旦主任務或是備用任務完成,則將這個任務標識為已經完成。我們優化了這個機制,使它通常能夠僅僅增加少量的操作所使用的計算資源。我們發現這能有效的減少完成大規模MapReduce操作所需要的時間。作為一個例子,5.3節所描述的那種程序在禁用備用任務機制的情況下,會需要多消耗44%的時間。
四、??????? 細化
盡管簡單的編寫Map和Reduce函數提供的基本功能足夠滿足大多數需要,但是,我們發現一些擴展是很有用的。這會在本章進行描述。
4.1 分區函數
MapReduce的用戶指定所希望的reduce任務/輸出文件的數量(R)。使用分區函數在中間鍵上將數據分區到這些任務上。一個默認的分區函數使用hash方法(如“hash(key) mod R”),它能產生相當平衡的分區。然而,在一些情況下,需要使用其它的在key上的分區函數對數據進行分區。為了支持這種情況,MapReduce庫的用戶能夠提供指定的分區函數。例如,使用“hash(Hostname(urlkey)) mod R”作為分區函數,使所有來自同一個host的URL最終放到同一個輸出文件中。
4.2 順序保證
我們保證在一個給定的分區內,中間key/value對是根據key值順序增量處理的。順序保證可以使它易于生成一個有序的輸出文件,這對于輸出文件需要支持有效的隨機訪問,或者輸出的用戶方便的查找排序的數據很有幫助。
4.3 組合(Combiner)函數
在一些情況下,每個map任務產生的中間key會有很多重復,并且用戶指定的reduce函數滿足結合律和交換律。2.1節中提到的單詞技術的例子就是一個很好的例子。因為單詞頻率傾向于zifp分布,每個map任務都會產生數百或數千個<the, 1>形式的記錄。所有這些計數都會通過網絡發送給一個單獨的reduce任務,然后通過Reduce函數進行累加并產生一個數字。我們允許用戶指定一個可選的Combiner函數,它能在數據通過網絡發送前先對這些數據進行局部合并。
Combiner函數在每臺執行map任務的機器上執行。通常情況下,combiner函數和reduce函數的代碼是相同的,兩者唯一不同的是MapReduce庫如何處理函數的輸出。Reduce函數的輸出被寫入到一個最終的輸出文件中,而combiner函數會寫入到一個將被發送給reduce函數的中間文件中。
局部合并可以有效的對某類MapReduce操作進行加速。附錄A包含了一個使用combiner函數的例子。
4.4 輸入和輸出類型
MapReduce庫支持幾種不同格式的輸入數據。比如,“text”模式的輸入可以講每一行看出一個key/value對:key是該行在文件中的偏移量,value是該行的內容。另一中常見的支持格式是根據key進行排序存儲一個key/value對的序列。每種輸入類型的實現知道如何將自己分割成對map任務處理有意義的區間(例如,text模式區間分割確保區間分割只在行的邊界進行)。用戶能夠通過實現一個簡單的讀取(reader)接口來增加支持一種新的輸入類型,盡管大多數用戶僅僅使用了預定義輸入類型中的一小部分。
Reader并不是必須從文件中讀取數據,比如,我們可以容易的定義一個從數據庫中讀取記錄,或者從內存的數據結構中讀取數據的Reader。
類似的,我們提供一組輸出類型來產生不同格式的數據,用戶也可以簡單的通過代碼增加對新輸出類型的支持。
4.5 副作用
在一些情況下,MapReduce的用戶發現為它們的map和/或reduce操作的輸出生成輔助的文件很方便。我們依靠應用的writer將這個副作用變成原子的和冪等的。通常,應用會將結果寫入到一個臨時文件,然后在數據完全生成后,原子的重命名這個文件。
如果一個單獨任務產生的多個輸出文件,我們沒有提供兩階段提交的原子操作。因此,產生多個輸出文件且對交叉文件有一致性需求的任務應該是確定性的操作。但是在實際工作中,這個限制并不是一個問題。
4.6 跳過損壞的記錄
有時,在我們的代碼中會存在一些bug,它們會導致Map或Reduce函數在處理特定的記錄上一定會Crash。這樣的bug會阻止MapReduce操作順利完成。通常的做法是解決這個bug,但有時,這是不可行的;可能是由于第三方的庫中的bug,而我們沒有這個庫的源碼。有時,忽略一些記錄也是可以接受的,例如,當在海量的數據集上做數據統計時。我們提供了一個可選的運行模式,MapReduce庫探測出哪些記錄會導致確定性的Crash,并跳過這些記錄以繼續執行這個程序。
每個工作進程都安裝了一個信號處理器,它能捕獲段錯誤和總線錯誤。在調用用戶的Map或Reduce操作之前,MapReduce庫將記錄的序號存儲到全局變量中。如果用戶代碼產生一個信號,這個信號處理器會向MapReudce master發送一個“臨死前”的UDP包,其中包含了這個序號。當master看到對于一個特定的記錄有多個失敗信號時,在相應的Map或Reduce任務下一次重新執行時,master會通知它跳過這個記錄。
4.7 本地執行
在Map或Reduce函數中調試問題是很棘手的,因為實際的計算是發生在一個分布式系統上的,通常有幾千臺機器,并且是由master動態分配的。為了有助于調試、性能分析和小規模測試,我們開發了一個MapReduce庫可供選擇的實現,它將在本地機器上序列化的執行一個MapReduce的所有工作。這為用戶提供了對MapReduce操作的控制,使計算能被限制在一個特定的map任務上。用戶使用標記調用他們的程序,并能夠簡單的使用它們找到的任何調試或測試工具(如,gdb)。
4.8 狀態信息
Master運行了一個內部的HTTP服務,并顯示出狀態集頁面供人們查看,如,有多少任務已經完成、有多少正在處理、輸入的字節數、中間數據的字節數、輸出的字節數、處理速率等。這些頁面也包含了指向每個任務生成的標準錯誤和標準輸出文件的鏈接。用戶能使用這些數據預測這個計算將要持續多長時間,以及是否應該向這個計算添加更多的資源。這些頁面也有助于找出計算比預期執行慢的多的原因。
此外,頂層的狀態頁顯示了哪些工作進程失效,哪些map和reduce任務在處理時失敗。這個信息對試圖診斷出用戶代碼中的bug很有用。
4.9 計數器
MapReduce庫提供了一個計數器,用于統計不同事件的發生次數。比如,用戶代碼想要統計已經處理了多少單詞,或者已經對多少德國的文檔建立了索引等。
用戶代碼可以使用這個計數器創建一個命名的計數器對象,然后在Map和/或Reduce函數中適當的增加這個計數器的計數。例如:
?
獨立的工作機器的計數器值周期性的傳送到master(附在ping的響應上)master將從成功的map和reduce任務上獲取的計數器值進行匯總,當MapReduce操作完成時,將它們返回給用戶的代碼。當前的計數器值也被顯示在了master的狀態頁面上,使人們能夠看到當前計算的進度。當匯總計數器值時,master通過去掉同一個map或reduce任務的多次執行所造成的影響來防止重復計數。(重復執行可能會在我們使用備用任務和重新執行失敗的任務時出現。)
一些計數器的值是由MapReduce庫自動維護的,如已處理的輸入key/value對的數量和已生成的輸出key/value對的數量。
用戶發現計數器對檢查MapReduce操作的行為很有用處。例如,在一些MapReduce操作中,用戶代碼可能想要確保生成的輸出對的數量是否精確的等于已處理的輸入對的數量,或者已處理的德國的文檔數量在已處理的所有文檔數量中是否被容忍。
五、??????? 性能
在這章中,我們測試兩個運行在一個大規模集群上的MapReduce計算的性能。一個計算在大約1TB的數據中進行特定的模式匹配,另一個計算對大約1TB的數據進行排序。
這兩個程序能夠代表實際中大量的由用戶編寫的MapReduce程序,一類程序將數據從一種表示方式轉換成另一種形式;另一類程序是從海里的數據集中抽取一小部分感興趣的數據。
5.1 集群配置
所有的程序運行在一個由將近1800臺機器組成的集群上。每個機器有兩個2GHz、支持超線程的Intel Xeon處理器、4GB的內存、兩個160GB的IDE磁盤和一個1Gbps的以太網鏈路,這些機器部署在一個兩層的樹狀交換網絡中,在根節點處有大約100-200Gbps的帶寬。所有的機器都采用相同的部署,因此任意兩個機器間的RTT都小于1ms。
在4GB內存里,有接近1-1.5GB用于運行在集群上的其它任務。程序在一個周末的下午開始執行,這時主機的CPU、磁盤和網絡基本都是空閑的。
5.2 字符串查找(Grep)
這個grep程序掃描了大概1010個100字節大小的記錄,查找出現概率相對較小的3個字符的模式(這個模式出現在92337個記錄中)。輸入被分割成接近64MB的片(M=15000),整個輸出被放到一個文件中(R=1)。
?
圖2:數據傳輸速率
圖2顯示了計算隨時間的進展情況。Y軸顯示了輸入數據的掃描速率,這個速率會隨著MapReduce計算的機器數量的增長而增長,當1764個工作進程參與計算時,總的速率超過30GB/s。隨著map任務的完成,速率開始下降,并在計算的大約第80秒變為0,整個計算從開始到結束大約持續了150秒,這包含了大約1分鐘的啟動時間開銷,這個開銷是由將程序傳播到所有工作機器的時間、等待GFS文件系統打開1000個輸入文件集的時間和獲取位置優化所需信息的時間造成的。
5.3 排序
排序程序對1010個100字節大小的記錄(接近1TB的數據)進行排序,這個程序模仿了TeraSort benchmark。
排序程序由不到50行的用戶代碼組成,一個三行的Map函數從一個文本行中抽取出一個10字節的key,并將這個key和原始的文本行作為中間的key/value對進行輸出。我們使用內置的Identity函數作為Reduce操作。這個函數將中間key/value對不做任何修改的輸出,最終排序結果輸出到兩路復制的GFS文件中(如,該程序輸出了2TB的數據)。
如前所述,輸入數據被分割為64MB大小的片(M=15000),將輸出結果分成4000個文件(R=4000)。分區函數使用了key的開頭字符將數據分隔到R片中的一個。
這個基準測試的分區函數內置了key的分區信息。在一個普通的排序程序中,我們將增加一個預處理MapReduce操作,它能夠對key進行抽樣,通過key的抽樣分布來計算最終排序處理的分割點。
?
圖3:對于排序程序的不同執行過程隨時間的數據傳輸速率
圖3(a)顯示了排序程序的正常執行過程。左上方的圖顯示了輸入讀取的速率,這個速率峰值大約為13GB/s,因為所有的map任務執行完成,速率也在200秒前下降到了0。注意,這里的輸入速率比字符串查找的要小,這是因為排序程序的map任務花費了大約一半的處理時間和I/O帶寬將終結結果輸出到它們的本地磁盤上,字符串查找相應的中間結果輸出幾乎可以忽略。
左邊中間的圖顯示了數據通過網絡從map任務發往reduce任務的速率。這個緩慢的數據移動在第一個map任務完成時會盡快開始。圖中的第一個峰值是啟動了第一批大概1700個reduce任務(整個MapReduce被分配到大約1700臺機器上,每個機器每次最多只執行一個reduce任務)。這個計算執行大概300秒后,第一批reduce任務中的一些執行完成,我們開始執行剩下的reduce任務進行數據處理。所有的處理在計算開始后的大約600秒后完成。
左邊下方的圖顯示了reduce任務就愛那個排序后的數據寫到最終的輸出文件的速率。在第一個處理周期完成到寫入周期開始間有一個延遲,因為機器正在忙于對中間數據進行排序。寫入的速率會在2-4GB/s上持續一段時間。所有的寫操作會在計算開始后的大約850秒后完成。包括啟動的開銷,整個計算耗時891秒,這與TeraSort benchmark中的最好記錄1057秒相似。
一些事情需要注意:因為我們的位置優化策略,大多數數據從本地磁盤中讀取,繞開了網絡帶寬的顯示,所以輸入速率比處理速率和輸出速率要高。處理速率要高于輸出速率,因為輸出過程要將排序后的數據寫入到兩個拷貝中(為了可靠性和可用性,我們將數據寫入到兩個副本中)。我們將數據寫入兩個副本,因為我們的底層文件系統為了可靠性和可用性提供了相應的機制。如果底層文件系統使用容錯編碼(erasure coding)而不是復制,寫數據的網絡帶寬需求會降低。
5.4 備用任務的作用
在圖3(b)中,我們顯示了一個禁用備用任務的排序程序的執行過程。執行的流程與如3(a)中所顯示的相似,除了有一個很長的尾巴,在這期間幾乎沒有寫入行為發生。在960秒后,除了5個reduce任務的所有任務都執行完成。然而,這些落后者只到300秒后才執行完成。整個計算任務耗時1283秒,增加了大約44%的時間。
5.5 機器故障
在圖3(c)中,我們顯示了一個排序程序的執行過程,在計算過程開始都的幾分鐘后,我們故意kill掉了1746個工作進程中的200個。底層的調度者會迅速在這些機器上重啟新的工作進程(因為只有進程被殺掉,機器本身運行正常)。
工作進程死掉會出現負的輸入速率,因為一些之前已經完成的map工作消失了(因為香港的map工作進程被kill掉了),并且需要重新執行。這個map任務會相當快的重新執行。整個計算過程在933秒后完成,包括了啟動開銷(僅僅比普通情況多花費了5%的時間)。
六、??????? 經驗
我們在2003年2月完成了MapReduce庫的第一個版本,并在2003年8月做了重大的改進,包括位置優化、任務在工作機器上的動態負載均衡執行等。從那時起,我們驚喜的發現,MapReduce庫能夠廣泛的用于我們工作中的各種問題。它已經被用于Google內部廣泛的領域,包括:
- 大規模機器學習問題
- Google新聞和Froogle產品的集群問題
- 抽取數據用于公眾查詢的產品報告
- 從大量新應用和新產品的網頁中抽取特性(如,從大量的位置查詢頁面中抽取地理位置信息)
- 大規模圖形計算
?
圖4:隨時間變化的MapReduce實例
圖4中顯示了在我們的源碼管理系統中,隨著時間的推移,MapReduce程序的數量有明顯的增加,從2003年早期的0增加到2004年9月時的900個獨立的實例。MapReduce如此的成功,因為它使利用半個小時編寫的一個簡單程序能夠高效的運行在一千臺機器上成為可能,這極大的加快了開發和原型設計的周期。此外,它允許沒有分布式和/或并行系統經驗的開發者能夠利用這些資源開發出分布式應用。
?
表1: 2004年8月運行的MapReduce任務
在每個工作的最后,MapReduce庫統計了工作使用的計算資源。在表1中,我們看到一些2004年8月在Google內部運行的MapReduce工作的一些統計數據。
6.1 大規模索引
目前為止,MapReduce最重要的應用之一就是完成了對生產索引系統的重寫,它生成了用于Google網頁搜索服務的數據結構。索引系統的輸入數據是通過我們的爬取系統檢索到的海量文檔,存儲為就一個GFS文件集合。這些文件的原始內容還有超過20TB的數據。索引程序是一個包含了5-10個MapReduce操作的序列。使用MapReduce(代替了之前版本的索引系統中的adhoc分布式處理)有幾個優點:
- 索引程序代碼是一個簡單、短小、易于理解的代碼,因為容錯、分布式和并行處理都隱藏在了MapReduce庫中。比如,一個計算程序的大小由接近3800行的C++代碼減少到使用MapReduce的大約700行的代碼。
- MapReduce庫性能非常好,以至于能夠將概念上不相關的計算分開,來代替將這些計算混合在一起進行,避免額外的數據處理。這會使索引程序易于改變。比如,對之前的索引系統做一個改動大概需要幾個月時間,而對新的系統則只需要幾天時間。
- 索引程序變得更易于操作,因為大多數由于機器故障、機器處理速度慢和網絡的瞬間阻塞等引起的問題都被MapReduce庫自動的處理掉,而無需人為的介入。
七、??????? 相關工作
許多系統都提供了有限的程序模型,并且對自動的并行計算使用了限制。比如,一個結合函數可以在logN時間內在N個處理器上對一個包含N個元素的數組使用并行前綴計算,來獲取所有的前綴[6,9,13]。MapReduce被認為是這些模型中基于我們對大規模工作計算的經驗的簡化和精華。更為重要的是,我們提供了一個在數千個處理器上的容錯實現。相反的,大多數并行處理系統只在較小規模下實現,并將機器故障的處理細節交給了程序開發者。
Bulk Synchronous Programming和一些MPI源于提供了更高層次的抽象使它更易于讓開發者編寫并行程序。這些系統和MapReduce的一個關鍵不同點是MapReduce開發了一個有限的程序模型來自動的并行執行用戶的程序,并提供了透明的容錯機制。
我們的位置優化機制的靈感來自于移動磁盤技術,計算用于處理靠近本地磁盤的數據,減少數據在I/O子系統或網絡上傳輸的次數。我們的系統運行在掛載幾個磁盤的普通機器上,而不是在磁盤處理器上運行,但是一般方法是類似的。
我們的備用任務機制與Charlotte系統中采用的eager調度機制類似。簡單的Eager調度機制有一個缺點,如果一個給定的任務造成反復的失敗,整個計算將以失敗告終。我們通過跳過損壞計算路的機制,解決了這個問題的一些情況。
MapReduce實現依賴了內部集群管理系統,它負責在一個大規模的共享機器集合中分發和運行用戶的任務。盡管不是本篇文章的焦點,但是集群管理系統在本質上與像Condor的其它系統類似。
排序功能是MapReduce庫的一部分,與NOW-Sort中的操作類似。源機器(map工作進程)將將要排序的數據分區,并將其發送給R個Reduce工作進程中的一個。每個reduce工作進程在本地對這些數據進行排序(如果可能的話就在內存中進行)。當然NOW-Sort沒有使MapReduce庫能夠廣泛使用的用戶定義的Map和Reduce函數。
River提供了一個編程模型,處理進程通過在分布式隊列上發送數據來進行通信。像MapReduce一樣,即使在不均勻的硬件或系統顛簸的情況下,River系統依然試圖提供較好的平均性能。River系統通過小心的磁盤和網絡傳輸調度來平衡完成時間。通過限制編程模型,MapReduce框架能夠將問題分解成很多細顆粒的任務,這些任務在可用的工作進程上動態的調度,以至于越快的工作進程處理越多的任務。這個受限制的編程模型也允許我們在工作將要結束時調度冗余的任務進行處理,這樣可以減少不均勻情況下的完成時間。
BAD-FS與MapReduce有完全不同的編程模型,不像MapReduce,它是用于在廣域網下執行工作的。然而,它們有兩個基本相似點。(1)兩個系統都使用了重新執行的方式來處理因故障而丟失的數據。(2)兩個系統都本地有限調度原則來減少網絡鏈路上發送數據的次數。
TASCC是一個用于簡化結構的高可用性的網絡服務。像MapReduce一樣,它依靠重新執行作為一個容錯機制。
八、??????? 總結
MapReduce編程模型已經成功的應用在Google內部的許多不同的產品上。我們將這個成功歸功于幾個原因。第一,模型很易用,即使對那些沒有并行計算和分布式系統經驗的開發者,因為它隱藏了并行處理、容錯、本地優化和負載均衡這些處理過程。第二,各種各樣的問題都能用MapReduce計算簡單的表示出來,例如,MapReduce被Google網頁搜索服務用于生成數據、排序、數據挖掘、機器學習和許多其它系統。第三,我們已經實現了擴展到由數千臺機器組成的大規模集群上使用的MapReduce。這個實現能夠有效的利用這些機器自由,因此適合在Google內部遇到的很多海量計算問題。
我們從這項工作中學到了幾樣東西。第一,限制程序模型使得并行計算和分布式計算變得容易,也容易實現這樣的計算容錯。第二,網絡帶寬是一個稀有的資源,因此我們系統中的很多優化的目標都是為了減少數據在網絡上的傳輸次數:位置優化允許我們從本地磁盤讀取數據,并將中間數據的一個拷貝寫入到本地磁盤,以此來節省網絡帶寬的使用。第三,冗余執行能夠用于減少允許速度慢的機器所造成的影響,并且能夠處理機器故障和數據丟失。