MapReduce 實現的簡單實現

最近有幸拜讀 Google 分佈式的三大論文,本着好記性不如爛筆頭的原則,談談樓主對分佈式系統開發的一點小小的心得~算法

相信用過 Hadoop 的同窗在等待結果輸出的時候會出現相似於這樣的 INFO : 2020-01-17 11:44:14,132 Stage-11 map = 0%, reduce = 0% 的日誌,它展現了 MapReduce 的執行過程,下面咱們也將就 MapReduce 進行展開,闡述 MapReduce 的執行原理以及根據 Google 的論文實現了 mini 版的 MapReduce。編程

什麼是 MapReduce

MapReduce is a programming model and an associated implementation for processing and generating large data sets. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key.json

就像 Google 的 MapReduce 論文中所說的, MapReduce 是一個編程模型,也是一個處理和生成超大數據集的算法模型的相關實現。用戶首先建立一個 Map 函數處理一個基於 key/value pair 的數據集合,輸出中間的基於 key/value pair 的數據集合,而後再建立一個 Reduce 函數用來合併全部的具備相同中間 key 值的中間 value 值。緩存

MapReduce 的例子

Map 與 Reduce 的原理

MapReduce編程模型的原理是:利用一個輸入 key/value pair 集合來產生一個輸出的 key/value pair 集合。MapReduce 庫的用戶用兩個函數表達這個計算:Map和Reduce。app

Map : 用戶自定義的 Map 函數接受一個輸入的 key/value pair 值,而後產生一箇中間 key/value pair 值的集合。MapReduce 庫把全部具備相同中間 key 值的中間 value 值集合在一塊兒後傳遞給 Reduce 函數。框架

Reduce : 用戶自定義的 Reduce 函數接受一箇中間 key 的值和相關的一個 value 值的集合。Reduce 函數合併這些 value 值,造成一個較小的 value 值的集合。通常的,每次 Reduce 函數調用只產生 0 或 1 個輸出 value 值。一般咱們經過一個迭代器把中間 value 值提供給 Reduce 函數,這樣咱們就能夠處理沒法所有放入內存中的大量的 value 值的集合。分佈式

Map 與 Reduce 的應用例子

  • 計算文檔中每一個單詞出現的次數:Map 函數處理文檔,對文檔中的單詞進行拆分,而後輸出(單詞,1)。Reduce 函數把相同單詞的 value 值都累加起來,產生(單詞,記錄總數)結果。
  • 倒排索引:Map 函數分析每一個文檔輸出一個(詞,文檔號)的列表,Reduce函數的輸入是一個給定詞的全部(詞,文檔號),排序全部的文檔號,輸出(詞,list(文檔號))。全部的輸出集合造成一個簡單的倒排索引,它以一種簡單的算法跟蹤詞在文檔中的位置。

MapReduce 執行過程圖解

Execution overview
上圖中展現了咱們的 MapReduce 實現中執行的所有流程。當用戶調用MapReduce函數時,將發生下面的一系列動做(下面的序號和上圖中的序號一一對應):

  1. 用戶程序首先調用的 MapReduce 庫將輸入文件分紅 M 個數據片度,每一個數據片斷的大小通常從 16MB 到 64MB (能夠經過可選的參數來控制每一個數據片斷的大小)。而後用戶程序在機羣中建立大量的程序副本。
  2. 這些程序副本中的有一個特殊的程序 master 。副本中其它的程序都是 worker 程序,由 master 分配任務。有 M 個 Map 任務和 R 個 Reduce 任務將被分配,master 將一個 Map 任務或 Reduce 任務分配給一個空閒的 worker
  3. 被分配了 Map 任務的 worker 程序讀取相關的輸入數據片斷,從輸入的數據片斷中解析出 key/value pair ,而後把 key/value pair 傳遞給用戶自定義的 Map 函數,由 Map 函數生成並輸出的中間 key/value pair,並緩存在內存中。
  4. 緩存中的key/value pair 經過分區函數分紅 R 個區域,以後週期性的寫入到本地磁盤上。緩存的 key/value pair 在本地磁盤上的存儲位置將被回傳給 master,由 master 負責把這些存儲位置再傳送給 Reduce worker
  5. Reduce worker 程序接收到 master 程序發來的數據存儲位置信息後,使用 RPC 從 Map worker 所在主機的磁盤上讀取這些緩存數據。當 Reduce worker 讀取了全部的中間數據後,經過對 key 進行排序後使得具備相同 key 值的數據聚合在一塊兒。因爲許多不一樣的 key 值會映射到相同的 Reduce 任務上,所以必須進行排序。若是中間數據太大沒法在內存中完成排序,那麼就要在外部進行排序。
  6. Reduce worker 程序遍歷排序後的中間數據,對於每個惟一的中間 key 值,Reduce worker 程序將這個key值和它相關的中間 value 值的集合傳遞給用戶自定義的 Reduce 函數。Reduce 函數的輸出被追加到所屬分區的輸出文件。
  7. 當全部的 Map 和 Reduce 任務都完成以後,master 喚醒用戶程序。在這個時候,在用戶程序裏的對 MapReduce 調用才返回。

