最近抽空開始作6.824 Schedule: Spring 2018的相關練習,一個國外的分佈式系統的相關課題任務,而後從新看了一下古老的 "mapreduce"相關論文論文 mapreduce Simplified Data Processing on Large Clusters,結合以前一些hadoop的相關知識,這裏使用Go實現了mapreduce中的關鍵部分。我想簡單的介紹一下mapreduce,而後解析實現過程,同時結合本身的想法,對mapreduce分佈式離線計算框架有一個簡單的整理,思考。html
mapreduce最開始是2004(全部說"古老")年被Google提出的,大多數人應該是在hadoop普遍普及纔開始接觸。git
簡單來講mapreduce就是離線處理大規模數據的分佈式計算框架,用戶只須要定義一個處理k,v的map函數(該map函數會自動將計算結果保存到臨時文件中)和reduce(處理合並臨時文件中全部key中記錄)函數便可。因此從用戶角度來說是很是簡單的,使用類mapreduce框架就能夠擁有處理海量數據的計算能力,他不須要去關心數據分區分片,任務的多機合理調度,任務失敗重試,計算集羣資源管理等等分佈式系統中會遇到的問題。github
這裏先簡單梳理一下mapreduce的邏輯。golang
上圖能夠簡單看一下官方的流程圖,有幾個流程,從左到右的邏輯是,首先是將數據文件切分,而後分佈到每一個worker執行(至關於分佈式執行map函數),再而後產生中間結果集文件,最後reduce函數聚合全部中間文件的結果集,而後其中須要Master節點去作居中調度,分配map Worker和reduce Worker。apache
下面簡要介紹map, reduce, schedule的實現流程編程
編程模型json
對於開發者的編程模型是 map和reduce,處理一組k,v數據,輸出一組k,v數據, 類mapreduce框架會提供map和reduce函數接口。bash
具體代碼提交放在連接中github-mapreduceapp
Master 任務分配框架
master將任務調度到合適的worker,這裏實際上是整個計算系統的核心模塊,至關因而整個系統的大腦,在這裏其實須要作不少事,有不少細節,也有不少選擇,好比首先很是重要的實現一個master-worker的服務發現過程,讓worker註冊到master, 能讓master調度worker, 而後是須要實現worker-master之間的通訊協議,能夠經過使用dubbo, grpc, thrift,再而後worker的任務調度,這裏能夠增長一個像hadoop yarn, apache mesos的這樣的資源管理器去合理管理調度資源,將覈實的任務放在合適的計算資源上,而後像worker作心跳保活,處理worker的異常調度,最後很是重要的是作master的高可用,這裏不管是在google仍是hadoop感受都不是作得特別好,主要緣由應該一方面master源數據沒有自動存儲,而後也沒有像master controller這樣的管理工具去管理master容錯的軟件吧。
這裏沒有那麼複雜,全部的worker和master都在一個節點,經過socket進行通訊,調用,單進程多協程,至關因而僞分佈式的場景,而後這裏只實現了任務的調度,將每一個doMap, doReduce 任務調度到合適的worker(這裏的worker已經註冊到了registerChan中),而後這裏會有個局部變量allTask知道須要處理的總任務數,而後利用sync.WaitGroup控制攜程函數的執行結束,而後將allTask中的任務經過使用Go func啓動單獨的協程去執行,當出現任務調度失敗,將task從新放入到allTask,最後當等待全部的worker func執行完畢後,退出函數。
func schedule(
jobName string,
mapFiles []string,
nReduce int,
phase jobPhase,
registerChan chan string) {
var ntasks int
var n_other int
//判斷map reduce函數類型
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
fmt.Printf("Schedule: %v done\n", phase)
var wg sync.WaitGroup
var allTask = make(chan int)
// 加載task, 等待全部task完成
go func() {
for i := 0; i < ntasks; i++ {
wg.Add(1)
allTask <- i
}
wg.Wait()
close(allTask)
}()
// 調度全部的task
for i := range allTask {
// 獲取合適的worker
worker := <- registerChan
go func(worker string, i int) {
file := ""
if phase == mapPhase {
file = mapFiles[i]
}
doTaskArgs := DoTaskArgs{
JobName: jobName,
File : file,
Phase : phase,
TaskNumber: i,
NumOtherPhase: n_other,
}
// 執行worker
if call(worker, "Worker.DoTask", &doTaskArgs, nil ) {
wg.Done()
// 執行成功從新放回worker
registerChan <- worker
} else {
// 放回執行失敗的task
allTask <- i
}
}(worker, i)
}
fmt.Printf("Schedule: %v done\n", phase)
}
複製代碼
doMap實現邏輯其實比較簡單,就是使用ioutil.ReadFile將就本worker的數據讀出來(會有一個文件切片功能,這裏沒有介紹,就是將須要計算的數據經過分片的方式傳遞給worker, 在hadoop和gfs中默認是64M,這和分佈式文件系統hdfs, gfs的具體內部實現邏輯有關係),而後執行用戶定義的map func函數,將執行後的結果保存到中間文件中tmpFiles中, 這裏保存的格式是json, 注意這裏的環境假設是,全部worker都一個文件系統中,實際處理狀況會在gfs,hdfs中。
func doMap(jobName string,
mapTask int,
inFile string,
nReduce int,
mapF func(filename string, contents string) []KeyValue,
)
// 1 讀取合適的文件,執行用戶定義的map函數
body, err := ioutil.ReadFile(inFile)
if err != nil {
fmt.Errorf("doMap read file err %s", err)
return
}
// 2 執行用戶定義的map函數
resultKv := mapF(inFile ,string(body))
if len(resultKv) == 0 {
fmt.Println("doMap not need to create tmp file")
return
}
tmpFiles := make([]*os.File, nReduce)
tmpFileEcoder := make([]*json.Encoder, nReduce)
// 3 預生成須要的中間結果文件,獲取文件指針
for i:= 0; i < nReduce; i++ {
reduceTmpFileName := reduceName(jobName, mapTask, i)
fmt.Println(reduceTmpFileName)
stat, err := os.Stat(reduceTmpFileName)
if err != nil {
if os.IsExist(err) {
os.Remove(reduceTmpFileName)
}
}
if stat != nil {
if stat.IsDir() {
os.RemoveAll(reduceTmpFileName)
}
}
reduceFile, err := os.Create(reduceTmpFileName)
if err != nil {
fmt.Errorf("doMap create tmp %s file err %s ", reduceTmpFileName, err)
return
}
//tmpFiles := append(tmpFiles, reduceFile)
tmpFiles[i] = reduceFile
enc := json.NewEncoder(reduceFile)
//tmpFileEcoder = append(tmpFileEcoder, enc)
tmpFileEcoder[i] = enc
}
// 4 將map函數執行的resultKv,放入到中間文件中
for _, kv := range resultKv {
// 經過hash定位到 k 和對應的v 保存的合適的數據文件中
hashIndex := ihash(kv.Key) % nReduce
if tmpFileEcoder[hashIndex] == nil {
fmt.Errorf("doMap tmpFile ecoder index err")
continue
}
err := tmpFileEcoder[hashIndex].Encode(&kv)
if err != nil {
fmt.Errorf("doMap write tmp file %s err %s", tmpFiles[hashIndex].Name(), err)
}
}
// 5 關閉文件描述符
for _, tmpFile := range tmpFiles {
tmpFile.Close()
}
}
複製代碼
doReduce邏輯也不是太難,就是將當全部map執行結束後,執行reduce函數(這裏會用map函數所生成的中間文件),至關因而合併排序結果集,而後輸出到輸出文件。
func doReduce(
jobName string,
MapReduce job,
reduceTask int,
outFile string,
nMap int,
reduceF func(key string, values []string) string,
) {
resultMap := make(map[string][]string)
var keys []string
// 1 讀取全部的中間文件
for i:= 0; i < nMap; i++{
reduceTmpFileName := reduceName(jobName, i, reduceTask)
reduceTmpfile, err := os.Open(reduceTmpFileName)
if err != nil {
fmt.Errorf("doReduce read tmp file error %s", reduceTmpFileName)
continue
}
var kv KeyValue
// 2 解析每一個臨時文件
decode := json.NewDecoder(reduceTmpfile)
err = decode.Decode(&kv)
for err == nil {
if _, ok := resultMap[kv.Key]; !ok {
keys = append(keys, kv.Key)
}
resultMap[kv.Key] = append(resultMap[kv.Key], kv.Value)
err = decode.Decode(&kv)
}
// 3 排序 key
sort.Strings(keys)
out, err := os.Create(outFile)
if err != nil {
fmt.Errorf("doReduce create output file %s failed", err)
return
}
enc := json.NewEncoder(out)
// 4 輸出全部的結果到reduce文件中
for _, key := range keys {
if err = enc.Encode(KeyValue{key, reduceF(key, resultMap[key])}); err != nil {
fmt.Errorf("write key: %s to file %s failed", key, outFile)
}
}
out.Close()
}
}
複製代碼
感受其實幾年前就進入雲計算大數據的時代,而後進入移動互諒網,而後最近兩年新冒出來的,物聯網,新零售的概念,這意味着終端設備愈來愈多,同時也會產生愈來愈多的數據,咱們在數據中找到有價值的信息會愈來愈難,因此感受數據處理,數據分析的比重會是在互聯網中會愈來愈高。
那麼對應到技術上,可能之前最開始想到數據處理框架就是mapreduce, 如今可能不少人會以爲mapreduce這種框架略顯老式, 我其實以爲仍是使用場景的問題,像帶有離線計算,批處理,離線存儲這樣需求,hadoop體系仍是能夠繼續勝任,而後對於實時性要求更高的應用場景,會採用流式計算技術,像storm, yahoo s4等方案,如今開源社區也對於數據處理已經也孵化出了更多程序的框架,spark, flink的程序體系等,對於咱們來說在技術選項上也能有更多方案。
其實只實現了mapreduce比較簡單的功能,有一些邏輯沒有作介紹,具體能夠看看上邊鏈接中github的代碼,也能夠嘗試作作6.824 Schedule: Spring 2018相關練習,會對理解分佈式系統有更多的幫助, 有了一個基礎的框架結構,能夠往上邊增長不少功能,實現一個標準的mapreduce庫吧。