Hadoop 學習系列(四)之 MapReduce 原理講解

原文連接:pengtuo.tech/大數據研發/2018/…java

本篇文章將會介紹 Hadoop 重要的計算框架 MapReduce算法

完整的 MapReduce 框架包含兩部分:數據庫

  1. 算法邏輯層面,即 mapshuffle 以及 reduce 三個重要算法組成部分,本篇文章將會介紹這個層面;
  2. 實際運行層面,即算法邏輯做業在分佈式主機中是以什麼形式和什麼流程運行的,由於自 MapReduce version2 之後,做業都是提交給 YARN 進行管理,因此本文將不會介紹此部分。

系列其餘文章有:apache

1、What is MapReduce?

MapReduce是一個基於 java 的並行分佈式計算框架,使用它來編寫的數據處理應用能夠運行在大型的商用硬件集羣上來處理大型數據集中的可並行化問題,數據處理能夠發生在存儲在文件系統(非結構化)或數據庫(結構化)中的數據上。MapReduce 能夠利用數據的位置,在存儲的位置附近處理數據,以最大限度地減小通訊開銷。編程

MapReduce 框架經過編組分佈式服務器,並行運行各類任務,管理系統各部分之間的全部通訊和數據傳輸;其還能自動完成計算任務的並行化處理,自動劃分計算數據和計算任務,在集羣節點上自動分配和執行任務以及收集計算結果,將數據分佈存儲、數據通訊、容錯處理等並行計算涉及到的不少系統底層的複雜細節交由系統負責處理,減小開發人員的負擔。bash

MapReduce 仍是一個並行程序設計模型與方法(Programming Model & Methodology)。它藉助於函數式程序設計語言Lisp的設計思想,提供了一種簡便的並行程序設計方法,將複雜的、運行於大規模集羣上的並行計算過程高度地抽象到了兩個函數:Map和Reduce,用Map和Reduce兩個函數編程實現基本的並行計算任務,提供了抽象的操做和並行編程接口,以簡單方便地完成大規模數據的編程和計算處理。服務器

2、The Algorithm

MapReduce框架一般由三個操做(或步驟)組成:網絡

  1. Map:每一個工做節點將 map 函數應用於本地數據,並將輸出寫入臨時存儲。主節點確保僅處理冗餘輸入數據的一個副本。
  2. Shuffle:工做節點根據輸出鍵(由 map 函數生成)從新分配數據,對數據映射排序、分組、拷貝,目的是屬於一個鍵的全部數據都位於同一個工做節點上。
  3. Reduce:工做節點如今並行處理每一個鍵的每組輸出數據。

MapReduce 流程圖: 數據結構

MapReduce 容許分佈式運行 Map 操做,只要每一個 Map 操做獨立於其餘 Map 操做就能夠並行執行。app

另外一種更詳細的,將 MapReduce 分爲5個步驟的理解是:

  1. Prepare the Map() inputMapReduce 框架先指定 Map 處理器,而後給其分配將要處理的輸入數據 -- 鍵值對 K1,併爲該處理器提供與該鍵值相關的全部輸入數據;
  2. Run the user-provided Map() codeMap()K1 鍵值對上運行一次,生成由 K2 指定的鍵值對的輸出;
  3. Shuffle the Map output to the Reduce processors:將先前生成的 K2 鍵值對,根據『鍵』是否相同移至相同的工做節點;
  4. Run the user-provided Reduce() code:對於每一個工做節點上的 K2 鍵值對進行 Reduce() 操做;
  5. Produce the final outputMapReduce 框架收集全部 Reduce 輸出,並按 K2 對其進行排序以產生最終結果進行輸出。

實際生產環境中,數據頗有多是分散在各個服務器上,對於原先的大數據處理方法,則是將數據發送至代碼所在的地方進行處理,這樣很是低效且佔用了大量的帶寬,爲應對這種狀況,MapReduce 框架的處理方法是,將 Map() 操做或者 Reduce() 發送至數據所在的服務器上,以『移動計算替代移動數據』,來加速整個框架的運行速度,大多數計算都發生在具備本地磁盤上數據的節點上,從而減小了網絡流量。

Mapper

一個 Map 函數就是對一些獨立元素組成的概念上的列表的每個元素進行指定的操做,因此每一個元素都是被獨立操做的,而原始列表沒有被更改,由於這裏建立了一個新的列表來保存新的答案。這就是說,Map 操做是能夠高度並行的

MapReduce 框架的 MapReduce 函數都是根據 (key, value) 形式的數據結構定義的。 Map 在一個數據域(Data Domain)中獲取一個鍵值對,而後返回一個鍵值對的列表:

Map(k1,v1) → list(k2,v2)
複製代碼

