用go實現"古老的"mapreduce

最近抽空開始作6.824 Schedule: Spring 2018的相關練習,一個國外的分佈式系統的相關課題任務,而後從新看了一下古老的 "mapreduce"相關論文論文 mapreduce Simplified Data Processing on Large Clusters,結合以前一些hadoop的相關知識,這裏使用Go實現了mapreduce中的關鍵部分。我想簡單的介紹一下mapreduce,而後解析實現過程,同時結合本身的想法,對mapreduce分佈式離線計算框架有一個簡單的整理,思考。html

簡要介紹

mapreduce最開始是2004(全部說"古老")年被Google提出的,大多數人應該是在hadoop普遍普及纔開始接觸。git

簡單來講mapreduce就是離線處理大規模數據的分佈式計算框架,用戶只須要定義一個處理k,v的map函數(該map函數會自動將計算結果保存到臨時文件中)和reduce(處理合並臨時文件中全部key中記錄)函數便可。因此從用戶角度來說是很是簡單的,使用類mapreduce框架就能夠擁有處理海量數據的計算能力,他不須要去關心數據分區分片,任務的多機合理調度,任務失敗重試,計算集羣資源管理等等分佈式系統中會遇到的問題。github

這裏先簡單梳理一下mapreduce的邏輯。golang


上圖能夠簡單看一下官方的流程圖,有幾個流程,從左到右的邏輯是,首先是將數據文件切分,而後分佈到每一個worker執行(至關於分佈式執行map函數),再而後產生中間結果集文件,最後reduce函數聚合全部中間文件的結果集,而後其中須要Master節點去作居中調度,分配map Worker和reduce Worker。apache

下面簡要介紹map, reduce, schedule的實現流程編程


編程模型json

對於開發者的編程模型是 map和reduce,處理一組k,v數據,輸出一組k,v數據, 類mapreduce框架會提供map和reduce函數接口。bash

關鍵實現邏輯

具體代碼提交放在連接中github-mapreduceapp

Master 任務分配框架

master將任務調度到合適的worker,這裏實際上是整個計算系統的核心模塊,至關因而整個系統的大腦,在這裏其實須要作不少事,有不少細節,也有不少選擇,好比首先很是重要的實現一個master-worker的服務發現過程,讓worker註冊到master, 能讓master調度worker, 而後是須要實現worker-master之間的通訊協議,能夠經過使用dubbo, grpc, thrift,再而後worker的任務調度,這裏能夠增長一個像hadoop yarn, apache mesos的這樣的資源管理器去合理管理調度資源,將覈實的任務放在合適的計算資源上,而後像worker作心跳保活,處理worker的異常調度,最後很是重要的是作master的高可用,這裏不管是在google仍是hadoop感受都不是作得特別好,主要緣由應該一方面master源數據沒有自動存儲,而後也沒有像master controller這樣的管理工具去管理master容錯的軟件吧。

這裏沒有那麼複雜,全部的worker和master都在一個節點,經過socket進行通訊,調用,單進程多協程,至關因而僞分佈式的場景,而後這裏只實現了任務的調度,將每一個doMap, doReduce 任務調度到合適的worker(這裏的worker已經註冊到了registerChan中),而後這裏會有個局部變量allTask知道須要處理的總任務數,而後利用sync.WaitGroup控制攜程函數的執行結束,而後將allTask中的任務經過使用Go func啓動單獨的協程去執行,當出現任務調度失敗,將task從新放入到allTask,最後當等待全部的worker func執行完畢後,退出函數。

func schedule(
            jobName string, 
            mapFiles []string, 
            nReduce int, 
            phase jobPhase, 
            registerChan chan string) {

    var ntasks int
    var n_other int 

    //判斷map reduce函數類型

    switch phase {
    case mapPhase:
        ntasks = len(mapFiles)
        n_other = nReduce
    case reducePhase:
        ntasks = nReduce
        n_other = len(mapFiles)
    }


    fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

    fmt.Printf("Schedule: %v done\n", phase)

    var wg sync.WaitGroup
    var allTask = make(chan int)

     // 加載task, 等待全部task完成
    go func() {

        for i := 0; i < ntasks; i++ {
            wg.Add(1)
            allTask <- i
        }

        wg.Wait()
        close(allTask)

    }()

     // 調度全部的task

     for i := range allTask {

        // 獲取合適的worker
        worker := <- registerChan

        go func(worker string, i int) {

            file := ""

            if phase == mapPhase {

                file = mapFiles[i]
            }
            doTaskArgs := DoTaskArgs{

                JobName: jobName,
                File :  file,
                Phase : phase,
                TaskNumber:  i,
                NumOtherPhase: n_other,
            }

            // 執行worker
            if call(worker, "Worker.DoTask", &doTaskArgs, nil ) {
                wg.Done()

                // 執行成功從新放回worker

                registerChan <- worker

            } else {

                    // 放回執行失敗的task

                allTask <- i
            }

        }(worker, i)

    }


    fmt.Printf("Schedule: %v done\n", phase)

}
複製代碼

Domap實現:

doMap實現邏輯其實比較簡單,就是使用ioutil.ReadFile將就本worker的數據讀出來(會有一個文件切片功能,這裏沒有介紹,就是將須要計算的數據經過分片的方式傳遞給worker, 在hadoop和gfs中默認是64M,這和分佈式文件系統hdfs, gfs的具體內部實現邏輯有關係),而後執行用戶定義的map func函數,將執行後的結果保存到中間文件中tmpFiles中, 這裏保存的格式是json, 注意這裏的環境假設是,全部worker都一個文件系統中,實際處理狀況會在gfs,hdfs中。

