nginx日誌分析工具

nginx日誌分析工具

日誌正則匹配

nginx的日誌輸出爲以下的格式:python

183.69.210.164 - - [07/Apr/2017:09:32:39 +0800] "GET /member/ HTTP/1.1" 302 31 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0

如今須要把remote ip,請求時間,請求方法,請求url以及請求協議等使用正則表達式的分組功能一一匹配出來,正則的匹配模式書寫以下:linux

(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[^\[\]]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w+/\d.]+)" (?P<status>\d+) (?P<size>\d+) .+ "(?P<useragent>.+)"

在書寫相似的正則時不要陷入目標數據對應的正則書寫,如[07/Apr/2017:09:32:39 +0800],能夠理解爲在一個[]中括號中出現了許多字符,但不會包含[]這樣的符號,因此就可寫成\[(?P&lt;datetime&gt;[^\[\]]+)\]這樣的形式。nginx

代碼驗證:正則表達式

import re
import datetime

logfile = '''183.69.210.164 - - [07/Apr/2017:09:32:39 +0800] "GET /member/ HTTP/1.1" 302 31 "-" "Mozilla/5.0 \
(Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"'''

pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[^\[\]]+)\] "(?P<method>\w+) (?P<url>\S+) \
(?P<protocol>[\w+/\d.]+)" (?P<status>\d+) (?P<size>\d+) .+ "(?P<useragent>.+)"'''
regex = re.compile(pattern)

matcher = regex.match(logfile)
if matcher:
    print(matcher.groupdict().items())

輸出後端

dict_items([('remote', '183.69.210.164'), ('datetime', '07/Apr/2017:09:32:39 +0800'), ('method', 'GET'), ('url', '/member/'), ('protocol', 'HTTP/1.1'), ('status', '302'), ('size', '31'), ('useragent', 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0')])

這樣就獲得一個類字典的數據,把一條日誌是的各個部份進行了分解,但各個分組中的數據類型都是字符串類型。像('datetime', '07/Apr/2017:09:32:39 +0800')是一個時間,能夠轉換成時間類型,('status', '302')是狀態碼,能夠轉換爲整形,以便後續在分析時直接就拿到相應的數據類型。瀏覽器

日誌中數據類型轉換

對字符串時間進行數據轉換緩存

import datetime

s = '07/Apr/2017:09:32:39 +0800'
dt = datetime.datetime.strptime(s, '%d/%b/%Y:%H:%M:%S %z')
print(type(dt), dt)

輸出多線程

<class 'datetime.datetime'> 2017-04-07 09:32:39+08:00

對status和size能夠直接使用int進行數據類型轉換,這種類型轉換函數能夠單獨定義在一個字典中,當一個功能來提供,以下:app

ops = {
    'datetime': lambda time_str: datetime.datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'size': int
}

這樣當日志格式變化後,能夠很靈活在opt這個字典中增長相應的轉換函數。dom

實現單行日誌抽取

結合上邊的轉換函數就能夠把一行日誌轉換成字典,該字典存放了正則匹配出的分組信息,並相應的數據已進行了類型轉換,發便後期分析時使用。完整代碼以下:

import re
import datetime

logfile = '''183.69.210.164 - - [07/Apr/2017:09:32:39 +0800] "GET /member/ HTTP/1.1" 302 31 "-" "Mozilla/5.0 \
(Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"'''

ops = {
    'datetime': lambda time_str: datetime.datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'size': int
}

pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[^\[\]]+)\] "(?P<method>\w+) (?P<url>\S+) \
(?P<protocol>[\w+/\d.]+)" (?P<status>\d+) (?P<size>\d+) .+ "(?P<useragent>.+)"'''
regex = re.compile(pattern)

def extract(line):
    matcher = regex.match(line)
    if matcher:
        info = {}
        for k, v in matcher.groupdict().items():
            info[k] = ops.get(k, lambda x: x)(v)  # 巧用字典的get方法及默認值是一個匿名函數
        return info
        # 上邊的代碼能夠用一行實現
        # return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()}
    else:
        return None

print(extract(logfile))

運行後輸出

