MIT 6.5840 (Spring, 2024) – Lab 1: MapReduce
👨?💻 Charles
🔗 實驗手冊: 6.5840 Lab 1: MapReduce
📃 MapReduce 論文原文: mapreduce-osdi04.pdf
?? 本系列前文: MIT 6.5840 (Spring, 2024) 通關指南——入門篇
文章目錄
- MIT 6.5840 (Spring, 2024) -- Lab 1: MapReduce
- 代碼理解
- baseline:串行實現
- todo:并行實現
- `coordinator.go`
- `worker.go`
- 代碼實現
- `coordinator.go`
- 初始化
- 處理 Map 任務
- 處理 Reduce 任務
- 監控任務情況(防超時)
- `rpc.go`
- worker 獲取 Map 任務
- worker 提交 Map 結果
- worker 獲取 Reduce 任務
- worker 提交 Reduce 結果
- `worker.go`
- 初始化
- 處理 Map 任務
- 處理 Reduce 任務
- 實驗結果
- 踩坑記錄/建議
代碼理解
baseline:串行實現
首先,看看 Lab 中已給出的一個串行版 MapReduce —— src/main/mrsequential.go
,這是我們后續自己實現并行版本的重要參考。在 mrsequential.go
中,有 mapf
和 reducef
兩個組件,分別對應 Map 任務和 Reduce 任務:
mapf, reducef := loadPlugin(os.Args[1])
可以看到,它們是通過插件的形式導入的, loadPlugin
的實現在 mrsequential.go
中,利用了golang的 plugin
庫,所以我們可以看到實驗手冊運行 mrsequential.go
之前先運行了:
go build -buildmode=plugin ../mrapps/wc.go
即將 wc.go
編譯為 wc.so
(動態加載共享庫),之后運行 mrsequential.go
的時候就可以這樣使用 wc.go
中的各種方法:
go run mrsequential.go wc.so pg*.txt
Anyway,其實就是說,Map 和 Reduce 的實現要到 src/mrapps/wc.go
中去找。源代碼也挺簡單的,實現方法為:
-
Split:以非字母符號為分隔符,將輸入文件拆分為若干個單詞,存到切片
words
中 -
Map:順序處理
words
的單詞,對于每個單詞w
,構建一個鍵值對{w, 1}
,將這個鍵值對存到一個切片kva
中 -
Reduce:統計
kva
中,每個單詞的個數(在kva
排序后,相同的單詞挨在一起,把它們放到新切片values := []string{}
中,Reduce其實就是返回len(values)
)
todo:并行實現
本實驗主要需要在已提供的代碼基礎上,完善 mr/coordinator.go
、 mr/worker.go
、 mr/rpc.go
。為了實現單 coordinator、多 worker 的并行架構,coordinator 需要負責給各 worker 分配 Map 任務和 Reduce 任務,并監控 worker 的工作情況、在發生超時的時候將其任務重新分配給其他 worker;同時,每個 worker 需要通過 RPC 調用 coordinator 的 Map 方法和 Reduce 方法,并保存相關結果、告知 coordinator 完成情況。
coordinator.go
我們需要實現的并行版 MapReduce 的主程序在 src/main/mrcoordinator.go
中,它負責調用 MakeCoordinator
構建 coordinator
(任務分發者,相當于server)——這是在 src/mr/coordinator.go
中實現的,這個文件中已經聲明/提示了我們 需要補全 的若干方法(見注釋)。
coordinator
啟動后,會通過 server()
方法創建一個 goroutine 來監聽 src/mr/worker.go
的 RPC 調用請求:
//
// start a thread that listens for RPCs from worker.go
//
func (c *Coordinator) server() {rpc.Register(c)rpc.HandleHTTP()//l, e := net.Listen("tcp", ":1234")sockname := coordinatorSock()os.Remove(sockname)l, e := net.Listen("unix", sockname)if e != nil {log.Fatal("listen error:", e)}go http.Serve(l, nil)
}
關于RPC的使用方法, worker.go
和 coordinator.go
中都有示例函數,所用的相關參數/方法定義在 src/mr/rpc.go
中。
建議先在現有代碼上嘗試 RPC 調用示例函數(
worker.go
里面有個CallExample()
基本可以直接用),從而熟悉代碼框架。關于 RPC 以及 golang 中如何使用 RPC,建議逢山開路,遇到不懂的就問 AI 🤖
worker.go
worker.go
即 map 和 reduce 任務的執行者,需要補全 Worker
方法:
// main/mrworker.go calls this function.
func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {...}
關鍵在于處理和 coordinator 的通信(需要通過 RPC 調用,獲取任務、執行任務)。
代碼實現
完整代碼: MIT-6.5840/src/mr at lab1 · Charles-T-T/MIT-6.5840
coordinator.go
Coordinator
結構體定義如下:
type Coordinator struct {mu sync.RWMutexnMap intnReduce inttoMapTasks chan MapTasktoReduceTasks chan ReduceTaskremainMapTask map[string]string // filename -> workerIDremainReduceTask map[string]string // reduceID -> workerIDworkerRegistry map[string]string // workerID -> workerAddrallMapDone boolallReduceDone bool
}
-
mu
:讀寫鎖,用于防止多個worker訪問同一個coordinator成員出現沖突 -
workerRegistry
:記錄已經注冊了的worker——只有已注冊的worker提交的Map或Reduce結果才會被接受(防止收到超時worker的任務結果——已被重新分配了)原本設計的是workerID ?? workerAddr(worker 的 sock 地址)的一個哈希表,但是后續實現中發現維護 coordinator 和 worker 的雙向通信似乎沒必要,故這里僅當作一個集合使用。
-
nMap
和nReduce
:需要執行的Map和Reduce任務總數 -
其余成員變量作用易從其名稱得出
初始化
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
func MakeCoordinator(files []string, nReduce int) *Coordinator {c := Coordinator{nMap: len(files),nReduce: nReduce,toMapTasks: make(chan MapTask, len(files)),toReduceTasks: make(chan ReduceTask, nReduce),remainMapTask: make(map[string]string),remainReduceTask: make(map[string]string),workerRegistry: make(map[string]string),allMapDone: false,allReduceDone: false,}...
處理 Map 任務
初始化后,啟動一個 goroutine 來處理 Map 任務:
// Manage Map tasks
go func() {// Init todo Map tasksfor i, file := range files {mapTask := MapTask{Filename: file,MapID: strconv.Itoa(i),NMap: c.nMap,NReduce: c.nReduce,}c.toMapTasks <- mapTaskDPrintf("Get todo-file: %s\n", file)c.remainMapTask[file] = "init"}// Wait all Map tasks to be donefor len(c.remainMapTask) > 0 {time.Sleep(time.Second)}close(c.toMapTasks)c.allMapDone = trueDPrintf("All map tasks done.\n")
}()
處理 Reduce 任務
啟動另一個 goroutine 來處理 Reduce 任務:
// Manage Reduce tasks
go func() {// output files for reduce resultsfor i := 0; i < nReduce; i++ {c.toReduceTasks <- ReduceTask{ReduceID: strconv.Itoa(i)}c.remainReduceTask[strconv.Itoa(i)] = "init"}// Wait all Map tasks to be donefor !c.allMapDone {time.Sleep(time.Second)}// Wait all Reduce tasks to be donefor len(c.remainReduceTask) > 0 {time.Sleep(time.Second)}close(c.toReduceTasks)c.allReduceDone = trueDPrintf("All reduce tasks done.\n")
}()
監控任務情況(防超時)
每次 worker 開始一個任務后,coordinator 就會啟動一個 goroutine ——如果 10s(實驗手冊建議的超時時間)后任務仍未完成則視為超時,需要將該任務放回 todo-channel 中,等待其他 worker 認領:
// Monitor a Map task, reassign it if time out.
func (c *Coordinator) monitorMapTask(file string, mapID string) {time.Sleep(time.Second * 10) // wait for 10sworkerID, exist := c.remainMapTask[file]if exist {c.mu.Lock()delete(c.workerRegistry, workerID)DPrintf("Map job by %s time out!\n", workerID)c.mu.Unlock()c.toMapTasks <- MapTask{Filename: file, MapID: mapID, NMap: c.nMap, NReduce: c.nReduce}}
}// Monitor a Reduce task, reassign it if time out.
func (c *Coordinator) monitorReduceTask(reduceID string) {time.Sleep(time.Second * 10) // wait for 10sworkerID, exist := c.remainReduceTask[reduceID]if exist {c.mu.Lock()delete(c.workerRegistry, workerID)DPrintf("Reduce job by %s time out!\n", workerID)c.mu.Unlock()c.toReduceTasks <- ReduceTask{ReduceID: reduceID}}
}
rpc.go
worker 需要 RPC 調用 coordinator 的各方法均寫在 rpc.go
中。
worker 獲取 Map 任務
每個worker啟動后,會首先嘗試從coordinator的 toMapTasks
channel 中獲取一個Map任務,如果所有Map任務已完成、channel已關閉,則返回任務的 AllMapDone
字段為 true
;如果獲取任務成功,則worker在 workerRegistry
注冊,同時coordinator啟動監視( c.monitorMapTask
),以在任務超時后重新分配任務。
func (c *Coordinator) WorkerGetMapTask(workerID string, mapTask *MapTask) error {toMapTask, ok := <-c.toMapTasksif ok {mapTask.Filename = toMapTask.FilenamemapTask.MapID = toMapTask.MapIDmapTask.NReduce = toMapTask.NReduce} else {mapTask.AllMapDone = true // all Map tasks already done.mapTask.AllReduceDone = c.allReduceDonereturn nil}// worker registersc.mu.Lock()c.workerRegistry[workerID] = workerSock(workerID)c.remainMapTask[toMapTask.Filename] = workerIDgo c.monitorMapTask(toMapTask.Filename, toMapTask.MapID)c.mu.Unlock()return nil
}
worker 提交 Map 結果
worker 完成其 Map 任務后,需要告知 coordinator,隨后 coordinator 會從 remainMapTask
中移除該任務,視為任務完成。coordinator 只接受注冊了的 worker 的結果。
worker 具體處理 Map 任務的過程在
worker.go
中,此處只是“通知任務完成”。
func (c *Coordinator) WorkerGiveMapRes(mapTask MapTask, reply *string) error {// Coordinator only accepts results from worker IN workerRegistryworkerID := mapTask.WorkerIDfilename := mapTask.Filename_, exist := c.workerRegistry[workerID]if !exist {DPrintf("Illegal map result: get from unknown worker: %s\n", workerID)return nil}c.mu.Lock()DPrintf("Successfully get map result from: %s\n", workerID)delete(c.remainMapTask, filename)c.mu.Unlock()return nil
}
worker 獲取 Reduce 任務
實現思路和獲取 Map 任務的一致:
func (c *Coordinator) WorkerGetReduceTask(workerID string, reduceTask *ReduceTask) error {toReduceTask, ok := <-c.toReduceTasksif ok {*reduceTask = toReduceTaskreduceTask.WorkerID = workerIDreduceTask.TempResFile = fmt.Sprintf("mr-tmp-%s", workerID)} else {reduceTask.AllReduceDone = true // all Reduce tasks already done.return nil}// worker registersc.mu.Lock()c.workerRegistry[workerID] = workerSock(workerID)c.remainReduceTask[toReduceTask.ReduceID] = workerIDgo c.monitorReduceTask(toReduceTask.ReduceID)c.mu.Unlock()return nil
}
worker 提交 Reduce 結果
實現思路和提交 Map 結果的一致:
func (c *Coordinator) WorkerGiveReduceRes(reduceTask ReduceTask, reply *string) error {// Coordinator only accepts results from worker in workerRegistryworkerID := reduceTask.WorkerID_, exist := c.workerRegistry[workerID]if !exist {DPrintf("Illegal reduce result: get from unknown worker: %s\n", workerID)return nil}newname := fmt.Sprintf("mr-out-%s", reduceTask.ReduceID)*reply = newnameerr := os.Rename(reduceTask.TempResFile, newname)if err != nil {DPrintf("Error when rename temp file: %v\n", err)}c.mu.Lock()DPrintf("Successfully get reduce result from: %s\n", workerID)delete(c.remainReduceTask, reduceTask.ReduceID)c.mu.Unlock()return nil
}
worker.go
worker 采用的 Map 和 Reduce 方法是通過不同插件載入的,我們不需要關心其實現,直接用就行了。
初始化
workerID := strconv.Itoa(os.Getpid())
mapDone := false // flag whether all Map tasks have been finished
reduceDone := false // flag whether all Reduce tasks have been finished
處理 Map 任務
worker 啟動后,周期性嘗試從 coordinator 那里獲取一個 Map 任務,獲取任務后處理、向 coordinator 提交結果,直到收到所有 Map 任務已完成的通知,則將 mapDone
置為 true
:
// Do the map task
for !mapDone {mapTask := MapTask{WorkerID: workerID}DPrintf("<%s> ask for a map task...\n", workerID)call("Coordinator.WorkerGetMapTask", workerID, &mapTask)DPrintf("<%s> get task: %s\n", workerID, mapTask.Filename)if !mapTask.AllMapDone {file, err := os.Open(mapTask.Filename)if err != nil {DPrintf("cannot open %v\n", mapTask.Filename)return}content, err := io.ReadAll(file)if err != nil {DPrintf("cannot read %v\n", mapTask.Filename)return}file.Close()kva := mapf(mapTask.Filename, string(content))saveMapRes(kva, mapTask.MapID, mapTask.NReduce)mapTask.Result = kvavar reply stringcall("Coordinator.WorkerGiveMapRes", mapTask, &reply)} else {mapDone = truereduceDone = mapTask.AllReduceDoneDPrintf("All map tasks done.\n")}time.Sleep(500 * time.Millisecond)
}
其中,Map 任務產生的中間結果需要保存到文件中,參考實驗手冊的 hint:

實現如下:
func saveMapRes(kva []KeyValue, mapID string, nReduce int) {reduceChunks := make(map[string][]KeyValue) // reduceID -> kvsfor _, kv := range kva {reduceID := strconv.Itoa(ihash(kv.Key) % nReduce)reduceChunks[reduceID] = append(reduceChunks[reduceID], kv)}for reduceID, kvs := range reduceChunks {oname := fmt.Sprintf("mr-%s-%s.json", mapID, reduceID)ofile, _ := os.Create(oname)defer ofile.Close()enc := json.NewEncoder(ofile)err := enc.Encode(&kvs)if err != nil {DPrintf("Error when encoding kv: %v\n", err)}}DPrintf("Finish saving map result.\n")
}
處理 Reduce 任務
和處理 Map 任務的思路一致——周期性嘗試獲取一個 Reduce 任務 ?? 處理 Reduce 任務 ?? 保存 Reduce 結果 ?? 向 coordinator 提交結果:
// Do the Reduce task
for !reduceDone {reduceTask := ReduceTask{WorkerID: workerID}DPrintf("<%s> ask for a reduce task...\n", workerID)call("Coordinator.WorkerGetReduceTask", workerID, &reduceTask)DPrintf("<%s> get reduceID: %s\n", workerID, reduceTask.ReduceID)if !reduceTask.AllReduceDone {// Get Map result files to be Reducedpattern := fmt.Sprintf(`^mr-.*-%s.json$`, regexp.QuoteMeta(reduceTask.ReduceID))re := regexp.MustCompile(pattern)files, err := os.ReadDir(".")if err != nil {fmt.Println("Error reading directory:", err)return}var toReduceFiles []stringfor _, file := range files {if !file.IsDir() && re.MatchString(file.Name()) {toReduceFiles = append(toReduceFiles, file.Name())}}// Do the reduce jobdoReduce(toReduceFiles, reducef, reduceTask.TempResFile)DPrintf("<%s> finish reduce job, res to %s.\n", workerID, reduceTask.TempResFile)var reply stringcall("Coordinator.WorkerGiveReduceRes", reduceTask, &reply)DPrintf("<%s> reduce res save to %s.\n", workerID, reply)} else {reduceDone = trueDPrintf("All reduce done.\n")}time.Sleep(100 * time.Millisecond)
}
其中負責執行 Reduce 的方法 doReduce
主要參考 mrsequential.go
實現:
func doReduce(toReduceFiles []string, reducef func(string, []string) string, oname string) {ofile, _ := os.Create(oname)defer ofile.Close()intermediate := []KeyValue{}for _, toReduceFile := range toReduceFiles {file, _ := os.Open(toReduceFile)dec := json.NewDecoder(file)kva := []KeyValue{}if err := dec.Decode(&kva); err != nil {DPrintf("Error when json decode: %v\n", err)return}intermediate = append(intermediate, kva...)file.Close()}sort.Sort(ByKey(intermediate))i := 0for i < len(intermediate) {j := i + 1for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {j++}values := []string{}for k := i; k < j; k++ {values = append(values, intermediate[k].Value)}output := reducef(intermediate[i].Key, values)fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)i = j}
}
實驗結果
手動測試并打印中間過程(在 worker.go
中將 Debug
設置為 true
):
運行測試腳本 mr-test.sh
:
測試通過。
踩坑記錄/建議
-
用
DPrintf
打印日志能發現大部分 bug,但是可能有些細節需要用打斷點調試,如果是 vscode 的話需要配置一下:-
例如,對于 worker,要斷點調試
wc
任務,需要在.vscode/launch.json
中添加配置:{"name": "mrworker-wc","type": "go","request": "launch","mode": "exec","program": "${workspaceFolder}/6.5840/src/main/mrworker","args": ["wc.so"],"cwd": "${workspaceFolder}/6.5840/src/main" },
-
對于 coordinator,可以配置:
{"name": "debug mrcoordinator","type": "go","request": "launch","mode": "auto","program": "${workspaceFolder}/6.5840/src/main/mrcoordinator.go","args": ["pg-being_ernest.txt","pg-dorian_gray.txt","pg-frankenstein.txt","pg-grimm.txt","pg-huckleberry_finn.txt","pg-metamorphosis.txt","pg-sherlock_holmes.txt","pg-tom_sawyer.txt"] }
具體參數可以根據任務調整,不懂的多問 AI。
-
-
本 lab 實現的是一個 MapReduce 框架 ,也就是說具體的 Map 任務和 Reduce 任務 不是一定的 ——我一開始以為只有單詞計數(
src/mrapps/wc.go
),所以傻了吧唧地搬運mrsequential.go
的代碼,但實際上最后測試的任務有很多,都在src/mrapps/
下。最后跑test-mr.sh
的時候,也可以根據出錯任務到src/mrapps/
中看看對應任務代碼,可能有所啟發。 -
RPC 函數,不僅 函數名 要 首字母大寫 ,如果參數是結構體,則該結構體中的 成員變量也要首字母大寫
否則你可能會像我一樣,發現
reply
中有些成員被更新了、有些沒有,非常詭異 🤷?♂ ? -
仔細閱讀官方實驗手冊的
Hints
,很有用。比如前一條,其實
Hints
中就有提到:“Go RPC sends only struct fields whose names start with capital letters. Sub-structures must also have capitalized field names.”
-
注意采用合理方法保存 Map 任務的中間結果,便于之后 Reduce 任務讀取。
Hints
中的建議是:“A reasonable naming convention for intermediate files is
mr-X-Y
, where X is the Map task number, and Y is the reduce task number.” -
注意給 coordinator 上鎖,防止多 worker 的讀寫沖突。
如果你覺得有幫助,歡迎去 我的代碼倉庫 點個 star ?? : )