用一句簡單的話語描述combiner組件做用:下降map任務輸出,減小reduce任務數量,從而下降網絡負載java
工做機制:apache
Map任務容許在提交給Reduce任務以前在本地執行一次彙總的操做,那就是combiner組件,combiner組件的行爲模式和Reduce同樣,都是接收key/values,產生key/value輸出網絡
注意:app
一、combiner的輸出是reduce的輸入ide
二、若是combiner是可插拔的 ,那麼combiner毫不能改變最終結果oop
三、combiner是一個優化組件,可是並非全部地方都能用到,因此combiner只能用於reduce的輸入、輸出key/value類型徹底一致且不影響最終結果的場景。優化
例子:WordCount程序中,經過統計每個單詞出現的次數,咱們能夠首先經過Map任務本地進行一次彙總(Combiner),而後將彙總的結果交給Reduce,完成各個Map任務存在相同KEY的數據進行一次總的彙總,圖:spa
Combiner代碼:
code
Combiner類,直接打開Combiner類源碼是直接繼承Reducer類,因此咱們直接繼承Reducer類便可,最終在提交時指定我們定義的Combiner類便可orm
package com.itheima.hadoop.mapreduce.combiner; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountCombiner extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0 ; for (LongWritable value : values) { count += value.get(); } context.write(key, new LongWritable(count)); } }
Mapper類:
package com.itheima.hadoop.mapreduce.mapper; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordCountCombinerMapper extends Mapper<LongWritable, Text, Text, LongWritable> { public void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException { String line = value.toString(); //獲取一行數據 String[] words = line.split(" "); //獲取各個單詞 for (String word : words) { // 將每個單詞寫出去 context.write(new Text(word), new LongWritable(1)); } } }
驅動類:
package com.itheima.hadoop.drivers; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import com.itheima.hadoop.mapreduce.combiner.WordCountCombiner; import com.itheima.hadoop.mapreduce.mapper.WordCountCombinerMapper; public class WordCountCombinerDriver extends Configured implements Tool{ @Override public int run(String[] args) throws Exception { /** * 提交五重奏: * 一、產生做業 * 二、指定MAP/REDUCE * 三、指定MAPREDUCE輸出數據類型 * 四、指定路徑 * 五、提交做業 */ Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WordCountCombinerDriver.class); job.setMapperClass(WordCountCombinerMapper.class); /***此處中間小插曲:combiner組件***/ job.setCombinerClass(WordCountCombiner.class); /***此處中間小插曲:combiner組件***/ //reduce邏輯和combiner邏輯一致且combiner又是reduce的子類 job.setReducerClass(WordCountCombiner.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } }
主類:
package com.itheima.hadoop.runner; import org.apache.hadoop.util.ToolRunner; import com.itheima.hadoop.drivers.WordCountCombinerDriver; public class WordCountCombinerRunner { public static void main(String[] args) throws Exception { int res = ToolRunner.run(new WordCountCombinerDriver(), args); System.exit(res); } }
運行結果: