MapReduce 編程模板編寫【分析網站基本指標UV】程序

1.網站基本指標的幾個概念

PV: page view 瀏覽量

頁面的瀏覽次數,用戶每打開一次頁面就記錄一次。java

UV:unique visitor 獨立訪客數

一天內訪問某站點的人數(以cookie爲例) 可是若是用戶把瀏覽器cookie給刪了以後再次訪問會影響記錄。apache

VV: visit view 訪客的訪問次數

記錄全部訪客一天內訪問了多少次網站,訪客完成訪問直到瀏覽器關閉算一次。編程

IP:獨立ip數

指一天內使用不一樣ip地址的用戶訪問網站的數量。瀏覽器

2.編寫MapReduce編程模板

Driver

package mapreduce;
​
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
​
public class MRDriver extends Configured implements Tool {
​
    public int run(String[] args) throws Exception {
        //建立job
        Job job = Job.getInstance(this.getConf(),"mr-demo");
        job.setJarByClass(MRDriver.class);
​
        //input 默認從hdfs讀取數據 將每一行轉換成key-value
        Path inPath = new Path(args[0]);
        FileInputFormat.setInputPaths(job,inPath);
​
        //map 一行調用一次Map方法  對每一行數據進行分割
        job.setMapperClass(null);
        job.setMapOutputKeyClass(null);
        job.setMapOutputValueClass(null);
​
        //shuffle
        job.setPartitionerClass(null);//分組
        job.setGroupingComparatorClass(null);//分區
        job.setSortComparatorClass(null);//排序
​
        //reduce 每有一條key value調用一次reduce方法
        job.setReducerClass(null);
        job.setOutputKeyClass(null);
        job.setOutputValueClass(null);
​
        //output
        Path outPath = new Path(args[1]);
        //this.getConf()來自父類 內容爲空能夠本身set配置信息
        FileSystem fileSystem = FileSystem.get(this.getConf());
        //若是目錄已經存在則刪除
        if(fileSystem.exists(outPath)){
            //if path is a directory and set to true
            fileSystem.delete(outPath,true);
        }
        FileOutputFormat.setOutputPath(job, outPath);
        //submit
        boolean isSuccess = job.waitForCompletion(true);
        return isSuccess ? 0:1;
    }
​
    public static void main(String[] args) {
        Configuration configuration = new Configuration();
        try {
            int status = ToolRunner.run(configuration, new MRDriver(), args);
            System.exit(status);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
​

Mapper

public class MRModelMapper extends Mapper<LongWritable,Text,Text,LongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        /**
         * 實現本身的業務邏輯
         */
    }
}

Reduce

public class MRModelReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
​
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        /**
         * 根據業務需求本身實現
         */
    }
}

3. 統計每一個城市的UV數

分析需求:cookie

UV:unique view 惟一訪問數,一個用戶記一次app

map: 框架

key: CityId (城市id) 數據類型: Textide

value: guid (用戶id) 數據類型:Textoop

 

shuffle:網站

key: CityId

value: {guid guid guid..}

 

reduce:

key: CityId

value: 訪問數 即shuffle輸出value的集合大小

 

output:

key : CityId

value : 訪問數

 

MRDriver.java mapreduce執行過程

 
package mapreduce;
​
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
​
public class MRDriver extends Configured implements Tool {
​
    public int run(String[] args) throws Exception {
        //建立job
        Job job = Job.getInstance(this.getConf(),"mr-demo");
        job.setJarByClass(MRDriver.class);
​
        //input 默認從hdfs讀取數據 將每一行轉換成key-value
        Path inPath = new Path(args[0]);
        FileInputFormat.setInputPaths(job,inPath);
​
        //map 一行調用一次Map方法  對每一行數據進行分割
        job.setMapperClass(MRMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
​
       /* //shuffle
        job.setPartitionerClass(null);//分組
        job.setGroupingComparatorClass(null);//分區
        job.setSortComparatorClass();//排序
*/
        //reduce
        job.setReducerClass(MRReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
​
        //output
        Path outPath = new Path(args[1]);
        FileSystem fileSystem = FileSystem.get(this.getConf());
        if(fileSystem.exists(outPath)){
            //if path is a directory and set to true
            fileSystem.delete(outPath,true);
        }
        FileOutputFormat.setOutputPath(job, outPath);
        
        //submit
        boolean isSuccess = job.waitForCompletion(true);
        return isSuccess ? 0:1;
    }
​
    public static void main(String[] args) {
        Configuration configuration = new Configuration();
        try {
            int status = ToolRunner.run(configuration, new MRDriver(), args);
            System.exit(status);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

MRMapper.java

package mapreduce;
​
import java.io.IOException;
​
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
​
public class MRMapper extends Mapper<LongWritable,Text,Text,Text> {
    private Text mapOutKey = new Text();
    private Text mapOutKey1 = new Text();
    
    //一行調用一次Map方法  對每一行數據進行分割
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        
        //得到每行的值
        String str = value.toString();
        //按空格獲得每一個item
        String[] items = str.split("\t");
        
        if (items[24]!=null) {
            this.mapOutKey.set(items[24]);
            if (items[5]!=null) {
                this.mapOutKey1.set(items[5]);
            }
        }
        context.write(mapOutKey, mapOutKey1);
    }
    
}

MPReducer.java

package mapreduce;
​
import java.io.IOException;
import java.util.HashSet;
​
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
​
public class MRReducer extends Reducer<Text, Text, Text, IntWritable>{
​
    //每有一個key value數據 就執行一次reduce方法
    @Override
    protected void reduce(Text key, Iterable<Text> texts, Reducer<Text, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        
        HashSet<String> set = new HashSet<String>();
        
        for (Text text : texts) {
            set.add(text.toString());
        }
        
        context.write(key,new IntWritable(set.size()));
    
    }   
}
 

4.MapReduce執行wordcount過程理解

input:默認從HDFS讀取數據

 Path inPath = new Path(args[0]);
 FileInputFormat.setInputPaths(job,inPath);

將每一行數據轉換爲key-value(分割),這一步由MapReduce框架自動完成。

輸出行的偏移量和行的內容

 

 

mapper: 分詞輸出

數據過濾,數據補全,字段格式化

輸入:input的輸出

將分割好的<key,value>對交給用戶定義的map方法進行處理,生成新的<key,value>對。

一行調用一次map方法。

統計word中的map:

shuffle: 分區,分組,排序

輸出:

<Bye,1>

<Hello,1>

<World,1,1>

獲得map輸出的<key,value>對,Mapper會將他們按照key進行排序,獲得mapper的最終輸出結果。

Reduce :每一條Keyvalue調用一次reduce方法

將相同Key的List<value>,進行相加求和

output:將reduce輸出寫入hdfs

相關文章
相關標籤/搜索