MIT6.824-Lab1-Part III: Distributing MapReduce tasks

1.概述

1.1 schedule()

在這部分實驗要將以前串行版本的MapReduce tasks改爲併發模式,只須要實現 mapreduce/schedule.go中的 schedule()函數,其餘文件不作更改。 主機在MapReduce做業期間調用schedule()兩次,一次用於Map階段,一次用於Reduce階段。schedule()的工做是將tasks分發給可用的 workers。一般tasks會比 workers多,所以schedule()必須爲每一個 worker提供一系列task,但每一個worker一次只能執行一個task。 schedule()應該等到全部tasks都完成後再返回。bash

func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
    var ntasks int
    var n_other int // number of inputs (for reduce) or outputs (for map)
    switch phase {
    case mapPhase:
        ntasks = len(mapFiles)
        n_other = nReduce
    case reducePhase:
        ntasks = nReduce
        n_other = len(mapFiles)
    }

    fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

    // All ntasks tasks have to be scheduled on workers. Once all tasks
    // have completed successfully, schedule() should return.
    //
    // Your code here (Part III, Part IV).
    //
    fmt.Printf("Schedule: %v done\n", phase)
}
複製代碼

schedule()經過讀取registerChan參數來獲取 workers 信息。該通道爲每一個worker生成一個字符串,包含worker的RPC address。有些workers可能在調度schedule()以前已經存在,而某些workers可能在schedule()運行時啓動,但全部workers都會出如今 registerChan上。schedule()應該使用全部worker,包括啓動後出現的worker。併發

1.2 DoTaskArgs

schedule()經過向worker 發送Worker.DoTask RPC來告訴worker執行task,每次只能向一個給定的worker發送一個RPC。該RPC的參數被定義在MapReduce / common_rpc.go中的DoTaskArgs。其中File參數只在Map tasks中使用,表示要讀取的輸入文件名稱。 schedule()能夠在mapFiles中找到這些文件名。app

type DoTaskArgs struct {
    JobName    string
    File       string   // 只在Map tasks中使用,表示要讀取的輸入文件名稱
    Phase      jobPhase // 標誌當前是map仍是reduce階段
    TaskNumber int      // 此task在當前階段的索引

    // NumOtherPhase是其餘階段的task總數
    // mappers須要這個來計算output bins, reducers 須要這個來知道要收集多少輸入文件
    NumOtherPhase int
}
複製代碼

1.3 call()

使用mapreduce / common_rpc.go中的call()函數 將RPC發送給worker。第一個參數是worker的 address,從registerChan讀取。第二個參數應該是「Worker.DoTask」,表示經過rpc調用worker的DoTask方法,第三個參數應該是DoTaskArgs結構,最後一個參數應該是nil。函數

func call(srv string, rpcname string,
    args interface{}, reply interface{}) bool {
    c, errx := rpc.Dial("unix", srv)
    if errx != nil {
        return false
    }
    defer c.Close()

    err := c.Call(rpcname, args, reply)
    if err == nil {
        return true
    }

    fmt.Println(err)
    return false
}
複製代碼

1.4 Worker、 Worker.DoTask

再來看下Worker的結構和方法,定義在worker.go中測試

type Worker struct {
    sync.Mutex

    name        string
    Map         func(string, string) []KeyValue
    Reduce      func(string, []string) string
    nRPC        int // quit after this many RPCs; protected by mutex
    nTasks      int // total tasks executed; protected by mutex
    concurrent  int // number of parallel DoTasks in this worker; mutex
    l           net.Listener
    parallelism *Parallelism
}
複製代碼

他有下列幾個方法:ui

  • register:worker的register方法中使用RPC遠程調用了Master.Register,將該worker註冊到master中。master維護了一個workers []string,記錄worker的address,master還負責把worker address發送到通道中(上文提到的registerChan),供schedule()讀取。
  • RunWorker:和master創建鏈接,註冊它的address到master中(調用上方的register函數),等待schedul()安排tasks。
  • DoTask:在schedul()函數中會調用此方法給worker安排task。根據傳入的phase參數,他會去執行doMap或doReduce方法。在worker中還維護了一個Parallelism結構,跟蹤worker是否併發執行。
type Parallelism struct {
    mu  sync.Mutex
    now int32
    max int32
}
複製代碼

其中max字段記錄該worker運行的最大task數量,經過鎖機制保證了併發。因爲在各個函數間傳遞的是&Parallelism(地址),因此你們修改的是同一個Parallelism。this

  • Shutdown:當全部tasks都完成後,master會調用worker的Shutdown,worker應該返回所處理過的tasks數量。

2.調用流程

