以前的文章已經簡單介紹過mapreduce的運做流程,不過其內部的shuffle過程並未深刻講解;本篇博客將分享shuffle的全過程。java
1、mapreduce運做流程長卷圖(其中[深]硃紅色表明是能夠用戶自定義的部分,固然它們有默認實現)apache
2、shuffle過程當中的combiner自定義實現服務器
首先combiner組件有什麼做用呢?它能夠減小咱們在shuffle歸併排序是的次數、reduce階段處理的數據次數,同時能夠有效提供程序的執行效率。app
如下是wordcount使用combiner實現的代碼框架
(1) maper實現:ide
package com.empire.hadoop.mr.wccombinerdemo; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * WordcountMapper.java的實現描述: KEYIN: 默認狀況下,是mr框架所讀到的一行文本的起始偏移量,Long, * 可是在hadoop中有本身的更精簡的序列化接口,因此不直接用Long,而用LongWritable * VALUEIN:默認狀況下,是mr框架所讀到的一行文本的內容,String,同上,用Text * KEYOUT:是用戶自定義邏輯處理完成以後輸出數據中的key,在此處是單詞,String,同上,用Text * VALUEOUT:是用戶自定義邏輯處理完成以後輸出數據中的value,在此處是單詞次數,Integer,同上,用IntWritable 類 * * @author arron 2018年12月4日 下午9:30:09 */ public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { /** * map階段的業務邏輯就寫在自定義的map()方法中 maptask會對每一行輸入數據調用一次咱們自定義的map()方法 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //將maptask傳給咱們的文本內容先轉換成String String line = value.toString(); //根據空格將這一行切分紅單詞 String[] words = line.split(" "); //將單詞輸出爲<單詞,1> for (String word : words) { //將單詞做爲key,將次數1做爲value,以便於後續的數據分發,能夠根據單詞分發,以便於相同單詞會到相同的reduce task context.write(new Text(word), new IntWritable(1)); } } }
(2) reducer實現實現:oop
package com.empire.hadoop.mr.wccombinerdemo; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * 類 WordcountReducer.java的實現描述:KEYIN, VALUEIN 對應 mapper輸出的KEYOUT,VALUEOUT類型對應 * KEYOUT, VALUEOUT 是自定義reduce邏輯處理結果的輸出數據類型 KEYOUT是單詞 VLAUEOUT是總次數 * * @author arron 2018年12月4日 下午9:51:15 */ public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { /** * <angelababy,1><angelababy,1><angelababy,1><angelababy,1><angelababy,1> * <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1> * <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1> * 入參key,是一組相同單詞kv對的key */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; /* * Iterator<IntWritable> iterator = values.iterator(); * while(iterator.hasNext()){ count += iterator.next().get(); } */ for (IntWritable value : values) { count += value.get(); } context.write(key, new IntWritable(count)); } }
(3) combiner實現實現:大數據
package com.empire.hadoop.mr.wccombinerdemo; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * 類 WordcountCombiner.java的實現描述:輸如爲map的輸出 * * @author arron 2018年12月4日 下午9:29:25 */ public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable v : values) { count += v.get(); } context.write(key, new IntWritable(count)); } }
(4) mapreduce主程序驅動類實現:3d
package com.empire.hadoop.mr.wccombinerdemo; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 類 WordcountDriver.java的實現描述:至關於一個yarn集羣的客戶端 須要在此封裝咱們的mr程序的相關運行參數,指定jar包 * 最後提交給yarn * * @author arron 2018年12月4日 下午9:29:48 */ public class WordcountDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //是否運行爲本地模式,就是看這個參數值是否爲local,默認就是local /* conf.set("mapreduce.framework.name", "local"); */ //本地模式運行mr程序時,輸入輸出的數據能夠在本地,也能夠在hdfs上 //到底在哪裏,就看如下兩行配置你用哪行,默認就是file:/// /* conf.set("fs.defaultFS", "hdfs://mini1:9000/"); */ /* conf.set("fs.defaultFS", "file:///"); */ //運行集羣模式,就是把程序提交到yarn中去運行 //要想運行爲集羣模式,如下3個參數要指定爲集羣上的值 /* * conf.set("mapreduce.framework.name", "yarn"); * conf.set("yarn.resourcemanager.hostname", "mini1"); * conf.set("fs.defaultFS", "hdfs://mini1:9000/"); */ Job job = Job.getInstance(conf); job.setJar("c:/wc.jar"); //指定本程序的jar包所在的本地路徑 /* job.setJarByClass(WordcountDriver.class); */ //指定本業務job要使用的mapper/Reducer業務類 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); //指定mapper輸出數據的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定最終輸出的數據的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //指定須要使用combiner,以及用哪一個類做爲combiner的邏輯 /* job.setCombinerClass(WordcountCombiner.class); */ job.setCombinerClass(WordcountReducer.class); //若是不設置InputFormat,它默認用的是TextInputformat.class job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); CombineTextInputFormat.setMinInputSplitSize(job, 2097152); //指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); //指定job的輸出結果所在目錄 FileOutputFormat.setOutputPath(job, new Path(args[1])); //將job中配置的相關參數,以及job所用的java類所在的jar包,提交給yarn去運行 /* job.submit(); */ boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }
3、最後總結code
雖然combiner組件在shuffle階段使用的話,能夠提升程序效率;可是,它有一個使用限制條件,那就是不能影響最後的執行結果;例如:這裏講述一個反例,對多個輸入的數進行求平均數,若是此時使用combiner將不能獲得正確的結果。
最後寄語,以上是博主本次文章的所有內容,若是你們以爲博主的文章還不錯,請點贊;若是您對博主其它服務器大數據技術或者博主本人感興趣,請關注博主博客,而且歡迎隨時跟博主溝通交流。