Map 函數會被並行調用,應用於輸入數據集中的每一個鍵值對(keyed by K1)。而後每一個調用返回一個鍵值對(keyed by K2)列表。以後,MapReduce 框架從全部列表中收集具備相同 key(這裏是 k2)的全部鍵值對,並將它們組合在一塊兒,爲每一個 key 建立一個組。

Reducer

Reduce 是對一個列表的元素進行適當的合併。雖然不如 Map 函數那麼並行,可是由於化簡老是有一個簡單的答案,大規模的運算相對獨立,因此化簡函數在高度並行環境下也頗有用。Reduce 函數並行應用於每一個組,從而在同一個數據域中生成一組值:

Reduce(k2, list (v2)) → list(v3)
複製代碼

Reduce 端接收到不一樣任務傳來的有序數據組。此時 Reduce() 會根據程序猿編寫的代碼邏輯進行相應的 reduce 操做,例如根據同一個鍵值對進行計數加和等。若是Reduce 端接受的數據量至關小,則直接存儲在內存中,若是數據量超過了該緩衝區大小的必定比例,則對數據合併後溢寫到磁盤中。

Partitioner

前面提到過,Map 階段有一個分割成組的操做,這個劃分數據的過程就是 Partition,而負責分區的 java 類就是 Partitioner

Partitioner 組件可讓 MapKey 進行分區,從而將不一樣分區的 Key 交由不一樣的 Reduce 處理,由此,Partitioner 數量等同於 Reducer 的數量,一個 Partitioner 對應一個 Reduce 做業,可認爲其就是 Reduce 的輸入分片,可根據實際業務狀況編程控制,提升 Reduce 效率或進行負載均衡。MapReduce 的內置分區是HashPartition

具備多個分割老是有好處的,由於與處理整個輸入所花費的時間相比,處理分割所花費的時間很短。當分割較小時,能夠更好的處理負載平衡,可是分割也不宜過小,若是太小,則會使得管理拆分和任務加載的時間在總運行時間中佔太高的比重。

下圖是 map 任務和 reduce 任務的示意圖:

3、WordCount Example

這裏給出一個統計詞頻案例的 Java 代碼:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  // 繼承 Mapper 類,實現本身的 map 功能
  public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    // map 功能必須實現的函數
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  // 繼承 Reducer 類,實現本身的 reduce 功能
  public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    // 初始化Configuration,讀取mapreduce系統配置信息
    Configuration conf = new Configuration();

    // 構建 Job 而且加載計算程序 WordCount.class
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);

    //指定 Mapper、Combiner、Reducer,也就是咱們本身繼承實現的類
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);

    // 設置輸入輸出數據
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
複製代碼

上述代碼會發如今指定 Mapper 以及 Reducer 時,還指定了 Combiner 類,Combiner 是一個本地化的 reduce 操做(所以咱們看見 WordCount 類裏是用 reduce 進行加載的),它是 map 運算的後續操做,與 map 在同一個主機上進行,主要是在 map 計算出中間文件前作一個簡單的合併重複key值的操做,減小中間文件的大小,這樣在後續進行到 Shuffle 時,能夠下降網絡傳輸成本,提升網絡傳輸效率。

提交 MR 做業的命令:

hadoop jar {程序的 jar 包} {任務名稱} {數據輸入路徑} {數據輸出路徑}
複製代碼

例如:

hadoop jar hadoop-mapreduce-wordcount.jar WordCount /sample/input /sample/output
複製代碼

上述代碼示意圖:

Map -> Shuffle -> Reduce 的中間結果,包括最後的輸出都是存儲在本地磁盤上。

4、Advantage & Shortcoming of MapReduce

MapReduce 的兩大優點是:

1 ) 並行處理:

MapReduce 中,咱們將做業劃分爲多個節點,每一個節點同時處理做業的一部分。所以,MapReduce 基於Divide and Conquer範例,它幫助咱們使用不一樣的機器處理數據。因爲數據由多臺機器而不是單臺機器並行處理,所以處理數據所需的時間會減小不少。

2 ) 數據位置:

咱們將計算移動到 MapReduce 框架中的數據,而不是將數據移動到計算部分。數據分佈在多個節點中,其中每一個節點處理駐留在其上的數據部分。

這使得具備如下優點:

  • 將處理單元移動到數據所在位置能夠下降網絡成本;
  • 因爲全部節點並行處理其部分數據,所以處理時間縮短;
  • 每一個節點都會獲取要處理的數據的一部分,所以節點不會出現負擔太重的可能性。

可是,MapReduce 也有其限制:

  1. 不能進行流式計算和實時計算,只能計算離線數據;
  2. 中間結果存儲在磁盤上,加大了磁盤的 I/O 負載,且讀取速度比較慢;
  3. 開發麻煩,例如 wordcount 功能就須要不少的設置和代碼量,而 Spark 將會很是簡單。
相關文章
相關標籤/搜索