在這部分實驗要將以前串行版本的MapReduce tasks改爲併發模式,只須要實現 mapreduce/schedule.go中的 schedule()函數,其餘文件不作更改。 主機在MapReduce做業期間調用schedule()兩次,一次用於Map階段,一次用於Reduce階段。schedule()的工做是將tasks分發給可用的 workers。一般tasks會比 workers多,所以schedule()必須爲每一個 worker提供一系列task,但每一個worker一次只能執行一個task。 schedule()應該等到全部tasks都完成後再返回。bash
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
// All ntasks tasks have to be scheduled on workers. Once all tasks
// have completed successfully, schedule() should return.
//
// Your code here (Part III, Part IV).
//
fmt.Printf("Schedule: %v done\n", phase)
}
複製代碼
schedule()經過讀取registerChan參數來獲取 workers 信息。該通道爲每一個worker生成一個字符串,包含worker的RPC address。有些workers可能在調度schedule()以前已經存在,而某些workers可能在schedule()運行時啓動,但全部workers都會出如今 registerChan上。schedule()應該使用全部worker,包括啓動後出現的worker。併發
schedule()經過向worker 發送Worker.DoTask RPC來告訴worker執行task,每次只能向一個給定的worker發送一個RPC。該RPC的參數被定義在MapReduce / common_rpc.go中的DoTaskArgs。其中File參數只在Map tasks中使用,表示要讀取的輸入文件名稱。 schedule()能夠在mapFiles中找到這些文件名。app
type DoTaskArgs struct {
JobName string
File string // 只在Map tasks中使用,表示要讀取的輸入文件名稱
Phase jobPhase // 標誌當前是map仍是reduce階段
TaskNumber int // 此task在當前階段的索引
// NumOtherPhase是其餘階段的task總數
// mappers須要這個來計算output bins, reducers 須要這個來知道要收集多少輸入文件
NumOtherPhase int
}
複製代碼
使用mapreduce / common_rpc.go中的call()函數 將RPC發送給worker。第一個參數是worker的 address,從registerChan讀取。第二個參數應該是「Worker.DoTask」,表示經過rpc調用worker的DoTask方法,第三個參數應該是DoTaskArgs結構,最後一個參數應該是nil。函數
func call(srv string, rpcname string,
args interface{}, reply interface{}) bool {
c, errx := rpc.Dial("unix", srv)
if errx != nil {
return false
}
defer c.Close()
err := c.Call(rpcname, args, reply)
if err == nil {
return true
}
fmt.Println(err)
return false
}
複製代碼
再來看下Worker的結構和方法,定義在worker.go中測試
type Worker struct {
sync.Mutex
name string
Map func(string, string) []KeyValue
Reduce func(string, []string) string
nRPC int // quit after this many RPCs; protected by mutex
nTasks int // total tasks executed; protected by mutex
concurrent int // number of parallel DoTasks in this worker; mutex
l net.Listener
parallelism *Parallelism
}
複製代碼
他有下列幾個方法:ui
type Parallelism struct {
mu sync.Mutex
now int32
max int32
}
複製代碼
其中max字段記錄該worker運行的最大task數量,經過鎖機制保證了併發。因爲在各個函數間傳遞的是&Parallelism(地址),因此你們修改的是同一個Parallelism。this
這個實驗將執行兩個測試,TestParallelBasic(驗證運行是否正確)和TestParallelCheck(驗證是否併發執行) 。在TestParallelBasic中首先會生成20個824-mrinput-xx.txt的輸入文件,接着調用Distributed()函數,啓動schedule(),運行map tasks和reduce tasks,而後新開兩個協程去啓動兩個workers。最後檢查生成的輸出文件是否正確,每一個worker至少執行了一個task。所以map tasks數量爲20,reduce tasks數量爲10,workers數量爲2。spa
func TestParallelBasic(t *testing.T) {
mr := setup()//該函數中會調用Distributed()
for i := 0; i < 2; i++ {
go RunWorker(mr.address, port("worker"+strconv.Itoa(i)),
MapFunc, ReduceFunc, -1, nil)
}
mr.Wait()
check(t, mr.files)
checkWorker(t, mr.stats)
cleanup(mr)
}
複製代碼
//先運行 map tasks,而後運行reduce tasks
func Distributed(jobName string, files []string, nreduce int, master string) (mr *Master) {
mr = newMaster(master)
mr.startRPCServer()
//會先運行 map tasks(mapPhase),而後運行reduce tasks(reducePhase),具體邏輯在Master.run函數中
go mr.run(jobName, files, nreduce,
func(phase jobPhase) {
//建立一個無緩衝的通道
ch := make(chan string)
//將全部現有的和新註冊的workers信息發送到ch通道中,schedule經過通道ch獲取workers信息
go mr.forwardRegistrations(ch)
//本次要實現內容,將tasks分發給可用的 workers
schedule(mr.jobName, mr.files, mr.nReduce, phase, ch)
},
func() {
mr.stats = mr.killWorkers()
mr.stopRPCServer()
})
return
}
複製代碼
TestParallelCheck 則是驗證所編寫的調度程序是否並行地執行task,一樣開啓了兩個worker,檢驗worker中的parallelism.max(運行的worker數量最大值)是否小於2,若小於則失敗。線程
func TestParallelCheck(t *testing.T) {
mr := setup()
parallelism := &Parallelism{}
for i := 0; i < 2; i++ {
go RunWorker(mr.address, port("worker"+strconv.Itoa(i)),
MapFunc, ReduceFunc, -1, parallelism)
}
mr.Wait()
check(t, mr.files)
checkWorker(t, mr.stats)
parallelism.mu.Lock()
if parallelism.max < 2 {
t.Fatalf("workers did not execute in parallel")
}
parallelism.mu.Unlock()
cleanup(mr)
}
複製代碼
schedule()的參數中mapFiles是輸入文件名稱列表,每一個maptask處理一個。nReduce是reduce tasks的數量,registerChan通道傳遞全部worker的RPC address。局部變量ntasks表示當前階段的tasks數量,如果map階段則爲輸入文件數量,如果reduce階段則爲nReduce參數。
瞭解了大概的運行流程,schedule()中要作的就是開啓多個線程,讀取通道registerChan獲取worker的address。構造DoTaskArgs參數。調用call()方法向worker 發送Worker.DoTask RPC,告訴worker執行task。在測試程序中開啓了兩個worker:worker0和worker1,而registerChan是一個無緩衝的通道,每次通道上只有一個worker address,所以能夠將處理完task的worker的 address再放回registerChan,供下一個線程讀取,以此實現兩個worker併發運行。
一旦全部該階段的tasks成功完成,schedule()就返回,這個功能可使用sync.WaitGroup來實現。 個人實現以下:3d
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
var wg sync.WaitGroup
wg.Add(ntasks)
for i:=0;i<ntasks;i++{
//開啓線程併發調用
go func(taskNum int) {
//從chan獲取可用的worker
for w := range registerChan {
//構造DoTaskArgs參數
var arg DoTaskArgs
switch phase {
case mapPhase:
arg = DoTaskArgs{JobName:jobName,File:mapFiles[taskNum],Phase:mapPhase,TaskNumber:taskNum,NumOtherPhase:n_other}
case reducePhase:
arg = DoTaskArgs{JobName:jobName,File:"",Phase:reducePhase,TaskNumber:taskNum,NumOtherPhase:n_other}
}
call(w,"Worker.DoTask",arg,nil)
wg.Done()
registerChan <- w//將worker address放回registerChan
break
}
}(i)
}
wg.Wait()
return
}
複製代碼
運行下面命令來測試所編寫的實驗代碼。這將依次執行兩個測試,TestParallelBasic和TestParallelCheck 。
go test -run TestParallel
獲得相似下面結果程序運行成功