上一篇文章咱們瞭解了MapReduce優化方面的知識,如今咱們經過簡單的項目,學會如何優化MapReduce性能java
一、項目介紹git
咱們使用簡單的成績數據集,統計出0~20、20~50、50~100這三個年齡段的男、女學生的最高分數apache
二、數據集app
姓名 年齡 性別 成績ide
Alice 23 female 45oop
Bob 34 male 89性能
Chris 67 male 97優化
Kristine 38 female 53url
Connor 25 male 27spa
Daniel 78 male 95
James 34 male 79
Alex 52 male 69
三、分析
基於需求,咱們經過如下幾步完成:
一、編寫Mapper類,按需求將數據集解析爲key=gender,value=name+age+score,而後輸出
二、編寫Partitioner類,按年齡段,將結果指定給不一樣的Reduce執行
三、編寫Reducer類,分別統計出男女學生的最高分
四、編寫run方法執行MapReduce做業
四、實現
package com.buaa; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @ProjectName BestScoreCount * @PackageName com.buaa * @ClassName Gender * @Description 統計不一樣年齡段內,男、女最高分數 * @Author 劉吉超 * @Date 2016-05-09 09:49:50 */ public class Gender extends Configured implements Tool { private static String TAB_SEPARATOR = "\t"; public static class GenderMapper extends Mapper<LongWritable, Text, Text, Text> { /* * 調用map解析一行數據,該行的數據存儲在value參數中,而後根據\t分隔符,解析出姓名,年齡,性別和成績 */ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { /* * 姓名 年齡 性別 成績 * Alice 23 female 45 * 每一個字段的分隔符是tab鍵 */ // 使用\t,分割數據 String[] tokens = value.toString().split(TAB_SEPARATOR); // 性別 String gender = tokens[2]; // 姓名 年齡 成績 String nameAgeScore = tokens[0] + TAB_SEPARATOR + tokens[1] + TAB_SEPARATOR + tokens[3]; // 輸出 key=gender value=name+age+score context.write(new Text(gender), new Text(nameAgeScore)); } } /* * 合併 Mapper輸出結果 */ public static class GenderCombiner extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException { int maxScore = Integer.MIN_VALUE; int score = 0; String name = " "; String age = " "; for (Text val : values) { String[] valTokens = val.toString().split(TAB_SEPARATOR); score = Integer.parseInt(valTokens[2]); if (score > maxScore) { name = valTokens[0]; age = valTokens[1]; maxScore = score; } } context.write(key, new Text(name + TAB_SEPARATOR + age + TAB_SEPARATOR + maxScore)); } } /* * 根據 age年齡段將map輸出結果均勻分佈在reduce上 */ public static class GenderPartitioner extends Partitioner<Text, Text> { @Override public int getPartition(Text key, Text value, int numReduceTasks) { String[] nameAgeScore = value.toString().split(TAB_SEPARATOR); // 學生年齡 int age = Integer.parseInt(nameAgeScore[1]); // 默認指定分區 0 if (numReduceTasks == 0) return 0; // 年齡小於等於20,指定分區0 if (age <= 20) { return 0; }else if (age <= 50) { // 年齡大於20,小於等於50,指定分區1 return 1 % numReduceTasks; }else // 剩餘年齡,指定分區2 return 2 % numReduceTasks; } } /* * 統計出不一樣性別的最高分 */ public static class GenderReducer extends Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int maxScore = Integer.MIN_VALUE; int score = 0; String name = " "; String age = " "; String gender = " "; // 根據key,迭代 values集合,求出最高分 for (Text val : values) { String[] valTokens = val.toString().split(TAB_SEPARATOR); score = Integer.parseInt(valTokens[2]); if (score > maxScore) { name = valTokens[0]; age = valTokens[1]; gender = key.toString(); maxScore = score; } } context.write(new Text(name), new Text("age:" + age + TAB_SEPARATOR + "gender:" + gender + TAB_SEPARATOR + "score:" + maxScore)); } } @SuppressWarnings("deprecation") @Override public int run(String[] args) throws Exception { // 讀取配置文件 Configuration conf = new Configuration(); Path mypath = new Path(args[1]); FileSystem hdfs = mypath.getFileSystem(conf); if (hdfs.isDirectory(mypath)) { hdfs.delete(mypath, true); } // 新建一個任務 Job job = new Job(conf, "gender"); // 主類 job.setJarByClass(Gender.class); // Mapper job.setMapperClass(GenderMapper.class); // Reducer job.setReducerClass(GenderReducer.class); // map 輸出key類型 job.setMapOutputKeyClass(Text.class); // map 輸出value類型 job.setMapOutputValueClass(Text.class); // reduce 輸出key類型 job.setOutputKeyClass(Text.class); // reduce 輸出value類型 job.setOutputValueClass(Text.class); // 設置Combiner類 job.setCombinerClass(GenderCombiner.class); // 設置Partitioner類 job.setPartitionerClass(GenderPartitioner.class); // reduce個數設置爲3 job.setNumReduceTasks(3); // 輸入路徑 FileInputFormat.addInputPath(job, new Path(args[0])); // 輸出路徑 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 提交任務 return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { String[] args0 = { "hdfs://ljc:9000/buaa/gender/gender.txt", "hdfs://ljc:9000/buaa/gender/out/" }; int ec = ToolRunner.run(new Configuration(),new Gender(), args0); System.exit(ec); } }
五、運行效果