[MapReduce] WordCount 代碼實例及具體執行過程說明

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {
        public WordCount(){}

        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                Configuration conf = new Configuration();
                // 調用格式 hadoop  jar  .jar   <input dir path 1 >   <input dir path 2 >  ...  <output dir path>
                // 輸入可能有多個文件夾
                String[] otherArgs =  (new GenericOptionsParser(conf,args)).getRemainingArgs();

                if(otherArgs.length < 2) {
                        System.err.println("Usage: wordcount <in> [<in>...] <out>");
                        System.exit(2);
                }
                // 建立一個job 實例
                Job job = Job.getInstance(conf, "word count");
                job.setJarByClass(WordCount.class);

                // 指定繼承了Mapper並實現了map方法的類
                job.setMapperClass(WordCount.TokenizerMapper.class);

                // 指定合併操做(實際上就是本地執行的Reduce操做), 可選
                job.setCombinerClass(WordCount.TokenizerReducer.class);

                //  指定分區操做 Map在根據分區類將不一樣的key映射到相應的分區 默認根據key值,哈希分區
                // 須要該分區的Reduce 會根據JobTracker 對Map的監控 ,當Map結束後到相應的分區經過http取數據
                // 可選 ,默認哈希分區
                // job.setPartitionerClass(xxx.class);

                // 排序將根據Key數據類型的內部Comparator方法自動排序

                //  指定分組條件, 知足一樣條件的key值將被分到同一個組,排序在最前的key值 將做爲該組的key
                //  我的理解是reduce端拉取數據後的歸併操做 從 <key,value1>, <key, value2 >... => <key, value-list>
                // 默認key 徹底相同爲一組
                // job.setOutputKeyComparatorClass(XXX.class);

                // 指定Reduce操做
                job.setReducerClass(WordCount.TokenizerReducer.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);
                for(int i =0 ;i <otherArgs.length-1;i++){
                        FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
                }
                FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));
                System.exit(job.waitForCompletion(true)?0:1);

        }

        public  static  class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
                private final  IntWritable one = new IntWritable(1);
                private Text word = new Text();
                public TokenizerMapper(){}
                public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
                        String[] words = value.toString().split(" ");
                        for(String word_tmp : words){
                                this.word.set(word_tmp);
                                context.write(word, one);
                        }

                }
        }

        public static class TokenizerReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
                private IntWritable result = new IntWritable(0);
                public TokenizerReducer(){}
                public void reduce(Text key, Iterable<IntWritable> values,  Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
                        int sum = 0;
                        for(IntWritable val: values){
                                sum+=val.get();
                        }
                        result.set(sum);
                        context.write(key, result);
                }

        }

}

Hadoop的shuffle過程就是從map端輸出到reduce端輸入之間的過程html

Map端

Map端輸出要通過分區->排序->(合併->) 而後各分區Merge到一塊兒 等待Reduce 取走各分區的數據java

(1)分區 Partition
默認分區是哈希分區,但存在數據傾斜問題,即有些key值的數據很是多,所以會影響Map效率,所以能夠經過自定義分區類平衡數據分佈。哪一個key到哪一個Reducer的分配過程,是由Partitioner規定的 。也就是說一個map其本地有着多個分區的數據,不一樣分區的數據會被對應不一樣的reducer拉取。apache

(2)排序 Sort
默認根據key的數據類型的內置Comparator排序,本身實現的數據類型須要自定義Comparator函數
(3)合併 Combine
map操做後會產生大量的<key,value>鍵值對,而且可能存在重複的鍵值對,而這會影響傳遞數據的效率(copy過程),所以 在map端能夠經過本地的合併操做(能夠看做本地的一次reduce), 合併一樣key的鍵值對app

Reduce端

(1)Reduce端的copy過程
reduce端可能從n個map的結果中獲取數據,而這些map的執行速度不盡相同,當其中一個map運行結束時,reduce就會從JobTracker中獲取該信息。map運行結束後TaskTracker會獲得消息,進而將消息彙報給JobTracker,reduce定時從JobTracker獲取該信息,reduce端默認有5個數據複製線程從map端複製數據。
(2) 排序 分組(歸併)
reduce端從不一樣的map拉取的數據,數據確定須要通過再一次排序才能保證其有效性。因爲分區策略的緣由,同一分區內的key值可能不一樣或不知足咱們處理數據的需求, 所以咱們須要對數據進行分組,我我的理解爲其實就是大數據技術原理課程講的歸併操做,即相同key值或知足相同條件的key值 合併爲一個<key, value-list>,其中key值爲排序的第一個key值函數

參考資料
Partitioner與自定義Partitioner
Shuffle過程那點事兒
hadoop的分區、分組與排序的理解oop

相關文章
相關標籤/搜索