func doMap(jobName string,  
    mapTask int, 
    inFile string,
    nReduce int, 
    mapF func(filename string, contents string) []KeyValue,
)

    // 1 讀取合適的文件,執行用戶定義的map函數

    body, err := ioutil.ReadFile(inFile)
    if err != nil {
        fmt.Errorf("doMap read file err %s", err)
        return
    }

    // 2 執行用戶定義的map函數 

    resultKv := mapF(inFile ,string(body))

    if len(resultKv) == 0 {
        fmt.Println("doMap not need to create tmp file")
        return
    }
    tmpFiles := make([]*os.File, nReduce)
    tmpFileEcoder := make([]*json.Encoder, nReduce)

    // 3 預生成須要的中間結果文件,獲取文件指針

    for i:= 0; i < nReduce; i++ {
        reduceTmpFileName := reduceName(jobName, mapTask, i)
        fmt.Println(reduceTmpFileName)
        stat, err := os.Stat(reduceTmpFileName)
        if err != nil {
            if os.IsExist(err) {
                os.Remove(reduceTmpFileName)
            }
        }
        if stat != nil {
            if stat.IsDir() {
                os.RemoveAll(reduceTmpFileName)
            }
        }
        reduceFile, err := os.Create(reduceTmpFileName)
        if err != nil {
            fmt.Errorf("doMap create tmp %s file err %s ", reduceTmpFileName, err)
            return
        }
        //tmpFiles := append(tmpFiles, reduceFile)
        tmpFiles[i] = reduceFile
        enc := json.NewEncoder(reduceFile)
        //tmpFileEcoder = append(tmpFileEcoder, enc)
        tmpFileEcoder[i] = enc

    }


    // 4 將map函數執行的resultKv,放入到中間文件中

    for _, kv := range resultKv {


        // 經過hash定位到 k 和對應的v 保存的合適的數據文件中

        hashIndex := ihash(kv.Key) % nReduce
        if tmpFileEcoder[hashIndex] == nil {
            fmt.Errorf("doMap tmpFile ecoder index err")
            continue
        }

        err := tmpFileEcoder[hashIndex].Encode(&kv)
        if err != nil {
            fmt.Errorf("doMap write tmp file %s err %s", tmpFiles[hashIndex].Name(), err)
        }
    }

    // 5 關閉文件描述符

    for _, tmpFile := range tmpFiles {
        tmpFile.Close()
    }
}
複製代碼

DoReduce實現:

doReduce邏輯也不是太難,就是將當全部map執行結束後,執行reduce函數(這裏會用map函數所生成的中間文件),至關因而合併排序結果集,而後輸出到輸出文件。

func doReduce(
    jobName string, 
    MapReduce job,
    reduceTask int, 
    outFile string, 
    nMap int, 
    reduceF func(key string, values []string) string,
) {


    resultMap := make(map[string][]string)

    var keys []string

    // 1 讀取全部的中間文件

    for i:= 0; i < nMap; i++{

        reduceTmpFileName := reduceName(jobName, i, reduceTask)
        reduceTmpfile, err := os.Open(reduceTmpFileName)
        if err != nil {
            fmt.Errorf("doReduce read tmp file error %s", reduceTmpFileName)
            continue
        }

        var kv KeyValue

        // 2 解析每一個臨時文件
        decode := json.NewDecoder(reduceTmpfile)
        err = decode.Decode(&kv)

        for err == nil {
            if _, ok := resultMap[kv.Key]; !ok {
                keys = append(keys, kv.Key)
            }
            resultMap[kv.Key] = append(resultMap[kv.Key], kv.Value)
            err = decode.Decode(&kv)
        }

        // 3 排序 key

        sort.Strings(keys)

        out, err := os.Create(outFile)
        if err != nil {
            fmt.Errorf("doReduce create output file %s failed", err)
            return
        }
        enc := json.NewEncoder(out)

        // 4 輸出全部的結果到reduce文件中

        for _, key := range keys {
            if err = enc.Encode(KeyValue{key, reduceF(key, resultMap[key])}); err != nil {
                fmt.Errorf("write key: %s to file %s failed", key, outFile)
            }
        }
        out.Close()
    }


}
複製代碼

想法:

感受其實幾年前就進入雲計算大數據的時代,而後進入移動互諒網,而後最近兩年新冒出來的,物聯網,新零售的概念,這意味着終端設備愈來愈多,同時也會產生愈來愈多的數據,咱們在數據中找到有價值的信息會愈來愈難,因此感受數據處理,數據分析的比重會是在互聯網中會愈來愈高。

那麼對應到技術上,可能之前最開始想到數據處理框架就是mapreduce, 如今可能不少人會以爲mapreduce這種框架略顯老式, 我其實以爲仍是使用場景的問題,像帶有離線計算,批處理,離線存儲這樣需求,hadoop體系仍是能夠繼續勝任,而後對於實時性要求更高的應用場景,會採用流式計算技術,像storm, yahoo s4等方案,如今開源社區也對於數據處理已經也孵化出了更多程序的框架,spark, flink的程序體系等,對於咱們來說在技術選項上也能有更多方案。


最後:

其實只實現了mapreduce比較簡單的功能,有一些邏輯沒有作介紹,具體能夠看看上邊鏈接中github的代碼,也能夠嘗試作作6.824 Schedule: Spring 2018相關練習,會對理解分佈式系統有更多的幫助, 有了一個基礎的框架結構,能夠往上邊增長不少功能,實現一個標準的mapreduce庫吧。



相關文章
相關標籤/搜索