Hadoop MapReduce執行過程詳解(帶hadoop例子)

分析MapReduce執行過程

    MapReduce運行的時候,會經過Mapper運行的任務讀取HDFS中的數據文件,而後調用本身的方法,處理數據,最後輸出。Reducer任務會接收Mapper任務輸出的數據,做爲本身的輸入數據,調用本身的方法,最後輸出到HDFS的文件中。整個流程如圖:java

image

Mapper任務的執行過程詳解

每一個Mapper任務是一個java進程,它會讀取HDFS中的文件,解析成不少的鍵值對,通過咱們覆蓋的map方法處理後,轉換爲不少的鍵值對再輸出。整個Mapper任務的處理過程又能夠分爲如下幾個階段,如圖所示。linux

image

在上圖中,把Mapper任務的運行過程分爲六個階段。apache

  1. 第一階段是把輸入文件按照必定的標準分片(InputSplit),每一個輸入片的大小是固定的。默認狀況下,輸入片(InputSplit)的大小與數據塊(Block)的大小是相同的。若是數據塊(Block)的大小是默認值64MB,輸入文件有兩個,一個是32MB,一個是72MB。那麼小的文件是一個輸入片,大文件會分爲兩個數據塊,那麼是兩個輸入片。一共產生三個輸入片。每個輸入片由一個Mapper進程處理。這裏的三個輸入片,會有三個Mapper進程處理。app

  2. 第二階段是對輸入片中的記錄按照必定的規則解析成鍵值對。有個默認規則是把每一行文本內容解析成鍵值對。「鍵」是每一行的起始位置(單位是字節),「值」是本行的文本內容。maven

  3. 第三階段是調用Mapper類中的map方法。第二階段中解析出來的每個鍵值對,調用一次map方法。若是有1000個鍵值對,就會調用1000次map方法。每一次調用map方法會輸出零個或者多個鍵值對。ide

  4. 第四階段是按照必定的規則對第三階段輸出的鍵值對進行分區。比較是基於鍵進行的。好比咱們的鍵表示省份(如北京、上海、山東等),那麼就能夠按照不一樣省份進行分區,同一個省份的鍵值對劃分到一個區中。默認是隻有一個區分區的數量就是Reducer任務運行的數量。默認只有一個Reducer任務。函數

  5. 第五階段是對每一個分區中的鍵值對進行排序。首先,按照鍵進行排序,對於鍵相同的鍵值對,按照值進行排序。好比三個鍵值對<2,2>、<1,3>、<2,1>,鍵和值分別是整數。那麼排序後的結果是<1,3>、<2,1>、<2,2>。若是有第六階段,那麼進入第六階段;若是沒有,直接輸出到本地的linux文件中。oop

  6. 第六階段是對數據進行歸約處理,也就是reduce處理。鍵相等的鍵值對會調用一次reduce方法。通過這一階段,數據量會減小。歸約後的數據輸出到本地的linxu文件中。本階段默認是沒有的,須要用戶本身增長這一階段的代碼spa

Reducer任務的執行過程詳解

每一個Reducer任務是一個java進程。Reducer任務接收Mapper任務的輸出,歸約處理後寫入到HDFS中,能夠分爲以下圖所示的幾個階段。設計

image

  1. 第一階段是Reducer任務會主動從Mapper任務複製其輸出的鍵值對。Mapper任務可能會有不少,所以Reducer會複製多個Mapper的輸出。

  2. 第二階段是把複製到Reducer本地數據,所有進行合併,即把分散的數據合併成一個大的數據。再對合並後的數據排序。

  3. 第三階段是對排序後的鍵值對調用reduce方法。鍵相等的鍵值對調用一次reduce方法,每次調用會產生零個或者多個鍵值對。最後把這些輸出的鍵值對寫入到HDFS文件中。

在整個MapReduce程序的開發過程當中,咱們最大的工做量是覆蓋map函數和覆蓋reduce函數。

