03Hadoop的TopN的問題

TopN的問題分爲兩種:一種是建是惟一的,還有是建非惟一。咱們這邊作的就是建是惟一的。java

這裏的建指得是:下面數據的第一列。apache

 

有一堆數據,想根據第一列找出裏面的Top10.數據結構

以下:app

 

關鍵:在map和reduce階段都使用了TreeMap這個數據結構,他有從小到大的排序功能,因此排第一的最小,依次增大。限定大小爲10 ,只要超過十,就把排在第一個的值給刪除。ide

 

代碼以下:oop

package com.book.topn;

import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TopN {

    static class Mapper1 extends Mapper<LongWritable, Text, NullWritable, Text> {
        public SortedMap<Double, Text> top10cats = new TreeMap<Double, Text>();
        public int N = 10;

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, Text>.Context context)
                throws IOException, InterruptedException {

            String[] lines = value.toString().split(",");
            Double weight = Double.parseDouble(lines[0]);
            // 一行讀完,而後把數據
            top10cats.put(weight, new Text(value));

            // 若是Map
            if (top10cats.size() > N) {
                top10cats.remove(top10cats.firstKey());
            }
        }

        // 待執行完map的讀取比較操做後,就把TreeMap裏面的數據打印出來。
        @Override
        protected void cleanup(Mapper<LongWritable, Text, NullWritable, Text>.Context context)
                throws IOException, InterruptedException {

            Set<Double> set = top10cats.keySet();

            Iterator<Double> iterator = set.iterator();

            while (iterator.hasNext()) {

                context.write(NullWritable.get(), top10cats.get(iterator.next()));
            }

        }

    }

    static class reduce1 extends Reducer<NullWritable, Text, NullWritable, Text> {

        SortedMap<Double, Text> finalTop = new TreeMap<Double, Text>();
        private int N = 10;

        @Override
        protected void reduce(NullWritable arg0, Iterable<Text> values,
                Reducer<NullWritable, Text, NullWritable, Text>.Context context)
                throws IOException, InterruptedException {

            for (Text value : values) {

                String[] finalresult = value.toString().split(",");

                finalTop.put(Double.parseDouble(finalresult[0]), new Text(value));
                if (finalTop.size() > N) {
                    finalTop.remove(finalTop.firstKey());
                }
                ;

            }

            Set<Double> set = finalTop.keySet();

            Iterator<Double> iterator = set.iterator();

            // 依次寫入到文件中
            while (iterator.hasNext()) {

                context.write(NullWritable.get(), finalTop.get(iterator.next()));
            }

        }

    }

    public static void main(String[] args) throws Exception, IOException {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(TopN.class);

        job.setMapperClass(Mapper1.class);
        job.setReducerClass(reduce1.class);

        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(Text.class);

        // 指定輸入的數據的目錄
        FileInputFormat.setInputPaths(job, new Path("/Users/mac/Desktop/TopN.txt"));

        FileOutputFormat.setOutputPath(job, new Path("/Users/mac/Desktop/flowresort"));

        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);

    }

}

結果:spa

 

 

 

注意點:code

 

上面的注意點必定要切記。 orm

相關文章
相關標籤/搜索