MIT6.824-Lab1-Part I: Map/Reduce input and output

1.實驗內容

在PartI主要是實現 common_map.go的 doMap()方法(分割map任務輸出的函數)以及 common_reduce.go的 doReduce()方法(收集reduce任務的全部輸入的函數),此時map和reduce階段的task仍是串行運行的。json

2.實現思路

首先了解一下整個程序的運行流程,執行下列命令l便可運行試驗第一部分的代碼。數組

go test -run Sequentialbash

執行命令後會運行test_test.go中的TestSequentialSingle函數app

func TestSequentialSingle(t *testing.T) {
    mr := Sequential("test", makeInputs(1), 1, MapFunc, ReduceFunc)
    mr.Wait()
    check(t, mr.files)
    checkWorker(t, mr.stats)
    cleanup(mr)
}
複製代碼

Sequential()函數中會串行地執行map和reduce tasks。其中makeInputs會生成一個輸入文件824-mrinput-0.txt,即本次要處理的文件,裏面是遞增的數字(0~99999),一個數字爲一行。函數

func Sequential(jobName string, files []string, nreduce int,
    mapF func(string, string) []KeyValue,
    reduceF func(string, []string) string,
) (mr *Master) {
    mr = newMaster("master")
    go mr.run(jobName, files, nreduce, func(phase jobPhase) {
        switch phase {
        case mapPhase:
            for i, f := range mr.files {
                doMap(mr.jobName, i, f, mr.nReduce, mapF)
            }
        case reducePhase:
            for i := 0; i < mr.nReduce; i++ {
                doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
            }
        }
    }, func() {
        mr.stats = []int{len(files) + nreduce}
    })
    return
}
複製代碼

根據參數,本次只會生成一個文件824-mrinput-0.txt,以及一個reduce task處理。在Sequential函數中首先調用 doMap()方法實現Map功能,生成中間鍵值對,而後調用doReduce()方法實現Reduce功能。因爲只有一個文件和一個reduce task,因此doMap()和doReduce()會依次串行地各執行一次。測試

3.doMap()

doMap()函數主要是實現這樣的功能:讀取一個輸入文件(inFile),調用咱們實現的Map功能的函數mapF,並將mapF的輸出內容分配到給nReduce的中間文件中。ui

  1. 每一個reduce task有一箇中間文件,首先須要生成中間文件的名稱,調用common.go中的reduceName方法便可 reduceName(jobName, mapTask, r)生成中間文件名稱
func reduceName(jobName string, mapTask int, reduceTask int) string {
    return "mrtmp." + jobName + "-" + strconv.Itoa(mapTask) + "-" + strconv.Itoa(reduceTask)
}
複製代碼
  1. 對每一個鍵值key調用ihash()方法,而後mod nReduce,來選擇該鍵值對放在哪一個中間文件中
  2. 在此部分實驗中,mapF()其實已經在原代碼中幫咱們實現了,他就是test_test.go中的MapFunc()函數。他在TestSequentialSingle中調用Sequential時已經傳入。其中第一個參數是要處理的文件名,第二個參數是文件的所有內容,最後函數會返回一個[]KeyValue。具體實現以下
// 將讀取文件中的全部單詞分割,返回[]KeyValue,形如[(「0」,」」),(「1」,」」)...]    ps.可能存在重複的KeyValue
func MapFunc(file string, value string) (res []KeyValue) {
    debug("Map %v\n", value)

    // Fields 以連續的空白字符爲分隔符,將 s 切分紅多個子串,結果中不包含空白字符自己
    // 空白字符有:\t, \n, \v, \f, \r, ' ', U+0085 (NEL), U+00A0 (NBSP)
    words := strings.Fields(value)
    for _, w := range words {
        kv := KeyValue{w, ""}
        res = append(res, kv)
    }
    return
}
複製代碼

至於寫入中間文件,建議使用json的方式,可使用如下方法this

enc := json.NewEncoder(file)
for _, kv := ... {
    err := enc.Encode(&kv)
}
複製代碼

綜上分析,獲得實現doMap()函數的大體思路(本部分 只有一個輸入文件和一個reduce task,nReduce=1):
首先根據n個輸入文件和設定的m個reduce tasks,生成n*m箇中間文件,調用reduceName方法進行命名。因爲這部分實驗只有一個輸入文件和一個reduce task,所以只會生成一箇中間文件。 對於每個輸入文件fileX,讀取文件內容,調用mapF()方法進行處理,最終返回鍵值對[]KeyValue。此處的mapF()是指上文的MapFunc方法 對上一步生成的[]KeyValue,每個Key調用ihash()方法而後mod nReduce,選擇該 KeyValue寫入哪一個中間文件中。
處理完[]KeyValue所有內容,關閉文件。
個人實現代碼以下:spa