鍵值對的編號

在對Mapper任務、Reducer任務的分析過程當中,會看到不少階段都出現了鍵值對,讀者容易混淆,因此這裏對鍵值對進行編號,方便你們理解鍵值對的變化狀況,以下圖所示。

image

在上圖中,對於Mapper任務輸入的鍵值對,定義爲key1和value1。在map方法中處理後,輸出的鍵值對,定義爲key2和value2。reduce方法接收key2和value2,處理後,輸出key3和value3。在下文討論鍵值對時,可能把key1和value1簡寫爲<k1,v1>,key2和value2簡寫爲<k2,v2>,key3和value3簡寫爲<k3,v3>。

以上內容來自:http://www.superwu.cn/2013/08/21/530/


-----------------------分------------------割----------------線-------------------------


例子:求每一年最高氣溫

在HDFS中的根目錄下有如下文件格式: /input.txt

2014010114
2014010216
2014010317
2014010410
2014010506
2012010609
2012010732
2012010812
2012010919
2012011023
2001010116
2001010212
2001010310
2001010411
2001010529
2013010619
2013010722
2013010812
2013010929
2013011023
2008010105
2008010216
2008010337
2008010414
2008010516
2007010619
2007010712
2007010812
2007010999
2007011023
2010010114
2010010216
2010010317
2010010410
2010010506
2015010649
2015010722
2015010812
2015010999
2015011023

    好比:2010012325表示在2010年01月23日的氣溫爲25度。如今要求使用MapReduce,計算每年出現過的最大氣溫。

    在寫代碼以前,先確保正確的導入了相關的jar包。我使用的是maven,能夠到http://mvnrepository.com去搜索這幾個artifactId。

    此程序須要以Hadoop文件做爲輸入文件,以Hadoop文件做爲輸出文件,所以須要用到文件系統,因而須要引入hadoop-hdfs包;咱們須要向Map-Reduce集羣提交任務,須要用到Map-Reduce的客戶端,因而須要導入hadoop-mapreduce-client-jobclient包;另外,在處理數據的時候會用到一些hadoop的數據類型例如IntWritable和Text等,所以須要導入hadoop-common包。因而運行此程序所須要的相關依賴有如下幾個:

<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-hdfs</artifactId>
	<version>2.4.0</version>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
	<version>2.4.0</version>
</dependency>
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-common</artifactId>
	<version>2.4.0</version>
</dependency>

    包導好了後, 設計代碼以下:

