大數據教程(9.2)MR內部的shuffle過程詳解&combiner的運行機制及代碼實現

        以前的文章已經簡單介紹過mapreduce的運做流程,不過其內部的shuffle過程並未深刻講解;本篇博客將分享shuffle的全過程。java

        1、mapreduce運做流程長卷圖(其中[深]硃紅色表明是能夠用戶自定義的部分,固然它們有默認實現)apache

        2、shuffle過程當中的combiner自定義實現服務器

               首先combiner組件有什麼做用呢?它能夠減小咱們在shuffle歸併排序是的次數、reduce階段處理的數據次數,同時能夠有效提供程序的執行效率。app

               如下是wordcount使用combiner實現的代碼框架

               (1) maper實現:ide

package com.empire.hadoop.mr.wccombinerdemo;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * WordcountMapper.java的實現描述: KEYIN: 默認狀況下,是mr框架所讀到的一行文本的起始偏移量,Long,
 * 可是在hadoop中有本身的更精簡的序列化接口,因此不直接用Long,而用LongWritable
 * VALUEIN:默認狀況下,是mr框架所讀到的一行文本的內容,String,同上,用Text
 * KEYOUT:是用戶自定義邏輯處理完成以後輸出數據中的key,在此處是單詞,String,同上,用Text
 * VALUEOUT:是用戶自定義邏輯處理完成以後輸出數據中的value,在此處是單詞次數,Integer,同上,用IntWritable 類
 * 
 * @author arron 2018年12月4日 下午9:30:09
 */

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    /**
     * map階段的業務邏輯就寫在自定義的map()方法中 maptask會對每一行輸入數據調用一次咱們自定義的map()方法
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //將maptask傳給咱們的文本內容先轉換成String
        String line = value.toString();
        //根據空格將這一行切分紅單詞
        String[] words = line.split(" ");

        //將單詞輸出爲<單詞,1>
        for (String word : words) {
            //將單詞做爲key,將次數1做爲value,以便於後續的數據分發,能夠根據單詞分發,以便於相同單詞會到相同的reduce task
            context.write(new Text(word), new IntWritable(1));
        }

    }

}

               (2) reducer實現實現:oop

package com.empire.hadoop.mr.wccombinerdemo;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 類 WordcountReducer.java的實現描述:KEYIN, VALUEIN 對應 mapper輸出的KEYOUT,VALUEOUT類型對應
 * KEYOUT, VALUEOUT 是自定義reduce邏輯處理結果的輸出數據類型 KEYOUT是單詞 VLAUEOUT是總次數
 * 
 * @author arron 2018年12月4日 下午9:51:15
 */
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    /**
     * <angelababy,1><angelababy,1><angelababy,1><angelababy,1><angelababy,1>
     * <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1>
     * <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1>
     * 入參key,是一組相同單詞kv對的key
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {

        int count = 0;
        /*
         * Iterator<IntWritable> iterator = values.iterator();
         * while(iterator.hasNext()){ count += iterator.next().get(); }
         */

        for (IntWritable value : values) {

            count += value.get();
        }

        context.write(key, new IntWritable(count));

    }

}

               (3) combiner實現實現:大數據

package com.empire.hadoop.mr.wccombinerdemo;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 類 WordcountCombiner.java的實現描述:輸如爲map的輸出
 * 
 * @author arron 2018年12月4日 下午9:29:25
 */
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {

        int count = 0;
        for (IntWritable v : values) {

            count += v.get();
        }

        context.write(key, new IntWritable(count));

    }

}

               (4) mapreduce主程序驅動類實現:3d

package com.empire.hadoop.mr.wccombinerdemo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 類 WordcountDriver.java的實現描述:至關於一個yarn集羣的客戶端 須要在此封裝咱們的mr程序的相關運行參數,指定jar包
 * 最後提交給yarn
 * 
 * @author arron 2018年12月4日 下午9:29:48
 */
public class WordcountDriver {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        //是否運行爲本地模式,就是看這個參數值是否爲local,默認就是local
        /* conf.set("mapreduce.framework.name", "local"); */

        //本地模式運行mr程序時,輸入輸出的數據能夠在本地,也能夠在hdfs上
        //到底在哪裏,就看如下兩行配置你用哪行,默認就是file:///
        /* conf.set("fs.defaultFS", "hdfs://mini1:9000/"); */
        /* conf.set("fs.defaultFS", "file:///"); */

        //運行集羣模式,就是把程序提交到yarn中去運行
        //要想運行爲集羣模式,如下3個參數要指定爲集羣上的值
        /*
         * conf.set("mapreduce.framework.name", "yarn");
         * conf.set("yarn.resourcemanager.hostname", "mini1");
         * conf.set("fs.defaultFS", "hdfs://mini1:9000/");
         */
        Job job = Job.getInstance(conf);

        job.setJar("c:/wc.jar");
        //指定本程序的jar包所在的本地路徑
        /* job.setJarByClass(WordcountDriver.class); */

        //指定本業務job要使用的mapper/Reducer業務類
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);

        //指定mapper輸出數據的kv類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //指定最終輸出的數據的kv類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //指定須要使用combiner,以及用哪一個類做爲combiner的邏輯
        /* job.setCombinerClass(WordcountCombiner.class); */
        job.setCombinerClass(WordcountReducer.class);

        //若是不設置InputFormat,它默認用的是TextInputformat.class
        job.setInputFormatClass(CombineTextInputFormat.class);
        CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
        CombineTextInputFormat.setMinInputSplitSize(job, 2097152);

        //指定job的輸入原始文件所在目錄
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //指定job的輸出結果所在目錄
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //將job中配置的相關參數,以及job所用的java類所在的jar包,提交給yarn去運行
        /* job.submit(); */
        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0 : 1);

    }

}

        3、最後總結code

               雖然combiner組件在shuffle階段使用的話,能夠提升程序效率;可是,它有一個使用限制條件,那就是不能影響最後的執行結果;例如:這裏講述一個反例,對多個輸入的數進行求平均數,若是此時使用combiner將不能獲得正確的結果。       

        最後寄語,以上是博主本次文章的所有內容,若是你們以爲博主的文章還不錯,請點贊;若是您對博主其它服務器大數據技術或者博主本人感興趣,請關注博主博客,而且歡迎隨時跟博主溝通交流。

相關文章
相關標籤/搜索