func doMap(
    jobName string, // the name of the MapReduce job
    mapTask int, // which map task this is
    inFile string,
    nReduce int, // the number of reduce task that will be run ("R" in the paper)
    mapF func(filename string, contents string) []KeyValue,
) {
    //建立中間文件
    var interFiles []*os.File
    for r:=0;r<nReduce;r++ {
        interName := reduceName(jobName, mapTask, r)
        interFile,err := os.Create(interName)
        if err != nil {
            fmt.Println(err)
        }
        interFiles = append(interFiles,interFile)
        defer interFile.Close()
    }

    //調用mapF,獲得map程序處理後的鍵值對
    inBody,err:=ioutil.ReadFile(inFile)//讀取文件內容
    if err!=nil {
        fmt.Println(err)
    }
    keyValue := mapF(inFile,string(inBody))

    //寫入中間文件
    for _,v := range keyValue {
        r:= ihash(v.Key)%nReduce
        enc := json.NewEncoder(interFiles[r])
        err := enc.Encode(&v)
        if err!=nil {
            fmt.Println(err)
        }
    }
}
複製代碼

4.doReduce()

doReduce()主要是實現這樣的功能:讀取這個reduce task對應的中間文件,按key對中間鍵/值對進行排序及合併,爲每一個key調用用戶定義的reduce函數(reduceF),最後將reduceF的輸出寫入磁盤。
若在doMap()中使用了enc.Encode(&kv)將中間鍵值對寫入中間文件,在doReduce()中可使用Decode(&kv)來讀取。最後的輸出文件也推薦使用json的方式寫入。
同理,reduceF對應的是ReduceFunc,也已經在原代碼中實現了。第一個參數是鍵key,第二個參數是值value的數組,若各個中間文件中某個key有多個value。在ReduceFunc中只是打印了key值,沒作什麼處理。debug

// Just return key
func ReduceFunc(key string, values []string) string {
    for _, e := range values {
        debug("Reduce %s %v\n", key, e)
    }
    return ""
}
複製代碼

個人實現以下

func doReduce(
    jobName string, // the name of the whole MapReduce job
    reduceTask int, // which reduce task this is
    outFile string, // write the output here
    nMap int, // the number of map tasks that were run ("M" in the paper)
    reduceF func(key string, values []string) string,
) {
    //讀取中間文件
    var keyValue []KeyValue
    for i:=0;i<nMap;i++ {
        interName := reduceName(jobName, i, reduceTask)
        interBody,err:=ioutil.ReadFile(interName)
        if err != nil {
                fmt.Println(err)
        }
        dec := json.NewDecoder(strings. NewReader(string(interBody)))
        for {
        var m KeyValue
        if err := dec.Decode(&m) ; err == io. EOF {
            break
        } else if err != nil {
            fmt.Println(err)
        }
                keyValue = append(keyValue,m)
    }
    }

    //排序及合併,處理後應該相似["0":[""],"1":[""]...]
    var keyValuesMap map[string][]string
    keyValuesMap = make(map[string][]string)
    for _,v := range keyValue {
        if _,ok:= keyValuesMap[v.Key];ok {//若key值已存在,將value添加到[]string中
             keyValuesMap[v.Key] = append(keyValuesMap[v.Key],v.Value)
        }else{//若key值不存在,在map中新建key
            var values []string
            values = append(values,v.Value)
            keyValuesMap[v.Key] = values
        }
    }

    //對每一個key調用reduceF,並寫入最後的文件
    outputFile,err := os.Create(outFile)
    if err!=nil {
        fmt.Println(err)
    }
    enc := json.NewEncoder(outputFile)
    for k,v := range keyValuesMap {
        err := enc.Encode(KeyValue{k, reduceF(k,v)})
        if err!=nil {
            fmt.Println(err)
        }
    }
}
複製代碼

5.測試運行

在6.824\src\mapreduce目錄下執行下列命令進行PartI實驗的測試

go test -run Sequential

若運行經過會在結果中輸出ok,相似

ps.運行go命令須要設置好GOPATH

在common.go中設置 debugEnabled = true,go test時增長-v參數能夠得到更多調試信息

env "GOPATH=$PWD/../../" go test -v -run Sequential
=== RUN TestSequentialSingle
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
master: Map/Reduce task completed
--- PASS: TestSequentialSingle (1.34s)
=== RUN TestSequentialMany
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
Merge: read mrtmp.test-res-1
Merge: read mrtmp.test-res-2
master: Map/Reduce task completed
--- PASS: TestSequentialMany (1.33s)
PASS
ok mapreduce 2.672s

6.問題記錄

  1. 運行報錯:./master_rpc.go:48: debug call has arguments but no formatting directives
    解決方法master_rpc.go的48行debug("RegistrationServer: accept error ", err) 改成debug("RegistrationServer: accept error %v", err)

相關文章
相關標籤/搜索