hadoop學習二:MapReduce源碼分析總結

一、Map-Reduce的邏輯過程

假設咱們須要處理一批有關天氣的數據,其格式以下: html

  • 按照ASCII碼存儲,每行一條記錄
  • 每一行字符從0開始計數,第15個到第18個字符爲年
  • 第25個到第29個字符爲溫度,其中第25位是符號+/-

0067011990999991950051507+0000+ java

0043011990999991950051512+0022+ 緩存

0043011990999991950051518-0011+ app

0043012650999991949032412+0111+ 分佈式

0043012650999991949032418+0078+ ide

0067011990999991937051507+0001+ 函數

0043011990999991937051512-0002+ oop

0043011990999991945051518+0001+ spa

0043012650999991945032412+0002+ 命令行

0043012650999991945032418+0078+

如今須要統計出每一年的最高溫度。

Map-Reduce主要包括兩個步驟:Map和Reduce

每一步都有key-value對做爲輸入和輸出:

  • map階段的key-value對的格式是由輸入的格式所決定的,若是是默認的TextInputFormat,則每行做爲一個記錄進程處理,其中key爲此行的開頭相對於文件的起始位置,value就是此行的字符文本
  • map階段的輸出的key-value對的格式必須同reduce階段的輸入key-value對的格式相對應

對於上面的例子,在map過程,輸入的key-value對以下:

(0, 0067011990999991950051507+0000+)

(33, 0043011990999991950051512+0022+)

(66, 0043011990999991950051518-0011+)

(99, 0043012650999991949032412+0111+)

(132, 0043012650999991949032418+0078+)

(165, 0067011990999991937051507+0001+)

(198, 0043011990999991937051512-0002+)

(231, 0043011990999991945051518+0001+)

(264, 0043012650999991945032412+0002+)

(297, 0043012650999991945032418+0078+)

在map過程當中,經過對每一行字符串的解析,獲得年-溫度的key-value對做爲輸出:

(1950, 0)

(1950, 22)

(1950, -11)

(1949, 111)

(1949, 78)

(1937, 1)

(1937, -2)

(1945, 1)

(1945, 2)

(1945, 78)

在reduce過程,將map過程當中的輸出,按照相同的key將value放到同一個列表中做爲reduce的輸入

(1950, [0, 22, –11])

(1949, [111, 78])

(1937, [1, -2])

(1945, [1, 2, 78])

在reduce過程當中,在列表中選擇出最大的溫度,將年-最大溫度的key-value做爲輸出:

(1950, 22)

(1949, 111)

(1937, 1)

(1945, 78)

其邏輯過程可用以下圖表示:

image

二、編寫Map-Reduce程序

編寫Map-Reduce程序,通常須要實現兩個函數:mapper中的map函數和Reducer中的reduce函數

通常遵循如下格式

map: (K1, V1)  ->  list(K2, V2)

public interface Mapper<K1,V1,K2,V2> extends JobConfigurable, Closeable{
    void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter)throw IOException;
}

reduce: (K2,list(v)) -> list(K3,V3)


public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {

  void reduce(K2 key, Iterator<V2> values,

              OutputCollector<K3, V3> output, Reporter reporter)

    throws IOException;

}
對於上面的例子,則實現的mapper以下:



public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

    @Override

    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

        String line = value.toString();

        String year = line.substring(15, 19);

        int airTemperature;

        if (line.charAt(25) == '+') {

            airTemperature = Integer.parseInt(line.substring(26, 30));

        } else {

            airTemperature = Integer.parseInt(line.substring(25, 30));

        }

        output.collect(new Text(year), new IntWritable(airTemperature));

    }

}
實現的reducer以下:



public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

        int maxValue = Integer.MIN_VALUE;

        while (values.hasNext()) {

            maxValue = Math.max(maxValue, values.next().get());

        }

        output.collect(key, new IntWritable(maxValue));

    }

}


欲運行上面實現的Mapper和Reduce,則須要生成一個Map-Reduce的任務(Job),基本包括如下三部分:‘


  • 輸入的數據,即須要處理的數據
  • Map-Reduce程序,即上面實現的Mapper和Reducer
  • 此任務的配置項JobConf


