項目狀況介紹:
基於Python 3.6.6 ,實現對nginx訪問的日誌分析代碼,實現了對日誌中code的佔比統計和瀏覽器類型和訪問狀況統計
實現的代碼段有:
1.編寫窗戶函數,實如今必定的時間內對數據進行分析
2.經過正則表達式對日誌進行匹配,加載日誌文件,提取出文本里每行的日誌信息
3.編寫消費端代碼,即便得提取到的數據可以按照消費端的代碼進行處理
4.消息分發代碼實現,經過queue,將提取的的文本放到隊列裏,供消費端代碼處理
項目代碼以下nginx
import random import datetime import time from queue import Queue import threading import re from pathlib import Path from user_agents import parse """ 這段代碼,實現了再一段時間內得到數據,經過不一樣的handler(即消費端函數) 對獲取到的同一份數據進行處理,主要是兩段消費函數,網頁返回的code的統計和瀏覽器的分析 這段代碼,窗口函數中,data = src.get(),使得沒有新的數據產生時,該代碼會阻塞,直到有新的數據生成,再次進行處理 """ pattern = '''(?P<remote>[\d.]{7,}\s-\s-\s\[(?P<datetime>[^\[\]]+)\])\s\ "(?P<method>.*)\s(?P<url>.*)\s(?P<protocol>.*)"\s(?P<status>\d{3})\s(?P<size>\d+)\s"[^"]+"\s"(?P<useragent>[^"]+)"''' #編譯 regex = re.compile(pattern) #構造字典 ops = { 'datetime': lambda datestr: datetime.datetime.strptime(datestr, '%d/%b/%Y:%H:%M:%S %z'), 'status': int, 'size': int, 'useragent': lambda ua: parse(ua) } #提取信息 def extract(line: str) -> dict: matcher = regex.match(line) if matcher: return {name: ops.get(name, lambda x: x)(data) for name, data in matcher.groupdict().items()} # 打開文件 def openfile(path: str): """裝載日誌文件""" with open(path) as f: for line in f: fields = extract(line) if fields: yield fields else: continue #裝載文件,判斷文件類型已是否存在 def load(*paths): for item in paths: p = Path(item) if not p.exists(): continue if p.is_dir(): for file in p.iterdir(): if file.is_file(): yield from openfile(str(file)) elif p.is_file(): yield from openfile(str(p)) # 隨機生成100之內的數字 def source(second=1): """生成數據""" while True: yield { 'datetime': datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=8))), 'value': random.randint(1, 100) } time.sleep(second) # 滑動窗口函數 def window(src: Queue, handler, width: int, interval: int): ''' 窗口函數,表示間隔一段時間取出必定的數據進行處理 :param src:數據源,這裏是緩存隊列,用於獲取數據 :param handler:數據處理的函數 :param width:時間窗口函數,秒 :param interval:處理時間間隔,秒 ''' start = datetime.datetime.strptime('20170101 000000 +0800', '%Y%m%d %H%M%S %z') current = datetime.datetime.strptime('20170101 010000 +0800', '%Y%m%d %H%M%S %z') buffer = [] delta = datetime.timedelta(seconds=width - interval) while True: # 從數據源獲取數據 data = src.get() # 這個代碼會阻塞,等待數據輸入,沒有數據輸入就阻塞 if data: buffer.append(data) current = data['datetime'] # 存入臨時緩衝等待計算 # 每隔interval從新計算buffer中的一次數據 if (current - start).total_seconds() >= interval: ret = handler(buffer) start = current # 清除超出width的數據 buffer = [x for x in buffer if x['datetime'] > current - delta] # 隨機數平均的測算函數 source() def handler(iterable): #return sum(map(lambda x: x['value'], iterable)) / len(iterable) print(sum(map(lambda x:x['value'],iterable))/len(iterable)) # 測試函數 def donothing_handler(iterable): #return iterable print(iterable) # 狀態碼佔比 def status_handler(iterable): # 時間窗口內的一批數據 status = {} for item in iterable: key = item['status'] status[key] = status.get(key, 0) + 1 total = len(iterable) print({k:float( "{:.2f}".format(status[k] / total)) for k, v in status.items()}) return {k: status[k] / total for k, v in status.items()} # 瀏覽器分析 allbrowsers = {} def browser_handler(iterable): browsers = {} for item in iterable: ua = item['useragent'] key = (ua.browser.family, ua.browser.version_string) browsers[key] = browsers.get(key, 0) + 1 allbrowsers[key] = allbrowsers.get(key, 0) + 1 print(sorted(allbrowsers.items(), key=lambda x: x[1], reverse=True)[:10]) return browsers # 分發器 def dispatcher(src): # 分發器中記錄handler,同時保存各自的隊列 handlers = [] queues = [] def reg(handler, width: int, interval: int): """ 註冊窗口處理函數 :param handler:註冊數據處理函數 :param width:時間窗口寬度 :param interval:時間間隔 """ q = Queue() queues.append(q) # 多線程,數據並行 h = threading.Thread(target=window, args=(q, handler, width, interval)) handlers.append(h) def run(): # 啓動線程處理數據 for t in handlers: t.start() # 將獲取到的數據分發到全部的隊列中 for item in src: for q in queues: q.put(item) # print(q.get()) return reg, run if __name__ == "__main__": import sys path = '/tmp/test.log' """ 如下的代碼爲測試用的,用於統計每隔5s統計10s內的隨機數字的平均值 reg, run = dispatcher(source()) reg(handler, 10, 5) """ reg, run = dispatcher(load(path)) #每隔5s返回過去10s的數據,可是不作處理 reg(donothing_handler, 10, 5) #每隔5s統計10s內的返回狀態碼的佔比狀況 reg(status_handler, 10, 5) # 每隔5s統計10s內的瀏覽器類型佔比狀況,展現排行10s內訪問量前十的瀏覽器 reg(browser_handler,10,5) run()