MapReduce是Google在2004年發表的論文《MapReduce: Simplified Data Processing on Large Clusters》中提出的一個用於分佈式的用於大規模數據處理的編程模型。編程
MapReduce將數據的處理分紅了兩個步驟,Map和Reduce。Map將輸入的數據集拆分紅一批KV對並輸出,對於每個<k1, v1>
,Map將輸出一批<k2, v2>
;Reduce將Map對Map中產生的結果進行彙總,對於每個<k2, list(v2)>
(list(v2)
是全部key爲k2
的value),Reduce將輸出結果<k3, v3>
。json
以單詞出現次數統計程序爲例,map對文檔中每一個單詞都輸出<word, 1>
,reduce則會統計每一個單詞對應的list
的長度,輸出<word, n>
:app
map(String key, String value): // key: document name // value: document contents for each word w in value: EmitIntermediate(w, 「1″); reduce(String key, Iterator values): // key: a word // values: a list of counts int result = 0; for each v in values: result += ParseInt(v); Emit(AsString(result));
MapReduce的流程以下:分佈式
<k2, list(v2)>
。若是沒法在內存中進行排序,就須要使用外部排序。<k2, list(v2)>
傳遞給用戶提供的Reduce函數,將函數的返回值追加到輸出文件中。MapReduce的整個流程並不複雜,就是將數據分片後提交給map執行,執行產生的中間結果通過處理後再交給reduce執行,產生最終結果。函數
當worker發生故障時,能夠經過心跳等方法進行檢測,當檢測到故障以後就能夠將任務從新分派給其餘worker從新執行。ui
當master發生故障時,能夠經過檢查點(checkpoint)的方法來進行恢復。然而因爲master只有一個,比較難進行恢復,所以可讓用戶檢測並從新執行任務。this
對於輸出文件來講,須要保證仍在寫入中的文件不被讀取,即保證操做的原子性。能夠經過文件系統重命名操做的原子性來實現,先將結果保存在臨時文件中,當執行完成後再進行重命名。使用這種方法就能夠將有反作用的write
變爲冪等(老是產生相同結果的運算,如a = 2
就是冪等的,而a += 2
則不是)的重命名。code
影響任務的總執行時間的重要因素就是落伍者:在運算中某個機器用了很長時間才完成了最後的幾個任務,從而增長了總的執行時間。對於這種狀況,能夠在任務即將完成時,將剩餘的任務交給備用者進程來執行,不管是最初的worker完成了任務仍是備用者完成了,均可以將任務標記爲完成。server
對於map產生的結果,經過分區函數來將相同key的KV對分配給同一個reduce來執行。默認的分區函數是hash(key) % R
,但在某些狀況下也能夠選擇其餘分區函數。如key爲URL時,但願相同主機的結果在同一個輸出中,那麼就能夠用hash(hostname(key)) % R
做爲分區函數。排序
實現部分是基於MIT 6.824的實驗完成的。
type Coordinator struct { mapJobs []Job reduceJobs []Job status int nMap int remainMap int nReduce int remainReduce int lock sync.Mutex } func MakeCoordinator(files []string, nReduce int) *Coordinator { c := Coordinator{} c.status = MAP c.nMap = len(files) c.remainMap = c.nMap c.nReduce = nReduce c.remainReduce = c.nReduce c.mapJobs = make([]Job, len(files)) c.reduceJobs = make([]Job, nReduce) for idx, file := range files { c.mapJobs[idx] = Job{[]string{file}, WAITTING, idx} } for idx := range c.reduceJobs { c.reduceJobs[idx] = Job{[]string{}, WAITTING, idx} } c.server() return &c } func (c *Coordinator) timer(status *int) { time.Sleep(time.Second * 10) c.lock.Lock() if *status == RUNNING { log.Printf("timeout\n") *status = WAITTING } c.lock.Unlock() } func (c *Coordinator) AcquireJob(args *AcquireJobArgs, reply *AcquireJobReply) error { c.lock.Lock() defer c.lock.Unlock() fmt.Printf("Acquire: %+v\n", args) if args.CommitJob.Index >= 0 { if args.Status == MAP { if c.mapJobs[args.CommitJob.Index].Status == RUNNING { c.mapJobs[args.CommitJob.Index].Status = FINISHED for idx, file := range args.CommitJob.Files { c.reduceJobs[idx].Files = append(c.reduceJobs[idx].Files, file) } c.remainMap-- } if c.remainMap == 0 { c.status = REDUCE } } else { if c.reduceJobs[args.CommitJob.Index].Status == RUNNING { c.reduceJobs[args.CommitJob.Index].Status = FINISHED c.remainReduce-- } if c.remainReduce == 0 { c.status = FINISH } } } if c.status == MAP { for idx := range c.mapJobs { if c.mapJobs[idx].Status == WAITTING { reply.NOther = c.nReduce reply.Status = MAP reply.Job = c.mapJobs[idx] c.mapJobs[idx].Status = RUNNING go c.timer(&c.mapJobs[idx].Status) return nil } } reply.NOther = c.nReduce reply.Status = MAP reply.Job = Job{Files: make([]string, 0), Index: -1} } else if c.status == REDUCE { for idx := range c.reduceJobs { if c.reduceJobs[idx].Status == WAITTING { reply.NOther = c.nMap reply.Status = REDUCE reply.Job = c.reduceJobs[idx] c.reduceJobs[idx].Status = RUNNING go c.timer(&c.reduceJobs[idx].Status) return nil } } reply.NOther = c.nMap reply.Status = REDUCE reply.Job = Job{Files: make([]string, 0), Index: -1} } else { reply.Status = FINISH } return nil }
在Coordinator
中保存全部的任務信息以及執行狀態,worker經過AcquireJob
來提交和申請任務,要等待全部map任務完成後才能執行reduce任務。這裏就簡單的將每個文件都做爲一個任務。
func doMap(mapf func(string, string) []KeyValue, job *Job, nReduce int) (files []string) { outFiles := make([]*os.File, nReduce) for idx := range outFiles { outFile, err := ioutil.TempFile("./", "mr-tmp-*") if err != nil { log.Fatalf("create tmp file failed: %v", err) } defer outFile.Close() outFiles[idx] = outFile } for _, filename := range job.Files { file, err := os.Open(filename) if err != nil { log.Fatalf("cannot open %v", filename) } content, err := ioutil.ReadAll(file) if err != nil { log.Fatalf("cannot read %v", filename) } file.Close() kva := mapf(filename, string(content)) for _, kv := range kva { hash := ihash(kv.Key) % nReduce js, _ := json.Marshal(kv) outFiles[hash].Write(js) outFiles[hash].WriteString("\n") } } for idx := range outFiles { filename := fmt.Sprintf("mr-%d-%d", job.Index, idx) os.Rename(outFiles[idx].Name(), filename) files = append(files, filename) } return } func doReduce(reducef func(string, []string) string, job *Job, nMap int) { log.Printf("Start reduce %d", job.Index) outFile, err := ioutil.TempFile("./", "mr-out-tmp-*") defer outFile.Close() if err != nil { log.Fatalf("create tmp file failed: %v", err) } m := make(map[string][]string) for _, filename := range job.Files { file, err := os.Open(filename) if err != nil { log.Fatalf("cannot open %v", filename) } scanner := bufio.NewScanner(file) for scanner.Scan() { kv := KeyValue{} if err := json.Unmarshal(scanner.Bytes(), &kv); err != nil { log.Fatalf("read kv failed: %v", err) } m[kv.Key] = append(m[kv.Key], kv.Value) } if err := scanner.Err(); err != nil { log.Fatal(err) } file.Close() } for key, value := range m { output := reducef(key, value) fmt.Fprintf(outFile, "%v %v\n", key, output) } os.Rename(outFile.Name(), fmt.Sprintf("mr-out-%d", job.Index)) log.Printf("End reduce %d", job.Index) } // // main/mrworker.go calls this function. // func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) { CallExample() var status int = MAP args := AcquireJobArgs{Job{Index: -1}, MAP} for { args.Status = status reply := AcquireJobReply{} call("Coordinator.AcquireJob", &args, &reply) fmt.Printf("AcReply: %+v\n", reply) if reply.Status == FINISH { break } status = reply.Status if reply.Job.Index >= 0 { // get a job, do it commitJob := reply.Job if status == MAP { commitJob.Files = doMap(mapf, &reply.Job, reply.NOther) } else { doReduce(reducef, &reply.Job, reply.NOther) commitJob.Files = make([]string, 0) } // job finished args = AcquireJobArgs{commitJob, status} } else { // no job, sleep to wait time.Sleep(time.Second) args = AcquireJobArgs{Job{Index: -1}, status} } } }
worker經過RPC調用向Coordinator.AcquireJob
申請和提交任務,以後根據任務類型執行doMap
或doReduce
。
doMap
函數讀取目標文件並將<filename, content>
傳遞給map函數,以後將返回值根據hash(key) % R
寫入到目標中間文件中去。
doReduce
函數則從目標文件中讀取KV對並加載到內存中,對相同的key進行合併(這裏我是用map
來作的,可是以後看論文發現是用排序來作的,這樣能夠保證在每一個輸出文件中的key是有序的)。合併以後就將<key, list(value)>
交給reduce函數處理,最後把返回值寫入到結果文件中去。