Hadoop-MR實現日誌清洗(三)

Hadoop-MR實現日誌清洗(三)java

5.論壇請求日誌清洗解析

請求日誌的清洗主要是指過濾掉跟後續統計無關的數據,包括爬蟲數據、靜態資源數據、無用數據列等。根據須要,清洗過程當中也能夠對部門數據域進行數據轉換,好比日期,以便簡化後續的數據加工/統計分析。apache

對日誌的清洗邏輯上也是分爲編寫map、reduce、run(main)函數,在對輸入數據處理時,日誌的提取過濾較爲複雜,一般是將文件處理的方法單獨編寫做爲解析類,由map調用相關的方法。數組

5.1解析日誌的各個域

單獨編寫的解析類,給map函數調用app

package com.leeyk99.hadoop;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.Locale;

/**

* 解析日誌的每一個數據列:日誌的數據域大體可分爲:IP 、"-"、"-"、TIME、URL、STATUS、STREAM、?、?等等

* @author LIN

*/

public class FieldParser {

    public static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");

    public static final SimpleDateFormat FORMAT=new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);

    /**

     * 解析日誌記錄

     * @return 數組,含有五個元素,分別是IP\TIME\URL\STAUS\流量

     */

    public String[] parseLog(String line){

        String ip=parseIP(line);

        String time=parseTime(line);

        String url=parseURL(line);

        String status=parseStatus(line);

        String stream=parseStream(line);

        String[] fields=new String[5];

        fields[0]=ip;

        fields[1]=time;

        fields[2]=url;

        fields[3]=status;

        fields[4]=stream;

        //String[] fields=new String[]{ip,time,url,status,stream};

        return fields;

    }

    private String parseStream(String line) {

        try{

            final String trim = line.substring(line.lastIndexOf("\"")+1).trim();

            String stream = trim.split(" ")[1];

            return stream;

        }catch (ArrayIndexOutOfBoundsException e){

            e.printStackTrace();

            System.out.println(line);

        }finally{

            return null;

        }

    }

    private String parseStatus(String line) {

        final String trim = line.substring(line.lastIndexOf("\"")+1).trim();

        String status = trim.split(" ")[0];

        return status;

    }

    private String parseURL(String line) {

        final String trim = line.split("\"")[1].trim();

        String url = trim;

        return url;

    }

    private String parseTime(String line) {

        final String trim = line.split("\"")[0].trim();

        String time = trim.split(" ")[3].substring(1);

        Date date=parseDateFormat(time);//原始字符串解析成date才能方便格式化爲指定的字符串樣式

        time=dateFormat.format(date);//轉成20180903101923格式

        return time;

    }

    private String parseIP(String line) {

        final String trim = line.split(" ")[0].trim();

        String ip = trim;

        return ip;

    }

    /**

     * 日誌時間轉換  18/Sep/2013:16:16:16

     * @author LIN

     * @param 18/Sep/2013:16:16:16

     */

    private Date parseDateFormat(String time){

        Date formatTime=new Date();

        try{

            formatTime =FORMAT.parse(time);//FORMAT.parse解析String類型返回Date類型,FORMAT.format解析Date類型返回字符串類型

        }catch (ParseException e){

            e.printStackTrace();

        }

        return  formatTime;

    }

}

5.2編寫map函數

這裏也演示瞭如何對多個字段進行傳遞輸出的方法。ide

package com.leeyk99.hadoop.mapreduce;

import com.leeyk99.hadoop.FieldParser;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LogMapper extends Mapper<LongWritable,Text,LongWritable,Text> {

    @Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //super.map(key, value, context);

        String line=value.toString();

        FieldParser fieldParser=new FieldParser();

        String[] record=fieldParser.parseLog(line);

        /*數據預處理*/

        //1.過濾指定字符串開頭的數據

        if( record[2].startsWith("GET /uc_server") || record[2].startsWith("GET /static") ){ //測試過濾數據

            return;

        }

        //2.數據域加工,這裏是字符串截取

        if( record[2].startsWith("GET /")){

            record[2]=record[2].substring("GET /".length()-1);//或者5

        }else if(record[2].startsWith("POST /")){

            record[2]=record[2].substring("POST /".length()-1);

        }

        if (record[2].endsWith(" HTTP/1.1")){

            //System.out.println("1"+record[2]);

            record[2]=record[2].substring(0,record[2].length()-" HTTP/1.1".length());

            //System.out.println("2"+record[2]);

        }

        //3.列裁剪,進一步選取指定的列

        Text outPutValue=new Text();

        outPutValue.set(record[0]+"\001"+record[1]+"\001"+record[2]); //指定了\001分隔符

        /*map輸出,這個輸出key使用的是LongWritable,輸出的仍是行號,沒有像往常使用Text(維度)

        輸出是Text,不像咱們平時的IntWritable或DoubleWritable,這個不是在reduce中進行與group by相似計算的*/

        context.write(key,outPutValue);

    }

}

