本lab將用go完成一個MapReduce框架,完成後將大大加深對MapReduce的理解。git
這部分須要咱們實現common_map.go中的doMap()和common_reduce.go中的doReduce()兩個函數。
能夠先從測試用例下手:github
func TestSequentialSingle(t *testing.T) { mr := Sequential("test", makeInputs(1), 1, MapFunc, ReduceFunc) mr.Wait() check(t, mr.files) checkWorker(t, mr.stats) cleanup(mr) }
從Sequential()開始調用鏈以下:
如今要作的是完成doMap()和doReduce()。json
doMap():數組
func doMap( jobName string, // the name of the MapReduce job mapTask int, // which map task this is inFile string, nReduce int, // the number of reduce task that will be run ("R" in the paper) mapF func(filename string, contents string) []KeyValue, ) { //打開inFile文件,讀取所有內容 //調用mapF,將內容轉換爲鍵值對 //根據reduceName()返回的文件名,打開nReduce箇中間文件,而後將鍵值對以json的格式保存到中間文件 inputContent, err := ioutil.ReadFile(inFile) if err != nil { panic(err) } keyValues := mapF(inFile, string(inputContent)) var intermediateFileEncoders []*json.Encoder for reduceTaskNumber := 0; reduceTaskNumber < nReduce; reduceTaskNumber++ { intermediateFile, err := os.Create(reduceName(jobName, mapTask, reduceTaskNumber)) if err != nil { panic(err) } defer intermediateFile.Close() enc := json.NewEncoder(intermediateFile) intermediateFileEncoders = append(intermediateFileEncoders, enc) } for _, kv := range keyValues { err := intermediateFileEncoders[ihash(kv.Key) % nReduce].Encode(kv) if err != nil { panic(err) } } }
總結來講就是:多線程
doReduce:app
func doReduce( jobName string, // the name of the whole MapReduce job reduceTask int, // which reduce task this is outFile string, // write the output here nMap int, // the number of map tasks that were run ("M" in the paper) reduceF func(key string, values []string) string, ) { //讀取當前reduceTaskNumber對應的中間文件中的鍵值對,將相同的key的value進行併合 //調用reduceF //將reduceF的結果以json形式保存到mergeName()返回的文件中 kvs := make(map[string][]string) for mapTaskNumber := 0; mapTaskNumber < nMap; mapTaskNumber++ { midDatafileName := reduceName(jobName, mapTaskNumber, reduceTask) file, err := os.Open(midDatafileName) if err != nil { panic(err) } defer file.Close() dec := json.NewDecoder(file) for { var kv KeyValue err = dec.Decode(&kv) if err != nil { break } values, ok := kvs[kv.Key] if ok { kvs[kv.Key] = append(values, kv.Value) } else { kvs[kv.Key] = []string{kv.Value} } } } outputFile, err := os.Create(outFile) if err != nil { panic(err) } defer outputFile.Close() enc := json.NewEncoder(outputFile) for key, values := range kvs { enc.Encode(KeyValue{key, reduceF(key, values)}) } }
總結:框架
文件轉換的過程大體以下:
函數
這部分將用一個簡單的實例展現如何使用MR框架。須要咱們實現main/wc.go中的mapF()和reduceF()來統計單詞的詞頻。測試
mapF:this
func mapF(filename string, contents string) []mapreduce.KeyValue { // Your code here (Part II). words := strings.FieldsFunc(contents, func(r rune) bool { return !unicode.IsLetter(r) }) var kvs []mapreduce.KeyValue for _, word := range words { kvs = append(kvs, mapreduce.KeyValue{word, "1"}) } return kvs }
將文本內容分割成單詞,每一個單詞對應一個<word, "1">鍵值對。
reduceF:
func reduceF(key string, values []string) string { // Your code here (Part II). return strconv.Itoa(len(values)) }
value中有多少個"1",就說明這個word出現了幾回。
目前實現的版本都是執行完一個map而後在執行下一個map,也就是說沒有並行,這偏偏是MapReduce最大的買點。這部分須要實現schedule(),該函數將任務分配給Worker去執行。固然這裏並無真正的多機部署,而是使用多線程進行模擬。
master和worker的關係大體以下:
在建立worker對象的時候會調用Register() RPC,master收到RPC後,將該worker的id保存在數組中,執行shedule()是能夠根據該id,經過DoTask() RPC調用該worker的DoTask()執行map或reduce任務。
schedule.go
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) { var ntasks int var n_other int // number of inputs (for reduce) or outputs (for map) switch phase { case mapPhase: ntasks = len(mapFiles) n_other = nReduce case reducePhase: ntasks = nReduce n_other = len(mapFiles) } fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other) //總共有ntasks個任務,registerChan中保存着空閒的workers taskChan := make(chan int) var wg sync.WaitGroup go func() { for taskNumber := 0; taskNumber < ntasks; taskNumber++ { taskChan <- taskNumber fmt.Printf("taskChan <- %d in %s\n", taskNumber, phase) wg.Add(1) } wg.Wait() //ntasks個任務執行完畢後才能經過 close(taskChan) }() for task := range taskChan { //全部任務都處理完後跳出循環 worker := <- registerChan //消費worker fmt.Printf("given task %d to %s in %s\n", task, worker, phase) var arg DoTaskArgs arg.JobName = jobName arg.Phase = phase arg.TaskNumber = task arg.NumOtherPhase = n_other if phase == mapPhase { arg.File = mapFiles[task] } go func(worker string, arg DoTaskArgs) { if call(worker, "Worker.DoTask", arg, nil) { //執行成功後,worker須要執行其它任務 //注意:須要先掉wg.Done(),而後調register<-worker,不然會出現死鎖 //fmt.Printf("worker %s run task %d success in phase %s\n", worker, task, phase) wg.Done() registerChan <- worker //回收worker } else { //若是失敗了,該任務須要被從新執行 //注意:這裏不能用taskChan <- task,由於task這個變量在別的地方可能會被修改。好比task 0執行失敗了,咱們這裏但願 //將task 0從新加入到taskChan中,可是由於執行for循環的那個goroutine,可能已經修改task這個變量爲1了,咱們錯誤地 //把task 1從新執行了一遍,而且task 0沒有獲得執行。 taskChan <- arg.TaskNumber } }(worker, arg) } fmt.Printf("Schedule: %v done\n", phase) }
這裏用到了兩個channel,分別是registerChan和taskChan。
registerChan中保存了可用的worker id。
生產:
消費:
taskChan中保存了任務號。任務執行失敗須要從新加入taskChan。
以前的代碼已經體現了,對於失敗的任務從新執行。
這是MapReduce的一個應用,生成倒排索引,好比想查某個單詞出如今哪些文本中,就能夠創建倒排索引來解決。
func mapF(document string, value string) (res []mapreduce.KeyValue) { // Your code here (Part V). words := strings.FieldsFunc(value, func(r rune) bool { return !unicode.IsLetter(r) }) var kvs []mapreduce.KeyValue for _, word := range words { kvs = append(kvs, mapreduce.KeyValue{word, document}) } return kvs } func reduceF(key string, values []string) string { // Your code here (Part V). values = removeDuplicationAndSort(values) return strconv.Itoa(len(values)) + " " + strings.Join(values, ",") } func removeDuplicationAndSort(values []string) []string { kvs := make(map[string]struct{}) for _, value := range values { _, ok := kvs[value] if !ok { kvs[value] = struct{}{} } } var ret []string for k := range kvs { ret = append(ret, k) } sort.Strings(ret) return ret }
mapF()生成<word, document>的鍵值對,reduceF()處理word對應的全部document,去重而且排序,而後拼接到一塊兒。
具體代碼在:https://github.com/gatsbyd/mit_6.824_2018 若有錯誤,歡迎指正: 15313676365