7,MapReduce基礎

MapReduce基礎

1、關於MapReduce

1.1 爲何要MapReduce

  • 單機資源有限:因爲單臺計算機的資源有限,計算能力不足以處理海量數據;因此須要多臺計算機組成分佈式集羣來處理海量數據。
  • 分佈式計算較複雜:在分佈式計算中,計算任務的分發,各個主機之間的協做;程序的啓動以及運行過程當中的監控、容錯、重試等都會變得很複雜。因此引入了MapReduce框架,框架解決了分佈式開發中的複雜性,開發人員只須要將大部分工做集中在業務邏輯的開發上,從而極大的提升了工做效率。

1.2 MapReduce的定義

  • MapReduce是一個分佈式運算程序的編程框架,用於大規模數據集(大於1TB)的並行計算;Map(映射)和reduce(歸約)是它的主要思想;它極大地方便了編程人員在不會分佈式並行編程的狀況下,將本身的程序運行在分佈式系統上。

2、MapReduce的優缺點

2.1 優勢:

  • 易於編程:只須要實現一些接口,就能夠完成一個分佈式程序的編寫;跟編寫一個串行程序同樣;
  • 良好的擴展性:當計算資源不足時,只須要簡單的增長機器來擴展它的計算能力;
  • 高容錯性:當一個機器掛了以後,會自動把上面的計算任務轉移到另外一個節點上運行,無需人工干預;
  • 海量:適合PB級海量數據的離線處理。

2.2 缺點:

  • 不適合實時計算:MapReduce因爲過程較爲複雜,IO次數較多,因此沒法作到毫秒或秒級響應;
  • 不適合流式計算:流式計算的輸入是動態的,能夠不斷添加,而MapReduce的輸入是靜態的;
  • 不適合DAG(有向圖)計算:對於多個程序之間有依賴關係,即後一個程序的輸入是前一個程序的輸出;雖然MapReduce也能夠完成,但都是經過磁盤來傳遞中間數據,形成大量的磁盤IO,性能極低。

3、MapReduce的執行階段

3.1 執行的兩個階段

  • Map階段:若干個maptask併發實例,徹底並行運行,互不相干。apache

  • Reduce階段:若干個reducetask併發實例,徹底並行運行,可是他們的數據依賴於Map階段的輸出。編程

  • 注意:MapReduce模型只能包含一個map階段和一個reduce階段;若是業務邏輯很是複雜,就只能使用多個MapReduce程序,串行運行。windows

4、編寫MapReduce程序

  • 用戶須要編寫的三個部分:Mapper、Reducer、Driver(提交MR程序)。

4.1 以WordCount爲例:

1. 編寫Mapper緩存

