Hadoop日誌分析系統html
項目需求:java
須要統計一下線上日誌中某些信息天天出現的頻率,舉個簡單的例子,統計線上天天的請求總數和異常請求數。線上大概幾十臺linux
服務器,每臺服務器大概天天產生4到5G左右的日誌,假設有30臺,每臺5G的,一天產生的日誌總量爲150G。apache
處理方案:服務器
方案1:傳統的處理方式,寫個JAVA日誌分析代碼,部署到每臺服務器進行處理,這種方式部署起來耗時費力,又很差維護。app
方案2:採用Hadoop分佈式處理,日誌分析是Hadoop集羣系統的拿手好戲。150G天天的日誌也算是比較大的數據量了,搭個簡eclipse
單的Hadoop集羣來處理這些日誌是再好不過的了。分佈式
Hadoop集羣的搭建:oop
參見這兩篇文章:http://www.cnblogs.com/cstar/archive/2012/12/16/2820209.html測試
http://www.cnblogs.com/cstar/archive/2012/12/16/2820220.html
咱們這裏的集羣就採用了兩臺機器,配置每臺8核,32G內存,500G磁盤空間。
日誌準備工做:
因爲日誌分散在各個服務器,因此咱們先須要將全部的日誌拷貝到咱們的集羣系統當中,這個能夠經過linux服務器下rsync或者scp
服務來執行。這裏咱們經過scp服務來拷貝,因爲都是內網的機器,因此拷貝幾個G的日誌能夠很快就完成。下面是拷貝日誌的腳本,腳本
仍是有一些須要注意的地方,咱們只須要拷貝前一天的數據,實際保存的數據多是好幾天的,因此咱們只要把咱們須要的這一天的數據
SCP過去就能夠了。
#!/bin/sh workdir=/home/myproj/bin/log/ files=`ls $workdir` pre1date=`date +"%Y%m%d" -d "-1 days"` pre1date1=`date +"%Y-%m-%d" -d "-1 days"` curdate=`date +"%Y%m%d"` hostname=`uname -n` echo $pre1date $curdate uploadpath="/home/hadoop/hadoop/mytest/log/"$pre1date1"/"$hostname echo $uploadpath cd $workdir mintime=240000 secondmintime=0 for file in $files;do filedate=`stat $file | grep Modify| awk '{print $2}' |sed -e 's/-//g'` filetime=`stat $file | grep Modify| awk '{print $3}' |cut -d"." -f1 | sed -e 's/://g'| sed 's/^0\+//'` if [ $filedate -eq $curdate ]; then if [ $filetime -lt $mintime ]; then secondmintime=$mintime mintime=$filetime fi fi done echo "mintime:"$mintime step=1000 mintime=`expr $mintime + $step` echo "mintime+1000:"$mintime for file in $files;do filedate=`stat $file | grep Modify| awk '{print $2}' |sed -e 's/-//g'` filetime=`stat $file | grep Modify| awk '{print $3}' |cut -d"." -f1 | sed -e 's/://g'| sed 's/^0\+//'` filename=`echo $file | cut -c 1-8` startchars="info.log" #echo $filename if [ $filename == $startchars ]; then if [ $filedate -eq $pre1date ]; then scp -rp $file dir@antix2:$uploadpath #echo $file elif [ $filedate -eq $curdate ]; then if [ $filetime -lt $mintime ]; then scp -rp $file dir@antix2:$uploadpath #echo $file fi fi fi #echo $filedate $filetime done
MapReduce代碼
接下來就是編寫MapReduce的代碼了。使用Eclipse環境來編寫,須要安裝hadoop插件,咱們hadoop機器採用的是1.1.1版本,因此插
件使用hadoop-eclipse-plugin-1.1.1.jar,將插件拷貝到eclipse的plugins目錄下就能夠了。而後新建一個MapReduce項目:
工程新建好了而後咱們就能夠編寫咱們的MapReduce代碼了。
import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class LogAnalysis { public static class LogMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private Text hourWord = new Text(); public void map(LongWritable key, Text value, Context context ) throws IOException, InterruptedException { String line = value.toString(); SimpleDateFormat formatter2 = new SimpleDateFormat("yy-MM-dd"); java.util.Date d1 =new Date(); d1.setTime(System.currentTimeMillis()-1*24*3600*1000); String strDate =formatter2.format(d1); if(line.contains(strDate)){ String[] strArr = line.split(","); int len = strArr[0].length(); String time = strArr[0].substring(1,len-1); String[] timeArr = time.split(":"); String strHour = timeArr[0]; String hour = strHour.substring(strHour.length()-2,strHour.length()); String hourKey = ""; if(line.contains("StartASocket")){ word.set("SocketCount"); context.write(word, one); hourKey = "SocketCount:" + hour; hourWord.set(hourKey); context.write(hourWord, one); word.clear(); hourWord.clear(); } if(line.contains("SocketException")){ word.set("SocketExceptionCount"); context.write(word, one); hourKey = "SocketExceptionCount:" + hour; hourWord.set(hourKey); context.write(hourWord, one); word.clear(); hourWord.clear(); }
} } public static class LogReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static int run(String[] args) throws Exception{ Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: loganalysis <in> <out>"); System.exit(2); } FileSystem fileSys = FileSystem.get(conf); String inputPath = "input/" + args[0]; fileSys.copyFromLocalFile(new Path(args[0]), new Path(inputPath));//將本地文件系統的文件拷貝到HDFS中 Job job = new Job(conf, "loganalysis"); job.setJarByClass(LogAnalysis.class); job.setMapperClass(LogMapper.class); job.setCombinerClass(LogReducer.class); job.setReducerClass(LogReducer.class); // 設置輸出類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); Date startTime = new Date(); System.out.println("Job started: " + startTime); int ret = job.waitForCompletion(true)? 0 : 1; fileSys.copyToLocalFile(new Path(otherArgs[1]), new Path(otherArgs[1])); fileSys.delete(new Path(inputPath), true); fileSys.delete(new Path(otherArgs[1]), true); Date end_time = new Date(); System.out.println("Job ended: " + end_time); System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds."); return ret; } public static void main(String[] args) { try { int ret = run(args); System.exit(ret); } catch (Exception e) { e.printStackTrace(); System.out.println(e.getMessage()); } } }
部署到Hadoop集羣:
代碼完成後測試沒有問題後,部署到集羣當中去執行,咱們有幾十臺服務器,因此每臺的服務器的日誌當成一個任務來執行。
workdir="/home/hadoop/hadoop/mytest" cd $workdir pre1date=`date +"%Y-%m-%d" -d "-1 days"` servers=(mach1 mach2 mach3 ) for i in ${servers[@]};do inputPath="log/"$pre1date"/"$i outputPath="output/log/"$pre1date"/"$i echo $inputPath $outputPath echo "start job "$i" date:"`date` hadoop jar LogAnalysis.jar loganalysis $inputPath $outputPath echo "end job "$i" date:"`date` done