{'remote': '183.69.210.164', 'datetime': datetime.datetime(2017, 4, 7, 9, 32, 39, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'url': '/member/', 'protocol': 'HTTP/1.1', 'status': 302, 'size': 31, 'useragent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0'}

輸出一個字典,相應的數據類型也完成了轉換。

日誌處理窗口函數

日誌文件被讀取並一條一條的送達給窗口函數,窗口函數須要緩存指定一段時間內的的日誌,因日誌是以時間序列產生的,當這個日誌中的時間差值大於定義的窗口大小時,代表這時咱們就須要對已緩存的日誌進行分析,如:在這個時間窗口內訪問的url有哪些,各個url訪問的數量,若是在一個時間窗口內對一個url訪問的數量特別的多,那可能這個url在被cc***,這時就能夠觸發告警,告知相關人員。

窗口函數

先看一個窗口函數的事例:

import random
import datetime
import time

def source():
    # 模擬數據源
    while True:
        tz_utc_8 = datetime.timezone(datetime.timedelta(hours=8))   # 建立時區UTC+8:00
        datetime_now = datetime.datetime.now()
        dt = datetime_now.replace(tzinfo=tz_utc_8)  # 強制設置爲UTC+8:00
        yield {'value': random.randint(1, 100), 'datetime': dt}
        time.sleep(1)

def window(src, handler, width: int, interval: int):
    """
    窗口函數
    :param src: 數據源,一個生成器
    :param handler: 數據處理函數
    :param width: 時間窗口寬度,秒
    :param interval: 處理時間間隔,秒,表示在nginx的日誌文件中的請求時間的差值在於interval時,就須要調用handler函數進行處理在這段時間內的這一批日誌
    :return:
    """

    # start和current都設置爲linux元年時間,後邊會被日誌中的時間替代,做初始化值用
    start = datetime.datetime.strptime('1970/01/01 00:00:00 +0800', '%Y/%m/%d %H:%M:%S %z')
    current = datetime.datetime.strptime('1970/01/01 00:00:00 +0800', '%Y/%m/%d %H:%M:%S %z')
    delta = datetime.timedelta(seconds=width - interval)

    buffer = []  # 時間窗口內日誌的緩存列表
    while True:
        for x in src:
            if x:
                # {'value': 31, 'datetime': datetime.datetime(2020, 5, 3, 16, 4, 33, 501124)}
                buffer.append(x)
                current = x['datetime']

            # 數據中的時間與定義的start相減後大於等於interval後,處理buffer中的時間間隔就到了
            if (current - start).total_seconds() >= interval:
                ret = handler(buffer)  # 數據處理函數
                start = current  # 時間點移動

                # buffer中的數據處理
                # x['datetime'] > current - delta 知足這個條件的數據須要保留
                buffer = [x for x in buffer if x['datetime'] > current - delta]

def do_nothing_handler(iterable: list):
    # 模擬處理函數
    print(iterable)
    print(len(iterable))

window(source(), do_nothing_handler, 8, 6)

運行後輸出

[{'value': 82, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 7, 963440, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}]
1
[{'value': 82, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 7, 963440, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 56, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 8, 967087, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 70, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 9, 971149, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 64, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 10, 975855, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 77, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 11, 978261, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 64, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 12, 980210, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 14, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 13, 983762, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}]
7
[{'value': 64, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 12, 980210, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 14, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 13, 983762, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 7, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 14, 987289, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 13, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 15, 990374, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 96, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 16, 992201, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 58, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 17, 992857, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 30, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 18, 995864, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 47, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 19, 998886, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}]
8
[{'value': 30, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 18, 995864, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 47, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 19, 998886, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 89, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 21, 3320, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 45, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 22, 7064, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 58, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 23, 9548, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 64, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 24, 11991, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 67, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 25, 15751, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 8, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 26, 18675, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}]
8
[{'value': 67, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 25, 15751, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 8, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 26, 18675, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 64, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 27, 23564, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 11, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 28, 27336, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 29, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 29, 29979, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 21, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 30, 34198, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 79, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 31, 37143, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 62, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 32, 39595, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}]
8

當運行穩定後,每次處理的buffer中的數據爲8條,即爲width的值,而interval設置爲6,這說明上一次buffer中的數據和下一次buffer中的數據有重複的,這在某些場景是容許有數據重複,當width = interval時,buffer中的數據不會有重複,若是width < interval時,這時就有時間丟失了,這是不能容許出現的。

整合日誌處理

import re
import datetime

logfile = '''183.69.210.164 - - [07/Apr/2017:09:32:39 +0800] "GET /member/ HTTP/1.1" 302 31 "-" "Mozilla/5.0 \
(Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"'''

ops = {
    'datetime': lambda time_str: datetime.datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'size': int
}

pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[^\[\]]+)\] "(?P<method>\w+) (?P<url>\S+) \
(?P<protocol>[\w+/\d.]+)" (?P<status>\d+) (?P<size>\d+) .+ "(?P<useragent>.+)"'''
regex = re.compile(pattern)

# 這裏還會面臨一個問題,若是有一行日誌因一些緣由不是按照定義的格式記錄的,那「regex.match(line)」會是一個None,
# 那上邊的代碼就會報異常。因此這裏還會面臨異常的處理,加判斷便可
def extract(line):
    matcher = regex.match(line)
    if matcher:
        return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()}  # ops.get(k, lambda x: x)(v) 轉換相應的值
    else:
        return None

def window(src, handler, width: int, interval: int):
    """
    窗口函數
    :param src: 數據源,生成器
    :param handler: 數據處理函數
    :param width: 時間窗口寬度,秒
    :param interval: 處理時間間隔,秒,表示在nginx的日誌文件中的請求時間的差值在於interval時,就須要調用handler函數進行處理在這段時間內的這一批日誌
    :return: None
    """

    start = datetime.datetime.strptime('1970/01/01 00:00:00 +0800', '%Y/%m/%d %H:%M:%S %z')  # 也能夠設置爲None
    current = datetime.datetime.strptime('1970/01/01 00:00:00 +0800', '%Y/%m/%d %H:%M:%S %z')
    delta = datetime.timedelta(seconds=width - interval)

    buffer = []
    for x in src:
        if x:
            # {'value': 31, 'datetime': datetime.datetime(2020, 5, 3, 16, 4, 33, 501124)}
            buffer.append(x)
            current = x['datetime']
        if (current - start).total_seconds() >= interval:  # 數據中的時間與定義的start相減後大於等於interval後,處理buffer中的時間間隔就到了
            ret = handler(buffer)  # 數據處理函數
            start = current  # 時間點移動

            # buffer中的數據處理
            # x['datetime'] > current - delta 知足這個條件的數據須要保留
            buffer = [x for x in buffer if x['datetime'] > current - delta]

# 裝載函數
def load(path):
    with open(path) as f:
        for line in f:
            field = extract(line)
            if field:
                yield field
            else:
                continue  # 解析失敗時拋棄或打印日誌

# 處理函數
def do_nothing_handler(iterable: list):
    print(iterable)
    print(len(iterable))

# test.log 日誌文件爲測試文件
window(load('test.log'), do_nothing_handler, 300, 300)

引入隊列和分發器

對日誌的分析函數多種多樣,如:在時間窗口內請求的狀態碼的佔比,在時間窗口內請求url的數量,各類useragent訪問的數量(不是時間窗口內的統計)等。而這些分析函數每每是須要並行執行的,因此這裏也會引入分發器,在分發器中使用多線程來執行相應的分析處理函數。

分發器事例

def dispatcher(src):  # src參數是一個生成器,便是數據源,通過extract函數處理過一行數據
    queues = []
    threads = []

    def reg(handler, width, interval):
        # handler: 分析函數
        # width: 時間窗口
        # interval: 分析函數調用時間間隔
        # 註冊時須要分配消費者函數window各自的隊列,建立各自的線程對象
        q = Queue()
        queues.append(q)
        t = threading.Thread(target=window, args=(q, handler, width, interval))
        threads.append(t)

    def run():
        # 啓動線程
        for t in threads:
            t.start()

        # 把日誌裝載函數load返回的數據put進各個消費者的隊列,相似於廣播形式
        for x in src:
            for q in queues:
                q.put(x)

    return reg, run # 返回註冊函數和運行函數

window函數的第一個參數爲src一個生成器,如今加入分發器後,第一參數變成了一個q隊列,因此window函數會作相應的變動。

狀態碼分析函數

在一個時間窗口內請求狀態碼的佔比能反應出後端服務的健康情況。在window這個窗口函數中會把一個指定時間內的日誌append到一個list中,若是觸發了時間窗口,那就會調用相應的handler函數對list中的日誌進行分析,因此狀態碼分析函數接收一個list做爲參數。

from collections import defaultdict

def status_handler(iterable: list):
    status = defaultdict(lambda: 0)  # 使用默認字典,初始化值爲0
    for item in iterable:
        key = item['status']
        status[key] += 1
    total = sum(status.values())
    # total = len(iterable)  # 數據總條數求值有多種方式,也能夠上for循環中計數
    result = {k: v/total*100 for k, v in status.items()} # 求狀態碼在該時間窗口內所佔的百分比
    # print(result)
    return result  # 返回一個字典, {'200': 95, '502': 2.4}

useragent統計函數

客戶端請求時使用的是什麼瀏覽器也能夠進行統計,只是這個統計不針對時間窗口內的日誌進行統計,只針對時間窗口內統計意義不太大,而是須要進行累計統計。對useragent有專門的包能夠對其進行解析,名稱爲user_agents,先看一個事例:

from user_agents import parse  # pip install user-agents

ua = "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"

u = parse(ua)
print(u.browser)
print(u.browser.family, u.browser.version_string)

輸出

Browser(family='Sogou Explorer', version=(1, 0), version_string='1.0')
Sogou Explorer 1.0

因此能夠利用user_agents包解析出user agent的名稱和相應的版本。

要對user agent進行統計,那在對日誌文件中的一行日誌文件進行轉換時,須要增長對user agent作相應的轉換,在opt這個字典中增長相應的轉換函數,以下:

from user_agents import parse

ops = {
    'datetime': lambda time_str: datetime.datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'size': int,
    'useragent': lambda useragent: parse(useragent)
}

User agent統計代碼以下:

# 統計結果放在函數useragent_handler以外,記錄當前處理的日誌文件,若是放在useragent_handler內則是統計時間窗口內的日誌條數
ua_dict = defaultdict(lambda: 0)

# useragent分析函數
def useragent_handler(iterable: list):
    for item in iterable:
        ua = item['useragent'] # ua是一個 user_agents.parsers.UserAgent 對象
        key = (ua.browser.family, ua.browser.version_string) # key爲一個元組
        ua_dict[key] += 1
    # print(sorted(ua_dict.items(), key=lambda x: x[1], reverse=True))  # 以useragent的數量升序排序
    return ua_dict

日誌分析工具完整代碼

import re
import datetime
from queue import Queue
import threading
from collections import defaultdict
from pathlib import Path
from user_agents import parse

ops = {
    'datetime': lambda time_str: datetime.datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S %z'),
    'status': int,
    'size': int,
    'useragent': lambda useragent: parse(useragent)
}

pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[^\[\]]+)\] "(?P<method>\w+) (?P<url>\S+) \
(?P<protocol>[\w+/\d.]+)" (?P<status>\d+) (?P<size>\d+) .+ "(?P<useragent>.+)"'''
regex = re.compile(pattern)

