在PartI主要是實現 common_map.go的 doMap()方法(分割map任務輸出的函數)以及 common_reduce.go的 doReduce()方法(收集reduce任務的全部輸入的函數),此時map和reduce階段的task仍是串行運行的。json
首先了解一下整個程序的運行流程,執行下列命令l便可運行試驗第一部分的代碼。數組
go test -run Sequentialbash
執行命令後會運行test_test.go中的TestSequentialSingle函數app
func TestSequentialSingle(t *testing.T) {
mr := Sequential("test", makeInputs(1), 1, MapFunc, ReduceFunc)
mr.Wait()
check(t, mr.files)
checkWorker(t, mr.stats)
cleanup(mr)
}
複製代碼
Sequential()函數中會串行地執行map和reduce tasks。其中makeInputs會生成一個輸入文件824-mrinput-0.txt,即本次要處理的文件,裏面是遞增的數字(0~99999),一個數字爲一行。函數
func Sequential(jobName string, files []string, nreduce int,
mapF func(string, string) []KeyValue,
reduceF func(string, []string) string,
) (mr *Master) {
mr = newMaster("master")
go mr.run(jobName, files, nreduce, func(phase jobPhase) {
switch phase {
case mapPhase:
for i, f := range mr.files {
doMap(mr.jobName, i, f, mr.nReduce, mapF)
}
case reducePhase:
for i := 0; i < mr.nReduce; i++ {
doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
}
}
}, func() {
mr.stats = []int{len(files) + nreduce}
})
return
}
複製代碼
根據參數,本次只會生成一個文件824-mrinput-0.txt,以及一個reduce task處理。在Sequential函數中首先調用 doMap()方法實現Map功能,生成中間鍵值對,而後調用doReduce()方法實現Reduce功能。因爲只有一個文件和一個reduce task,因此doMap()和doReduce()會依次串行地各執行一次。測試
doMap()函數主要是實現這樣的功能:讀取一個輸入文件(inFile),調用咱們實現的Map功能的函數mapF,並將mapF的輸出內容分配到給nReduce的中間文件中。ui
func reduceName(jobName string, mapTask int, reduceTask int) string {
return "mrtmp." + jobName + "-" + strconv.Itoa(mapTask) + "-" + strconv.Itoa(reduceTask)
}
複製代碼
// 將讀取文件中的全部單詞分割,返回[]KeyValue,形如[(「0」,」」),(「1」,」」)...] ps.可能存在重複的KeyValue
func MapFunc(file string, value string) (res []KeyValue) {
debug("Map %v\n", value)
// Fields 以連續的空白字符爲分隔符,將 s 切分紅多個子串,結果中不包含空白字符自己
// 空白字符有:\t, \n, \v, \f, \r, ' ', U+0085 (NEL), U+00A0 (NBSP)
words := strings.Fields(value)
for _, w := range words {
kv := KeyValue{w, ""}
res = append(res, kv)
}
return
}
複製代碼
至於寫入中間文件,建議使用json的方式,可使用如下方法this
enc := json.NewEncoder(file)
for _, kv := ... {
err := enc.Encode(&kv)
}
複製代碼
綜上分析,獲得實現doMap()函數的大體思路(本部分 只有一個輸入文件和一個reduce task,nReduce=1):
首先根據n個輸入文件和設定的m個reduce tasks,生成n*m箇中間文件,調用reduceName方法進行命名。因爲這部分實驗只有一個輸入文件和一個reduce task,所以只會生成一箇中間文件。 對於每個輸入文件fileX,讀取文件內容,調用mapF()方法進行處理,最終返回鍵值對[]KeyValue。此處的mapF()是指上文的MapFunc方法 對上一步生成的[]KeyValue,每個Key調用ihash()方法而後mod nReduce,選擇該 KeyValue寫入哪一個中間文件中。
處理完[]KeyValue所有內容,關閉文件。
個人實現代碼以下:spa
func doMap(
jobName string, // the name of the MapReduce job
mapTask int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(filename string, contents string) []KeyValue,
) {
//建立中間文件
var interFiles []*os.File
for r:=0;r<nReduce;r++ {
interName := reduceName(jobName, mapTask, r)
interFile,err := os.Create(interName)
if err != nil {
fmt.Println(err)
}
interFiles = append(interFiles,interFile)
defer interFile.Close()
}
//調用mapF,獲得map程序處理後的鍵值對
inBody,err:=ioutil.ReadFile(inFile)//讀取文件內容
if err!=nil {
fmt.Println(err)
}
keyValue := mapF(inFile,string(inBody))
//寫入中間文件
for _,v := range keyValue {
r:= ihash(v.Key)%nReduce
enc := json.NewEncoder(interFiles[r])
err := enc.Encode(&v)
if err!=nil {
fmt.Println(err)
}
}
}
複製代碼
doReduce()主要是實現這樣的功能:讀取這個reduce task對應的中間文件,按key對中間鍵/值對進行排序及合併,爲每一個key調用用戶定義的reduce函數(reduceF),最後將reduceF的輸出寫入磁盤。
若在doMap()中使用了enc.Encode(&kv)將中間鍵值對寫入中間文件,在doReduce()中可使用Decode(&kv)來讀取。最後的輸出文件也推薦使用json的方式寫入。
同理,reduceF對應的是ReduceFunc,也已經在原代碼中實現了。第一個參數是鍵key,第二個參數是值value的數組,若各個中間文件中某個key有多個value。在ReduceFunc中只是打印了key值,沒作什麼處理。debug
// Just return key
func ReduceFunc(key string, values []string) string {
for _, e := range values {
debug("Reduce %s %v\n", key, e)
}
return ""
}
複製代碼
個人實現以下
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTask int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
//讀取中間文件
var keyValue []KeyValue
for i:=0;i<nMap;i++ {
interName := reduceName(jobName, i, reduceTask)
interBody,err:=ioutil.ReadFile(interName)
if err != nil {
fmt.Println(err)
}
dec := json.NewDecoder(strings. NewReader(string(interBody)))
for {
var m KeyValue
if err := dec.Decode(&m) ; err == io. EOF {
break
} else if err != nil {
fmt.Println(err)
}
keyValue = append(keyValue,m)
}
}
//排序及合併,處理後應該相似["0":[""],"1":[""]...]
var keyValuesMap map[string][]string
keyValuesMap = make(map[string][]string)
for _,v := range keyValue {
if _,ok:= keyValuesMap[v.Key];ok {//若key值已存在,將value添加到[]string中
keyValuesMap[v.Key] = append(keyValuesMap[v.Key],v.Value)
}else{//若key值不存在,在map中新建key
var values []string
values = append(values,v.Value)
keyValuesMap[v.Key] = values
}
}
//對每一個key調用reduceF,並寫入最後的文件
outputFile,err := os.Create(outFile)
if err!=nil {
fmt.Println(err)
}
enc := json.NewEncoder(outputFile)
for k,v := range keyValuesMap {
err := enc.Encode(KeyValue{k, reduceF(k,v)})
if err!=nil {
fmt.Println(err)
}
}
}
複製代碼
在6.824\src\mapreduce目錄下執行下列命令進行PartI實驗的測試
go test -run Sequential
若運行經過會在結果中輸出ok,相似
ps.運行go命令須要設置好GOPATH
在common.go中設置 debugEnabled = true,go test時增長-v參數能夠得到更多調試信息
env "GOPATH=$PWD/../../" go test -v -run Sequential
=== RUN TestSequentialSingle
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
master: Map/Reduce task completed
--- PASS: TestSequentialSingle (1.34s)
=== RUN TestSequentialMany
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
Merge: read mrtmp.test-res-1
Merge: read mrtmp.test-res-2
master: Map/Reduce task completed
--- PASS: TestSequentialMany (1.33s)
PASS
ok mapreduce 2.672s