統計PV、UV


使用的工具:三臺linux、hadoop-1.1.二、jdk1.7.0_4五、Xmanager Enterprise 四、eclipse、java


目標統計:pv、uvlinux


對日誌字段進行分析apache

wKioL1OmwUChEOIwAAOI0VtDlYE320.jpg



每行記錄有5部分組成:bash

  1. 訪問ip 2.訪問時間 3.訪問資源 4.訪問狀態 5.本次流量app

     

 

先對日誌進行清理eclipse

mapreduce程序ide

package hmbbs;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class HmbbsCleaner extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
final Job job = new Job(new Configuration(),
HmbbsCleaner.class.getSimpleName());
job.setJarByClass(HmbbsCleaner.class);
FileInputFormat.setInputPaths(job, args[0]);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new HmbbsCleaner(), args);
}
static class MyMapper extends
Mapper<LongWritable, Text, LongWritable, Text> {
LogParser logParser = new LogParser();
Text v2 = new Text();
protected void map(
LongWritable key,
Text value,
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, Text>.Context context)
throws java.io.IOException, InterruptedException {
final String[] parsed = logParser.parse(value.toString());
// 
if (parsed[2].startsWith("GET /static/")
|| parsed[2].startsWith("GET /uc_server")) {
return;
}
// 
if (parsed[2].startsWith("GET /")) {
parsed[2] = parsed[2].substring("GET /".length());
} else if (parsed[2].startsWith("POST /")) {
parsed[2] = parsed[2].substring("POST /".length());
}
// 
if (parsed[2].endsWith(" HTTP/1.1")) {
parsed[2] = parsed[2].substring(0, parsed[2].length()
- " HTTP/1.1".length());
}
v2.set(parsed[0] + "\t" + parsed[1] + "\t" + parsed[2]);
context.write(key, v2);
};
}
static class MyReducer extends
Reducer<LongWritable, Text, Text, NullWritable> {
protected void reduce(
LongWritable k2,
java.lang.Iterable<Text> v2s,
org.apache.hadoop.mapreduce.Reducer<LongWritable, Text, Text, NullWritable>.Context context)
throws java.io.IOException, InterruptedException {
for (Text v2 : v2s) {
context.write(v2, NullWritable.get());
}
};
}
static class LogParser {
public static final SimpleDateFormat FORMAT = new SimpleDateFormat(
"d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);
public static final SimpleDateFormat dateformat1 = new SimpleDateFormat(
"yyyyMMddHHmmss");
public static void main(String[] args) throws ParseException {
final String S1 = "27.19.74.143 - - [30/May/2013:17:38:20 +0800] \"GET /static/p_w_picpath/common/faq.gif HTTP/1.1\" 200 1127";
LogParser parser = new LogParser();
final String[] array = parser.parse(S1);
System.out.println( S1);
System.out.format(
" ip=%s, time=%s, url=%s, status=%s, traffic=%s",
array[0], array[1], array[2], array[3], array[4]);
}
/**
 * 
 * 
 * @param string
 * @return
 * @throws ParseException
 */
private Date parseDateFormat(String string) {
Date parse = null;
try {
parse = FORMAT.parse(string);
} catch (ParseException e) {
e.printStackTrace();
}
return parse;
}
/**
 *
 * 
 * @param line
 * @return 
 */
public String[] parse(String line) {
String ip = parseIP(line);
String time = parseTime(line);
String url = parseURL(line);
String status = parseStatus(line);
String traffic = parseTraffic(line);
return new String[] { ip, time, url, status, traffic };
}
private String parseTraffic(String line) {
final String trim = line.substring(line.lastIndexOf("\"") + 1)
.trim();
String traffic = trim.split(" ")[1];
return traffic;
}
private String parseStatus(String line) {
final String trim = line.substring(line.lastIndexOf("\"") + 1)
.trim();
String status = trim.split(" ")[0];
return status;
}
private String parseURL(String line) {
final int first = line.indexOf("\"");
final int last = line.lastIndexOf("\"");
String url = line.substring(first + 1, last);
return url;
}
private String parseTime(String line) {
final int first = line.indexOf("[");
final int last = line.indexOf("+0800]");
String time = line.substring(first + 1, last).trim();
Date date = parseDateFormat(time);
return dateformat1.format(date);
}
private String parseIP(String line) {
String ip = line.split("- -")[0].trim();
return ip;
}
}
}


wKioL1OmwLng8VRwAAcZJsnJMYA984.jpg

wKiom1OmwOeSBvQiAAJMkh6B1kA459.jpg


統計pv的mapreduce工具

清洗後的數據以我自定義以\t爲隔,因此oop

String[] arr = value.toString().split("\t");
package hmbbs;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class KPIPV extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
final Job job = new Job(new Configuration(),
KPIPV.class.getSimpleName());
job.setJarByClass(KPIPV.class);
FileInputFormat.setInputPaths(job, args[0]);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new KPIPV(), args);
}
static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
protected void map(
LongWritable key,
Text value,
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text>.Context context)
throws java.io.IOException, InterruptedException {
Text v1 = new Text();
// 每行以製表符\t分隔
String[] arr = value.toString().split("\t");
// 每行請求不爲空
if (arr.length >= 0) {
v1.set("1");
} else {
v1.set("0");
}
context.write(new Text("pv"), v1);
};
}
static class MyReducer extends Reducer<Text, Text, Text, IntWritable> {
private IntWritable result = new IntWritable(0);
private Integer value2 = new Integer(0);
protected void reduce(
Text k2,
java.lang.Iterable<Text> v2s,
org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, IntWritable>.Context context)
throws java.io.IOException, InterruptedException {
for (Text v2 : v2s) {
value2 += Integer
.parseInt((v2.toString().trim().equals("1")) ? "1"
: "0");
}
result.set(value2);
context.write(k2, result);
};
}
}