在成功完成任務以後,MapReduce 的輸出存放在 R 個輸出文件中(對應每一個 Reduce 任務產生一個輸出文件,文件名由用戶指定)。通常狀況下,用戶不須要將這 R 個輸出文件合併成一個文件,咱們常常把這些文件做爲另一個 MapReduce 的輸入,或者在另一個能夠處理多個分割文件的分佈式應用中使用。函數

MapReduce 程序的實現

MapReduce 的核心就是實現其 Map 與 Reduce 的邏輯代碼,顯示樓主將就在上面描述的 Map 與 Reduce 的執行過程完成對 Map 與 Reduce 的實現。oop

實現 Map

1,下面的 doMap 函數管理一項 map 任務:它讀取輸入文件(inFile),爲該文件的內容調用用戶定義的 map 函數(mapF),而後將 mapF 的輸出分區爲 nReduce 中間文件。大數據

2,每一個 reduce 任務對應一箇中間文件。文件名包括 map 任務編號和 reduce 任務編號。使用由reduceName 函數生成的文件名做爲 reduce 任務的中間文件。在每一個key mod nReduce 上調用 ihash()來選擇對應的 reduce 任務。

3,mapF 是應用程序提供的 map 函數。第一個參數應該是輸入文件名。第二個參數應該是整個輸入文件的內容。 mapF()返回包含用於 reduce 的鍵/值對的切片。

4,下面程序中使用 json 格式將 mapF 處理好的數據寫入文件中,爲了數據處理方便,下面程序中處理好的每條數據都採用換行符進行分割。

func reduceName(jobName string, mapTask int, reduceTask int) string {
	return "mrtmp." + jobName + "-" + strconv.Itoa(mapTask) + "-" + strconv.Itoa(reduceTask)
}