package com.abc.yarn;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Temperature {
    /**
     * 四個泛型類型分別表明:
     * KeyIn        Mapper的輸入數據的Key,這裏是每行文字的起始位置(0,11,...)
     * ValueIn      Mapper的輸入數據的Value,這裏是每行文字
     * KeyOut       Mapper的輸出數據的Key,這裏是每行文字中的「年份」
     * ValueOut     Mapper的輸出數據的Value,這裏是每行文字中的「氣溫」
     */
    static class TempMapper extends
            Mapper<LongWritable, Text, Text, IntWritable> {
        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // 打印樣本: Before Mapper: 0, 2000010115
            System.out.print("Before Mapper: " + key + ", " + value);
            String line = value.toString();
            String year = line.substring(0, 4);
            int temperature = Integer.parseInt(line.substring(8));
            context.write(new Text(year), new IntWritable(temperature));
            // 打印樣本: After Mapper:2000, 15
            System.out.println(
                    "======" +
                    "After Mapper:" + new Text(year) + ", " + new IntWritable(temperature));
        }
    }

    /**
     * 四個泛型類型分別表明:
     * KeyIn        Reducer的輸入數據的Key,這裏是每行文字中的「年份」
     * ValueIn      Reducer的輸入數據的Value,這裏是每行文字中的「氣溫」
     * KeyOut       Reducer的輸出數據的Key,這裏是不重複的「年份」
     * ValueOut     Reducer的輸出數據的Value,這裏是這一年中的「最高氣溫」
     */
    static class TempReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int maxValue = Integer.MIN_VALUE;
            StringBuffer sb = new StringBuffer();
            //取values的最大值
            for (IntWritable value : values) {
                maxValue = Math.max(maxValue, value.get());
                sb.append(value).append(", ");
            }
            // 打印樣本: Before Reduce: 2000, 15, 23, 99, 12, 22, 
            System.out.print("Before Reduce: " + key + ", " + sb.toString());
            context.write(key, new IntWritable(maxValue));
            // 打印樣本: After Reduce: 2000, 99
            System.out.println(
                    "======" +
                    "After Reduce: " + key + ", " + maxValue);
        }
    }

    public static void main(String[] args) throws Exception {
        //輸入路徑
        String dst = "hdfs://localhost:9000/intput.txt";
        //輸出路徑,必須是不存在的,空文件加也不行。
        String dstOut = "hdfs://localhost:9000/output";
        Configuration hadoopConfig = new Configuration();
        
        hadoopConfig.set("fs.hdfs.impl", 
            org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
        );
        hadoopConfig.set("fs.file.impl",
            org.apache.hadoop.fs.LocalFileSystem.class.getName()
        );
        Job job = new Job(hadoopConfig);
        
        //若是須要打成jar運行,須要下面這句
        //job.setJarByClass(NewMaxTemperature.class);

        //job執行做業時輸入和輸出文件的路徑
        FileInputFormat.addInputPath(job, new Path(dst));
        FileOutputFormat.setOutputPath(job, new Path(dstOut));

        //指定自定義的Mapper和Reducer做爲兩個階段的任務處理類
        job.setMapperClass(TempMapper.class);
        job.setReducerClass(TempReducer.class);
        
        //設置最後輸出結果的Key和Value的類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        //執行job,直到完成
        job.waitForCompletion(true);
        System.out.println("Finished");
    }
}

上面代碼中,注意Mapper類的泛型不是java的基本類型,而是Hadoop的數據類型Text、IntWritable。咱們能夠簡單的等價爲java的類String、int。

代碼中Mapper類的泛型依次是<k1,v1,k2,v2>。map方法的第二個形參是行文本內容,是咱們關心的。核心代碼是把行文本內容按照空格拆分,把每行數據中「年」和「氣溫」提取出來,其中「年」做爲新的鍵,「溫度」做爲新的值,寫入到上下文context中。在這裏,由於每年有多行數據,所以每一行都會輸出一個<年份, 氣溫>鍵值對。

下面是控制檯打印結果:

