大數據學習系列----web日誌分析

場景

nginx日誌目錄 /usr/local/nginx/logs,日誌格式:python

123.13.17.13 - - [25/Aug/2016:00:00:01 +0800] "GET /AppFiles/apk/studynet/icon_v120/apk_80111_1.jpg HTTP/1.1" 206 51934 "http://img.xxx.com:8080/AppFiles/apk/studynet/icon_v120/apk_80111_1.jpg" "Dalvik/1.6.0 (Linux; U; Android 4.4.2; S100 Build/KOT49H)"
120.210.166.150 - - [25/Aug/2016:00:00:01 +0800] "GET /AppFiles/apk/studynet/products/product_lc01.zip HTTP/1.1" 206 16631 "http://img.xxx.com:8080/AppFiles/apk/studynet/products/product_lc01.zip" "Dalvik/1.6.0 (Linux; U; Android 4.4.2; S908 Build/KVT49L)"
123.13.17.13 - - [25/Aug/2016:00:00:01 +0800] "GET /AppFiles/apk/studynet/icon_v120/apk_80111_0.jpg HTTP/1.1" 206 53119 "http://img.xxx.com:8080/AppFiles/apk/studynet/icon_v120/apk_80111_0.jpg" "Dalvik/1.6.0 (Linux; U; Android 4.4.2; S100 Build/KOT49H)"
219.137.119.16 - - [25/Aug/2016:00:00:01 +0800] "GET /AppFiles/apk/gamenet/icon/icon_0_506_0.jpg HTTP/1.1" 404 1035 "-" "Dalvik/v3.3.110_update3 (Linux; U; Android 2.2.1-R-20151127.1131; ET_35 Build/KTU84Q)"
120.210.166.150 - - [25/Aug/2016:00:00:01 +0800] "GET /AppFiles/apk/studynet/products/product_lc01.zip HTTP/1.1" 206 40719 "http://img.xxx.com:8080/AppFiles/apk/studynet/products/product_lc01.zip" "Dalvik/1.6.0 (Linux; U; Android 4.4.2; S908 Build/KVT49L)"

日誌以空格分割,共12列數據:nginx

一、客戶端IP
二、空白(遠程登陸名稱)
三、空白(認證的遠程用戶)
四、請求時間
五、時區(UTC)
六、請求方法
七、請求資源
八、http協議
九、狀態碼
十、發送字節數
十一、訪問來源
十二、客戶瀏覽信息(不具體拆分)

nginx服務器部署HDFS日誌上傳腳本,定時將nginx日誌上傳到HDFS平臺。web

#!/usr/bin/env python
# -*- encoding: utf-8 -*-

import subprocess
import sys
import datetime

webid = 'test1' #HDFS存儲日誌標誌,另外一臺Web服務器爲:test2
currdate = datetime.datetime.now().strftime('%Y%m%d')

logspath = '/usr/local/nginx/logs/access.log' #日誌路徑
logname = 'access.log.'+webid

try:
    #建立HDFS目錄,目錄格式:nginx/20160825,加wait()是爲了讓父進程等待子進程完成後再繼續往下執行(subporcess默認啓動子進程後不等待其執行結果就繼續往下執行)
    subprocess.Popen(['/usr/local/hadoop-2.6.4/bin/hadoop','fs','-mkdir','-p','hdfs:///user/root/nginx'+currdate],stdout=subprocess.PIPE).wait() 
except Exception as e:
    pass

putinfo = subprocess.Popen(['/usr/local/hadoop-2.6.4/bin/hadoop','fs','-put',logspath,'hdfs:///user/root/nginx/' +currdate +'/'+logname],stdout=subprocess.PIPE) #上傳本地日誌到HDFS

for line in putinfo.stdout:
    print line

將上傳腳本部署到corntab實現定時功能安全

0 0 * * * /usr/bin/python /root/hadooptest/hdfsput.py >> /dev/null 2>&1服務器