# 日誌抽取函數
def extract(line) -> dict:
    matcher = regex.match(line)
    if matcher:
        return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()}
    else:
        return None

def window(src: Queue, handler, width: int, interval: int):
    """
    窗口函數
    :param src: 是一個queue
    :param handler: 數據處理函數
    :param width: 時間窗口寬度,秒
    :param interval: 處理時間間隔,秒
    :return:
    """

    start = datetime.datetime.strptime('1970/01/01 00:00:00 +0800', '%Y/%m/%d %H:%M:%S %z')  # 也能夠設置爲None
    current = datetime.datetime.strptime('1970/01/01 00:00:00 +0800', '%Y/%m/%d %H:%M:%S %z')
    delta = datetime.timedelta(seconds=width - interval)

    buffer = []
    while True:
        data = src.get()  # 阻塞模式
        if data:
            buffer.append(data)
            current = data['datetime']
        if (current - start).total_seconds() >= interval:  # 數據中的時間與定義的start相減後大於等於interval後,處理buffer中的時間間隔就到了
            ret = handler(buffer)  # 數據處理函數
            start = current  # 時間點移動
            # buffer中的數據處理
            # x['datetime'] > current - delta 知足這個條件的數據須要保留,結合「01-日誌分析回顧及數據載入」的圖形看
            buffer = [x for x in buffer if x['datetime'] > current - delta]
            print(ret)  # ret是觸發調用函數的執行返回值,在此能夠窗口內的分析結果進行相應的判斷,好比502的狀態碼佔比大於多少時作什麼

