mapreduce——實現網站PV排序兩種思路

業務需求:

1.統計request.dat中每一個頁面被訪問的總次數,同時,要求輸出結果文件中的數據按照次數大小倒序排序
2.統計request.dat中每一個頁面被訪問的總次數,同時,要求輸出結果文件中的數據top5

任務一:使用兩個mapreduce分開計算java

流程圖apache

關鍵技術點:緩存

mapreduce程序內置了一個排序機制:app

map work和reduce work,都會對數據按照key的大小來排序ide

因此最終的輸出結果中,必定是按照key有順序的結果oop

思路:測試

本案例中,就能夠利用這個機制來實現需求:網站

1.先寫一個mapreduce程序,將每一個頁面的訪問總次數統計出來this

2.再寫第二個mapreduce程序:url

map階段:讀取第一個mapreduce產生的結果文件,將每一條數據解析成一個Java對象URLCountBean(封裝一個URL和它的總次數),而後將這個對象做爲key,null做爲value返回。

要點:這個Java對象要實現WritableComparable接口,以讓worker能夠調用對象的compareTo方法來實現排序。

reduce階段:因爲worker已經對收到的數據按照URLCountBean的compareTo方法排序,因此,在reduce方法中,只要將數據輸出便可,最後的結果天然是按照總次數大小的有序結果。

測試數據:訪問日期+網站

2019/05/29 qq.com/a
2019/05/29 qq.com/bx
2019/05/29 qq.com/by
2019/05/29 qq.com/by3
2019/05/29 qq.com/news
2019/05/29 sina.com/news/socail
2019/05/29 163.com/ac
2019/05/29 sina.com/news/socail
2019/05/29 163.com/sport
2019/05/29 163.com/ac
2019/05/29 sina.com/play
2019/05/29 163.com/sport
2019/05/29 163.com/ac
2019/05/29 sina.com/movie
2019/05/29 sina.com/play
2019/05/29 sina.com/movie
2019/05/29 163.com/sport
2019/05/29 sina.com/movie
2019/05/29 163.com/ac
2019/05/29 163.com/ac
2019/05/29 163.com/acc
2019/05/29 qq.com/by
2019/05/29 qq.com/by3
2019/05/29 qq.com/news
2019/05/29 163.com/sport
2019/05/29 sina.com/news/socail
2019/05/29 163.com/sport
2019/05/29 sina.com/movie
2019/05/29 sina.com/news/socail
2019/05/29 sina.com/movie
2019/05/29 qq.com/news
2019/05/29 163.com/bb
2019/05/29 163.com/cc
2019/05/29 sina.com/lady/
2019/05/29 163.com/cc
2019/05/29 qq.com/news
2019/05/29 qq.com/by
2019/05/29 qq.com/by3
2019/05/29 sina.com/lady/
2019/05/29 qq.com/by3
2019/05/29 sina.com/lady/
2019/05/29 qq.com/by3
2019/05/29 qq.com/news
2019/05/29 qq.com/by3
2019/05/29 163.com/sport
2019/05/29 163.com/sport
2019/05/29 sina.com/news/socail
2019/05/29 sina.com/lady/
2019/05/29 sina.com/play
2019/05/29 sina.com/movie
2019/05/29 sina.com/music
2019/05/29 sina.com/sport
2019/05/29 sina.com/sport
2019/05/29 163.com/sport
2019/05/29 sina.com/news/socail
2019/05/29 sohu.com/lady/
2019/05/29 sohu.com/play
2019/05/29 sohu.com/movie
2019/05/29 sohu.com/music
2019/05/29 sohu.com/sport
2019/05/29 sohu.com/sport
2019/05/29 sina.com/news/socail
2019/05/29 baidu.com/lady/
2019/05/29 baidu.com/play
2019/05/29 baidu.com/movie
2019/05/29 baidu.com/music
2019/05/29 baidu.com/movie
2019/05/29 baidu.com/music
2019/05/29 baidu.com/movie
2019/05/29 baidu.com/music
2019/05/29 baidu.com/movie
2019/05/29 baidu.com/music
2019/05/29 baidu.com/movie
2019/05/29 baidu.com/music
2019/05/29 baidu.com/music
2019/05/29 baidu.com/movie
2019/05/29 baidu.com/music
2019/05/29 baidu.com/sport
2019/05/29 baidu.com/sport

