最近有幸拜讀 Google 分佈式的三大論文,本着好記性不如爛筆頭的原則,談談樓主對分佈式系統開發的一點小小的心得~算法
相信用過 Hadoop 的同窗在等待結果輸出的時候會出現相似於這樣的 INFO : 2020-01-17 11:44:14,132 Stage-11 map = 0%, reduce = 0%
的日誌,它展現了 MapReduce 的執行過程,下面咱們也將就 MapReduce 進行展開,闡述 MapReduce 的執行原理以及根據 Google 的論文實現了 mini 版的 MapReduce。編程
MapReduce is a programming model and an associated implementation for processing and generating large data sets. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key.json
就像 Google 的 MapReduce 論文中所說的, MapReduce 是一個編程模型,也是一個處理和生成超大數據集的算法模型的相關實現。用戶首先建立一個 Map 函數處理一個基於 key/value pair
的數據集合,輸出中間的基於 key/value pair
的數據集合,而後再建立一個 Reduce 函數用來合併全部的具備相同中間 key 值的中間 value 值。緩存
MapReduce編程模型的原理是:利用一個輸入 key/value pair
集合來產生一個輸出的 key/value pair
集合。MapReduce 庫的用戶用兩個函數表達這個計算:Map和Reduce。app
Map : 用戶自定義的 Map 函數接受一個輸入的 key/value pair
值,而後產生一箇中間 key/value pair
值的集合。MapReduce 庫把全部具備相同中間 key 值的中間 value 值集合在一塊兒後傳遞給 Reduce 函數。框架
Reduce : 用戶自定義的 Reduce 函數接受一箇中間 key 的值和相關的一個 value 值的集合。Reduce 函數合併這些 value 值,造成一個較小的 value 值的集合。通常的,每次 Reduce 函數調用只產生 0 或 1 個輸出 value 值。一般咱們經過一個迭代器把中間 value 值提供給 Reduce 函數,這樣咱們就能夠處理沒法所有放入內存中的大量的 value 值的集合。分佈式
master
。副本中其它的程序都是 worker
程序,由 master
分配任務。有 M 個 Map 任務和 R 個 Reduce 任務將被分配,master
將一個 Map 任務或 Reduce 任務分配給一個空閒的 worker
。worker
程序讀取相關的輸入數據片斷,從輸入的數據片斷中解析出 key/value pair
,而後把 key/value pair
傳遞給用戶自定義的 Map 函數,由 Map 函數生成並輸出的中間 key/value pair
,並緩存在內存中。key/value pair
經過分區函數分紅 R 個區域,以後週期性的寫入到本地磁盤上。緩存的 key/value pair
在本地磁盤上的存儲位置將被回傳給 master
,由 master
負責把這些存儲位置再傳送給 Reduce worker
。Reduce worker
程序接收到 master
程序發來的數據存儲位置信息後,使用 RPC 從 Map worker
所在主機的磁盤上讀取這些緩存數據。當 Reduce worker
讀取了全部的中間數據後,經過對 key 進行排序後使得具備相同 key 值的數據聚合在一塊兒。因爲許多不一樣的 key 值會映射到相同的 Reduce 任務上,所以必須進行排序。若是中間數據太大沒法在內存中完成排序,那麼就要在外部進行排序。Reduce worker
程序遍歷排序後的中間數據,對於每個惟一的中間 key 值,Reduce worker
程序將這個key值和它相關的中間 value 值的集合傳遞給用戶自定義的 Reduce 函數。Reduce 函數的輸出被追加到所屬分區的輸出文件。master
喚醒用戶程序。在這個時候,在用戶程序裏的對 MapReduce 調用才返回。在成功完成任務以後,MapReduce 的輸出存放在 R 個輸出文件中(對應每一個 Reduce 任務產生一個輸出文件,文件名由用戶指定)。通常狀況下,用戶不須要將這 R 個輸出文件合併成一個文件,咱們常常把這些文件做爲另一個 MapReduce 的輸入,或者在另一個能夠處理多個分割文件的分佈式應用中使用。函數
MapReduce 的核心就是實現其 Map 與 Reduce 的邏輯代碼,顯示樓主將就在上面描述的 Map 與 Reduce 的執行過程完成對 Map 與 Reduce 的實現。oop
1,下面的 doMap 函數管理一項 map 任務:它讀取輸入文件(inFile),爲該文件的內容調用用戶定義的 map 函數(mapF),而後將 mapF 的輸出分區爲 nReduce 中間文件。大數據
2,每一個 reduce 任務對應一箇中間文件。文件名包括 map 任務編號和 reduce 任務編號。使用由reduceName 函數生成的文件名做爲 reduce 任務的中間文件。在每一個key mod nReduce 上調用 ihash()來選擇對應的 reduce 任務。
3,mapF 是應用程序提供的 map 函數。第一個參數應該是輸入文件名。第二個參數應該是整個輸入文件的內容。 mapF()返回包含用於 reduce 的鍵/值對的切片。
4,下面程序中使用 json 格式將 mapF 處理好的數據寫入文件中,爲了數據處理方便,下面程序中處理好的每條數據都採用換行符進行分割。
func reduceName(jobName string, mapTask int, reduceTask int) string {
return "mrtmp." + jobName + "-" + strconv.Itoa(mapTask) + "-" + strconv.Itoa(reduceTask)
}
func doMap( jobName string, // MapReduce 的任務名稱 mapTask int, // 當前執行的 mapTask inFile string, // 輸入的的文件 nReduce int, // reduceTask 的數量 mapF func(filename string, contents string) []KeyValue, // 用戶自定義的 map 函數 ) {
f, err := os.Open(inFile)
if err != nil {
debug("open file err %v", err)
}
defer f.Close()
dat, err := ioutil.ReadAll(f)
if err != nil {
debug("open map file err %v", err)
}
res := mapF(inFile, string(dat))
for _, kv := range res {
hash := ihash(kv.Key)
r := hash % nReduce
// mrtmp.xxx-0-0
fd, err := os.OpenFile(reduceName(jobName, mapTask, r), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
debug("open mrtmp.xxx file err %v", err)
continue
}
enc := json.NewEncoder(fd)
if err := enc.Encode(&kv); err != nil {
debug("encode json err %v", err)
continue
}
fd.Close()
}
}
func ihash(s string) int {
h := fnv.New32a()
h.Write([]byte(s))
return int(h.Sum32() & 0x7fffffff)
}
複製代碼
doReduce 管理一個 reduce 任務:它讀取任務的中間文件,按 key 對中間文件中的數據對進行排序,爲每一個 key 調用用戶定義的 reduceF 函數,並將 reduceF 的輸出的寫入磁盤。
func reduceName(jobName string, mapTask int, reduceTask int) string {
return "mrtmp." + jobName + "-" + strconv.Itoa(mapTask) + "-" + strconv.Itoa(reduceTask)
}
func doReduce( jobName string, // MapReduce 的任務名稱 reduceTask int, // 當前運行的 reduce 任務的任務號 outFile string, // 結果輸出的文件路徑 nMap int, // map 任務的個數 reduceF func(key string, values []string) string, // 用戶的自定義 reduce 函數 ) {
kvMap := make(map[string][]string)
for i := 0; i < nMap; i++ {
func() {
inFileName := reduceName(jobName, i, reduceTask)
inFile, err := os.Open(inFileName)
if err != nil {
panic("can't open file:" + inFileName)
}
defer inFile.Close()
// Read and Decoder the file
var kv KeyValue
for decoder := json.NewDecoder(inFile); decoder.Decode(&kv) != io.EOF; {
kvMap[kv.Key] = append(kvMap[kv.Key], kv.Value)
}
}()
}
var keys []string
// sort by key
for k := range kvMap {
keys = append(keys, k)
}
sort.Strings(keys)
// reduce
outfd, err := os.Create(outFile)
if err != nil {
panic("can't create file:" + outFile)
}
defer outfd.Close()
enc := json.NewEncoder(outfd)
for _, k := range keys {
reducedValue := reduceF(k, kvMap[k])
enc.Encode(KeyValue{Key: k, Value: reducedValue})
}
}
複製代碼
下面的函數是對 doMap 與 doReduce 進行順序調用,生成 MapReduce 任務的結果輸出到結果文件中。
func Sequential(jobName string, files []string, nreduce int, mapF func(string, string) []KeyValue, reduceF func(string, []string) string, ) (mr *Master) {
mr = newMaster("master")
go mr.run(jobName, files, nreduce, func(phase jobPhase) {
switch phase {
case mapPhase:
for i, f := range mr.files {
doMap(mr.jobName, i, f, mr.nReduce, mapF)
}
case reducePhase:
for i := 0; i < mr.nReduce; i++ {
doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
}
}
}, func() {
mr.stats = []int{len(files) + nreduce}
})
return
}
複製代碼
在上面咱們提到了 MapReduce 在實際應用中的例子,下面咱們將對這兩個例子作一下簡單的實現。
爲了實現詞頻統計這一功能,咱們使用 MapReduce 框架的思路就是實現自定義的 map 與 reduce 函數: 1,map:讀取文檔,將文檔中的單詞逐個提取出來,生成(單詞,1)這樣的鍵值對,而後把數據羅盤,寫入到中間文件中。 2,reduce:讀取中間文件,按照鍵值對進行排序,將 key 相同的數據聚合到一塊兒,統計每一個單詞出現的次數,而後將結果寫入到文件中落盤。
package main
import (
"6.824/src/mapreduce"
"fmt"
"os"
"strconv"
"strings"
"unicode"
)
func mapF(filename string, contents string) (res []mapreduce.KeyValue) {
// Your code here (Part II).
f := func(c rune) bool {
return !unicode.IsLetter(c)
}
words := strings.FieldsFunc(contents, f)
for _, w := range words {
kv := mapreduce.KeyValue{Key: w, Value: "1"}
res = append(res, kv)
}
return res
}
func reduceF(key string, values []string) string {
// Your code here (Part II).
sum := 0
for _, e := range values {
data, err := strconv.Atoi(e)
if err != nil {
fmt.Printf("Reduce err %s%v\n", key, err)
continue
}
sum += data
}
return strconv.Itoa(sum)
}
func main() {
if len(os.Args) < 4 {
fmt.Printf("%s: see usage comments in file\n", os.Args[0])
} else {
var mr *mapreduce.Master
mr = mapreduce.Sequential("wcseq", os.Args[3:], 3, mapF, reduceF)
mr.Wait()
}
}
複製代碼
一樣,在理解了倒排索引的基礎上設計咱們本身的 map 與 reduce 方法, 1,map:將讀取文檔,將文檔中的單詞做爲 key,單詞所在的文檔做爲 value,寫入到中間文件中。 2,reduce:讀取中間文件,按照鍵值對進行排序,將 key 相同的數據聚合到一塊兒,將單詞出現的文件名拼接在一塊兒,寫入到結果文件中。
package main
import (
"bytes"
"os"
"strconv"
"strings"
"unicode"
)
import "fmt"
import "6.824/src/mapreduce"
func mapF(document string, value string) (res []mapreduce.KeyValue) {
// Your code here (Part V).
words := strings.FieldsFunc(value, func(c rune) bool {
return !unicode.IsLetter(c)
})
for _, w := range words {
res = append(res, mapreduce.KeyValue{Key: w, Value: document})
}
return res
}
func reduceF(key string, values []string) string {
// Your code here (Part V).
sum := 0
var buffer bytes.Buffer
if key == "www" {
fmt.Println(values)
}
isExist := make(map[string]string)
for _, e := range values {
if _, ok := isExist[e]; !ok {
buffer.WriteString(e)
buffer.WriteString(",")
sum += 1
isExist[e] = e
}
}
iiRes := strconv.Itoa(sum) + " " + strings.TrimRight(buffer.String(), ",")
return iiRes
}
func main() {
if len(os.Args) < 4 {
fmt.Printf("%s: see usage comments in file\n", os.Args[0])
} else {
var mr *mapreduce.Master
mr = mapreduce.Sequential("iiseq", os.Args[3:], 3, mapF, reduceF)
mr.Wait()
}
}
複製代碼
本文參考 Google 的論文,實現了一個單機版的 MapReduce 框架,並實現了兩個簡單的 MapReduce 實例,文中的代碼能夠在樓主的 GitHub 下載查看。
MapReduce: Simplified Data Processing on Large Clusters