# 抽取出日誌文件讀取函數,是一個生成器函數
def open_file(path: str):
    with open(str(path)) as f:
        for line in f:
            field = extract(line)
            if field:
                yield field
            else:
                continue  # 解析失敗時拋棄或打印日誌

# 裝載函數,可受多個路徑,能夠是目錄,若是是目錄只迭代一該目錄下的全部文件,目錄忽略
def load(*path):
    for item in path:
        p = Path(item)
        if not p.exists():
            continue
        if p.is_dir():
            for file in p.iterdir():
                if file.is_file():  # 只處理文件,目錄無論
                    yield from open_file(str(file))

        elif p.is_file():
            yield from open_file(str(p))

# 處理函數
def do_nothing_handler(iterable: list):
    print(iterable)
    print(len(iterable))

# 狀態碼分析處理函數,返回各狀態碼的佔比
def status_handler(iterable: list):
    status = defaultdict(lambda: 0)
    for item in iterable:
        key = item['status']
        status[key] += 1
    total = sum(status.values())
    # total = len(iterable)  # 數據總條數求值有多種方式,也能夠上for循環中計數
    result = {k: v/total*100 for k, v in status.items()}
    # print(result)
    return result

# 統計結果放在函數useragent_handler以外,記錄當前處理的日誌文件,若是放在useragent_handler內則是統計時間窗口內的日誌條數
ua_dict = defaultdict(lambda: 0)

