Map-Reduce的邏輯過程

一、Map-Reduce的邏輯過程緩存

假設咱們須要處理一批有關天氣的數據,其格式以下:app


按照ASCII碼存儲,每行一條記錄分佈式

每一行字符從0開始計數,第15個到第18個字符爲年ide

第25個到第29個字符爲溫度,其中第25位是符號+/-函數

0067011990999991950051507+0000+oop


0043011990999991950051512+0022+命令行


0043011990999991950051518-0011+線程


0043012650999991949032412+0111+orm


0043012650999991949032418+0078+xml


0067011990999991937051507+0001+


0043011990999991937051512-0002+


0043011990999991945051518+0001+


0043012650999991945032412+0002+


0043012650999991945032418+0078+


如今須要統計出每一年的最高溫度。


Map-Reduce主要包括兩個步驟:Map和Reduce


每一步都有key-value對做爲輸入和輸出:


map階段的key-value對的格式是由輸入的格式所決定的,若是是默認的TextInputFormat,則每行做爲一個記錄進程處理,其中key爲此行的開頭相對於文件的起始位置,value就是此行的字符文本

map階段的輸出的key-value對的格式必須同reduce階段的輸入key-value對的格式相對應

對於上面的例子,在map過程,輸入的key-value對以下:


(0, 0067011990999991950051507+0000+)


(33, 0043011990999991950051512+0022+)


(66, 0043011990999991950051518-0011+)


(99, 0043012650999991949032412+0111+)


(132, 0043012650999991949032418+0078+)


(165, 0067011990999991937051507+0001+)


(198, 0043011990999991937051512-0002+)


(231, 0043011990999991945051518+0001+)


(264, 0043012650999991945032412+0002+)


(297, 0043012650999991945032418+0078+)


在map過程當中,經過對每一行字符串的解析,獲得年-溫度的key-value對做爲輸出:


(1950, 0)


(1950, 22)


(1950, -11)


(1949, 111)


(1949, 78)


(1937, 1)


(1937, -2)


(1945, 1)


(1945, 2)


(1945, 78)


在reduce過程,將map過程當中的輸出,按照相同的key將value放到同一個列表中做爲reduce的輸入


(1950, [0, 22, –11])


(1949, [111, 78])


(1937, [1, -2])


(1945, [1, 2, 78])


在reduce過程當中,在列表中選擇出最大的溫度,將年-最大溫度的key-value做爲輸出:


(1950, 22)


(1949, 111)


(1937, 1)


(1945, 78)


其邏輯過程可用以下圖表示:


image


二、編寫Map-Reduce程序

編寫Map-Reduce程序,通常須要實現兩個函數:mapper中的map函數和reducer中的reduce函數。


通常遵循如下格式:


map: (K1, V1)  ->  list(K2, V2)

public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {


  void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)


  throws IOException;


}


reduce: (K2, list(V))  ->  list(K3, V3) 

public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {


  void reduce(K2 key, Iterator<V2> values,


              OutputCollector<K3, V3> output, Reporter reporter)


    throws IOException;


}


 


對於上面的例子,則實現的mapper以下:


public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {


    @Override


    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {


        String line = value.toString();


        String year = line.substring(15, 19);


        int airTemperature;


        if (line.charAt(25) == '+') {


            airTemperature = Integer.parseInt(line.substring(26, 30));


        } else {


            airTemperature = Integer.parseInt(line.substring(25, 30));


        }


        output.collect(new Text(year), new IntWritable(airTemperature));


    }


}


實現的reducer以下:


public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {


    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {


        int maxValue = Integer.MIN_VALUE;


        while (values.hasNext()) {


            maxValue = Math.max(maxValue, values.next().get());


        }


        output.collect(key, new IntWritable(maxValue));


    }


}


 


欲運行上面實現的Mapper和Reduce,則須要生成一個Map-Reduce得任務(Job),其基本包括如下三部分:


輸入的數據,也即須要處理的數據

Map-Reduce程序,也即上面實現的Mapper和Reducer

此任務的配置項JobConf

欲配置JobConf,須要大體瞭解Hadoop運行job的基本原理:


