MapReduce

MapReduce

@author qcliu

@time 2015/07/29html

Abstract

MIT6.824 Lab1
總結。java

Part I: Word count

第一個任務是單機版詞頻統計。入口爲wc.gomain()。 統計的過程分爲4個部分:golang

  • Split
  • Map
  • Reduce
  • Merge

Split

將一個大文件切分紅若干個小文件。 採用逐行掃描的方式。
涉及到segmentfault

Map

通過Spilt過程,kjv12.txt(file size:4834757)已經被切分紅了5個文件。分佈式

mrtmp.kjv12.txt-0 966967
mrtmp.kjv12.txt-1 966941
mrtmp.kjv12.txt-2 966974
mrtmp.kjv12.txt-3 966970
mrtmp.kjv12.txt-4 966905

以後執行DoMap()操做。spa

for i:=0; i<nMap; i++ {
    DoMap(i, mr.file, mr.NReduce, Map)
}

對每個子文件進行DoMap()。
在DoMap()中,將子文件的內容讀到內存中的bype[]。對bype[]進行Map()操做。

Map()是對傳入的string進行單詞切分,而後創建一個HashMap,key是單詞,value是單詞出現的頻率。這裏能夠採用bit方式記錄詞頻,即1111表明4次。

這裏Go提供了一個很好用的分割字符串的庫。
strings.FieldsFuncunix

獲得子文件中每一個單詞及其出現的頻率後,將這份hashMap以JSON格式存入nreduce個文件中,這裏nreduce爲3。

對於文件mrtmp.kjv12.txt-0 966967,會被分爲3個文件code

mrtmp.kjv12.txt-0-0
mrtmp.kjv12.txt-0-1
mrtmp.kjv12.txt-0-2

GoJsonhtm

Reduce

在進入Reduce階段時,已經存在了nmap*nreduce個文件。內存

mrtmp.kjv12.txt-0-0
mrtmp.kjv12.txt-0-1
mrtmp.kjv12.txt-0-2
mrtmp.kjv12.txt-1-0
mrtmp.kjv12.txt-1-1
mrtmp.kjv12.txt-1-2
mrtmp.kjv12.txt-2-0
mrtmp.kjv12.txt-2-1
mrtmp.kjv12.txt-2-2
mrtmp.kjv12.txt-3-0
mrtmp.kjv12.txt-3-1
mrtmp.kjv12.txt-3-2
mrtmp.kjv12.txt-4-0
mrtmp.kjv12.txt-4-1
mrtmp.kjv12.txt-4-2

在Reduce時,進行nreduce次DoReduce(), 每一次裏面都打開nmap個文件。

在一次DoReduce()中,對nmap個文件創建一個map[string]*list.List,用於存放nmap個文件中不一樣單詞以及在不一樣文件中出現的頻率,這裏的頻率是用字符串長度表示的。將nmap個文件中單詞出現的頻率進行合計,用JSON格式寫入文件中。nreduce次DoReduce()總共產生nreduce個merge文件。

Merge

Merge()時已經存在nreduce個文件,此處nreduce爲3.每一個文件中存放着若干單詞和對應的頻率。

Merge()操做對3個文件的內容再一次進行合計。用一個hashMap存放3個文件中的單詞和對應的頻率。將最終的結果寫入文件

Part II: MapReduce

figure1
partII是真正的分佈式的MapReduce。對應於上圖,test_test.go是咱們的UserProgram,在setup()中開啓的master,以後開啓了2個Worker。

javafunc TestBasic(t *testing.T) {
    InitLog()
    Logger.Printf("Test: Basic mapreduce ...\n")
    mr := setup()
    for i := 0; i < 2; i++ {
        go RunWorker(mr.MasterAddress, port("worker"+strconv.
        (i)),MapFunc, ReduceFunc, -1)
    }
    // Wait until MR is done
    <-mr.DoneChannel
    check(t, mr.file)
    checkWorker(t, mr.stats)
    cleanup(mr)
    fmt.Printf("  ... Basic Passed\n")
}

Master

master是負責切分文件,給Worker分配任務,合併結果。在開啓Master時,將MapReduce經過rpc暴露給Worker。而後等待Worker註冊。此時Worker能夠調用MapReduce的Register(),從而通知Master有Worker可用。

Worker

開啓Worker時,也經過RPC將Worker暴露給Master,以後經過RPC調用Register(),通知Master有Worker可用。等待Master分配任務。

Example

在mapreduce.go中,StarRegistrationServer()負責Master監聽是否有Worker進行RPC調用。

javafunc (mr *MapReduce) StartRegistrationServer() {
    rpcs := rpc.NewServer()
    rpcs.Register(mr)//take mr expose to client
    os.Remove(mr.MasterAddress) // only needed for "unix"
    l, e := net.Listen("unix", mr.MasterAddress)
    if e != nil {
        log.Fatal("RegstrationServer", mr.MasterAddress, " error: ", e)
    }
    mr.l = l
    // now that we are listening on the master address, can fork off
    // accepting connections to another thread.
    go func() {
        for mr.alive {
            conn, err := mr.l.Accept()
            Logger.Printf("accepted %T from worker\n", conn)
            if err == nil {
                go func() {
                    rpcs.ServeConn(conn)
                    conn.Close()
                }()
            } else {
                DPrintf("RegistrationServer: accept error", err)
                break
            }
        }
        DPrintf("RegistrationServer: done\n")
    }()
}

其中,mr.l.Accept()會阻塞。若是Worker此時經過RPC調用Register()

javafunc call(srv string, rpcname string, args interface{}, reply interface{}) bool {
    c, errx := rpc.Dial("unix", srv)
    if errx != nil {
        return false
    }
    defer c.Close()
    err := c.Call(rpcname, args, reply)
    if err == nil {
        return true
    }
    fmt.Println(err)
    return false
}
  1. srv爲Master的地址。當Worker執行rpc.Dial("unix", srv)時,Master會收到這個鏈接請求。
  2. 因而conn, err := mr.l.Accept()的阻塞結束,到rpcs.ServeConn(conn)時繼續阻塞,等待Worker的調用。
  3. 當Worker執行c.Call(rpcname, args, reply),其中rpcname爲Master的一個方法名,此處是Register。
  4. 此時Master的rpcs.ServeConn(conn)阻塞結束,調用成功。

Part III: Handling worker failures

worker failures 可能發生在RPC調用的時候。

javafunc doMap(mr *MapReduce, worker chan string) {
    done := make(chan int)
    for i := 0; i<mr.nMap; i++ {
        go func(i int) {
            for avil := range worker {
                args := DoJobArgs_new(mr.file, Map, i, mr.nReduce)
                var reply DoJobReply
                ok := call(avil, "Worker.DoJob", args, &reply)
                if ok {
                    done<-1
                    worker<-avil
                    break// break the for range
                }else {
                    fmt.Printf("No aviliable worker for Map %d\n", args.JobNumber)
                }
            }
        }(i)
    }
    for i :=0; i<mr.nMap; i++ {
        <-done
    }
}

因此用for avil := range worker不停地從channel中取可用的worker來完成響應的工做,只有完成後才break

若是call不會發生錯誤,那麼for avil := range worker徹底能夠改成avil:=<-worker

相關文章
相關標籤/搜索