// 注意:hadoop1.0版本中是mapred下包,hadoop2.0是mapreduce下的包
import org.apache.hadoop.mapreduce.Mapper;
// 繼承Mapper父類,泛型爲輸入和輸出的<K, V>;並重寫父類的map方法
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    /**
     * 每行文本都會執行一次map方法.
     *
     * @param key     文本偏移量.
     * @param value   一行文本.
     * @param context 上下文對象.
     * @throws IOException          .
     * @throws InterruptedException 當阻塞方法收到中斷請求時拋出.
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split("\\s+");   // 拆分一行中的單詞
        for (String word : words) {
            context.write(new Text(word), new IntWritable(1));   // 輸出一個<K, V>
        }
    }
}

2. 編寫Reducer併發

// 繼承Reducer類,輸入的<K, V>類型爲map端輸出<K, V>類型
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    /**
     * 相同的key只會執行一次reduce方法
     *
     * @param key     map端輸出的key
     * @param values  相同key的value集合
     * @param context 上下文對象
     * @throws IOException          .
     * @throws InterruptedException .
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        // 當前的 key出現了多少次
        int count = 0;
        // values中的數據是反序列化過來的,最好不要直接使用values中的bean
        for (IntWritable value : values) {
            count += value.get();
        }
        context.write(key, new IntWritable(count));  // 輸出
    }
}

3. 編寫Driverapp

// Driver的做用是將這個Mapper和Reducer程序打包成一個Job,並提交該Job
public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 不須要爲 conf設置HDFS等參數,由於conf會調用系統默認的配置文件,
        // 因此這個mr程序在哪裏運行就會調用哪裏的配置文件,在集羣上運行就會使用集羣的設置文件。
        Configuration conf = new Configuration();
        // 刪除輸出文件,或者手動刪除
        // FileHelper.deleteDir(args[1], conf);

        // 根據配置文件實例化一個 Job,並取個名字
        Job job = Job.getInstance(conf, "MyWordCount");

        // 設置 Jar的位置
        job.setJarByClass(WordCountDriver.class);

        // 設置 Mapper運行類,以及輸出的key和value的類型
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 設置 Reducer的運行類,以及輸出的key和value的類型
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        // 設置分區(能夠不用設置)
        // 當設置的分區數大於實際分區數時,能夠正常執行,多出的分區爲空文件;
        // 當設置的分區數小於實際分區數時,會報錯。
        job.setNumReduceTasks(4);
        // 若是設置的 numReduceTasks大於 1,而又沒有設置自定義的 PartitionerClass
        // 則會調用系統默認的 HashPartitioner實現類來計算分區。
        job.setPartitionerClass(WordCountPartitioner.class);
        // 設置combine
        job.setCombinerClass(WordCountCombiner.class);

        // 設置輸入和輸出文件的位置
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 提交任務,等待執行結果,參數爲 true表示打印信息
        boolean result = job.waitForCompletion(true);
        // 根據 job的返回值自定義退出
        System.exit(result?0:1);
    }
}

4. 運行框架

  • 若是在Hadoop集羣上運行還須要將這個project打包成jar包,因此通常是先在windows上運行調試。
  • 因爲要從命令行輸入input和output參數,因此這裏配置一下輸入和輸出的位置。

5、MapReduce的主要執行流程

  1. job.waitForCompletion(true):將這個MapReduce任務(Job)提交,默認是提交到本地運行;部署到集羣時,是提交給YARN運行。
  2. map():在父類Mapper的run()方法中會調用子類重寫的map()方法。輸入文件的每一行都會調用一次map()方法,map()方法的參數中:key爲當前輸入行的偏移量,LongWritable類型;value爲當前輸入行的數據,Text類型;context爲上下文對象。父類Mapper是一個泛型類,泛型的類型表示map()方法輸入和輸出的<K, V>類型,子類在繼承時要傳入實際輸入輸出的<K, V>類型。map()使用context.write(k, v)來輸出數據到shuffle階段的環形緩衝區。
  3. shuffle階段簡述:shuffle階段起到承上啓下的做用;從接收map()方法的輸出,到執行reduce()方法以前都屬於shuffle階段。shuffle接收map()輸出<K,V>並經過K計算出分區號,而後與元數據一塊兒寫入環形緩存區;環形緩衝區溢寫時會將數據排序並寫入小文件,而後歸併成一個大的分區文件。一個ReducerTask主機會到全部MapTask主機上拉取對應的分區文件,歸併全部分區文件後會對相同的key進行合併,再執行reduce方法。
  4. reduce():在父類Reducer的run()方法中會調用子類重寫的reduce()方法。相同的key只會調用一次reduce()方法,reduce()方法的參數中:key爲相同key合併後的第一個key,與map()的輸出key類型相同;values爲相同key的value列表,類型是Iterable<map()的輸出value類型>。與Mapper類相似,子類在繼承Reducer時輸入的<K, V>類型是Mapper輸出的<K, V>類型、Reducer輸出的<K, V>類型是context.write(K, V)中<K, V>的類型。reduce中的context.write(K, V)最終會寫入到輸出文件中,就是此次MapReduce的結果。
相關文章
相關標籤/搜索