日誌上傳到HDFS上,信息以下:app

[root@wx ~]# hadoop fs -ls /user/root/nginx/20160825
Found 2 items
-rw-r--r-- 1 root supergroup 15 2016-08-25 15:58 /user/root/nginx/20160825/access.log.test1
-rw-r--r-- 1 root supergroup 28 2016-08-25 15:58 /user/root/nginx/20160825/access.log.test2

訪問流量統計

實現精確到分鐘統計網站訪問流量,mapper操做時將web日誌中的每分鐘做爲key,將對應的行發送字節數value,在reducer操做時對時間相同的key作累加。oop

使用MRJob

#/usr/bin/env python
# -*- coding:utf-8 -*-

from mrjob.job import MRJob
import re

class MRCounter(MRJob):
    def mapper(self, key, line):
        i = 0
        for flow in line.split(): #獲取時間段,爲域日誌的第4列,內容如:「[24/Aug/2016:00:00:02」
            if i==3:
                timerow = flow.split(':')
                hm = timerow[1] + ':' + timerow[2] #獲取'小時:分鐘',做爲key
            if i==9 and re.match(r'\d{1,}',flow): #獲取日誌第10列:發送的字節數,做爲value
                yield hm,int(flow) #初始化key:value
            i+=1

    def reducer(self, key, occurences):
        yield key,sum(occurences) #相同key「小時:分鐘」的value作累加操做

if __name__ == '__main__':
    MRCounter.run()

生成Hadoop任務,運行:網站

python /root/hadoop/httpflow.py -r hadoop -o hdfs://output/httpflow hdfs:///user/root/nginxui

分析結果按期導入MySql生成報表。spa

統計網站HTTP狀態碼

能夠幫助咱們瞭解網站健康狀態,利用MRJob的多步調用實現。

#!/usr/bin/env python
# -*- encoding: utf-8 -*-

from mrjob.job import MRJob
import re

class MRCounter(MRJob):
    def mapper(self, key, line):
        i = 0
        for httpcode in line.split():
            if i == 8 and re.match(r'\d{1,3}',httpcode): #獲取日誌中HTTP狀態碼段,做爲key
                yield httpcode,1 #初始化key:value,value計數爲1,方便reducer作累加
            i+=1

    def reducer(self, httpcode,occurrences):
        yield httpcode,sum(occurrences) #對排序後的key對應的value做sum累加

    def steps(self):
        return [self.mr(mapper=self.mapper),self.mr(reducer=self.reducer)] #在steps方法中添加調用隊列

if __name__ == '__main__':
    MRCounter.run()

生成Hadoop任務,運行:

python httpstatus.py -r hadoop -o hdfs:///output/httpstatus hdfs:///user/nginx

結果:

[root@wx hadooptest]# hadoop fs -cat /output/httpstatus/part-00000
"200" 608997
"206" 2802574
"302" 1
"304" 34600
"400" 30
"401" 1
"404" 1653791
"416" 180358
"499" 2689

訪問來源IP統計

統計訪問來源IP能夠了解網站用戶分佈,幫助安全人員捕捉攻擊來源。定義匹配IP正則字符串做爲key,將value初始化爲1,執行reducer操做累加統計。

#!/usr/bin/env python
# -*- encoding: utf-8 -*-

from mrjob.job import MRJob
import re

IP_RE = re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}') #定義IP正則匹配

class MRCounter(MRJob):
    def mapper(self, key, line):
        for ip in IP_RE.findall(line): #匹配IP正則後生成key:value,其中key爲IP地址,value初始值爲1
            yield ip,1

    def reducer(self, ip,occurrences):
        yield ip,sum(occurrences) #對排序後的key對應的value做sum累加

if __name__ == '__main__':
    MRCounter.run()

執行任務:

python ipstat.py -r hadoop -o hdfs:///output/ipstat hdfs:///user/nginx

相關文章
相關標籤/搜索