Before Mapper: 0, 2014010114======After Mapper:2014, 14
Before Mapper: 11, 2014010216======After Mapper:2014, 16
Before Mapper: 22, 2014010317======After Mapper:2014, 17
Before Mapper: 33, 2014010410======After Mapper:2014, 10
Before Mapper: 44, 2014010506======After Mapper:2014, 6
Before Mapper: 55, 2012010609======After Mapper:2012, 9
Before Mapper: 66, 2012010732======After Mapper:2012, 32
Before Mapper: 77, 2012010812======After Mapper:2012, 12
Before Mapper: 88, 2012010919======After Mapper:2012, 19
Before Mapper: 99, 2012011023======After Mapper:2012, 23
Before Mapper: 110, 2001010116======After Mapper:2001, 16
Before Mapper: 121, 2001010212======After Mapper:2001, 12
Before Mapper: 132, 2001010310======After Mapper:2001, 10
Before Mapper: 143, 2001010411======After Mapper:2001, 11
Before Mapper: 154, 2001010529======After Mapper:2001, 29
Before Mapper: 165, 2013010619======After Mapper:2013, 19
Before Mapper: 176, 2013010722======After Mapper:2013, 22
Before Mapper: 187, 2013010812======After Mapper:2013, 12
Before Mapper: 198, 2013010929======After Mapper:2013, 29
Before Mapper: 209, 2013011023======After Mapper:2013, 23
Before Mapper: 220, 2008010105======After Mapper:2008, 5
Before Mapper: 231, 2008010216======After Mapper:2008, 16
Before Mapper: 242, 2008010337======After Mapper:2008, 37
Before Mapper: 253, 2008010414======After Mapper:2008, 14
Before Mapper: 264, 2008010516======After Mapper:2008, 16
Before Mapper: 275, 2007010619======After Mapper:2007, 19
Before Mapper: 286, 2007010712======After Mapper:2007, 12
Before Mapper: 297, 2007010812======After Mapper:2007, 12
Before Mapper: 308, 2007010999======After Mapper:2007, 99
Before Mapper: 319, 2007011023======After Mapper:2007, 23
Before Mapper: 330, 2010010114======After Mapper:2010, 14
Before Mapper: 341, 2010010216======After Mapper:2010, 16
Before Mapper: 352, 2010010317======After Mapper:2010, 17
Before Mapper: 363, 2010010410======After Mapper:2010, 10
Before Mapper: 374, 2010010506======After Mapper:2010, 6
Before Mapper: 385, 2015010649======After Mapper:2015, 49
Before Mapper: 396, 2015010722======After Mapper:2015, 22
Before Mapper: 407, 2015010812======After Mapper:2015, 12
Before Mapper: 418, 2015010999======After Mapper:2015, 99
Before Mapper: 429, 2015011023======After Mapper:2015, 23
Before Reduce: 2001, 12, 10, 11, 29, 16, ======After Reduce: 2001, 29
Before Reduce: 2007, 23, 19, 12, 12, 99, ======After Reduce: 2007, 99
Before Reduce: 2008, 16, 14, 37, 16, 5, ======After Reduce: 2008, 37
Before Reduce: 2010, 10, 6, 14, 16, 17, ======After Reduce: 2010, 17
Before Reduce: 2012, 19, 12, 32, 9, 23, ======After Reduce: 2012, 32
Before Reduce: 2013, 23, 29, 12, 22, 19, ======After Reduce: 2013, 29
Before Reduce: 2014, 14, 6, 10, 17, 16, ======After Reduce: 2014, 17
Before Reduce: 2015, 23, 49, 22, 12, 99, ======After Reduce: 2015, 99
Finished

    執行結果:

對分析的驗證

    從打印的日誌中能夠看出:

  • Mapper的輸入數據(k1,v1)格式是:默認的按行分的鍵值對<0, 2010012325>,<11, 2012010123>...

  • Reducer的輸入數據格式是:把相同的鍵合併後的鍵值對:<2001, [12, 32, 25...]>,<2007, [20, 34, 30...]>...

  • Reducer的輸出數(k3,v3)據格式是:經本身在Reducer中寫出的格式:<2001, 32>,<2007, 34>...

    其中,因爲輸入數據過小,Map過程的第1階段這裏不能證實。但事實上是這樣的。

    結論中第一點驗證了Map過程的第2階段「鍵」是每一行的起始位置(單位是字節),「值」是本行的文本內容。

    另外,經過Reduce的幾行

Before Reduce: 2001, 12, 10, 11, 29, 16, ======After Reduce: 2001, 29
Before Reduce: 2007, 23, 19, 12, 12, 99, ======After Reduce: 2007, 99
Before Reduce: 2008, 16, 14, 37, 16, 5, ======After Reduce: 2008, 37
Before Reduce: 2010, 10, 6, 14, 16, 17, ======After Reduce: 2010, 17
Before Reduce: 2012, 19, 12, 32, 9, 23, ======After Reduce: 2012, 32
Before Reduce: 2013, 23, 29, 12, 22, 19, ======After Reduce: 2013, 29
Before Reduce: 2014, 14, 6, 10, 17, 16, ======After Reduce: 2014, 17
Before Reduce: 2015, 23, 49, 22, 12, 99, ======After Reduce: 2015, 99

    能夠證明Map過程的第4階段:先分區,而後對每一個分區都執行一次Reduce(Map過程第6階段)。

    對於Mapper的輸出,前文中提到:若是沒有Reduce過程,Mapper的輸出會直接寫入文件。因而咱們把Reduce方法去掉(註釋掉第95行便可)。

    再執行,下面是控制檯打印結果: 

