1.圖解MapReduce MapReduce總體流程圖 並行讀取文本中的內容,而後進行MapReduce操做 Map過程:並行讀取三行,對讀取的單詞進行map操做,每一個詞都以形式生成 reduce操做是對map的結果進行排序,合併,最後得出詞頻。 2.簡單過程: Input: Hello World Bye World Hello Hadoop Bye Hadoop Bye Hadoop Hello Hadoop Map: Sort: Combine: Reduce: MergeSort的過程(ps:2012-10-18) Map: MergeSort: | || | || ||| || ||| | ||| || ||| MergeArray結果: ||| || ||| | ||| || ||| 在|||這一層級 MergeArray結果: || | || 在||這一層級 MergeArray結果: | 在|這一層級 MergeArray結果: 排序完成 3.代碼實例: 複製代碼 package com.felix; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; /** * * 描述:WordCount explains by Felix * @author Hadoop Dev Group */ public class WordCount { /** * MapReduceBase類:實現了Mapper和Reducer接口的基類(其中的方法只是實現接口,而未做任何事情) * Mapper接口: * WritableComparable接口:實現WritableComparable的類能夠相互比較。全部被用做key的類應該實現此接口。 * Reporter 則可用於報告整個應用的運行進度,本例中未使用。 * */ public static class Map extends MapReduceBase implements Mapper { /** * LongWritable, IntWritable, Text 均是 Hadoop 中實現的用於封裝 Java 數據類型的類,這些類實現了WritableComparable接口, * 都可以被串行化從而便於在分佈式環境中進行數據交換,你能夠將它們分別視爲long,int,String 的替代品。 */ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); /** * Mapper接口中的map方法: * void map(K1 key, V1 value, OutputCollector output, Reporter reporter) * 映射一個單個的輸入k/v對到一箇中間的k/v對 * 輸出對不須要和輸入對是相同的類型,輸入對能夠映射到0個或多個輸出對。 * OutputCollector接口:收集Mapper和Reducer輸出的對。 * OutputCollector接口的collect(k, v)方法:增長一個(k,v)對到output */ public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { /** * JobConf:map/reduce的job配置類,向hadoop框架描述map-reduce執行的工做 * 構造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等 */ JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount"); //設置一個用戶定義的job名稱 conf.setOutputKeyClass(Text.class); //爲job的輸出數據設置Key類 conf.setOutputValueClass(IntWritable.class); //爲job輸出設置value類 conf.setMapperClass(Map.class); //爲job設置Mapper類 conf.setCombinerClass(Reduce.class); //爲job設置Combiner類 conf.setReducerClass(Reduce.class); //爲job設置Reduce類 conf.setInputFormat(TextInputFormat.class); //爲map-reduce任務設置InputFormat實現類 conf.setOutputFormat(TextOutputFormat.class); //爲map-reduce任務設置OutputFormat實現類 /** * InputFormat描述map-reduce中對job的輸入定義 * setInputPaths():爲map-reduce job設置路徑數組做爲輸入列表 * setInputPath():爲map-reduce job設置路徑數組做爲輸出列表 */ FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); //運行一個job } } 複製代碼java