# useragent分析函數
def useragent_handler(iterable: list):
    for item in iterable:
        ua = item['useragent']  # ua是一個 user_agents.parsers.UserAgent 對象
        key = (ua.browser.family, ua.browser.version_string)  # key爲一個元組
        ua_dict[key] += 1
    # print(sorted(ua_dict.items(), key=lambda x: x[1], reverse=True))  # 以useragent的數量升序排序
    return ua_dict

# 分發器函數
# 1. 每個window函數擁有本身的隊列,相應的處理函數,以及時間窗口及處理時間間隔
# 2. window函數至關於就是生產者消費者模型中的消費者,在實際的業務環境中消費者可能許多,拿到相同的一份數據後進行各自的handler處理
def dispatcher(src):  # src參數是一個生成器,便是數據源
    queues = []
    threads = []

    def reg(handler, width, interval):
        # 註冊時須要分配消費都函數window各息的隊列,建立各息的線程對象
        q = Queue()
        queues.append(q)
        t = threading.Thread(target=window, args=(q, handler, width, interval))
        threads.append(t)

    def run():
        # 啓動線程
        for t in threads:
            t.start()

        # 把日誌裝載函數load返回的數據put進各個消費者的隊列
        for x in src:
            for q in queues:
                q.put(x)

    return reg, run

path = 'test.log'  # 接受可變參數傳遞,如 path = 'test.log, /var/logs/'
regs, runs = dispatcher(load(path))

# 註冊窗口,能夠註冊多個,根據各自的日誌處理邏輯傳入handler, width, interval便可。這樣每一個window函數都在各自的線程中運行,互不影響。
# regs(do_nothing_handler, 10, 5)

# 註冊狀態碼處理函數
regs(status_handler, 10, 5)

# 註冊useragent分析函數
regs(useragent_handler, 10, 10)  # useragent分析不是對時間窗口內的數據求和或求平均值,數據不須要有重複

# 運行
runs()
相關文章
相關標籤/搜索