Hadoop將Job分紅task進行處理,共兩種task:map task和reduce task

Hadoop有兩類的節點控制job的運行:JobTracker和TaskTracker

JobTracker協調整個job的運行,將task分配到不一樣的TaskTracker上

TaskTracker負責運行task,並將結果返回給JobTracker

Hadoop將輸入數據分紅固定大小的塊,咱們稱之input split

Hadoop爲每個input split建立一個task,在此task中依次處理此split中的一個個記錄(record)

Hadoop會盡可能讓輸入數據塊所在的DataNode和task所執行的DataNode(每一個DataNode上都有一個TaskTracker)爲同一個,能夠提升運行效率,因此input split的大小也通常是HDFS的block的大小。

Reduce task的輸入通常爲Map Task的輸出,Reduce Task的輸出爲整個job的輸出,保存在HDFS上。

在reduce中,相同key的全部的記錄必定會到同一個TaskTracker上面運行,然而不一樣的key能夠在不一樣的TaskTracker上面運行,咱們稱之爲partition

partition的規則爲:(K2, V2) –> Integer, 也即根據K2,生成一個partition的id,具備相同id的K2則進入同一個partition,被同一個TaskTracker上被同一個Reducer進行處理。

public interface Partitioner<K2, V2> extends JobConfigurable {


  int getPartition(K2 key, V2 value, int numPartitions);


}


下圖大概描述了Map-Reduce的Job運行的基本原理:


image


 


下面咱們討論JobConf,其有不少的項能夠進行配置:


setInputFormat:設置map的輸入格式,默認爲TextInputFormat,key爲LongWritable, value爲Text

setNumMapTasks:設置map任務的個數,此設置一般不起做用,map任務的個數取決於輸入的數據所能分紅的input split的個數

setMapperClass:設置Mapper,默認爲IdentityMapper

setMapRunnerClass:設置MapRunner, map task是由MapRunner運行的,默認爲MapRunnable,其功能爲讀取input split的一個個record,依次調用Mapper的map函數

setMapOutputKeyClass和setMapOutputValueClass:設置Mapper的輸出的key-value對的格式

setOutputKeyClass和setOutputValueClass:設置Reducer的輸出的key-value對的格式

setPartitionerClass和setNumReduceTasks:設置Partitioner,默認爲HashPartitioner,其根據key的hash值來決定進入哪一個partition,每一個partition被一個reduce task處理,因此partition的個數等於reduce task的個數

setReducerClass:設置Reducer,默認爲IdentityReducer

setOutputFormat:設置任務的輸出格式,默認爲TextOutputFormat

FileInputFormat.addInputPath:設置輸入文件的路徑,可使一個文件,一個路徑,一個通配符。能夠被調用屢次添加多個路徑

FileOutputFormat.setOutputPath:設置輸出文件的路徑,在job運行前此路徑不該該存在

固然不用全部的都設置,由上面的例子,能夠編寫Map-Reduce程序以下:


public class MaxTemperature {


    public static void main(String[] args) throws IOException {


        if (args.length != 2) {


            System.err.println("Usage: MaxTemperature <input path> <output path>");


            System.exit(-1);


        }


        JobConf conf = new JobConf(MaxTemperature.class);


        conf.setJobName("Max temperature");


        FileInputFormat.addInputPath(conf, new Path(args[0]));


        FileOutputFormat.setOutputPath(conf, new Path(args[1]));


        conf.setMapperClass(MaxTemperatureMapper.class);


        conf.setReducerClass(MaxTemperatureReducer.class);


        conf.setOutputKeyClass(Text.class);


        conf.setOutputValueClass(IntWritable.class);


        JobClient.runJob(conf);


    }


}


三、Map-Reduce數據流(data flow)

Map-Reduce的處理過程主要涉及如下四個部分:


客戶端Client:用於提交Map-reduce任務job

JobTracker:協調整個job的運行,其爲一個Java進程,其main class爲JobTracker

TaskTracker:運行此job的task,處理input split,其爲一個Java進程,其main class爲TaskTracker

HDFS:hadoop分佈式文件系統,用於在各個進程間共享Job相關的文件

