在這部分,須要使master處理失敗的worker。若是worker在處理master的RPC時失敗,則master的call()最終會因超時而返回false。在這種狀況下,master應該將失敗的task從新分配給另外一個worker。
RPC的失敗並不必定意味着worker沒有執行task,worker可能已執行可是回覆丟失,或者worker可能仍在執行但master的RPC超時。因此若是從新分配task可能形成2個worker接受相同的task並計算。可是這不要緊,由於相同的task生成相同的結果。咱們只要實現對失敗的task從新分配worker便可。
這部分一樣只須要修改schedule.go文件。bash
這部分只須要對call()的結果作一個判斷就行,若是成功,則WaitGroup減一,將該worker address放回registerChan通道,並跳出registerChan通道的讀取。若失敗則繼續從registerChan中讀取worker執行。併發
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
var wg sync.WaitGroup
wg.Add(ntasks)
for i:=0;i<ntasks;i++{
//開啓線程併發調用
go func(taskNum int) {
//從chan獲取可用的worker
for w := range registerChan {
//構造DoTaskArgs參數
var arg DoTaskArgs
switch phase {
case mapPhase:
arg = DoTaskArgs{JobName:jobName,File:mapFiles[taskNum],Phase:mapPhase,TaskNumber:taskNum,NumOtherPhase:n_other}
case reducePhase:
arg = DoTaskArgs{JobName:jobName,File:"",Phase:reducePhase,TaskNumber:taskNum,NumOtherPhase:n_other}
}
result := call(w,"Worker.DoTask",arg,nil)
if result {
wg.Done()
registerChan <- w
break
}
}
}(i)
}
wg.Wait()
return
}
複製代碼
運行下面命令來測試所寫代碼app
go test -run Failure函數
前幾個試驗實現的是統計一些文檔中各個單詞出現的總次數,而這個部分須要實現統計有單詞出現的文檔數,即某個單詞同一個文檔中出現屢次,只算一次,統計該單詞在哪些文檔中出現了。
須要實現main/ii.go 中的mapF和reduceF函數。最終output文件內容應該是下面這種格式,每一個單詞一行。
word: 出現該word的文檔數 出現該word的文檔,以’,’分隔測試
和前面的實現基本相似,只是返回的KeyValue中的Value由原來的單詞頻數變成了該輸入文件的文件名。ui
func mapF(document string, value string) (res []mapreduce.KeyValue) {
//刪除,.?等等標點符號
re, _ := regexp.Compile("[^a-z^A-Z]")
value = re.ReplaceAllString(value, " ")
var kv map[string]string
kv = make(map[string]string)
words := strings.Fields(value)
for _, w := range words {
kv[w] = document
}
//轉換爲[]mapreduce.KeyValue
for k,v := range kv {
res = append(res,mapreduce.KeyValue{k,v})
}
return res
}
複製代碼
也是差很少的處理,須要注意的是values []string會有重複的文件名,因此應該去重。最後返回的該word的value應該是這種格式:出現該word的文檔數+" "+出現該word的文檔,以’,’分隔。spa
func reduceF(key string, values []string) string {
var tmp []string
//去除values中重複的文件名,最終出現過該單詞的文件名保存在tmp中
set := make(map[string]int)
for _, str := range values {
_, ok := set[str]
if !ok {
set[str] = 1
tmp = append(tmp, str)
}
}
sort.Strings(tmp)//文件名排序
//拼接返回的Value內容,應該是相似這種樣子
//8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txt
num := len(tmp)
result := strconv.Itoa(num)+" "
for i,v := range tmp {
if(i<num-1){
result = result + v +","
}else{
result = result + v
}
}
return result
}
複製代碼
運行腳本測試,最終顯示以下則測試經過。線程
查看最終的mrtmp.iiseq文件,如果下列內容,則程序正確。bash ./test-ii.sh3d
LC_ALL=C sort -k1,1 mrtmp.iiseq | sort -snk2,2 | grep -v '16' | tail -10
www: 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txt year: 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txt
years: 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txt
yesterday: 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txt
yet: 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txt
you: 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txt
young: 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txt
your: 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txt
yourself: 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txt
zip: 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txtcode