編寫代碼實現

PageCount 序列化

public class PageCount implements WritableComparable<PageCount> {

    private String page;
    private int count;

    public void set(String page, int count) {
        this.page = page;
        this.count = count;
    }

    public String getPage() {
        return page;
    }

    public void setPage(String page) {
        this.page = page;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    public int compareTo(PageCount o) {
        return o.getCount()-this.count==0
                ?this.page.compareTo(o.getPage())
                :o.getCount()-this.count;
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(this.page);
        dataOutput.writeInt(this.count);
    }

    public void readFields(DataInput dataInput) throws IOException {
        this.page=dataInput.readUTF();
        this.count=dataInput.readInt();
    }

    @Override
    public String toString() {
        return "PageCount{" +
                "page='" + page + '\'' +
                ", count=" + count +
                '}';
    }
}

1.先寫一個mapreduce程序,將每一個頁面的訪問總次數統計出來

public class PageCountStep1 {
    public static class PageCountStep1Mapper extends Mapper<LongWritable,Text,Text,IntWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line=value.toString();
            String[] split=line.split(" ");
            context.write(new Text(split[1]),new IntWritable(1));
        }
    }

    public static class PageCountStep1Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int count=0;
            for(IntWritable v:values){
                count+=v.get();
            }
            context.write(key,new IntWritable(count));
        }
    }

    public static void main(String[] args)throws Exception {
        //加載配置文件
        Configuration conf = new Configuration();


        Job job = Job.getInstance(conf);

        job.setJarByClass(PageCountStep1.class);

        job.setMapperClass(PageCountStep1Mapper.class);
        job.setReducerClass(PageCountStep1Reduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.setInputPaths(job, new Path("f:/mrdata/url/input"));
        FileOutputFormat.setOutputPath(job, new Path("f:/mrdata/url/outputout"));

        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
    }
}

2.再寫第二個mapreduce程序對結果進行排序

public class PageCountStep2 {
    public static class PageCountStep2Mapper extends Mapper<LongWritable,Text,PageCount,NullWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] split = value.toString().split("\t");
            PageCount pageCount = new PageCount();
            pageCount.set(split[0],Integer.parseInt(split[1]));

            context.write(pageCount,NullWritable.get());
        }
    }
    public static class PageCountStep2Reduce extends Reducer<PageCount,NullWritable,PageCount,NullWritable>{
        @Override
        protected void reduce(PageCount key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            context.write(key,NullWritable.get());
        }
    }

    public static void main(String[] args)throws Exception {
        //加載配置文件
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(PageCountStep1.class);
        job.setMapperClass(PageCountStep2Mapper.class);
        job.setReducerClass(PageCountStep2Reduce.class);
        job.setMapOutputKeyClass(PageCount.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(PageCount.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.setInputPaths(job, new Path("f:/mrdata/url/outputout"));
        FileOutputFormat.setOutputPath(job, new Path("f:/mrdata/url/sortout"));
        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
    }
}

輸出結果

1.將每一個頁面的訪問總次數統計出來

163.com/ac	5
163.com/acc	1
163.com/bb	1
163.com/cc	2
163.com/sport	8
baidu.com/lady/	1
baidu.com/movie	6
baidu.com/music	7
baidu.com/play	1
baidu.com/sport	2
qq.com/a	1
qq.com/bx	1
qq.com/by	3
qq.com/by3	6
qq.com/news	5
sina.com/lady/	4
sina.com/movie	6
sina.com/music	1
sina.com/news/socail	7
sina.com/play	3
sina.com/sport	2
sohu.com/lady/	1
sohu.com/movie	1
sohu.com/music	1
sohu.com/play	1
sohu.com/sport	2

2.對結果進行排序輸出

PageCount{page='163.com/sport', count=8}
PageCount{page='baidu.com/music', count=7}
PageCount{page='sina.com/news/socail', count=7}
PageCount{page='baidu.com/movie', count=6}
PageCount{page='qq.com/by3', count=6}
PageCount{page='sina.com/movie', count=6}
PageCount{page='163.com/ac', count=5}
PageCount{page='qq.com/news', count=5}
PageCount{page='sina.com/lady/', count=4}
PageCount{page='qq.com/by', count=3}
PageCount{page='sina.com/play', count=3}
PageCount{page='163.com/cc', count=2}
PageCount{page='baidu.com/sport', count=2}
PageCount{page='sina.com/sport', count=2}
PageCount{page='sohu.com/sport', count=2}
PageCount{page='163.com/acc', count=1}
PageCount{page='163.com/bb', count=1}
PageCount{page='baidu.com/lady/', count=1}
PageCount{page='baidu.com/play', count=1}
PageCount{page='qq.com/a', count=1}
PageCount{page='qq.com/bx', count=1}
PageCount{page='sina.com/music', count=1}
PageCount{page='sohu.com/lady/', count=1}
PageCount{page='sohu.com/movie', count=1}
PageCount{page='sohu.com/music', count=1}
PageCount{page='sohu.com/play', count=1}

任務二:使用兩個map緩存來充當中間件,作計數使用,適合數據量小的場景使用

分析圖

PageCount對象實體

public class PageCount implements Comparable<PageCount>{

    private String page;
    private int count;

    public void set(String page, int count) {
        this.page = page;
        this.count = count;
    }

    public String getPage() {
        return page;
    }

    public void setPage(String page) {
        this.page = page;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    public int compareTo(PageCount o) {
        return o.getCount()-this.count==0
                ?this.page.compareTo(o.getPage())
                :o.getCount()-this.count;
    }
}

PageTopMapper:讀取數據

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class PageTopMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] split = line.split(" ");
        context.write(new Text(split[1]),new IntWritable(1));
    }
}