欲配置JobConf,須要大體瞭解Hadoop運行Job的基本原理:


  • Hadoop將Job分紅task進行處理,共有兩種task:map task 和 reduce task;
  • Hadoop有兩類的節點控制Job的運行:JobTracker和TaskTracker
  •                    JobTracker協調整個job的運行,將task分配到不一樣的TaskTracker上,
  •                    TaskTracker負責運行task,並將結果返回給JobTracker
  • Hadoop將輸入數據分紅固定大小的塊,咱們稱之爲input split
  • Hadoop爲每一個input split建立一個task,在此task中依次處理此split中的一個個記錄(record)
  • Hadoop會盡可能讓輸入數據塊所在的DataNode和task所執行的DataNode(每一個DataNode上都有一個TaskTracker)爲同一個,能夠提升運行效率,因此input slipt的大小也通常是HDFS的block的大小。
  • Reduce Task的輸入通常爲Map Task的輸出,Reduce Task的輸出爲整個Job的輸出。
  • 在reduce中,相同的key的全部的記錄必定會到同一個TaskTracker上面運行,然而不一樣的key能夠在不一樣的TaskTracker上面運行,咱們稱爲partition。
  •                      partition的規則爲:(K2,V2)-> Integer,也即根據K2,生成一個partition的id,具備相同id的K2則進入同一個partition,在同一個TaskTracker上被同一個Reducer處理。
  • public interface Partitioner<K2, V2> extends JobConfigurable {
    
      int getPartition(K2 key, V2 value, int numPartitions);
    
    }
    下圖大概描述了Map-Reduce的Job運行的基本原理:

image

下面咱們討論JobConf,其有不少的項能夠進行配置:

  • setInputFormat:設置map的輸入格式,默認爲TextInputFormat,key爲LongWritable,value爲Text
  • setNumMapTasks:設置map的任務的個數,此設置一般不起做用,map任務的個數取決於輸入的數據所能分紅的input slipt的一個個record,依次調用Mapper的map函數。
  • setMapOutputKeyClass和setMapOutputValueClass:設置Mapper的輸出的key-value對的格式。
  • setOutputKeyClass和setOutputValueClass:設置Reducer的輸出的key-value對的格式
  • setPartitionerClass和setNumReduceTasks:設置Partitioner,默認爲HashPartitioner,其根據key的hash值來決定進入哪一個partition,每一個partition被一個reduce task處理,因此partition的個數等於reduce task的個數。
  • setReducerClass:設置Reducer,默認爲IdentityReducer
  • setOutputFormat:設置任務的輸出格式,默認爲TextOutputFormat
  • FileInputFormat.addInputPath:設置輸入文件的路徑,可使一個文件,一個路徑,一個通配符。能夠被調用屢次添加多個路徑。
  • FileOutputFormat.setOutputPath:設置輸出文件的路徑,在job運行前此路徑不該該存在

固然不用全部的都設置,由上面的例子,能夠編寫Map-Reduce程序以下:

public class MapTemperature {
   public static void main (String[] args)throws IOException{
     if(args.length != 2){
         System.err.println("Usage: MaxTemperature <input path> <output path>");
         System.exit(-1);
      }
        JobConf conf = new JobConf(MaxTemperature.class);

        conf.setJobName("Max temperature");

        FileInputFormat.addInputPath(conf, new Path(args[0]));

        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        conf.setMapperClass(MaxTemperatureMapper.class);

        conf.setReducerClass(MaxTemperatureReducer.class);

        conf.setOutputKeyClass(Text.class);

        conf.setOutputValueClass(IntWritable.class);

        JobClient.runJob(conf);

    }

}

三、Map-Reduce數據流(data flow)

Map-Reduce的處理過程主要涉及如下四部分:
  • 客戶端Client:用於提交Map-Reduce任務job
  • JobTracker:協調整個job的運行,其爲一個Java進程,其main class爲JobTracker
  • TaskTracker:運行此job的task,處理input split,其爲一個java進程,其main class爲TaskTracker
  • HDFS:hadoop分佈式文件系統,用於在各個進程間共享Job相關的文件

image

3.一、任務提交

