Hadoop解決大規模數據分佈式計算的方案是MapReduce。MapReduce既是一個編程模型,又是一個計算框架。也就是說,開發人員必須基於MapReduce編程模型進行編程開發,而後將程序經過MapReduce計算框架分發到Hadoop集羣中運行。咱們先看一下做爲編程模型的MapReduce。數據庫
MapReduce是一種很是簡單又很是強大的編程模型。編程
簡單在於其編程模型只包含map和reduce兩個過程,map的主要輸入是一對<key , value>值,通過map計算後輸出一對<key , value>值;而後將相同key合併,造成<key , value集合>;再將這個<key , value集合>輸入reduce,通過計算輸出零個或多個<key , value>對。bash
可是MapReduce同時又是很是強大的,不論是關係代數運算(SQL計算),仍是矩陣運算(圖計算),大數據領域幾乎全部的計算需求均可以經過MapReduce編程來實現。服務器
咱們以WordCount程序爲例。WordCount主要解決文本處理中的詞頻統計問題,就是統計文本中每個單詞出現的次數。若是隻是統計一篇文章的詞頻,幾十K到幾M的數據,那麼寫一個程序,將數據讀入內存,建一個Hash表記錄每一個詞出現的次數就能夠了,以下圖。app
可是若是想統計全世界互聯網全部網頁(數萬億計)的詞頻數(這正是google這樣的搜索引擎典型需求),你不可能寫一個程序把全世界的網頁都讀入內存,這時候就須要用MapReduce編程來解決。框架
WordCount的MapReduce程序以下。分佈式
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
}複製代碼
其核心是一個map函數,一個reduce函數。函數
map函數的輸入主要是一個<key , value>對,在這個例子裏,value是要統計的全部文本中的一行數據,key在這裏不重要,咱們忽略。oop
public void map(Object key, Text value, Context context)複製代碼
map函數的計算過程就是,將這行文本中的單詞提取出來,針對每一個單詞輸出一個<word , 1>這樣的<key , value>對。性能
MapReduce計算框架會將這些<word , 1>收集起來,將相同的word放在一塊兒,造成<word , <1,1,1,1,1,1,1.....>>這樣的<key , value集合>數據,而後將其輸入給reduce函數。
public void reduce(Text key, Iterable<IntWritable> values,Context context)複製代碼
這裏的reduce的輸入參數values就是由不少個1組成的集合,而key就是具體的單詞word。
reduce函數的計算過程就是,將這個集合裏的1求和,再將單詞(word)和這個和(sum)組成一個<key , value>(<word , sum>)輸出。每個輸出就是一個單詞和它的詞頻統計總和。
假設有兩個block的文本數據須要進行詞頻統計,MapReduce計算過程以下圖。
一個map函數能夠針對一部分數據進行運算,這樣就能夠將一個大數據切分紅不少塊(這也正是HDFS所作的),MapReduce計算框架爲每一個塊分配一個map函數去計算,從而實現大數據的分佈式計算。
上面提到MapReduce編程模型將大數據計算過程切分爲map和reduce兩個階段,在map階段爲每一個數據塊分配一個map計算任務,而後將全部map輸出的key進行合併,相同的key及其對應的value發送給同一個reduce任務去處理。
這個過程有兩個關鍵問題須要處理
如何爲每一個數據塊分配一個map計算任務,代碼是如何發送數據塊所在服務器的,發送過去是如何啓動的,啓動之後又如何知道本身須要計算的數據在文件什麼位置(數據塊id是什麼)
處於不一樣服務器的map輸出的<key , value> ,如何把相同的key聚合在一塊兒發送給reduce任務
這兩個關鍵問題正好對應文章中「MapReduce計算過程」一圖中兩處「MapReduce框架處理」。
咱們先看下MapReduce是如何啓動處理一個大數據計算應用做業的。
咱們以Hadoop1爲例,MapReduce運行過程涉及如下幾類關鍵進程:
大數據應用進程:啓動用戶MapReduce程序的主入口,主要指定Map和Reduce類、輸入輸出文件路徑等,並提交做業給Hadoop集羣。
JobTracker進程:根據要處理的輸入數據量啓動相應數量的map和reduce進程任務,並管理整個做業生命週期的任務調度和監控。JobTracker進程在整個Hadoop集羣全局惟一。
TaskTracker進程:負責啓動和管理map進程以及reduce進程。由於須要每一個數據塊都有對應的map函數,TaskTracker進程一般和HDFS的DataNode進程啓動在同一個服務器,也就是說,Hadoop集羣中絕大多數服務器同時運行DataNode進程和TaskTacker進程。
以下圖所示。
具體做業啓動和計算過程以下:
應用進程將用戶做業jar包存儲在HDFS中,未來這些jar包會分發給Hadoop集羣中的服務器執行MapReduce計算。
應用程序提交job做業給JobTracker。
JobTacker根據做業調度策略建立JobInProcess樹,每一個做業都會有一個本身的JobInProcess樹。
JobInProcess根據輸入數據分片數目(一般狀況就是數據塊的數目)和設置的reduce數目建立相應數量的TaskInProcess。
TaskTracker進程和JobTracker進程進行定時通訊。
若是TaskTracker有空閒的計算資源(空閒CPU核),JobTracker就會給他分配任務。分配任務的時候會根據TaskTracker的服務器名字匹配在同一臺機器上的數據塊計算任務給它,使啓動的計算任務正好處理本機上的數據。
TaskRunner收到任務後根據任務類型(map仍是reduce),任務參數(做業jar包路徑,輸入數據文件路徑,要處理的數據在文件中的起始位置和偏移量,數據塊多個備份的DataNode主機名等)啓動相應的map或者reduce進程。
map或者reduce程序啓動後,檢查本地是否有要執行任務的jar包文件,若是沒有,就去HDFS上下載,而後加載map或者reduce代碼開始執行。
若是是map進程,從HDFS讀取數據(一般要讀取的數據塊正好存儲在本機)。若是是reduce進程,將結果數據寫出到HDFS。
經過以上過程,MapReduce能夠將大數據做業計算任務分佈在整個Hadoop集羣中運行,每一個map計算任務要處理的數據一般都能從本地磁盤上讀取到。而用戶要作的僅僅是編寫一個map函數和一個reduce函數就能夠了,根本不用關心這兩個函數是如何被分佈啓動到集羣上的,數據塊又是如何分配給計算任務的。這一切都由MapReduce計算框架完成。
在WordCount例子中,要統計相同單詞在全部輸入數據中出現的次數,而一個map只能處理一部分數據,一個熱門單詞幾乎會出如今全部的map中,這些單詞必需要合併到一塊兒進行統計才能獲得正確的結果。
事實上,幾乎全部的大數據計算場景都須要處理數據關聯的問題,簡單如WordCount只要對key進行合併就能夠了,複雜如數據庫的join操做,須要對兩種類型(或者更多類型)的數據根據key進行鏈接。
MapReduce計算框架處理數據合併與鏈接的操做就在map輸出與reduce輸入之間,這個過程有個專門的詞彙來描述,叫作shuffle。
每一個map任務的計算結果都會寫入到本地文件系統,等map任務快要計算完成的時候,MapReduce計算框架會啓動shuffle過程,在map端調用一個Partitioner接口,對map產生的每一個<key , value>進行reduce分區選擇,而後經過http通訊發送給對應的reduce進程。這樣無論map位於哪一個服務器節點,相同的key必定會被髮送給相同的reduce進程。reduce端對收到的<key , value>進行排序和合並,相同的key放在一塊兒,組成一個<key , value集合>傳遞給reduce執行。
MapReduce框架缺省的Partitioner用key的哈希值對reduce任務數量取模,相同的key必定會落在相同的reduce任務id上,實現上,這樣的Partitioner代碼只須要一行,以下所示。
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K2 key, V2 value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}複製代碼
shuffle是大數據計算過程當中發生奇蹟的地方,不論是MapReduce仍是Spark,只要是大數據批處理計算,必定會有shuffle過程,讓數據關聯起來,數據的內在關係和價值纔會呈現出來。不理解shuffle,就會在map和reduce編程中產生困惑,不知道該如何正確設計map的輸出和reduce的輸入。shuffle也是整個MapReduce過程當中最難最消耗性能的地方,在MapReduce早期代碼中,一半代碼都是關於shuffle處理的。