func doMap( jobName string, // MapReduce 的任務名稱 mapTask int, // 當前執行的 mapTask inFile string, // 輸入的的文件 nReduce int, // reduceTask 的數量 mapF func(filename string, contents string) []KeyValue, // 用戶自定義的 map 函數 ) {
	f, err := os.Open(inFile)
	if err != nil {
		debug("open file err %v", err)
	}

	defer f.Close()

	dat, err := ioutil.ReadAll(f)
	if err != nil {
		debug("open map file err %v", err)
	}

	res := mapF(inFile, string(dat))

	for _, kv := range res {

		hash := ihash(kv.Key)
		r := hash % nReduce
		// mrtmp.xxx-0-0
		fd, err := os.OpenFile(reduceName(jobName, mapTask, r), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
		if err != nil {
			debug("open mrtmp.xxx file err %v", err)
			continue
		}

		enc := json.NewEncoder(fd)
		if err := enc.Encode(&kv); err != nil {
			debug("encode json err %v", err)
			continue
		}
		fd.Close()
	}
}

func ihash(s string) int {
	h := fnv.New32a()
	h.Write([]byte(s))
	return int(h.Sum32() & 0x7fffffff)
}

複製代碼

實現 Reduce

doReduce 管理一個 reduce 任務:它讀取任務的中間文件,按 key 對中間文件中的數據對進行排序,爲每一個 key 調用用戶定義的 reduceF 函數,並將 reduceF 的輸出的寫入磁盤。

func reduceName(jobName string, mapTask int, reduceTask int) string {
	return "mrtmp." + jobName + "-" + strconv.Itoa(mapTask) + "-" + strconv.Itoa(reduceTask)
}

func doReduce( jobName string, // MapReduce 的任務名稱 reduceTask int, // 當前運行的 reduce 任務的任務號 outFile string, // 結果輸出的文件路徑 nMap int, // map 任務的個數 reduceF func(key string, values []string) string, // 用戶的自定義 reduce 函數 ) {
	kvMap := make(map[string][]string)
	for i := 0; i < nMap; i++ {
		func() {
			inFileName := reduceName(jobName, i, reduceTask)
			inFile, err := os.Open(inFileName)
			if err != nil {
				panic("can't open file:" + inFileName)
			}
			defer inFile.Close()

			// Read and Decoder the file
			var kv KeyValue
			for decoder := json.NewDecoder(inFile); decoder.Decode(&kv) != io.EOF; {
				kvMap[kv.Key] = append(kvMap[kv.Key], kv.Value)
			}

		}()
	}
	var keys []string

	// sort by key
	for k := range kvMap {
		keys = append(keys, k)
	}
	sort.Strings(keys)

	// reduce
	outfd, err := os.Create(outFile)
	if err != nil {
		panic("can't create file:" + outFile)
	}
	defer outfd.Close()
	enc := json.NewEncoder(outfd)
	for _, k := range keys {
		reducedValue := reduceF(k, kvMap[k])
		enc.Encode(KeyValue{Key: k, Value: reducedValue})
	}
}
複製代碼

對 doMap 與 doReduce 的封裝

下面的函數是對 doMap 與 doReduce 進行順序調用,生成 MapReduce 任務的結果輸出到結果文件中。

func Sequential(jobName string, files []string, nreduce int, mapF func(string, string) []KeyValue, reduceF func(string, []string) string, ) (mr *Master) {
	mr = newMaster("master")
	go mr.run(jobName, files, nreduce, func(phase jobPhase) {
		switch phase {
		case mapPhase:
			for i, f := range mr.files {
				doMap(mr.jobName, i, f, mr.nReduce, mapF)
			}
		case reducePhase:
			for i := 0; i < mr.nReduce; i++ {
				doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
			}
		}
	}, func() {
		mr.stats = []int{len(files) + nreduce}
	})
	return
}
複製代碼

使用 MapReduce 實現詞頻統計和倒排索引

在上面咱們提到了 MapReduce 在實際應用中的例子,下面咱們將對這兩個例子作一下簡單的實現。

實現詞頻統計

爲了實現詞頻統計這一功能,咱們使用 MapReduce 框架的思路就是實現自定義的 map 與 reduce 函數: 1,map:讀取文檔,將文檔中的單詞逐個提取出來,生成(單詞,1)這樣的鍵值對,而後把數據羅盤,寫入到中間文件中。 2,reduce:讀取中間文件,按照鍵值對進行排序,將 key 相同的數據聚合到一塊兒,統計每一個單詞出現的次數,而後將結果寫入到文件中落盤。

package main

import (
	"6.824/src/mapreduce"
	"fmt"
	"os"
	"strconv"
	"strings"
	"unicode"
)

func mapF(filename string, contents string) (res []mapreduce.KeyValue) {
	// Your code here (Part II).
	f := func(c rune) bool {
		return !unicode.IsLetter(c)
	}

	words := strings.FieldsFunc(contents, f)
	for _, w := range words {
		kv := mapreduce.KeyValue{Key: w, Value: "1"}
		res = append(res, kv)
	}
	return res
}

func reduceF(key string, values []string) string {
	// Your code here (Part II).
	sum := 0
	for _, e := range values {
		data, err := strconv.Atoi(e)
		if err != nil {
			fmt.Printf("Reduce err %s%v\n", key, err)
			continue
		}
		sum += data
	}
	return strconv.Itoa(sum)
}

func main() {
	if len(os.Args) < 4 {
		fmt.Printf("%s: see usage comments in file\n", os.Args[0])
	} else {
		var mr *mapreduce.Master
		mr = mapreduce.Sequential("wcseq", os.Args[3:], 3, mapF, reduceF)
		mr.Wait()
	}
}
複製代碼

實現倒排索引

一樣,在理解了倒排索引的基礎上設計咱們本身的 map 與 reduce 方法, 1,map:將讀取文檔,將文檔中的單詞做爲 key,單詞所在的文檔做爲 value,寫入到中間文件中。 2,reduce:讀取中間文件,按照鍵值對進行排序,將 key 相同的數據聚合到一塊兒,將單詞出現的文件名拼接在一塊兒,寫入到結果文件中。

package main

import (
	"bytes"
	"os"
	"strconv"
	"strings"
	"unicode"
)
import "fmt"
import "6.824/src/mapreduce"

func mapF(document string, value string) (res []mapreduce.KeyValue) {
	// Your code here (Part V).
	words := strings.FieldsFunc(value, func(c rune) bool {
		return !unicode.IsLetter(c)
	})
	for _, w := range words {
		res = append(res, mapreduce.KeyValue{Key: w, Value: document})
	}
	return res
}

func reduceF(key string, values []string) string {
	// Your code here (Part V).
	sum := 0
	var buffer bytes.Buffer
	if key == "www" {
		fmt.Println(values)
	}
	isExist := make(map[string]string)
	for _, e := range values {
		if _, ok := isExist[e]; !ok {
			buffer.WriteString(e)
			buffer.WriteString(",")
			sum += 1
			isExist[e] = e
		}
	}
	iiRes := strconv.Itoa(sum) + " " + strings.TrimRight(buffer.String(), ",")
	return iiRes
}

func main() {
	if len(os.Args) < 4 {
		fmt.Printf("%s: see usage comments in file\n", os.Args[0])
	} else {
		var mr *mapreduce.Master
		mr = mapreduce.Sequential("iiseq", os.Args[3:], 3, mapF, reduceF)
		mr.Wait()
	}
}
複製代碼

小結

本文參考 Google 的論文,實現了一個單機版的 MapReduce 框架,並實現了兩個簡單的 MapReduce 實例,文中的代碼能夠在樓主的 GitHub 下載查看。

參考

MapReduce: Simplified Data Processing on Large Clusters

關注咱們

關注咱們
相關文章
相關標籤/搜索