PageTopReducer:對數據進行分析統計

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;

public class PageTopReducer extends Reducer<Text,IntWritable,Text,IntWritable> {

    TreeMap<PageCount,Object>treeMap=new TreeMap<PageCount, Object>();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int count =0;
        for(IntWritable value :values){
            count +=value.get();
        }
        PageCount pageCount = new PageCount();
        pageCount.set(key.toString(),count);
        treeMap.put(pageCount,null);
    }
    /**
     * 全部數據處理完調用
     */
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {

        Configuration conf = context.getConfiguration();
        int top = conf.getInt("top.n", 5);
        Set<Map.Entry<PageCount, Object>> entrySet = treeMap.entrySet();
        int i=0;
        for (Map.Entry<PageCount,Object> entry:entrySet) {
            context.write(new Text(entry.getKey().getPage()),new IntWritable(entry.getKey().getCount()));
            i++;
            //取到排名前n條
            if(i==top) return;
        }
    }
}

JobSubmitter:啓動程序

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.Properties;

public class JobSubmitter {

    public static void main(String[] args) throws Exception{
        //加載配置文件
        Configuration conf = new Configuration();
        //1.直接給定
        //conf.setInt("top.n",3);
        //2.main方法傳參數
        //conf.setInt("top.n",Integer.parseInt(args[0]));
        //3.經過配置文件
        Properties props = new Properties();
        props.load(JobSubmitter.class.getClassLoader().
                getResourceAsStream("topn.properties"));
        conf.setInt("top.n",Integer.parseInt(props.getProperty("top.n")));
        //4.xml形式,直接配置,默認加載
        Job job = Job.getInstance(conf);

        job.setJarByClass(JobSubmitter.class);

        job.setMapperClass(PageTopMapper.class);
        job.setReducerClass(PageTopReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.setInputPaths(job, new Path("f:/mrdata/url/input"));
        FileOutputFormat.setOutputPath(job, new Path("f:/mrdata/url/output"));

        //job.setNumReduceTasks(3);

        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
    }
}

統計結果輸出:top5

163.com/sport    8
baidu.com/music    7
sina.com/news/socail    7
baidu.com/movie    6
qq.com/by3    6

版權@須臾之餘https://my.oschina.net/u/3995125

相關文章
相關標籤/搜索