原文連接:pengtuo.tech/大數據研發/2018/…java
本篇文章將會介紹 Hadoop
重要的計算框架 MapReduce
。算法
完整的 MapReduce
框架包含兩部分:數據庫
map
、shuffle
以及 reduce
三個重要算法組成部分,本篇文章將會介紹這個層面;MapReduce version2
之後,做業都是提交給 YARN
進行管理,因此本文將不會介紹此部分。系列其餘文章有:apache
MapReduce
是一個基於 java 的並行分佈式計算框架,使用它來編寫的數據處理應用能夠運行在大型的商用硬件集羣上來處理大型數據集中的可並行化問題,數據處理能夠發生在存儲在文件系統(非結構化)或數據庫(結構化)中的數據上。MapReduce
能夠利用數據的位置,在存儲的位置附近處理數據,以最大限度地減小通訊開銷。編程
MapReduce
框架經過編組分佈式服務器,並行運行各類任務,管理系統各部分之間的全部通訊和數據傳輸;其還能自動完成計算任務的並行化處理,自動劃分計算數據和計算任務,在集羣節點上自動分配和執行任務以及收集計算結果,將數據分佈存儲、數據通訊、容錯處理等並行計算涉及到的不少系統底層的複雜細節交由系統負責處理,減小開發人員的負擔。bash
MapReduce
仍是一個並行程序設計模型與方法(Programming Model & Methodology)。它藉助於函數式程序設計語言Lisp的設計思想,提供了一種簡便的並行程序設計方法,將複雜的、運行於大規模集羣上的並行計算過程高度地抽象到了兩個函數:Map和Reduce,用Map和Reduce兩個函數編程實現基本的並行計算任務,提供了抽象的操做和並行編程接口,以簡單方便地完成大規模數據的編程和計算處理。服務器
MapReduce框架一般由三個操做(或步驟)組成:網絡
Map
:每一個工做節點將 map
函數應用於本地數據,並將輸出寫入臨時存儲。主節點確保僅處理冗餘輸入數據的一個副本。Shuffle
:工做節點根據輸出鍵(由 map
函數生成)從新分配數據,對數據映射排序、分組、拷貝,目的是屬於一個鍵的全部數據都位於同一個工做節點上。Reduce
:工做節點如今並行處理每一個鍵的每組輸出數據。MapReduce 流程圖: 數據結構
MapReduce
容許分佈式運行 Map
操做,只要每一個 Map
操做獨立於其餘 Map
操做就能夠並行執行。app
另外一種更詳細的,將 MapReduce
分爲5個步驟的理解是:
MapReduce
框架先指定 Map
處理器,而後給其分配將要處理的輸入數據 -- 鍵值對 K1
,併爲該處理器提供與該鍵值相關的全部輸入數據;Map()
在 K1
鍵值對上運行一次,生成由 K2
指定的鍵值對的輸出;K2
鍵值對,根據『鍵』是否相同移至相同的工做節點;K2
鍵值對進行 Reduce()
操做;MapReduce
框架收集全部 Reduce
輸出,並按 K2
對其進行排序以產生最終結果進行輸出。實際生產環境中,數據頗有多是分散在各個服務器上,對於原先的大數據處理方法,則是將數據發送至代碼所在的地方進行處理,這樣很是低效且佔用了大量的帶寬,爲應對這種狀況,MapReduce
框架的處理方法是,將 Map()
操做或者 Reduce()
發送至數據所在的服務器上,以『移動計算替代移動數據』,來加速整個框架的運行速度,大多數計算都發生在具備本地磁盤上數據的節點上,從而減小了網絡流量。
一個 Map
函數就是對一些獨立元素組成的概念上的列表的每個元素進行指定的操做,因此每一個元素都是被獨立操做的,而原始列表沒有被更改,由於這裏建立了一個新的列表來保存新的答案。這就是說,Map
操做是能夠高度並行的
MapReduce
框架的 Map
和 Reduce
函數都是根據 (key, value)
形式的數據結構定義的。 Map
在一個數據域(Data Domain)中獲取一個鍵值對,而後返回一個鍵值對的列表:
Map(k1,v1) → list(k2,v2)
複製代碼
Map
函數會被並行調用,應用於輸入數據集中的每一個鍵值對(keyed by K1)。而後每一個調用返回一個鍵值對(keyed by K2)列表。以後,MapReduce
框架從全部列表中收集具備相同 key
(這裏是 k2)的全部鍵值對,並將它們組合在一塊兒,爲每一個 key
建立一個組。
而 Reduce
是對一個列表的元素進行適當的合併。雖然不如 Map
函數那麼並行,可是由於化簡老是有一個簡單的答案,大規模的運算相對獨立,因此化簡函數在高度並行環境下也頗有用。Reduce
函數並行應用於每一個組,從而在同一個數據域中生成一組值:
Reduce(k2, list (v2)) → list(v3)
複製代碼
Reduce
端接收到不一樣任務傳來的有序數據組。此時 Reduce()
會根據程序猿編寫的代碼邏輯進行相應的 reduce
操做,例如根據同一個鍵值對進行計數加和等。若是Reduce
端接受的數據量至關小,則直接存儲在內存中,若是數據量超過了該緩衝區大小的必定比例,則對數據合併後溢寫到磁盤中。
前面提到過,Map
階段有一個分割成組的操做,這個劃分數據的過程就是 Partition
,而負責分區的 java 類就是 Partitioner
。
Partitioner
組件可讓 Map
對 Key
進行分區,從而將不一樣分區的 Key
交由不一樣的 Reduce
處理,由此,Partitioner
數量等同於 Reducer
的數量,一個 Partitioner
對應一個 Reduce
做業,可認爲其就是 Reduce
的輸入分片,可根據實際業務狀況編程控制,提升 Reduce
效率或進行負載均衡。MapReduce
的內置分區是HashPartition
。
具備多個分割老是有好處的,由於與處理整個輸入所花費的時間相比,處理分割所花費的時間很短。當分割較小時,能夠更好的處理負載平衡,可是分割也不宜過小,若是太小,則會使得管理拆分和任務加載的時間在總運行時間中佔太高的比重。
下圖是 map
任務和 reduce
任務的示意圖:
這裏給出一個統計詞頻案例的 Java 代碼:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
// 繼承 Mapper 類,實現本身的 map 功能
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
// map 功能必須實現的函數
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
// 繼承 Reducer 類,實現本身的 reduce 功能
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
// 初始化Configuration,讀取mapreduce系統配置信息
Configuration conf = new Configuration();
// 構建 Job 而且加載計算程序 WordCount.class
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
//指定 Mapper、Combiner、Reducer,也就是咱們本身繼承實現的類
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// 設置輸入輸出數據
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
複製代碼
上述代碼會發如今指定 Mapper
以及 Reducer
時,還指定了 Combiner
類,Combiner
是一個本地化的 reduce
操做(所以咱們看見 WordCount
類裏是用 reduce
進行加載的),它是 map
運算的後續操做,與 map
在同一個主機上進行,主要是在 map
計算出中間文件前作一個簡單的合併重複key值的操做,減小中間文件的大小,這樣在後續進行到 Shuffle
時,能夠下降網絡傳輸成本,提升網絡傳輸效率。
提交 MR
做業的命令:
hadoop jar {程序的 jar 包} {任務名稱} {數據輸入路徑} {數據輸出路徑}
複製代碼
例如:
hadoop jar hadoop-mapreduce-wordcount.jar WordCount /sample/input /sample/output
複製代碼
上述代碼示意圖:
Map -> Shuffle -> Reduce 的中間結果,包括最後的輸出都是存儲在本地磁盤上。
MapReduce
的兩大優點是:
1 ) 並行處理:
在 MapReduce
中,咱們將做業劃分爲多個節點,每一個節點同時處理做業的一部分。所以,MapReduce
基於Divide and Conquer範例,它幫助咱們使用不一樣的機器處理數據。因爲數據由多臺機器而不是單臺機器並行處理,所以處理數據所需的時間會減小不少。
2 ) 數據位置:
咱們將計算移動到 MapReduce
框架中的數據,而不是將數據移動到計算部分。數據分佈在多個節點中,其中每一個節點處理駐留在其上的數據部分。
這使得具備如下優點:
可是,MapReduce 也有其限制:
wordcount
功能就須要不少的設置和代碼量,而 Spark
將會很是簡單。