統計UV,分析思路是無論某個IP點擊了多少次,咱們都只統計一次,看用戶量多少。url

因此,我先寫一個mapreduce統計各IP點擊次數,而後再寫一個mapreduce統計PV。這樣至關於前一個mapreduce爲後個mapreduce作清洗。

若是你不想用前面已經清洗過的數據,你也能夠直接用原日誌來清洗。原日誌是字段間是以空字符串來隔開的。

統計各IP點擊次數


package hmbbs;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class KPIUV_FOUR extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
final Job job = new Job(new Configuration(),
KPIUV_FOUR.class.getSimpleName());
job.setJarByClass(KPIUV_FOUR.class);
// FileInputFormat.setInputPaths(job, args[0]);
FileInputFormat.setInputPaths(job,
"hdfs://192.168.14.132:9000/chen/fa/");
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(
"hdfs://192.168.14.132:9000/chen/fa/uv_1"));
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new KPIUV_FOUR(), args);
}
static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
Text key1 = new Text();
Text v1 = new Text();
protected void map(
LongWritable key,
Text value,
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text>.Context context)
throws java.io.IOException, InterruptedException {
// 每行以空格分隔
String[] arr = value.toString().split(" ");
for (String a : arr) {
System.out.println(a);
}
if (arr.length >= 2) {
if (arr[0].matches("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}")) {
key1.set(arr[0]);
}
}
v1.set("1");
System.out.println("s1的值是" + key1);
context.write(key1, v1);
};
}
static class MyReducer extends Reducer<Text, Text, Text, Text> {
// Map<Text, ArrayList<Text>> map = new HashMap<Text,
// ArrayList<Text>>();
protected void reduce(
Text k2,
java.lang.Iterable<Text> v2s,
org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text>.Context context)
throws java.io.IOException, InterruptedException {
Text result = new Text();
Integer re = new Integer(0);
System.out.println(k2 + "k2的值");
ArrayList<Text> ar = new ArrayList<Text>();
while (v2s.iterator().hasNext()) {
// ar.add(v2s.iterator().next());
re += Integer.parseInt(v2s.iterator().next().toString());
}
// map.put(k2, ar);
// re = ar.size();
result.set(String.valueOf(re));
context.write(k2, result);
};
}
}



還有種方法是由於能夠用Map思想,Map<String,List<int>這種咱們只要統計Map中value中list的大小。

不過用Map的話感受畫蛇添足。還不如用原方法設置變量的時用局部變量來統計。


把生成的結果導到別一文件裏。

[grid@hadoop2 ~]$ hadoop-1.1.2/bin/hadoop fs -mkdir /chen/csl
[grid@hadoop2 ~]$ hadoop-1.1.2/bin/hadoop fs -mv /chen/fa/uv_1/part-r-00000 /chen/csl

而後再寫一個mapreduce統計uv

package hmbbs;
import hmbbs.KPIUV_FOUR.MyMapper;
import hmbbs.KPIUV_FOUR.MyReducer;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class KPIUV_FIVE extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
final Job job = new Job(new Configuration(),
KPIUV_FIVE.class.getSimpleName());
job.setJarByClass(KPIUV_FIVE.class);
// FileInputFormat.setInputPaths(job, args[0]);
FileInputFormat.setInputPaths(job,
"hdfs://192.168.14.132:9000/chen/csl/");
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(
"hdfs://192.168.14.132:9000/chen/csl/uv_1"));
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new KPIUV_FIVE(), args);
}
static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
Text key1 = new Text();
Text v1 = new Text();
protected void map(
LongWritable key,
Text value,
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text>.Context context)
throws java.io.IOException, InterruptedException {
// 每行以空格分隔
String[] arr = value.toString().split("\t");
for (String a : arr) {
System.out.println(a);
}
if (arr.length == 2) {
if (arr[0].matches("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}")) {
key1.set(arr[0]);
}
}
v1.set("1");
System.out.println("s1的值是" + key1);
context.write(new Text("uv"), v1);
};
}
static class MyReducer extends Reducer<Text, Text, Text, Text> {
Text result = new Text();
Integer re = new Integer(0);
protected void reduce(
Text k2,
java.lang.Iterable<Text> v2s,
org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text>.Context context)
throws java.io.IOException, InterruptedException {
System.out.println(k2 + "k2的值");
while (v2s.iterator().hasNext()) {
re += Integer.parseInt(v2s.iterator().next().toString());
}
result.set(String.valueOf(re));
context.write(k2, result);
};
}
}


結果

[grid@hadoop2 ~]$ hadoop-1.1.2/bin/hadoop fs -cat /chen/csl/uv_1/part-r-00000 
uv  10509
相關文章
相關標籤/搜索