1.統計request.dat中每一個頁面被訪問的總次數,同時,要求輸出結果文件中的數據按照次數大小倒序排序 2.統計request.dat中每一個頁面被訪問的總次數,同時,要求輸出結果文件中的數據top5
任務一:使用兩個mapreduce分開計算java
流程圖apache
關鍵技術點:緩存
mapreduce程序內置了一個排序機制:app
map work和reduce work,都會對數據按照key的大小來排序ide
因此最終的輸出結果中,必定是按照key有順序的結果oop
思路:測試
本案例中,就能夠利用這個機制來實現需求:網站
1.先寫一個mapreduce程序,將每一個頁面的訪問總次數統計出來this
2.再寫第二個mapreduce程序:url
map階段:讀取第一個mapreduce產生的結果文件,將每一條數據解析成一個Java對象URLCountBean(封裝一個URL和它的總次數),而後將這個對象做爲key,null做爲value返回。
要點:這個Java對象要實現WritableComparable接口,以讓worker能夠調用對象的compareTo方法來實現排序。
reduce階段:因爲worker已經對收到的數據按照URLCountBean的compareTo方法排序,因此,在reduce方法中,只要將數據輸出便可,最後的結果天然是按照總次數大小的有序結果。
測試數據:訪問日期+網站
2019/05/29 qq.com/a 2019/05/29 qq.com/bx 2019/05/29 qq.com/by 2019/05/29 qq.com/by3 2019/05/29 qq.com/news 2019/05/29 sina.com/news/socail 2019/05/29 163.com/ac 2019/05/29 sina.com/news/socail 2019/05/29 163.com/sport 2019/05/29 163.com/ac 2019/05/29 sina.com/play 2019/05/29 163.com/sport 2019/05/29 163.com/ac 2019/05/29 sina.com/movie 2019/05/29 sina.com/play 2019/05/29 sina.com/movie 2019/05/29 163.com/sport 2019/05/29 sina.com/movie 2019/05/29 163.com/ac 2019/05/29 163.com/ac 2019/05/29 163.com/acc 2019/05/29 qq.com/by 2019/05/29 qq.com/by3 2019/05/29 qq.com/news 2019/05/29 163.com/sport 2019/05/29 sina.com/news/socail 2019/05/29 163.com/sport 2019/05/29 sina.com/movie 2019/05/29 sina.com/news/socail 2019/05/29 sina.com/movie 2019/05/29 qq.com/news 2019/05/29 163.com/bb 2019/05/29 163.com/cc 2019/05/29 sina.com/lady/ 2019/05/29 163.com/cc 2019/05/29 qq.com/news 2019/05/29 qq.com/by 2019/05/29 qq.com/by3 2019/05/29 sina.com/lady/ 2019/05/29 qq.com/by3 2019/05/29 sina.com/lady/ 2019/05/29 qq.com/by3 2019/05/29 qq.com/news 2019/05/29 qq.com/by3 2019/05/29 163.com/sport 2019/05/29 163.com/sport 2019/05/29 sina.com/news/socail 2019/05/29 sina.com/lady/ 2019/05/29 sina.com/play 2019/05/29 sina.com/movie 2019/05/29 sina.com/music 2019/05/29 sina.com/sport 2019/05/29 sina.com/sport 2019/05/29 163.com/sport 2019/05/29 sina.com/news/socail 2019/05/29 sohu.com/lady/ 2019/05/29 sohu.com/play 2019/05/29 sohu.com/movie 2019/05/29 sohu.com/music 2019/05/29 sohu.com/sport 2019/05/29 sohu.com/sport 2019/05/29 sina.com/news/socail 2019/05/29 baidu.com/lady/ 2019/05/29 baidu.com/play 2019/05/29 baidu.com/movie 2019/05/29 baidu.com/music 2019/05/29 baidu.com/movie 2019/05/29 baidu.com/music 2019/05/29 baidu.com/movie 2019/05/29 baidu.com/music 2019/05/29 baidu.com/movie 2019/05/29 baidu.com/music 2019/05/29 baidu.com/movie 2019/05/29 baidu.com/music 2019/05/29 baidu.com/music 2019/05/29 baidu.com/movie 2019/05/29 baidu.com/music 2019/05/29 baidu.com/sport 2019/05/29 baidu.com/sport
PageCount 序列化
public class PageCount implements WritableComparable<PageCount> { private String page; private int count; public void set(String page, int count) { this.page = page; this.count = count; } public String getPage() { return page; } public void setPage(String page) { this.page = page; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } public int compareTo(PageCount o) { return o.getCount()-this.count==0 ?this.page.compareTo(o.getPage()) :o.getCount()-this.count; } public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(this.page); dataOutput.writeInt(this.count); } public void readFields(DataInput dataInput) throws IOException { this.page=dataInput.readUTF(); this.count=dataInput.readInt(); } @Override public String toString() { return "PageCount{" + "page='" + page + '\'' + ", count=" + count + '}'; } }
1.先寫一個mapreduce程序,將每一個頁面的訪問總次數統計出來
public class PageCountStep1 { public static class PageCountStep1Mapper extends Mapper<LongWritable,Text,Text,IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line=value.toString(); String[] split=line.split(" "); context.write(new Text(split[1]),new IntWritable(1)); } } public static class PageCountStep1Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count=0; for(IntWritable v:values){ count+=v.get(); } context.write(key,new IntWritable(count)); } } public static void main(String[] args)throws Exception { //加載配置文件 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(PageCountStep1.class); job.setMapperClass(PageCountStep1Mapper.class); job.setReducerClass(PageCountStep1Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("f:/mrdata/url/input")); FileOutputFormat.setOutputPath(job, new Path("f:/mrdata/url/outputout")); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
2.再寫第二個mapreduce程序對結果進行排序
public class PageCountStep2 { public static class PageCountStep2Mapper extends Mapper<LongWritable,Text,PageCount,NullWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); PageCount pageCount = new PageCount(); pageCount.set(split[0],Integer.parseInt(split[1])); context.write(pageCount,NullWritable.get()); } } public static class PageCountStep2Reduce extends Reducer<PageCount,NullWritable,PageCount,NullWritable>{ @Override protected void reduce(PageCount key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key,NullWritable.get()); } } public static void main(String[] args)throws Exception { //加載配置文件 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(PageCountStep1.class); job.setMapperClass(PageCountStep2Mapper.class); job.setReducerClass(PageCountStep2Reduce.class); job.setMapOutputKeyClass(PageCount.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(PageCount.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("f:/mrdata/url/outputout")); FileOutputFormat.setOutputPath(job, new Path("f:/mrdata/url/sortout")); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
1.將每一個頁面的訪問總次數統計出來
163.com/ac 5 163.com/acc 1 163.com/bb 1 163.com/cc 2 163.com/sport 8 baidu.com/lady/ 1 baidu.com/movie 6 baidu.com/music 7 baidu.com/play 1 baidu.com/sport 2 qq.com/a 1 qq.com/bx 1 qq.com/by 3 qq.com/by3 6 qq.com/news 5 sina.com/lady/ 4 sina.com/movie 6 sina.com/music 1 sina.com/news/socail 7 sina.com/play 3 sina.com/sport 2 sohu.com/lady/ 1 sohu.com/movie 1 sohu.com/music 1 sohu.com/play 1 sohu.com/sport 2
2.對結果進行排序輸出
PageCount{page='163.com/sport', count=8} PageCount{page='baidu.com/music', count=7} PageCount{page='sina.com/news/socail', count=7} PageCount{page='baidu.com/movie', count=6} PageCount{page='qq.com/by3', count=6} PageCount{page='sina.com/movie', count=6} PageCount{page='163.com/ac', count=5} PageCount{page='qq.com/news', count=5} PageCount{page='sina.com/lady/', count=4} PageCount{page='qq.com/by', count=3} PageCount{page='sina.com/play', count=3} PageCount{page='163.com/cc', count=2} PageCount{page='baidu.com/sport', count=2} PageCount{page='sina.com/sport', count=2} PageCount{page='sohu.com/sport', count=2} PageCount{page='163.com/acc', count=1} PageCount{page='163.com/bb', count=1} PageCount{page='baidu.com/lady/', count=1} PageCount{page='baidu.com/play', count=1} PageCount{page='qq.com/a', count=1} PageCount{page='qq.com/bx', count=1} PageCount{page='sina.com/music', count=1} PageCount{page='sohu.com/lady/', count=1} PageCount{page='sohu.com/movie', count=1} PageCount{page='sohu.com/music', count=1} PageCount{page='sohu.com/play', count=1}
任務二:使用兩個map緩存來充當中間件,作計數使用,適合數據量小的場景使用
分析圖
PageCount對象實體
public class PageCount implements Comparable<PageCount>{ private String page; private int count; public void set(String page, int count) { this.page = page; this.count = count; } public String getPage() { return page; } public void setPage(String page) { this.page = page; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } public int compareTo(PageCount o) { return o.getCount()-this.count==0 ?this.page.compareTo(o.getPage()) :o.getCount()-this.count; } }
PageTopMapper:讀取數據
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class PageTopMapper extends Mapper<LongWritable,Text,Text,IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split(" "); context.write(new Text(split[1]),new IntWritable(1)); } }
PageTopReducer:對數據進行分析統計
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Map; import java.util.Set; import java.util.TreeMap; public class PageTopReducer extends Reducer<Text,IntWritable,Text,IntWritable> { TreeMap<PageCount,Object>treeMap=new TreeMap<PageCount, Object>(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count =0; for(IntWritable value :values){ count +=value.get(); } PageCount pageCount = new PageCount(); pageCount.set(key.toString(),count); treeMap.put(pageCount,null); } /** * 全部數據處理完調用 */ @Override protected void cleanup(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); int top = conf.getInt("top.n", 5); Set<Map.Entry<PageCount, Object>> entrySet = treeMap.entrySet(); int i=0; for (Map.Entry<PageCount,Object> entry:entrySet) { context.write(new Text(entry.getKey().getPage()),new IntWritable(entry.getKey().getCount())); i++; //取到排名前n條 if(i==top) return; } } }
JobSubmitter:啓動程序
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.util.Properties; public class JobSubmitter { public static void main(String[] args) throws Exception{ //加載配置文件 Configuration conf = new Configuration(); //1.直接給定 //conf.setInt("top.n",3); //2.main方法傳參數 //conf.setInt("top.n",Integer.parseInt(args[0])); //3.經過配置文件 Properties props = new Properties(); props.load(JobSubmitter.class.getClassLoader(). getResourceAsStream("topn.properties")); conf.setInt("top.n",Integer.parseInt(props.getProperty("top.n"))); //4.xml形式,直接配置,默認加載 Job job = Job.getInstance(conf); job.setJarByClass(JobSubmitter.class); job.setMapperClass(PageTopMapper.class); job.setReducerClass(PageTopReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("f:/mrdata/url/input")); FileOutputFormat.setOutputPath(job, new Path("f:/mrdata/url/output")); //job.setNumReduceTasks(3); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
統計結果輸出:top5
163.com/sport 8
baidu.com/music 7
sina.com/news/socail 7
baidu.com/movie 6
qq.com/by3 6