MapReduce編程模型簡介和總結

MapReduce應用普遍的緣由之一就是其易用性,提供了一個高度抽象化而變得很是簡單的編程模型,它是在總結大量應用的共同特色的基礎上抽象出來的分佈式計算框架,在其編程模型中,任務能夠被分解成相互獨立的子問題。MapReduce編程模型給出了分佈式編程方法的5個步驟:java

  1. 迭代,遍歷輸入數據,將其解析成key/value對;
  2. 將輸入key/value對映射map成另一些key/value對;
  3. 根據key對中間結果進行分組(grouping);
  4. 以組爲單位對數據進行歸約;
  5. 迭代,將最終產生的key/value對保存到輸出文件中。

下面就簡要總結一下編程模型中用到的主要組件以及在其中的做用: node

仍然以示例開始:算法

package hadoop;

import java.io.IOException;
import java.util.StringTokenizer;
import java.util.UUID;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
/**
 * 

* <p>Title: ParquetNewMR</p>  

* <p>Description: </p>  

* @author zjhua

* @date 2019年4月7日
 */
public class ParquetNewMR {
 
    /**
     * map模型
    
    * <p>Title: WordCountMap</p>  
    
    * <p>Description: </p>  
    
    * @author zjhua
    
    * @date 2019年4月23日
     */
    public static class WordCountMap extends
            Mapper<LongWritable, Text, Text, IntWritable> {
 
        private final IntWritable one = new IntWritable(1);
        private Text word = new Text();
        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            StringTokenizer token = new StringTokenizer(line);
            while (token.hasMoreTokens()) {
                word.set(token.nextToken());
                context.write(word, one);
            }
        }
    }
 
    /**
     * reduce模型
    
    * <p>Title: WordCountReduce</p>  
    
    * <p>Description: </p>  
    
    * @author zjhua
    
    * @date 2019年4月23日
     */
    public static class WordCountReduce extends
            Reducer<Text, IntWritable, Void, Group> {
        private SimpleGroupFactory factory;
        @Override
        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            Group group = factory.newGroup()
                    .append("name",  key.toString())
                    .append("age", sum);
            context.write(null,group);
        }
 
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            super.setup(context);
            factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(context.getConfiguration()));
        }
    }
 
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String writeSchema = "message example {\n" +
                "required binary name;\n" +
                "required int32 age;\n" +
                "}";
        conf.set("parquet.example.schema",writeSchema);
//        conf.set("dfs.client.use.datanode.hostname", "true");
 
        Job job = Job.getInstance(conf); // new Job()接口過時了
        job.setJarByClass(ParquetNewMR.class);
        job.setJobName("parquet");
 
        String in = "hdfs://192.168.223.150:8020/user/hadoop1/wordcount/input";
        String out = "hdfs://192.168.223.150:8020/user/hadoop1/pq_out_" + UUID.randomUUID().toString();
 
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
 
        job.setOutputValueClass(Group.class);
 
        job.setMapperClass(WordCountMap.class);  // Map實現類
        job.setReducerClass(WordCountReduce.class);  //Reduce實現類
 
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(ParquetOutputFormat.class);
 
        FileInputFormat.addInputPath(job, new Path(in));
        ParquetOutputFormat.setOutputPath(job, new Path(out));
        ParquetOutputFormat.setWriteSupportClass(job, GroupWriteSupport.class);
 
        job.waitForCompletion(true);
    }
}

 

1. InputFormat

主要用於描述輸入數據的格式,提供數據切分功能,按照某種方式將輸入數據且分紅若干個split,肯定map task的個數,以及爲Mapper提供輸入數據,給定某個split,讓其解析成一個個key/value對。

InputFormat中的getSplits方法主要完成數據切分的功能,會嘗試着將輸入數據且分紅numSplits個進行存儲。InputSplit中只記錄了分片的元數據信息,好比起始位置、長度以及所在的節點列表。

在Hadoop中對象的序列化主要用在進程間通訊以及數據的永久存儲。Client端會調用Job中的InputFormat中的getSplits函數,看成業提交到JobTracker端對做業初始化時,能夠直接讀取該文件,解析出全部InputSplit,並建立對應的MapTask。

而重要的方法就是getRecordReader,其返回一個RecordReader,將輸入的InputSplit解析成若干個key/value對。MapReduce框架在Map Task執行過程當中,不斷地調用RecordReader對象中的方法,獲取key/value對交給map函數處理,僞代碼以下:

apache

K1 key = input.createKey();
V1 value = input.createValue();
while(input.next(key, value)){
     //invoke map()
}
input.close();


對於FileInputFormat,這是一個採用統一的方法對各類輸入文件進行切分的InputFormat,也是好比TextInputFormat, KeyValueInputFormat等類的基類。其中最重要的是getSplits函數,最核心的兩個算法就是文件切分算法以及host選擇算法。

文件切分算法主要用於肯定InputSplit的個數以及每一個InputSplit對應的數據段。