5.3編寫reducer函數

package com.leeyk99.hadoop.mapreduce;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class LogReducer extends Reducer<LongWritable, Text, Text, NullWritable> {

    @Override

    protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        //super.reduce(key, values, context);

        for(Text value : values){

            context.write(value, NullWritable.get());

        }

    }

}

5.4編寫入口函數(main函數、run函數)

package com.leeyk99.hadoop.mapreduce;

//import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

//import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

//import java.io.File;

import java.net.URI;

public class LogParser extends Configured implements Tool {

    //不能使用小寫override

    //@Override 實現接口的方法不能註釋爲重寫,一直紅色波浪線提示不合規,程序運行正常,找了很久這個位置的異常。

    public int run(String[] args) throws Exception{

        if(args.length != 2){

            System.err.printf("Usage: %s [generic options ] <input> <output> \n",getClass().getSimpleName());

            ToolRunner.printGenericCommandUsage(System.err);

            return -1;

        }

        //getClass() 、getConf()

        //方法1:Hadoop權威指南寫法

        /*Job job=new Job(getConf(),"Log parser");

        job.setJarByClass(getClass());*/

        //方法二:main寫法,最簡單寫法

        Job job=new Job();

        job.setJarByClass(getClass());//getClass() 獲取類名 LogParser.class

        job.setJobName("Log parser");

        //方法三:Configuration寫法,網上寫法

        /*Configuration conf=new Configuration();

        //String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();

        Job job=new Job(conf, "Job_001");//新建一個job對象,並給了job任務名

        job.setJarByClass(LogParser.class);  //指定class

        //FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //輸入路徑

        //FileOutputFormat.setOutputPath(job,new Path(otherArgs[1])); //輸出路徑*/

        FileInputFormat.addInputPath(job,new Path(args[0]));

        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        job.setMapperClass(LogMapper.class);

        job.setMapOutputKeyClass(LongWritable.class); //與Reducer的不一致,須要指定

        job.setMapOutputValueClass(Text.class);

        /*使用這個後,map一直卡在22%不動,由於map的輸出是<LongWritable,Text>,若是使用Combiner後,輸出與reducer一致<Text, NullWritable>,

          這種輸出是不能做爲Reducer的輸入的,由於輸入要求是<LongWritable,Text>*/

        //job.setCombinerClass(LogReducer.class);

        job.setReducerClass(LogReducer.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(NullWritable.class);

        //Hdfs 輸出目錄刪除

        FileSystem fs= FileSystem.get(new URI(args[0]),getConf());

        Path outPath=new Path(args[1]);

        if(fs.exists(outPath)){

            fs.delete(outPath,true);

        }

        return  job.waitForCompletion(true)?0:1;

    }

    public static void main(String[] args) throws Exception {

        int exitCode=ToolRunner.run(new LogParser(),args);

        System.exit(exitCode);

    }

    //使用本地,Hdfs 輸出目錄應該怎麼刪除呢

    /*private static void delDir(String path){

        File f=new File(path);

        if(f.exists()){

            if(f.isDirectory()){

                String[] items=f.list();

                for( String item : items ){

                    File f2=new File(path+"/"+item);

                    if(f2.isDirectory()){

                        delDir(path+"/"+item);

                    }

                    else{

                        f2.delete();

                    }

                }

            }

            f.delete(); //刪除文件或者最後的空目錄

        }

        else{

            System.out.println("Output directory does not exist .");

        }

    }*/

}
相關文章
相關標籤/搜索