JobClient.runJob()建立一個新的JobClient實例,調用其submitJob()函數。
  • 向JobTracker請求一個新的job ID
  • 檢測此job的output配置
  • 計算此job的input split
  • 將Job運行所需的資源拷貝到JobTracker的文件系統中的文件夾中,包括job.jar文件、job.xml配置文件,input splits
  • 通知JobTracker此Job已經能夠運行了

提交任務後,runJob每隔一秒鐘輪詢一次job的進度,將進度返回到命令行,直到任務運行完畢。

3.二、任務初始化

當JobTracker收到submitJob調用的時候,將此任務放到一個隊列中,job調度器將從隊列中獲取任務並初始化任務。初始化首先建立一個對象來封裝job運行的tasks、status以及progress。

在建立task以前,job調度器首先從共享文件系統中得到JobClient計算出的input split。其爲每一個input split建立一個map task。每一個task被分配一個ID。

3.三、任務分配

TaskTracker週期性的向JobTracker發送heartbeat。

在heartbeat中,TaskTracker告知JobTracker其已經準備運行一個新的task,JobTracker將分配給其一個task。

在JobTracker爲TaskTracker選擇一個task以前,JobTracker必須首先按照優先級選擇一個Job,在最高優先級的Job中選擇一個task。TaskTracker有固定數量的位置來運行map task 或者 reduce task。

默認的調度器對待map task優先於reduce task

當選擇reduce task的時候,JobTracker並不在多個task之間進行選擇,而是直接取下一個,由於reduce task沒有數據本地化的概念。

3.四、任務執行

TaskTracker被分配了一個task,下面便要運行此task。

首先,TaskTracker將此job的jar從共享文件系統中拷貝到TaskTracker的文件系統中。

TaskTracker從distributed cache中將job運行所須要的文件拷貝到本地磁盤。

其次,其爲每一個task建立一個本地的工做目錄,將jar解壓縮到文件目錄中。

其三,其建立一個TaskRunner來運行task。

TaskRunner建立一新的JVM來運行task。

被建立的child JVM和TaskTracker通訊來報告運行進度。

3.4.一、Map的過程

MapRunnable從input split中讀取一個個的record,而後依次調用Mapper的map函數,將結果輸出。

map的輸出並非直接寫入硬盤,而是將其寫入緩存memory buffer。

當buffer中數據的到達必定的大小,一個背景線程將數據開始寫入硬盤。

在寫入硬盤以前,內存中的數據經過partitioner分紅多個partition。

在同一個partition中,背景線程會將數據按照key在內存中排序。

每次從內存向硬盤flush數據,都生成一個新的spill文件。

當此task結束以前,全部的spill文件被合併爲一個整的被partition的並且排好序的文件。

reducer能夠經過http協議請求map的輸出文件,tracker.http.threads能夠設置http服務線程數。

3.4.二、Reduce的過程

當map task結束後,其通知TaskTracker,TaskTracker通知JobTracker。

對於一個job,JobTracker知道TaskTracer和map輸出的對應關係。

reducer中一個線程週期性的向JobTracker請求map輸出的位置,直到其取得了全部的map輸出。

reduce task須要其對應的partition的全部的map輸出。

reduce task中的copy過程即當每一個map task結束的時候就開始拷貝輸出,由於不一樣的map task完成時間不一樣。

reduce task中有多個copy線程,能夠並行拷貝map輸出。

當不少map輸出拷貝到reduce task後,一個背景線程將其合併爲一個大的排好序的文件。

當全部的map輸出都拷貝到reduce task後,進入sort過程,將全部的map輸出合併爲大的排好序的文件。

最後進入reduce過程,調用reducer的reduce函數,處理排好序的輸出的每一個key,最後的結果寫入HDFS。

image

3.五、任務結束

 

當JobTracker得到最後一個task的運行成功的報告後,將job得狀態改成成功。

當JobClient從JobTracker輪詢的時候,發現此job已經成功結束,則向用戶打印消息,從runJob函數中返回。

hadoop運行痕跡http://www.cnblogs.com/forfuture1978/archive/2010/11/23/1884967.html
相關文章
相關標籤/搜索