在InputSplit切分方案完成後,就須要肯定每一個InputSplit的元數據信息: <file, start, length, host>,表示InputSplit所在文件,起始位置,長度以及所在的host節點列表,其中host節點列表是最難肯定的。

host列表選擇策略直接影響到運行過程當中的任務本地性。Hadoop中HDFS文件是以block爲單位存儲的,一個大文件對應的block可能會遍及整個集羣,InputSplit的劃分算法可能致使一個InputSplit對應的多個block位於不一樣的節點上。

hadoop將數據本地性分紅三個等級:node locality, rack locality和data center locality。在進行任務調度時,會依次考慮3個節點的locality,優先讓空閒資源處理本節點的數據,其次同一個機架上的數據,最差是處理其餘機架上的數據。

雖然InputSplit對應的block可能位於多個節點上,但考慮到任務調度的效率,一般不會將全部節點到InputSplit的host列表中,而是選擇數據總量最大的前幾個節點,做爲任務調度時判斷任務是否具備本地性的主要憑據。對於FileInputFormat設計了一個簡單有效的啓發式算法:按照rack包含的數據量對rack進行排序,在rack內部按照每一個node包含的數據量對node排序,取前N個node的host做爲InputSplit的host列表(N爲block的副本數,默認爲3)。

當InputSplit的尺寸大於block的尺寸時,MapTask不能實現徹底的數據本地性,總有一部分數據須要從遠程節點中獲取,所以當使用基於FileInputFormat實現InputFormat時,爲了提升Map Task的數據本地性,應該儘可能使得InputSplit大小與block大小相同。(雖然理論上是這麼說,可是這會致使過多的MapTask,使得任務初始時佔用的資源很大)。

2. OutputFormat

OutputFormat主要用於描述輸出數據的格式,可以將用戶提供的key/value對寫入特定格式的文件中。其中與InputFormat相似,OutputFormat接口中有一個重要的方法就是getRecordWriter,返回的RecordWriter接收一個key/value對,並將之寫入文件。Task執行過程當中,MapReduce框架會將map或reduce函數產生的結果傳入write方法:編程

public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException{
     output.collect(newKey, newValue);
}


hadoop中全部基於文件的OutputFormat都是從FileOutputFormat中派生的,事實上這也是最經常使用的OutputFormat。總結髮現,FileOutputFormat實現的主要功能有兩點:多線程

  1. 爲防止用戶配置的輸出目錄數據被意外覆蓋,實現checkOutputSpecs接口,在輸出目錄存在時拋出異常;
  2. 處理side-effect file。hadoop可能會在一個做業執行過程當中加入一些推測式任務,所以,hadoop中reduce端執行的任務並不會真正寫入到輸出目錄,而是會爲每個Task的數據創建一個side-effect file,將產生的數據臨時寫入該文件,待Task完成後,再移動到最終輸出目錄。


默認狀況下,看成業成功完成後,會在最終結果目錄下生成空文件_SUCCESS,該文件主要爲高層應用提供做業運行完成的標識(好比oozie工做流就能夠根據這個判斷任務是否執行成功)。

3. Mapper和Reducer

Mapper的過程主要包括初始化、Map操做執行和清理三個部分。Reducer過程與Mapper過程基本相似。app

  1. 初始化,Mapper中的configure方法容許經過JobConf參數對Mapper進行初始化工做;
  2. Map操做,經過前面介紹的InputFormat中的RecordReader從InputSplit獲取一個key/value對,交給實際的map函數進行處理;
  3. 經過繼承Closable接口,得到close方法,實現對Mapper的清理。

對於一個MapReduce應用,不必定非要存在Mapper,MapReduce框架提供了比Mapper更加通用的接口:org.apache.hadoop.mapred.MapRunnable,能夠直接實現該接口定製本身的key/value處理邏輯(相對於MapReduce階段中固定的map階段,能夠跳過Map階段,好比Hadoop Pipes中的將數據發送給其餘進程處理)。
MapRunner是其固定實現,直接調用用戶job中設置的Mapper Class,此外,hadoop中還提供了一個多線程的MapRunnable實現,用於非CPU類型的做業提供吞吐率。

4. Partitioner

Partitoner的做用是對Mapper產生的中間結果進行分片,將同一分組的數據交給一個Reducer來處理,直接影響這Reducer階段的負載均衡。其中最重要的方法就是getPartition,包含三個參數,key,value,以及Reducer的個數numPartions。

MapReduce提供兩個Partitioner實現,HashPartitoner和TotalOrderPartitioner。HashPartitioner是默認實現,基於哈希值進行分片;TotalOrderPartitoner提供了一種基於區間分片的方法,一般用在數據的全排序中。例如歸併排序,若是Map Task進行局部排序後Reducer端進行全局排序,那麼Reducer端只能設置成1個,這會成爲性能瓶頸,爲了提升全局排序的性能和擴展性,並保證一個區間中的全部數據都大於前一個區間的數據,就會用到TotalOrderPartitioner。負載均衡

相關文章
相關標籤/搜索