Before Mapper: 0, 2014010114======After Mapper:2014, 14
Before Mapper: 11, 2014010216======After Mapper:2014, 16
Before Mapper: 22, 2014010317======After Mapper:2014, 17
Before Mapper: 33, 2014010410======After Mapper:2014, 10
Before Mapper: 44, 2014010506======After Mapper:2014, 6
Before Mapper: 55, 2012010609======After Mapper:2012, 9
Before Mapper: 66, 2012010732======After Mapper:2012, 32
Before Mapper: 77, 2012010812======After Mapper:2012, 12
Before Mapper: 88, 2012010919======After Mapper:2012, 19
Before Mapper: 99, 2012011023======After Mapper:2012, 23
Before Mapper: 110, 2001010116======After Mapper:2001, 16
Before Mapper: 121, 2001010212======After Mapper:2001, 12
Before Mapper: 132, 2001010310======After Mapper:2001, 10
Before Mapper: 143, 2001010411======After Mapper:2001, 11
Before Mapper: 154, 2001010529======After Mapper:2001, 29
Before Mapper: 165, 2013010619======After Mapper:2013, 19
Before Mapper: 176, 2013010722======After Mapper:2013, 22
Before Mapper: 187, 2013010812======After Mapper:2013, 12
Before Mapper: 198, 2013010929======After Mapper:2013, 29
Before Mapper: 209, 2013011023======After Mapper:2013, 23
Before Mapper: 220, 2008010105======After Mapper:2008, 5
Before Mapper: 231, 2008010216======After Mapper:2008, 16
Before Mapper: 242, 2008010337======After Mapper:2008, 37
Before Mapper: 253, 2008010414======After Mapper:2008, 14
Before Mapper: 264, 2008010516======After Mapper:2008, 16
Before Mapper: 275, 2007010619======After Mapper:2007, 19
Before Mapper: 286, 2007010712======After Mapper:2007, 12
Before Mapper: 297, 2007010812======After Mapper:2007, 12
Before Mapper: 308, 2007010999======After Mapper:2007, 99
Before Mapper: 319, 2007011023======After Mapper:2007, 23
Before Mapper: 330, 2010010114======After Mapper:2010, 14
Before Mapper: 341, 2010010216======After Mapper:2010, 16
Before Mapper: 352, 2010010317======After Mapper:2010, 17
Before Mapper: 363, 2010010410======After Mapper:2010, 10
Before Mapper: 374, 2010010506======After Mapper:2010, 6
Before Mapper: 385, 2015010649======After Mapper:2015, 49
Before Mapper: 396, 2015010722======After Mapper:2015, 22
Before Mapper: 407, 2015010812======After Mapper:2015, 12
Before Mapper: 418, 2015010999======After Mapper:2015, 99
Before Mapper: 429, 2015011023======After Mapper:2015, 23
Finished

    再來看看執行結果:

    結果還有不少行,沒有截圖了。

    因爲沒有執行Reduce操做,所以這個就是Mapper輸出的中間文件的內容了。

    從打印的日誌能夠看出:

  • Mapper的輸出數據(k2, v2)格式是:經本身在Mapper中寫出的格式:<2010, 25>,<2012, 23>...

    從這個結果中能夠看出,原數據文件中的每一行確實都有一行輸出,那麼Map過程的第3階段就證明了。

    從這個結果中還能夠看出,「年份」已經不是輸入給Mapper的順序了,這也說明了在Map過程當中也按照Key執行了排序操做,即Map過程的第5階段

相關文章
相關標籤/搜索