image


3.一、任務提交

JobClient.runJob()建立一個新的JobClient實例,調用其submitJob()函數。


向JobTracker請求一個新的job ID

檢測此job的output配置

計算此job的input splits

將Job運行所需的資源拷貝到JobTracker的文件系統中的文件夾中,包括job jar文件,job.xml配置文件,input splits

通知JobTracker此Job已經能夠運行了

提交任務後,runJob每隔一秒鐘輪詢一次job的進度,將進度返回到命令行,直到任務運行完畢。


 


3.二、任務初始化

 


當JobTracker收到submitJob調用的時候,將此任務放到一個隊列中,job調度器將從隊列中獲取任務並初始化任務。


初始化首先建立一個對象來封裝job運行的tasks, status以及progress。


在建立task以前,job調度器首先從共享文件系統中得到JobClient計算出的input splits。


其爲每一個input split建立一個map task。


每一個task被分配一個ID。


 


3.三、任務分配

 


TaskTracker週期性的向JobTracker發送heartbeat。


在heartbeat中,TaskTracker告知JobTracker其已經準備運行一個新的task,JobTracker將分配給其一個task。


在JobTracker爲TaskTracker選擇一個task以前,JobTracker必須首先按照優先級選擇一個Job,在最高優先級的Job中選擇一個task。


TaskTracker有固定數量的位置來運行map task或者reduce task。


默認的調度器對待map task優先於reduce task


當選擇reduce task的時候,JobTracker並不在多個task之間進行選擇,而是直接取下一個,由於reduce task沒有數據本地化的概念。


 


3.四、任務執行

 


TaskTracker被分配了一個task,下面便要運行此task。


首先,TaskTracker將此job的jar從共享文件系統中拷貝到TaskTracker的文件系統中。


TaskTracker從distributed cache中將job運行所須要的文件拷貝到本地磁盤。


其次,其爲每一個task建立一個本地的工做目錄,將jar解壓縮到文件目錄中。


其三,其建立一個TaskRunner來運行task。


TaskRunner建立一個新的JVM來運行task。


被建立的child JVM和TaskTracker通訊來報告運行進度。


 

3.4.一、Map的過程

MapRunnable從input split中讀取一個個的record,而後依次調用Mapper的map函數,將結果輸出。


map的輸出並非直接寫入硬盤,而是將其寫入緩存memory buffer。


當buffer中數據的到達必定的大小,一個背景線程將數據開始寫入硬盤。


在寫入硬盤以前,內存中的數據經過partitioner分紅多個partition。


在同一個partition中,背景線程會將數據按照key在內存中排序。


每次從內存向硬盤flush數據,都生成一個新的spill文件。


當此task結束以前,全部的spill文件被合併爲一個整的被partition的並且排好序的文件。


reducer能夠經過http協議請求map的輸出文件,tracker.http.threads能夠設置http服務線程數。


3.4.二、Reduce的過程

當map task結束後,其通知TaskTracker,TaskTracker通知JobTracker。


對於一個job,JobTracker知道TaskTracer和map輸出的對應關係。


reducer中一個線程週期性的向JobTracker請求map輸出的位置,直到其取得了全部的map輸出。


reduce task須要其對應的partition的全部的map輸出。


reduce task中的copy過程即當每一個map task結束的時候就開始拷貝輸出,由於不一樣的map task完成時間不一樣。


reduce task中有多個copy線程,能夠並行拷貝map輸出。


當不少map輸出拷貝到reduce task後,一個背景線程將其合併爲一個大的排好序的文件。


當全部的map輸出都拷貝到reduce task後,進入sort過程,將全部的map輸出合併爲大的排好序的文件。


最後進入reduce過程,調用reducer的reduce函數,處理排好序的輸出的每一個key,最後的結果寫入HDFS。


 


image


 


3.五、任務結束

當JobTracker得到最後一個task的運行成功的報告後,將job得狀態改成成功。

當JobClient從JobTracker輪詢的時候,發現此job已經成功結束,則向用戶打印消息,從runJob函數中返回。

相關文章
相關標籤/搜索