Hadoop-MR實現日誌清洗(三)java
請求日誌的清洗主要是指過濾掉跟後續統計無關的數據,包括爬蟲數據、靜態資源數據、無用數據列等。根據須要,清洗過程當中也能夠對部門數據域進行數據轉換,好比日期,以便簡化後續的數據加工/統計分析。apache
對日誌的清洗邏輯上也是分爲編寫map、reduce、run(main)函數,在對輸入數據處理時,日誌的提取過濾較爲複雜,一般是將文件處理的方法單獨編寫做爲解析類,由map調用相關的方法。數組
單獨編寫的解析類,給map函數調用app
package com.leeyk99.hadoop; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Locale; /** * 解析日誌的每一個數據列:日誌的數據域大體可分爲:IP 、"-"、"-"、TIME、URL、STATUS、STREAM、?、?等等 * @author LIN */ public class FieldParser { public static final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); public static final SimpleDateFormat FORMAT=new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH); /** * 解析日誌記錄 * @return 數組,含有五個元素,分別是IP\TIME\URL\STAUS\流量 */ public String[] parseLog(String line){ String ip=parseIP(line); String time=parseTime(line); String url=parseURL(line); String status=parseStatus(line); String stream=parseStream(line); String[] fields=new String[5]; fields[0]=ip; fields[1]=time; fields[2]=url; fields[3]=status; fields[4]=stream; //String[] fields=new String[]{ip,time,url,status,stream}; return fields; } private String parseStream(String line) { try{ final String trim = line.substring(line.lastIndexOf("\"")+1).trim(); String stream = trim.split(" ")[1]; return stream; }catch (ArrayIndexOutOfBoundsException e){ e.printStackTrace(); System.out.println(line); }finally{ return null; } } private String parseStatus(String line) { final String trim = line.substring(line.lastIndexOf("\"")+1).trim(); String status = trim.split(" ")[0]; return status; } private String parseURL(String line) { final String trim = line.split("\"")[1].trim(); String url = trim; return url; } private String parseTime(String line) { final String trim = line.split("\"")[0].trim(); String time = trim.split(" ")[3].substring(1); Date date=parseDateFormat(time);//原始字符串解析成date才能方便格式化爲指定的字符串樣式 time=dateFormat.format(date);//轉成20180903101923格式 return time; } private String parseIP(String line) { final String trim = line.split(" ")[0].trim(); String ip = trim; return ip; } /** * 日誌時間轉換 18/Sep/2013:16:16:16 * @author LIN * @param 18/Sep/2013:16:16:16 */ private Date parseDateFormat(String time){ Date formatTime=new Date(); try{ formatTime =FORMAT.parse(time);//FORMAT.parse解析String類型返回Date類型,FORMAT.format解析Date類型返回字符串類型 }catch (ParseException e){ e.printStackTrace(); } return formatTime; } }
這裏也演示瞭如何對多個字段進行傳遞輸出的方法。ide
package com.leeyk99.hadoop.mapreduce; import com.leeyk99.hadoop.FieldParser; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class LogMapper extends Mapper<LongWritable,Text,LongWritable,Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //super.map(key, value, context); String line=value.toString(); FieldParser fieldParser=new FieldParser(); String[] record=fieldParser.parseLog(line); /*數據預處理*/ //1.過濾指定字符串開頭的數據 if( record[2].startsWith("GET /uc_server") || record[2].startsWith("GET /static") ){ //測試過濾數據 return; } //2.數據域加工,這裏是字符串截取 if( record[2].startsWith("GET /")){ record[2]=record[2].substring("GET /".length()-1);//或者5 }else if(record[2].startsWith("POST /")){ record[2]=record[2].substring("POST /".length()-1); } if (record[2].endsWith(" HTTP/1.1")){ //System.out.println("1"+record[2]); record[2]=record[2].substring(0,record[2].length()-" HTTP/1.1".length()); //System.out.println("2"+record[2]); } //3.列裁剪,進一步選取指定的列 Text outPutValue=new Text(); outPutValue.set(record[0]+"\001"+record[1]+"\001"+record[2]); //指定了\001分隔符 /*map輸出,這個輸出key使用的是LongWritable,輸出的仍是行號,沒有像往常使用Text(維度) 輸出是Text,不像咱們平時的IntWritable或DoubleWritable,這個不是在reduce中進行與group by相似計算的*/ context.write(key,outPutValue); } }
package com.leeyk99.hadoop.mapreduce; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class LogReducer extends Reducer<LongWritable, Text, Text, NullWritable> { @Override protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //super.reduce(key, values, context); for(Text value : values){ context.write(value, NullWritable.get()); } } }
package com.leeyk99.hadoop.mapreduce; //import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //import java.io.File; import java.net.URI; public class LogParser extends Configured implements Tool { //不能使用小寫override //@Override 實現接口的方法不能註釋爲重寫,一直紅色波浪線提示不合規,程序運行正常,找了很久這個位置的異常。 public int run(String[] args) throws Exception{ if(args.length != 2){ System.err.printf("Usage: %s [generic options ] <input> <output> \n",getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } //getClass() 、getConf() //方法1:Hadoop權威指南寫法 /*Job job=new Job(getConf(),"Log parser"); job.setJarByClass(getClass());*/ //方法二:main寫法,最簡單寫法 Job job=new Job(); job.setJarByClass(getClass());//getClass() 獲取類名 LogParser.class job.setJobName("Log parser"); //方法三:Configuration寫法,網上寫法 /*Configuration conf=new Configuration(); //String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs(); Job job=new Job(conf, "Job_001");//新建一個job對象,並給了job任務名 job.setJarByClass(LogParser.class); //指定class //FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //輸入路徑 //FileOutputFormat.setOutputPath(job,new Path(otherArgs[1])); //輸出路徑*/ FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.setMapperClass(LogMapper.class); job.setMapOutputKeyClass(LongWritable.class); //與Reducer的不一致,須要指定 job.setMapOutputValueClass(Text.class); /*使用這個後,map一直卡在22%不動,由於map的輸出是<LongWritable,Text>,若是使用Combiner後,輸出與reducer一致<Text, NullWritable>, 這種輸出是不能做爲Reducer的輸入的,由於輸入要求是<LongWritable,Text>*/ //job.setCombinerClass(LogReducer.class); job.setReducerClass(LogReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); //Hdfs 輸出目錄刪除 FileSystem fs= FileSystem.get(new URI(args[0]),getConf()); Path outPath=new Path(args[1]); if(fs.exists(outPath)){ fs.delete(outPath,true); } return job.waitForCompletion(true)?0:1; } public static void main(String[] args) throws Exception { int exitCode=ToolRunner.run(new LogParser(),args); System.exit(exitCode); } //使用本地,Hdfs 輸出目錄應該怎麼刪除呢 /*private static void delDir(String path){ File f=new File(path); if(f.exists()){ if(f.isDirectory()){ String[] items=f.list(); for( String item : items ){ File f2=new File(path+"/"+item); if(f2.isDirectory()){ delDir(path+"/"+item); } else{ f2.delete(); } } } f.delete(); //刪除文件或者最後的空目錄 } else{ System.out.println("Output directory does not exist ."); } }*/ }