目錄java
Map階段:若干個maptask併發實例,徹底並行運行,互不相干。apache
Reduce階段:若干個reducetask併發實例,徹底並行運行,可是他們的數據依賴於Map階段的輸出。編程
注意:MapReduce模型只能包含一個map階段和一個reduce階段;若是業務邏輯很是複雜,就只能使用多個MapReduce程序,串行運行。windows
1. 編寫Mapper緩存
// 注意:hadoop1.0版本中是mapred下包,hadoop2.0是mapreduce下的包 import org.apache.hadoop.mapreduce.Mapper; // 繼承Mapper父類,泛型爲輸入和輸出的<K, V>;並重寫父類的map方法 public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { /** * 每行文本都會執行一次map方法. * * @param key 文本偏移量. * @param value 一行文本. * @param context 上下文對象. * @throws IOException . * @throws InterruptedException 當阻塞方法收到中斷請求時拋出. */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split("\\s+"); // 拆分一行中的單詞 for (String word : words) { context.write(new Text(word), new IntWritable(1)); // 輸出一個<K, V> } } }
2. 編寫Reducer併發
// 繼承Reducer類,輸入的<K, V>類型爲map端輸出<K, V>類型 public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { /** * 相同的key只會執行一次reduce方法 * * @param key map端輸出的key * @param values 相同key的value集合 * @param context 上下文對象 * @throws IOException . * @throws InterruptedException . */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 當前的 key出現了多少次 int count = 0; // values中的數據是反序列化過來的,最好不要直接使用values中的bean for (IntWritable value : values) { count += value.get(); } context.write(key, new IntWritable(count)); // 輸出 } }
3. 編寫Driverapp
// Driver的做用是將這個Mapper和Reducer程序打包成一個Job,並提交該Job public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 不須要爲 conf設置HDFS等參數,由於conf會調用系統默認的配置文件, // 因此這個mr程序在哪裏運行就會調用哪裏的配置文件,在集羣上運行就會使用集羣的設置文件。 Configuration conf = new Configuration(); // 刪除輸出文件,或者手動刪除 // FileHelper.deleteDir(args[1], conf); // 根據配置文件實例化一個 Job,並取個名字 Job job = Job.getInstance(conf, "MyWordCount"); // 設置 Jar的位置 job.setJarByClass(WordCountDriver.class); // 設置 Mapper運行類,以及輸出的key和value的類型 job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 設置 Reducer的運行類,以及輸出的key和value的類型 job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 設置分區(能夠不用設置) // 當設置的分區數大於實際分區數時,能夠正常執行,多出的分區爲空文件; // 當設置的分區數小於實際分區數時,會報錯。 job.setNumReduceTasks(4); // 若是設置的 numReduceTasks大於 1,而又沒有設置自定義的 PartitionerClass // 則會調用系統默認的 HashPartitioner實現類來計算分區。 job.setPartitionerClass(WordCountPartitioner.class); // 設置combine job.setCombinerClass(WordCountCombiner.class); // 設置輸入和輸出文件的位置 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 提交任務,等待執行結果,參數爲 true表示打印信息 boolean result = job.waitForCompletion(true); // 根據 job的返回值自定義退出 System.exit(result?0:1); } }
4. 運行框架