import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public WordCount(){} public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); // 調用格式 hadoop jar .jar <input dir path 1 > <input dir path 2 > ... <output dir path> // 輸入可能有多個文件夾 String[] otherArgs = (new GenericOptionsParser(conf,args)).getRemainingArgs(); if(otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } // 建立一個job 實例 Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); // 指定繼承了Mapper並實現了map方法的類 job.setMapperClass(WordCount.TokenizerMapper.class); // 指定合併操做(實際上就是本地執行的Reduce操做), 可選 job.setCombinerClass(WordCount.TokenizerReducer.class); // 指定分區操做 Map在根據分區類將不一樣的key映射到相應的分區 默認根據key值,哈希分區 // 須要該分區的Reduce 會根據JobTracker 對Map的監控 ,當Map結束後到相應的分區經過http取數據 // 可選 ,默認哈希分區 // job.setPartitionerClass(xxx.class); // 排序將根據Key數據類型的內部Comparator方法自動排序 // 指定分組條件, 知足一樣條件的key值將被分到同一個組,排序在最前的key值 將做爲該組的key // 我的理解是reduce端拉取數據後的歸併操做 從 <key,value1>, <key, value2 >... => <key, value-list> // 默認key 徹底相同爲一組 // job.setOutputKeyComparatorClass(XXX.class); // 指定Reduce操做 job.setReducerClass(WordCount.TokenizerReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for(int i =0 ;i <otherArgs.length-1;i++){ FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1])); System.exit(job.waitForCompletion(true)?0:1); } public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final IntWritable one = new IntWritable(1); private Text word = new Text(); public TokenizerMapper(){} public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String[] words = value.toString().split(" "); for(String word_tmp : words){ this.word.set(word_tmp); context.write(word, one); } } } public static class TokenizerReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable result = new IntWritable(0); public TokenizerReducer(){} public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable val: values){ sum+=val.get(); } result.set(sum); context.write(key, result); } } }
Hadoop的shuffle過程就是從map端輸出到reduce端輸入之間的過程html
Map端輸出要通過分區->排序->(合併->) 而後各分區Merge到一塊兒 等待Reduce 取走各分區的數據java
(1)分區 Partition
默認分區是哈希分區,但存在數據傾斜問題,即有些key值的數據很是多,所以會影響Map效率,所以能夠經過自定義分區類平衡數據分佈。哪一個key到哪一個Reducer的分配過程,是由Partitioner規定的 。也就是說一個map其本地有着多個分區的數據,不一樣分區的數據會被對應不一樣的reducer拉取。apache
(2)排序 Sort
默認根據key的數據類型的內置Comparator排序,本身實現的數據類型須要自定義Comparator函數
(3)合併 Combine
map操做後會產生大量的<key,value>鍵值對,而且可能存在重複的鍵值對,而這會影響傳遞數據的效率(copy過程),所以 在map端能夠經過本地的合併操做(能夠看做本地的一次reduce), 合併一樣key的鍵值對app
(1)Reduce端的copy過程
reduce端可能從n個map的結果中獲取數據,而這些map的執行速度不盡相同,當其中一個map運行結束時,reduce就會從JobTracker中獲取該信息。map運行結束後TaskTracker會獲得消息,進而將消息彙報給JobTracker,reduce定時從JobTracker獲取該信息,reduce端默認有5個數據複製線程從map端複製數據。
(2) 排序 分組(歸併)
reduce端從不一樣的map拉取的數據,數據確定須要通過再一次排序才能保證其有效性。因爲分區策略的緣由,同一分區內的key值可能不一樣或不知足咱們處理數據的需求, 所以咱們須要對數據進行分組,我我的理解爲其實就是大數據技術原理課程講的歸併操做,即相同key值或知足相同條件的key值 合併爲一個<key, value-list>,其中key值爲排序的第一個key值函數
參考資料
Partitioner與自定義Partitioner
Shuffle過程那點事兒
hadoop的分區、分組與排序的理解oop