使用的工具:三臺linux、hadoop-1.1.二、jdk1.7.0_4五、Xmanager Enterprise 四、eclipse、java
目標統計:pv、uvlinux
對日誌字段進行分析apache
每行記錄有5部分組成:bash
訪問ip 2.訪問時間 3.訪問資源 4.訪問狀態 5.本次流量app
先對日誌進行清理eclipse
mapreduce程序ide
package hmbbs; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Locale; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class HmbbsCleaner extends Configured implements Tool { @Override public int run(String[] args) throws Exception { final Job job = new Job(new Configuration(), HmbbsCleaner.class.getSimpleName()); job.setJarByClass(HmbbsCleaner.class); FileInputFormat.setInputPaths(job, args[0]); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { ToolRunner.run(new HmbbsCleaner(), args); } static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> { LogParser logParser = new LogParser(); Text v2 = new Text(); protected void map( LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws java.io.IOException, InterruptedException { final String[] parsed = logParser.parse(value.toString()); // if (parsed[2].startsWith("GET /static/") || parsed[2].startsWith("GET /uc_server")) { return; } // if (parsed[2].startsWith("GET /")) { parsed[2] = parsed[2].substring("GET /".length()); } else if (parsed[2].startsWith("POST /")) { parsed[2] = parsed[2].substring("POST /".length()); } // if (parsed[2].endsWith(" HTTP/1.1")) { parsed[2] = parsed[2].substring(0, parsed[2].length() - " HTTP/1.1".length()); } v2.set(parsed[0] + "\t" + parsed[1] + "\t" + parsed[2]); context.write(key, v2); }; } static class MyReducer extends Reducer<LongWritable, Text, Text, NullWritable> { protected void reduce( LongWritable k2, java.lang.Iterable<Text> v2s, org.apache.hadoop.mapreduce.Reducer<LongWritable, Text, Text, NullWritable>.Context context) throws java.io.IOException, InterruptedException { for (Text v2 : v2s) { context.write(v2, NullWritable.get()); } }; } static class LogParser { public static final SimpleDateFormat FORMAT = new SimpleDateFormat( "d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH); public static final SimpleDateFormat dateformat1 = new SimpleDateFormat( "yyyyMMddHHmmss"); public static void main(String[] args) throws ParseException { final String S1 = "27.19.74.143 - - [30/May/2013:17:38:20 +0800] \"GET /static/p_w_picpath/common/faq.gif HTTP/1.1\" 200 1127"; LogParser parser = new LogParser(); final String[] array = parser.parse(S1); System.out.println( S1); System.out.format( " ip=%s, time=%s, url=%s, status=%s, traffic=%s", array[0], array[1], array[2], array[3], array[4]); } /** * * * @param string * @return * @throws ParseException */ private Date parseDateFormat(String string) { Date parse = null; try { parse = FORMAT.parse(string); } catch (ParseException e) { e.printStackTrace(); } return parse; } /** * * * @param line * @return */ public String[] parse(String line) { String ip = parseIP(line); String time = parseTime(line); String url = parseURL(line); String status = parseStatus(line); String traffic = parseTraffic(line); return new String[] { ip, time, url, status, traffic }; } private String parseTraffic(String line) { final String trim = line.substring(line.lastIndexOf("\"") + 1) .trim(); String traffic = trim.split(" ")[1]; return traffic; } private String parseStatus(String line) { final String trim = line.substring(line.lastIndexOf("\"") + 1) .trim(); String status = trim.split(" ")[0]; return status; } private String parseURL(String line) { final int first = line.indexOf("\""); final int last = line.lastIndexOf("\""); String url = line.substring(first + 1, last); return url; } private String parseTime(String line) { final int first = line.indexOf("["); final int last = line.indexOf("+0800]"); String time = line.substring(first + 1, last).trim(); Date date = parseDateFormat(time); return dateformat1.format(date); } private String parseIP(String line) { String ip = line.split("- -")[0].trim(); return ip; } } }
統計pv的mapreduce工具
清洗後的數據以我自定義以\t爲隔,因此oop
String[] arr = value.toString().split("\t");
package hmbbs; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Locale; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class KPIPV extends Configured implements Tool { @Override public int run(String[] args) throws Exception { final Job job = new Job(new Configuration(), KPIPV.class.getSimpleName()); job.setJarByClass(KPIPV.class); FileInputFormat.setInputPaths(job, args[0]); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { ToolRunner.run(new KPIPV(), args); } static class MyMapper extends Mapper<LongWritable, Text, Text, Text> { protected void map( LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text>.Context context) throws java.io.IOException, InterruptedException { Text v1 = new Text(); // 每行以製表符\t分隔 String[] arr = value.toString().split("\t"); // 每行請求不爲空 if (arr.length >= 0) { v1.set("1"); } else { v1.set("0"); } context.write(new Text("pv"), v1); }; } static class MyReducer extends Reducer<Text, Text, Text, IntWritable> { private IntWritable result = new IntWritable(0); private Integer value2 = new Integer(0); protected void reduce( Text k2, java.lang.Iterable<Text> v2s, org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, IntWritable>.Context context) throws java.io.IOException, InterruptedException { for (Text v2 : v2s) { value2 += Integer .parseInt((v2.toString().trim().equals("1")) ? "1" : "0"); } result.set(value2); context.write(k2, result); }; } }
統計UV,分析思路是無論某個IP點擊了多少次,咱們都只統計一次,看用戶量多少。url
因此,我先寫一個mapreduce統計各IP點擊次數,而後再寫一個mapreduce統計PV。這樣至關於前一個mapreduce爲後個mapreduce作清洗。
若是你不想用前面已經清洗過的數據,你也能夠直接用原日誌來清洗。原日誌是字段間是以空字符串來隔開的。
統計各IP點擊次數
package hmbbs; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class KPIUV_FOUR extends Configured implements Tool { @Override public int run(String[] args) throws Exception { final Job job = new Job(new Configuration(), KPIUV_FOUR.class.getSimpleName()); job.setJarByClass(KPIUV_FOUR.class); // FileInputFormat.setInputPaths(job, args[0]); FileInputFormat.setInputPaths(job, "hdfs://192.168.14.132:9000/chen/fa/"); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // FileOutputFormat.setOutputPath(job, new Path(args[1])); FileOutputFormat.setOutputPath(job, new Path( "hdfs://192.168.14.132:9000/chen/fa/uv_1")); job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { ToolRunner.run(new KPIUV_FOUR(), args); } static class MyMapper extends Mapper<LongWritable, Text, Text, Text> { Text key1 = new Text(); Text v1 = new Text(); protected void map( LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text>.Context context) throws java.io.IOException, InterruptedException { // 每行以空格分隔 String[] arr = value.toString().split(" "); for (String a : arr) { System.out.println(a); } if (arr.length >= 2) { if (arr[0].matches("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}")) { key1.set(arr[0]); } } v1.set("1"); System.out.println("s1的值是" + key1); context.write(key1, v1); }; } static class MyReducer extends Reducer<Text, Text, Text, Text> { // Map<Text, ArrayList<Text>> map = new HashMap<Text, // ArrayList<Text>>(); protected void reduce( Text k2, java.lang.Iterable<Text> v2s, org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text>.Context context) throws java.io.IOException, InterruptedException { Text result = new Text(); Integer re = new Integer(0); System.out.println(k2 + "k2的值"); ArrayList<Text> ar = new ArrayList<Text>(); while (v2s.iterator().hasNext()) { // ar.add(v2s.iterator().next()); re += Integer.parseInt(v2s.iterator().next().toString()); } // map.put(k2, ar); // re = ar.size(); result.set(String.valueOf(re)); context.write(k2, result); }; } }
還有種方法是由於能夠用Map思想,Map<String,List<int>這種咱們只要統計Map中value中list的大小。
不過用Map的話感受畫蛇添足。還不如用原方法設置變量的時用局部變量來統計。
把生成的結果導到別一文件裏。
[grid@hadoop2 ~]$ hadoop-1.1.2/bin/hadoop fs -mkdir /chen/csl [grid@hadoop2 ~]$ hadoop-1.1.2/bin/hadoop fs -mv /chen/fa/uv_1/part-r-00000 /chen/csl
而後再寫一個mapreduce統計uv
package hmbbs; import hmbbs.KPIUV_FOUR.MyMapper; import hmbbs.KPIUV_FOUR.MyReducer; import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class KPIUV_FIVE extends Configured implements Tool { @Override public int run(String[] args) throws Exception { final Job job = new Job(new Configuration(), KPIUV_FIVE.class.getSimpleName()); job.setJarByClass(KPIUV_FIVE.class); // FileInputFormat.setInputPaths(job, args[0]); FileInputFormat.setInputPaths(job, "hdfs://192.168.14.132:9000/chen/csl/"); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // FileOutputFormat.setOutputPath(job, new Path(args[1])); FileOutputFormat.setOutputPath(job, new Path( "hdfs://192.168.14.132:9000/chen/csl/uv_1")); job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { ToolRunner.run(new KPIUV_FIVE(), args); } static class MyMapper extends Mapper<LongWritable, Text, Text, Text> { Text key1 = new Text(); Text v1 = new Text(); protected void map( LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text>.Context context) throws java.io.IOException, InterruptedException { // 每行以空格分隔 String[] arr = value.toString().split("\t"); for (String a : arr) { System.out.println(a); } if (arr.length == 2) { if (arr[0].matches("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}")) { key1.set(arr[0]); } } v1.set("1"); System.out.println("s1的值是" + key1); context.write(new Text("uv"), v1); }; } static class MyReducer extends Reducer<Text, Text, Text, Text> { Text result = new Text(); Integer re = new Integer(0); protected void reduce( Text k2, java.lang.Iterable<Text> v2s, org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text>.Context context) throws java.io.IOException, InterruptedException { System.out.println(k2 + "k2的值"); while (v2s.iterator().hasNext()) { re += Integer.parseInt(v2s.iterator().next().toString()); } result.set(String.valueOf(re)); context.write(k2, result); }; } }
結果
[grid@hadoop2 ~]$ hadoop-1.1.2/bin/hadoop fs -cat /chen/csl/uv_1/part-r-00000 uv 10509