目錄
🌹前言
🦅2? Programming Model
🌼2.1? Example
🌼2.2? Types
🌼2.3? More Examples
🦅3? Implementation(實現)
🌼3.1 ~ 3.3
🌼3.4 ~ 3.6
🦅4? Refinements(改進)
🌼4.1 ~ 4.5
🌼4.6 ~ 4.9
🦅5 Performance
🦅6 Experience
🌹前言
介紹:nil.csail.mit.edu/6.824/2021/notes/l01.txt
論文:rfeet.qrk (mit.edu)
視頻:Lecture 1: Introduction (youtube.com)
中文視頻:Lecture 1- Introduction_嗶哩嗶哩_bilibili
付費視頻:simviso-開源分享,傳播知識 (simtoco.com)
作業提交:6.824 Lab 1: MapReduce (mit.edu)
先看論文,再看視頻,最后做實驗
🦅2? Programming Model
🌼2.1? Example
0)偽代碼
1)map
// MapReduce 庫中的 Map 函數
void map(const string& key, const string& value) {// key: document name// value: document content// ("document1", "hello world hello")istringstream iss(value); // 字符串流, 從文本提取單詞string word;while (iss >> word)// 輸出鍵值對cout << "EmitIntermediate(" << word << ", \"1\")" << endl;
}
EmitIntermediate(hello, "1")
EmitIntermediate(world, "1")
EmitIntermediate(hello, "1")
2)reduce
// MapReduce 庫中的 Reduce 函數
void reduce(const string& key, const vector<string>& values) {// key: a word// value: a list of counts// ("hello", {"1", "1"})int result = 0; // 單詞出現次數for (size_t i = 0; i < values.size(); ++i)// atoi("..."): "+211" --> 211// 字符串轉 int// c_str 轉 C風格字符串result += atoi(values[i].c_str());// 模擬發出最終結果cout << "Emit(" << key << ", " << result << ")" << endl;
}
Emit(hello, 2)
🌼2.2? Types
輸入的鍵 / 值(原始數據集)?&& 輸出的鍵 / 值?———— 不同域
中間的鍵 / 值(Map處理后生成) && 輸出的鍵 / 值(Reduce處理后) ———— 相同域
"document1", "hello world hello" // 輸入的鍵/值"hello", "1" // 中間的鍵/值
"world", "1"
"hello", "1""hello", "2" // 輸出的鍵/值
Map函數:處理輸入數據,生成中間鍵值對。中間鍵值對被 MapReduce框架收集并排序,傳遞給 Reduce 函數
Reduce函數:接受中間鍵值對,以及與之關聯的的中間值列表,然后對這些值進行聚合操作,生成輸出鍵值對
上述結構,允許 MapReduce 框架處理大規模數據集,通過分布式并行處理數據,提高數據處理效率和可擴展性
總而言之,字符串傳遞給用戶所定義的函數,用戶所定義的函數負責將字符串轉化成合適的類型
🌼2.3? More Examples
1)用戶程序 User Program:啟動 MapReduce 的程序,定義了 Map 和 Reduce 函數。
2)主節點 Master:將輸入數據拆分成多個片段(splits),并將這些 splits 分配給不同的工作節點(workers)。
3)工作節點 workers:實際數據處理的節點。分兩種,執行 Map 函數 || 執行 Reduce 函數。每個工作節點從主節點接受任務,并在本地執行。
4)輸入文件 Input files:存儲在分布式系統 GFS。
5)Map 階段 Map phase:workers 執行 Map 函數,根據輸入數據生成中間鍵值對(Intermediate key/value pairs)。中間鍵值對會被寫入本地磁盤,并按鍵(key)分區。
6)本地寫入 Local write:Map 工作節點將中間數據寫入本地磁盤。
7)遠程讀取 Remote read:Reduce 工作節點通過網絡從 Map 工作節點的本地磁盤中讀取中間數據。
8)Reduce 階段:Reduce 工作節點執行 Reduce 函數,處理相同鍵的所有值,并將結果寫入輸出文件。
🦅3? Implementation(實現)
🌼3.1 ~ 3.3
3.1?Execution Overview?
1,對中間數據排序,是為了讓所有具有相同鍵的值都被聚合到一起。
2,workers 和 Master 都是用戶程序啟動的多個副本。
3,Map函數產生的中間鍵/值對在內存中緩沖,這些 緩沖對 在本地磁盤上的位置被傳回給主節點,主節點負責將這些位置轉發給Reduce工作者
4,當所有的Map任務和Reduce任務都完成后,主節點喚醒用戶程序
3.2?Master Data Structures?
1,主節點維護著多個數據結構。對于每個Map任務和Reduce任務,它存儲著狀態(空閑、進行中或已完成)?
2,主節點是中間文件區域位置從 Map 任務傳播到 Reduce 任務的渠道
3,因此,對于每個完成的 Map 任務,主節點存儲著該 Map 任務產生的 R 個中間文件的位置和大小。
隨著 Map 任務的完成,這些位置和大小信息會更新。這些信息會逐漸推送給正在進行Reduce任務的工作者。
3.3 Fault Tolerance
工作者故障:
(1)主節點定期對每個工作者進行?ping?操作。如果在一定時間內沒有收到工作者的響應,主節點將該工作者標記為失敗。
(2)由該工作者完成的任何 Map 任務都會重置回它們的初始空閑狀態,因此有資格在其他工作者上調度。
(3)類似地,任何在失敗的工作者上進行中的Map任務或Reduce任務也會被重置為空閑,并有資格重新調度。
(4)由于完成的Map任務的輸出存儲在失敗機器的本地磁盤上,因此無法訪問,所以失敗時需要重新執行。由于完成的Reduce任務的輸出存儲在全局文件系統中,因此不需要重新執行。?
(5)主節點需要定期寫入 checkpoints,以便它 dead 后,可以恢復到最后一個 checkpoint 的狀態
Semantics in the Presence of Failures
故障下的語義👇
1,確定性函數和輸出一致性
MapReduce 中 Map函數 和 Reduce函數是確定的話,意味著,相同的輸入,總是產生相同的輸出
那么,無論是否發生故障,最終的輸出都一樣
2,原子提交和臨時文件
(1)“原子”:操作要么完全完成,要么不發生,不會處于中間狀態。
(2)MapRedeuce中,以原子提交的方式來保存任務的輸出👇
任務的輸出數據,首先被寫入一個臨時文件,然后一次性將該臨時文件重命名為最終輸出文件
這個重命名操作是原子的(確保的輸出文件要么完全不可見,要么完全可見,不會出現輸出文件部分更新的情況)
(3) MapReduce 中,每個任務的輸出受限被寫入一個臨時文件(類似草稿紙),在任務完成得最終輸出之前,現在 “草稿紙” 打草稿,可以防止數據丟失。
🌼3.4 ~ 3.6
3.4 本地性(Locality):
1)MapReduce 利用 GFS(Goole File System)系統存儲數據。
2)GFS系統將大文件切分成一小塊一小塊,每塊約 64 MB 大小,并且會在不同機器上保存這些數據塊的副本。
3)任務開始時,主節點優先將任務分配給?已經存儲了數據塊?的機器,如果沒有已經存儲的數據塊的機器,主節點就會將任務優先分配給,?離數據塊存儲位置比較近?的機器。
4)👆得益于上述策略,數據一般在本地機器讀取,所以不需要通過網絡來?傳輸數據?,節省了網絡帶寬。
5)本地性,即,盡可能利用本地資源來處理數據,以減少對網絡帶寬的需求,提高數據處理效率。
3.5 任務粒度(Task Granularity)
MapReduce 中任務粒度,由兩個參數決定
任務粒度過大 -- 某些機器很快完成任務變空閑,而其他機器還在忙碌,造成資源浪費
任務粒度過小 --
a. 任務分配更均勻,提高整體效率
b. 某臺機器失敗后,任務粒度較小,處理的任務量較小,所以可以迅速在其他機器重啟
c. 可能導致任務調度開銷增加
3.6 備份任務(Backup Tasks):
1)分布式計算環境中,由于 機器故障,資源競爭等原因,某些任務執行非常慢,這些執行緩慢的任務稱為 “落后者”。
2)所以 MapReduce 引入了備份任務機制。
3)功能:
a. 識別落后者:執行時間超出平均時間,確定為 “落后者”
b. 調度備份任務:一旦識別出 “落后者”,主節點為 “落后者” 進行備份(除了原本正在執行的任務,相同的任務會在另一臺機器執行)
c. 資源利用:開銷不大,1000臺機器,只需額外的幾臺機器執行備份
d. 任務完成:原始 OR 備份任務,一個完成 == 完成
e. 👆以上機制,顯著減少了 “落后者” 導致的整個任務的拖延,使得大型 MapReduce 操作完成時間減少到 50%
f. 容錯性:備份任務的存在,避免了單點故障導致的完全失敗
🦅4? Refinements(改進)
🌼4.1 ~ 4.5
4.1? Partitioning Function(分區函數)
1)分區函數:
用來決定如何將 Map 階段產生的 中間鍵值對(Intermediate key/value pairs) 分配給不同的 Reduce 任務。
這個過程稱為 分區(Partitioning)(將大量數據切分成更小的塊,以便并行處理)
2)作用:
a. 負載均衡:使得 Reduce 任務獲得大致相同數量的數據,從而避免某些任務過載而其他任務空閑的情況。
b. 數據局部性:合理的分區策略,使得數據在 物理上 更接近于處理它的任務,減少數據在網絡中的傳輸。
3)默認分區函數:
默認分區函數基于哈希。
比如,使用 hash(key) mod R 策略,hash(key) 是鍵的哈希值,R 是Reduce任務的數量
4.3 Combiner Function
reduce函數的輸出被寫入最終的輸出文件
combiner函數的輸出被寫入一個中間文件,該文件將被發送到reduce任務
4.4?Input and Output Types
- 支持多種格式
MapReduce庫支持多種輸入數據格式,例如“文本”模式,其中每行作為一個鍵/值對,鍵是文件中的偏移量,值是行的內容- 輸入分割
每種輸入類型知道如何將自己分割成有意義的范圍,以便作為單獨的Map任務處理,例如文本模式確保僅在行邊界分割
4.5 Side-effects
1)確定性:任務的輸出只和它的輸入有關,和執行順序和外部環境無關
2)原子性:先將所有數據寫入臨時文件,成功后,原子性的將臨時文件命名為最終輸出文件,保證任務過程不會留下損壞的文件
3)冪等性:任務重新執行(多次執行),輸出文件一致
🌼4.6 ~ 4.9
4.6 Skipping Bad Records
1)MapReduce庫檢測哪些記錄導致確定性崩潰,并跳過這些記錄以取得進展
2)每個工作進程安裝了一個信號處理程序,用于捕獲段錯誤和總線錯誤
3)Map或Reduce操作之前,MapReduce庫將參數的序列號存儲在全局變量
4)用戶代碼生成信號,信號處理程序,發送一個包含序列號的“臨終”UDP數據包,到MapReduce 主節點5)主節點記錄多次失敗后,下次 Map 或 Reduce 任務就會跳過該節點
4.7 Local Execution?
1)Map 或 Reduce 函數很難調試,因為分布式通常有幾千臺機器
2)所以我們通過一個特殊標志,可以直接使用 gdb 等調試/測試工具
4.8 Status Information?
1)主節點運行一個HTTP服務器,導出一組頁面讓用戶使用
2)頁面顯示:已完成任務,進行中任務,輸入字節數,中間數據字節數,輸出字節數,處理速率等信息(以便增加資源 或 優化代碼)
3)還顯示了:失敗工作節點,失敗時執行的任務(以便調試代碼)
4.9?Counters
1)創建計數器對象,用于統計各種事件的發生次數
2)各個工作節點上的計數器值會定期發送回主節點,并由主節點聚合這些值,避免重復計數,并在作業完成后返回給用戶代碼
// 聲明一個計數器對象指向uppercase
Counter* uppercase;
// 通過GetCounter函數獲取名為"uppercase"的計數器
uppercase = GetCounter("uppercase");
// 定義Map函數處理輸入
map(String name, String contents): // 遍歷輸入內容中的每個單詞for each word w in contents: // 如果單詞是大寫開頭的if (IsCapitalized(w)): // 對應的計數器increment操作,增加計數uppercase->Increment(); // 調用EmitIntermediate函數,發出中間鍵值對EmitIntermediate(w, "1");
🦅5 Performance
?(1)
排序?sort 任務需要將中間輸出寫入本地磁盤是因為Map任務生成的數據量很大,需要在本地磁盤上進行聚合和排序,以便于后續的Reduce任務處理
(2)
而 grep 任務由于其輸出數據量小,可以直接在內存中處理或通過網絡傳輸給Reduce任務,無需額外的磁盤I/O操作
(3)
備份是為了解決“落后者”(主節點識別“落后者”,并為它備份--相同任務在另一臺機器執行,避免了單點故障導致的完全失敗)
(4)
備份后,即使在有意引入機器故障的情況下,MapReduce程序也能夠有效地恢復并完成執行,Figure 3(c)就是,即使 kill 了 200 個進程,重新執行這些 Map 任務也很快,只比正常執行時間多了 5%
🦅6 Experience
(1)應用領域
- 大規模機器學習問題
- Google新聞和Froogle產品的數據聚類問題
- 提取用于生成熱門查詢報告(例如Google Zeitgeist)的數據
- 提取新實驗和產品使用的網頁屬性(例如從大量網頁語料庫中提取地理位置以進行本地化搜索)
- 大規模圖計算
(2)成果
MapReduce之所以如此成功,是因為它允許開發人員在半小時內在一千臺機器上簡單編寫并高效運行程序,大大加快了開發和原型設計周期
(3)大規模索引(Large-Scale Indexing)
????????a. 因為處理容錯、分布和并行化的代碼隱藏在 MapReduce 庫中,當使用MapReduce表達時,從?3800 行的 C++ 代碼減少到 700 行
? ? ? ? b.?大多數由機器故障、慢機器和網絡問題引起的問題都由MapReduce庫自動處理,無需操作員干預,極大提高了索引過程的性能