@author qcliu
@time 2015/07/29html
MIT6.824 Lab1
總結。java
第一個任務是單機版詞頻統計。入口爲wc.go
的main()
。 統計的過程分爲4個部分:golang
將一個大文件切分紅若干個小文件。 採用逐行掃描的方式。
涉及到segmentfault
通過Spilt過程,kjv12.txt(file size:4834757)
已經被切分紅了5個文件。分佈式
mrtmp.kjv12.txt-0 966967 mrtmp.kjv12.txt-1 966941 mrtmp.kjv12.txt-2 966974 mrtmp.kjv12.txt-3 966970 mrtmp.kjv12.txt-4 966905
以後執行DoMap()操做。spa
for i:=0; i<nMap; i++ { DoMap(i, mr.file, mr.NReduce, Map) }
對每個子文件進行DoMap()。
在DoMap()中,將子文件的內容讀到內存中的bype[]。對bype[]進行Map()操做。
Map()是對傳入的string進行單詞切分,而後創建一個HashMap,key是單詞,value是單詞出現的頻率。這裏能夠採用bit方式記錄詞頻,即1111表明4次。
這裏Go提供了一個很好用的分割字符串的庫。
strings.FieldsFuncunix
獲得子文件中每一個單詞及其出現的頻率後,將這份hashMap以JSON
格式存入nreduce個文件中,這裏nreduce爲3。
對於文件mrtmp.kjv12.txt-0 966967
,會被分爲3個文件code
mrtmp.kjv12.txt-0-0 mrtmp.kjv12.txt-0-1 mrtmp.kjv12.txt-0-2
GoJsonhtm
在進入Reduce階段時,已經存在了nmap*nreduce個文件。內存
mrtmp.kjv12.txt-0-0 mrtmp.kjv12.txt-0-1 mrtmp.kjv12.txt-0-2 mrtmp.kjv12.txt-1-0 mrtmp.kjv12.txt-1-1 mrtmp.kjv12.txt-1-2 mrtmp.kjv12.txt-2-0 mrtmp.kjv12.txt-2-1 mrtmp.kjv12.txt-2-2 mrtmp.kjv12.txt-3-0 mrtmp.kjv12.txt-3-1 mrtmp.kjv12.txt-3-2 mrtmp.kjv12.txt-4-0 mrtmp.kjv12.txt-4-1 mrtmp.kjv12.txt-4-2
在Reduce時,進行nreduce次DoReduce(), 每一次裏面都打開nmap個文件。
在一次DoReduce()中,對nmap個文件創建一個map[string]*list.List
,用於存放nmap個文件中不一樣單詞以及在不一樣文件中出現的頻率,這裏的頻率是用字符串長度表示的。將nmap個文件中單詞出現的頻率進行合計,用JSON格式寫入文件中。nreduce次DoReduce()總共產生nreduce個merge文件。
Merge()時已經存在nreduce個文件,此處nreduce爲3.每一個文件中存放着若干單詞和對應的頻率。
Merge()操做對3個文件的內容再一次進行合計。用一個hashMap存放3個文件中的單詞和對應的頻率。將最終的結果寫入文件
partII是真正的分佈式的MapReduce。對應於上圖,test_test.go是咱們的UserProgram,在setup()中開啓的master,以後開啓了2個Worker。
javafunc TestBasic(t *testing.T) { InitLog() Logger.Printf("Test: Basic mapreduce ...\n") mr := setup() for i := 0; i < 2; i++ { go RunWorker(mr.MasterAddress, port("worker"+strconv. (i)),MapFunc, ReduceFunc, -1) } // Wait until MR is done <-mr.DoneChannel check(t, mr.file) checkWorker(t, mr.stats) cleanup(mr) fmt.Printf(" ... Basic Passed\n") }
master是負責切分文件,給Worker分配任務,合併結果。在開啓Master時,將MapReduce經過rpc暴露給Worker。而後等待Worker註冊。此時Worker能夠調用MapReduce的Register(),從而通知Master有Worker可用。
開啓Worker時,也經過RPC將Worker暴露給Master,以後經過RPC調用Register(),通知Master有Worker可用。等待Master分配任務。
在mapreduce.go中,StarRegistrationServer()負責Master監聽是否有Worker進行RPC調用。
javafunc (mr *MapReduce) StartRegistrationServer() { rpcs := rpc.NewServer() rpcs.Register(mr)//take mr expose to client os.Remove(mr.MasterAddress) // only needed for "unix" l, e := net.Listen("unix", mr.MasterAddress) if e != nil { log.Fatal("RegstrationServer", mr.MasterAddress, " error: ", e) } mr.l = l // now that we are listening on the master address, can fork off // accepting connections to another thread. go func() { for mr.alive { conn, err := mr.l.Accept() Logger.Printf("accepted %T from worker\n", conn) if err == nil { go func() { rpcs.ServeConn(conn) conn.Close() }() } else { DPrintf("RegistrationServer: accept error", err) break } } DPrintf("RegistrationServer: done\n") }() }
其中,mr.l.Accept()會阻塞。若是Worker此時經過RPC調用Register()
javafunc call(srv string, rpcname string, args interface{}, reply interface{}) bool { c, errx := rpc.Dial("unix", srv) if errx != nil { return false } defer c.Close() err := c.Call(rpcname, args, reply) if err == nil { return true } fmt.Println(err) return false }
rpc.Dial("unix", srv)
時,Master會收到這個鏈接請求。conn, err := mr.l.Accept()
的阻塞結束,到rpcs.ServeConn(conn)
時繼續阻塞,等待Worker的調用。c.Call(rpcname, args, reply)
,其中rpcname爲Master的一個方法名,此處是Register。rpcs.ServeConn(conn)
阻塞結束,調用成功。worker failures 可能發生在RPC調用的時候。
javafunc doMap(mr *MapReduce, worker chan string) { done := make(chan int) for i := 0; i<mr.nMap; i++ { go func(i int) { for avil := range worker { args := DoJobArgs_new(mr.file, Map, i, mr.nReduce) var reply DoJobReply ok := call(avil, "Worker.DoJob", args, &reply) if ok { done<-1 worker<-avil break// break the for range }else { fmt.Printf("No aviliable worker for Map %d\n", args.JobNumber) } } }(i) } for i :=0; i<mr.nMap; i++ { <-done } }
因此用for avil := range worker
不停地從channel中取可用的worker來完成響應的工做,只有完成後才break
。
若是call不會發生錯誤,那麼for avil := range worker
徹底能夠改成avil:=<-worker