MR程序重要組件-combiner

    用一句簡單的話語描述combiner組件做用:下降map任務輸出,減小reduce任務數量,從而下降網絡負載java

    工做機制:apache

        Map任務容許在提交給Reduce任務以前在本地執行一次彙總的操做,那就是combiner組件,combiner組件的行爲模式和Reduce同樣,都是接收key/values,產生key/value輸出網絡

        

    注意:app

    一、combiner的輸出是reduce的輸入ide

    二、若是combiner是可插拔的 ,那麼combiner毫不能改變最終結果oop

    三、combiner是一個優化組件,可是並非全部地方都能用到,因此combiner只能用於reduce的輸入、輸出key/value類型徹底一致且不影響最終結果的場景。優化

    例子:WordCount程序中,經過統計每個單詞出現的次數,咱們能夠首先經過Map任務本地進行一次彙總(Combiner),而後將彙總的結果交給Reduce,完成各個Map任務存在相同KEY的數據進行一次總的彙總,圖:spa

    

Combiner代碼:
code

    Combiner類,直接打開Combiner類源碼是直接繼承Reducer類,因此咱們直接繼承Reducer類便可,最終在提交時指定我們定義的Combiner類便可orm

package com.itheima.hadoop.mapreduce.combiner;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountCombiner extends
        Reducer<Text, LongWritable, Text, LongWritable> {

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context)
            throws IOException, InterruptedException {
        long count = 0 ;
        for (LongWritable value : values) {
            count += value.get();
        }
        context.write(key, new LongWritable(count));
    }

}

Mapper類:

package com.itheima.hadoop.mapreduce.mapper;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountCombinerMapper extends
        Mapper<LongWritable, Text, Text, LongWritable> {

    public void map(LongWritable key, Text value, Context context)
            throws java.io.IOException, InterruptedException {
        
        String line = value.toString(); //獲取一行數據
        String[] words = line.split(" "); //獲取各個單詞
        for (String word : words) {
            // 將每個單詞寫出去
            context.write(new Text(word), new LongWritable(1));
        }
        
        
        
    }

}

驅動類:

package com.itheima.hadoop.drivers;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;

import com.itheima.hadoop.mapreduce.combiner.WordCountCombiner;
import com.itheima.hadoop.mapreduce.mapper.WordCountCombinerMapper;

public class WordCountCombinerDriver extends Configured implements Tool{

    @Override
    public int run(String[] args) throws Exception {
        /**
         * 提交五重奏:
         * 一、產生做業
         * 二、指定MAP/REDUCE
         * 三、指定MAPREDUCE輸出數據類型
         * 四、指定路徑
         * 五、提交做業
         */
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(WordCountCombinerDriver.class);
        job.setMapperClass(WordCountCombinerMapper.class);
        
        /***此處中間小插曲:combiner組件***/
        job.setCombinerClass(WordCountCombiner.class);
        /***此處中間小插曲:combiner組件***/
        
        //reduce邏輯和combiner邏輯一致且combiner又是reduce的子類
        job.setReducerClass(WordCountCombiner.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }

}

主類:

package com.itheima.hadoop.runner;

import org.apache.hadoop.util.ToolRunner;

import com.itheima.hadoop.drivers.WordCountCombinerDriver;

public class WordCountCombinerRunner {

    public static void main(String[] args) throws Exception {
        
        int res = ToolRunner.run(new WordCountCombinerDriver(), args);
        System.exit(res);
    }
}

運行結果:

相關文章
相關標籤/搜索