這個實驗將執行兩個測試,TestParallelBasic(驗證運行是否正確)和TestParallelCheck(驗證是否併發執行) 。在TestParallelBasic中首先會生成20個824-mrinput-xx.txt的輸入文件,接着調用Distributed()函數,啓動schedule(),運行map tasks和reduce tasks,而後新開兩個協程去啓動兩個workers。最後檢查生成的輸出文件是否正確,每一個worker至少執行了一個task。所以map tasks數量爲20,reduce tasks數量爲10,workers數量爲2。spa

func TestParallelBasic(t *testing.T) {
    mr := setup()//該函數中會調用Distributed()
    for i := 0; i < 2; i++ {
        go RunWorker(mr.address, port("worker"+strconv.Itoa(i)),
            MapFunc, ReduceFunc, -1, nil)
    }
    mr.Wait()
    check(t, mr.files)
    checkWorker(t, mr.stats)
    cleanup(mr)
}
複製代碼
//先運行 map tasks,而後運行reduce tasks
func Distributed(jobName string, files []string, nreduce int, master string) (mr *Master) {
    mr = newMaster(master)
    mr.startRPCServer()
    //會先運行 map tasks(mapPhase),而後運行reduce tasks(reducePhase),具體邏輯在Master.run函數中
    go mr.run(jobName, files, nreduce,
        func(phase jobPhase) {
            //建立一個無緩衝的通道
            ch := make(chan string)
            //將全部現有的和新註冊的workers信息發送到ch通道中,schedule經過通道ch獲取workers信息
            go mr.forwardRegistrations(ch)
            //本次要實現內容,將tasks分發給可用的 workers
            schedule(mr.jobName, mr.files, mr.nReduce, phase, ch)
        },
        func() {
            mr.stats = mr.killWorkers()
            mr.stopRPCServer()
        })
    return
}
複製代碼

TestParallelCheck 則是驗證所編寫的調度程序是否並行地執行task,一樣開啓了兩個worker,檢驗worker中的parallelism.max(運行的worker數量最大值)是否小於2,若小於則失敗。線程

func TestParallelCheck(t *testing.T) {
    mr := setup()
    parallelism := &Parallelism{}
    for i := 0; i < 2; i++ {
        go RunWorker(mr.address, port("worker"+strconv.Itoa(i)),
            MapFunc, ReduceFunc, -1, parallelism)
    }
    mr.Wait()
    check(t, mr.files)
    checkWorker(t, mr.stats)

    parallelism.mu.Lock()
    if parallelism.max < 2 {
        t.Fatalf("workers did not execute in parallel")
    }
    parallelism.mu.Unlock()

    cleanup(mr)
}
複製代碼

3.schedule()

schedule()的參數中mapFiles是輸入文件名稱列表,每一個maptask處理一個。nReduce是reduce tasks的數量,registerChan通道傳遞全部worker的RPC address。局部變量ntasks表示當前階段的tasks數量,如果map階段則爲輸入文件數量,如果reduce階段則爲nReduce參數。
瞭解了大概的運行流程,schedule()中要作的就是開啓多個線程,讀取通道registerChan獲取worker的address。構造DoTaskArgs參數。調用call()方法向worker 發送Worker.DoTask RPC,告訴worker執行task。在測試程序中開啓了兩個worker:worker0和worker1,而registerChan是一個無緩衝的通道,每次通道上只有一個worker address,所以能夠將處理完task的worker的 address再放回registerChan,供下一個線程讀取,以此實現兩個worker併發運行。
一旦全部該階段的tasks成功完成,schedule()就返回,這個功能可使用sync.WaitGroup來實現。 個人實現以下:3d

func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
    var ntasks int
    var n_other int // number of inputs (for reduce) or outputs (for map)
    switch phase {
    case mapPhase:
        ntasks = len(mapFiles)
        n_other = nReduce
    case reducePhase:
        ntasks = nReduce
        n_other = len(mapFiles)
    }

    fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

    var wg sync.WaitGroup
    wg.Add(ntasks)
    for i:=0;i<ntasks;i++{
        //開啓線程併發調用
        go func(taskNum int) {
            //從chan獲取可用的worker
            for w := range registerChan {
                //構造DoTaskArgs參數
                var arg DoTaskArgs
                switch phase {
                case mapPhase:
                    arg = DoTaskArgs{JobName:jobName,File:mapFiles[taskNum],Phase:mapPhase,TaskNumber:taskNum,NumOtherPhase:n_other}
                case reducePhase:
                    arg = DoTaskArgs{JobName:jobName,File:"",Phase:reducePhase,TaskNumber:taskNum,NumOtherPhase:n_other}
                }
                call(w,"Worker.DoTask",arg,nil)
                wg.Done()
                registerChan <- w//將worker address放回registerChan
                break
            }
        }(i)
    }
    wg.Wait()
    return
}
複製代碼

4.測試運行

運行下面命令來測試所編寫的實驗代碼。這將依次執行兩個測試,TestParallelBasic和TestParallelCheck 。

go test -run TestParallel

獲得相似下面結果程序運行成功

相關文章
相關標籤/搜索