TopN的問題分爲兩種:一種是建是惟一的,還有是建非惟一。咱們這邊作的就是建是惟一的。java
這裏的建指得是:下面數據的第一列。apache
有一堆數據,想根據第一列找出裏面的Top10.數據結構
以下:app
關鍵:在map和reduce階段都使用了TreeMap這個數據結構,他有從小到大的排序功能,因此排第一的最小,依次增大。限定大小爲10 ,只要超過十,就把排在第一個的值給刪除。ide
代碼以下:oop
package com.book.topn; import java.io.IOException; import java.util.Iterator; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class TopN { static class Mapper1 extends Mapper<LongWritable, Text, NullWritable, Text> { public SortedMap<Double, Text> top10cats = new TreeMap<Double, Text>(); public int N = 10; @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException { String[] lines = value.toString().split(","); Double weight = Double.parseDouble(lines[0]); // 一行讀完,而後把數據 top10cats.put(weight, new Text(value)); // 若是Map if (top10cats.size() > N) { top10cats.remove(top10cats.firstKey()); } } // 待執行完map的讀取比較操做後,就把TreeMap裏面的數據打印出來。 @Override protected void cleanup(Mapper<LongWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException { Set<Double> set = top10cats.keySet(); Iterator<Double> iterator = set.iterator(); while (iterator.hasNext()) { context.write(NullWritable.get(), top10cats.get(iterator.next())); } } } static class reduce1 extends Reducer<NullWritable, Text, NullWritable, Text> { SortedMap<Double, Text> finalTop = new TreeMap<Double, Text>(); private int N = 10; @Override protected void reduce(NullWritable arg0, Iterable<Text> values, Reducer<NullWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException { for (Text value : values) { String[] finalresult = value.toString().split(","); finalTop.put(Double.parseDouble(finalresult[0]), new Text(value)); if (finalTop.size() > N) { finalTop.remove(finalTop.firstKey()); } ; } Set<Double> set = finalTop.keySet(); Iterator<Double> iterator = set.iterator(); // 依次寫入到文件中 while (iterator.hasNext()) { context.write(NullWritable.get(), finalTop.get(iterator.next())); } } } public static void main(String[] args) throws Exception, IOException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TopN.class); job.setMapperClass(Mapper1.class); job.setReducerClass(reduce1.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); // 指定輸入的數據的目錄 FileInputFormat.setInputPaths(job, new Path("/Users/mac/Desktop/TopN.txt")); FileOutputFormat.setOutputPath(job, new Path("/Users/mac/Desktop/flowresort")); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
結果:spa
注意點:code
上面的注意點必定要切記。 orm