nginx的日誌輸出爲以下的格式:python
183.69.210.164 - - [07/Apr/2017:09:32:39 +0800] "GET /member/ HTTP/1.1" 302 31 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0
如今須要把remote ip
,請求時間,請求方法,請求url以及請求協議等使用正則表達式的分組功能一一匹配出來,正則的匹配模式書寫以下:linux
(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[^\[\]]+)\] "(?P<method>\w+) (?P<url>\S+) (?P<protocol>[\w+/\d.]+)" (?P<status>\d+) (?P<size>\d+) .+ "(?P<useragent>.+)"
在書寫相似的正則時不要陷入目標數據對應的正則書寫,如[07/Apr/2017:09:32:39 +0800]
,能夠理解爲在一個[]
中括號中出現了許多字符,但不會包含[
,]
這樣的符號,因此就可寫成\[(?P<datetime>[^\[\]]+)\]
這樣的形式。nginx
代碼驗證:正則表達式
import re import datetime logfile = '''183.69.210.164 - - [07/Apr/2017:09:32:39 +0800] "GET /member/ HTTP/1.1" 302 31 "-" "Mozilla/5.0 \ (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"''' pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[^\[\]]+)\] "(?P<method>\w+) (?P<url>\S+) \ (?P<protocol>[\w+/\d.]+)" (?P<status>\d+) (?P<size>\d+) .+ "(?P<useragent>.+)"''' regex = re.compile(pattern) matcher = regex.match(logfile) if matcher: print(matcher.groupdict().items())
輸出後端
dict_items([('remote', '183.69.210.164'), ('datetime', '07/Apr/2017:09:32:39 +0800'), ('method', 'GET'), ('url', '/member/'), ('protocol', 'HTTP/1.1'), ('status', '302'), ('size', '31'), ('useragent', 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0')])
這樣就獲得一個類字典的數據,把一條日誌是的各個部份進行了分解,但各個分組中的數據類型都是字符串類型。像('datetime', '07/Apr/2017:09:32:39 +0800')
是一個時間,能夠轉換成時間類型,('status', '302')
是狀態碼,能夠轉換爲整形,以便後續在分析時直接就拿到相應的數據類型。瀏覽器
對字符串時間進行數據轉換緩存
import datetime s = '07/Apr/2017:09:32:39 +0800' dt = datetime.datetime.strptime(s, '%d/%b/%Y:%H:%M:%S %z') print(type(dt), dt)
輸出多線程
<class 'datetime.datetime'> 2017-04-07 09:32:39+08:00
對status和size能夠直接使用int進行數據類型轉換,這種類型轉換函數能夠單獨定義在一個字典中,當一個功能來提供,以下:app
ops = { 'datetime': lambda time_str: datetime.datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S %z'), 'status': int, 'size': int }
這樣當日志格式變化後,能夠很靈活在opt
這個字典中增長相應的轉換函數。dom
結合上邊的轉換函數就能夠把一行日誌轉換成字典,該字典存放了正則匹配出的分組信息,並相應的數據已進行了類型轉換,發便後期分析時使用。完整代碼以下:
import re import datetime logfile = '''183.69.210.164 - - [07/Apr/2017:09:32:39 +0800] "GET /member/ HTTP/1.1" 302 31 "-" "Mozilla/5.0 \ (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"''' ops = { 'datetime': lambda time_str: datetime.datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S %z'), 'status': int, 'size': int } pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[^\[\]]+)\] "(?P<method>\w+) (?P<url>\S+) \ (?P<protocol>[\w+/\d.]+)" (?P<status>\d+) (?P<size>\d+) .+ "(?P<useragent>.+)"''' regex = re.compile(pattern) def extract(line): matcher = regex.match(line) if matcher: info = {} for k, v in matcher.groupdict().items(): info[k] = ops.get(k, lambda x: x)(v) # 巧用字典的get方法及默認值是一個匿名函數 return info # 上邊的代碼能夠用一行實現 # return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()} else: return None print(extract(logfile))
運行後輸出
{'remote': '183.69.210.164', 'datetime': datetime.datetime(2017, 4, 7, 9, 32, 39, tzinfo=datetime.timezone(datetime.timedelta(0, 28800))), 'method': 'GET', 'url': '/member/', 'protocol': 'HTTP/1.1', 'status': 302, 'size': 31, 'useragent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0'}
輸出一個字典,相應的數據類型也完成了轉換。
日誌文件被讀取並一條一條的送達給窗口函數,窗口函數須要緩存指定一段時間內的的日誌,因日誌是以時間序列產生的,當這個日誌中的時間差值大於定義的窗口
大小時,代表這時咱們就須要對已緩存的日誌進行分析,如:在這個時間窗口
內訪問的url
有哪些,各個url
訪問的數量,若是在一個時間窗口內對一個url訪問的數量特別的多,那可能這個url在被cc***,這時就能夠觸發告警,告知相關人員。
先看一個窗口函數的事例:
import random import datetime import time def source(): # 模擬數據源 while True: tz_utc_8 = datetime.timezone(datetime.timedelta(hours=8)) # 建立時區UTC+8:00 datetime_now = datetime.datetime.now() dt = datetime_now.replace(tzinfo=tz_utc_8) # 強制設置爲UTC+8:00 yield {'value': random.randint(1, 100), 'datetime': dt} time.sleep(1) def window(src, handler, width: int, interval: int): """ 窗口函數 :param src: 數據源,一個生成器 :param handler: 數據處理函數 :param width: 時間窗口寬度,秒 :param interval: 處理時間間隔,秒,表示在nginx的日誌文件中的請求時間的差值在於interval時,就須要調用handler函數進行處理在這段時間內的這一批日誌 :return: """ # start和current都設置爲linux元年時間,後邊會被日誌中的時間替代,做初始化值用 start = datetime.datetime.strptime('1970/01/01 00:00:00 +0800', '%Y/%m/%d %H:%M:%S %z') current = datetime.datetime.strptime('1970/01/01 00:00:00 +0800', '%Y/%m/%d %H:%M:%S %z') delta = datetime.timedelta(seconds=width - interval) buffer = [] # 時間窗口內日誌的緩存列表 while True: for x in src: if x: # {'value': 31, 'datetime': datetime.datetime(2020, 5, 3, 16, 4, 33, 501124)} buffer.append(x) current = x['datetime'] # 數據中的時間與定義的start相減後大於等於interval後,處理buffer中的時間間隔就到了 if (current - start).total_seconds() >= interval: ret = handler(buffer) # 數據處理函數 start = current # 時間點移動 # buffer中的數據處理 # x['datetime'] > current - delta 知足這個條件的數據須要保留 buffer = [x for x in buffer if x['datetime'] > current - delta] def do_nothing_handler(iterable: list): # 模擬處理函數 print(iterable) print(len(iterable)) window(source(), do_nothing_handler, 8, 6)
運行後輸出
[{'value': 82, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 7, 963440, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}] 1 [{'value': 82, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 7, 963440, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 56, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 8, 967087, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 70, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 9, 971149, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 64, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 10, 975855, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 77, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 11, 978261, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 64, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 12, 980210, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 14, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 13, 983762, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}] 7 [{'value': 64, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 12, 980210, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 14, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 13, 983762, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 7, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 14, 987289, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 13, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 15, 990374, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 96, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 16, 992201, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 58, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 17, 992857, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 30, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 18, 995864, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 47, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 19, 998886, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}] 8 [{'value': 30, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 18, 995864, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 47, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 19, 998886, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 89, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 21, 3320, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 45, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 22, 7064, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 58, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 23, 9548, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 64, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 24, 11991, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 67, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 25, 15751, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 8, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 26, 18675, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}] 8 [{'value': 67, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 25, 15751, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 8, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 26, 18675, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 64, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 27, 23564, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 11, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 28, 27336, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 29, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 29, 29979, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 21, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 30, 34198, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 79, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 31, 37143, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}, {'value': 62, 'datetime': datetime.datetime(2020, 9, 26, 22, 11, 32, 39595, tzinfo=datetime.timezone(datetime.timedelta(0, 28800)))}] 8
當運行穩定後,每次處理的buffer中的數據爲8條,即爲width
的值,而interval
設置爲6,這說明上一次buffer中的數據和下一次buffer中的數據有重複的,這在某些場景是容許有數據重複,當width
= interval
時,buffer中的數據不會有重複,若是width
< interval
時,這時就有時間丟失了,這是不能容許出現的。
import re import datetime logfile = '''183.69.210.164 - - [07/Apr/2017:09:32:39 +0800] "GET /member/ HTTP/1.1" 302 31 "-" "Mozilla/5.0 \ (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0"''' ops = { 'datetime': lambda time_str: datetime.datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S %z'), 'status': int, 'size': int } pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[^\[\]]+)\] "(?P<method>\w+) (?P<url>\S+) \ (?P<protocol>[\w+/\d.]+)" (?P<status>\d+) (?P<size>\d+) .+ "(?P<useragent>.+)"''' regex = re.compile(pattern) # 這裏還會面臨一個問題,若是有一行日誌因一些緣由不是按照定義的格式記錄的,那「regex.match(line)」會是一個None, # 那上邊的代碼就會報異常。因此這裏還會面臨異常的處理,加判斷便可 def extract(line): matcher = regex.match(line) if matcher: return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()} # ops.get(k, lambda x: x)(v) 轉換相應的值 else: return None def window(src, handler, width: int, interval: int): """ 窗口函數 :param src: 數據源,生成器 :param handler: 數據處理函數 :param width: 時間窗口寬度,秒 :param interval: 處理時間間隔,秒,表示在nginx的日誌文件中的請求時間的差值在於interval時,就須要調用handler函數進行處理在這段時間內的這一批日誌 :return: None """ start = datetime.datetime.strptime('1970/01/01 00:00:00 +0800', '%Y/%m/%d %H:%M:%S %z') # 也能夠設置爲None current = datetime.datetime.strptime('1970/01/01 00:00:00 +0800', '%Y/%m/%d %H:%M:%S %z') delta = datetime.timedelta(seconds=width - interval) buffer = [] for x in src: if x: # {'value': 31, 'datetime': datetime.datetime(2020, 5, 3, 16, 4, 33, 501124)} buffer.append(x) current = x['datetime'] if (current - start).total_seconds() >= interval: # 數據中的時間與定義的start相減後大於等於interval後,處理buffer中的時間間隔就到了 ret = handler(buffer) # 數據處理函數 start = current # 時間點移動 # buffer中的數據處理 # x['datetime'] > current - delta 知足這個條件的數據須要保留 buffer = [x for x in buffer if x['datetime'] > current - delta] # 裝載函數 def load(path): with open(path) as f: for line in f: field = extract(line) if field: yield field else: continue # 解析失敗時拋棄或打印日誌 # 處理函數 def do_nothing_handler(iterable: list): print(iterable) print(len(iterable)) # test.log 日誌文件爲測試文件 window(load('test.log'), do_nothing_handler, 300, 300)
對日誌的分析函數多種多樣,如:在時間窗口內請求的狀態碼的佔比,在時間窗口內請求url的數量,各類useragent訪問的數量(不是時間窗口內的統計)等。而這些分析函數每每是須要並行執行的,因此這裏也會引入分發器,在分發器中使用多線程來執行相應的分析處理函數。
def dispatcher(src): # src參數是一個生成器,便是數據源,通過extract函數處理過一行數據 queues = [] threads = [] def reg(handler, width, interval): # handler: 分析函數 # width: 時間窗口 # interval: 分析函數調用時間間隔 # 註冊時須要分配消費者函數window各自的隊列,建立各自的線程對象 q = Queue() queues.append(q) t = threading.Thread(target=window, args=(q, handler, width, interval)) threads.append(t) def run(): # 啓動線程 for t in threads: t.start() # 把日誌裝載函數load返回的數據put進各個消費者的隊列,相似於廣播形式 for x in src: for q in queues: q.put(x) return reg, run # 返回註冊函數和運行函數
原window
函數的第一個參數爲src
一個生成器,如今加入分發器後,第一參數變成了一個q
隊列,因此window
函數會作相應的變動。
在一個時間窗口內請求狀態碼的佔比能反應出後端服務的健康情況。在window
這個窗口函數中會把一個指定時間內的日誌append到一個list中,若是觸發了時間窗口,那就會調用相應的handler
函數對list中的日誌進行分析,因此狀態碼分析函數接收一個list做爲參數。
from collections import defaultdict def status_handler(iterable: list): status = defaultdict(lambda: 0) # 使用默認字典,初始化值爲0 for item in iterable: key = item['status'] status[key] += 1 total = sum(status.values()) # total = len(iterable) # 數據總條數求值有多種方式,也能夠上for循環中計數 result = {k: v/total*100 for k, v in status.items()} # 求狀態碼在該時間窗口內所佔的百分比 # print(result) return result # 返回一個字典, {'200': 95, '502': 2.4}
客戶端請求時使用的是什麼瀏覽器也能夠進行統計,只是這個統計不針對時間窗口內的日誌進行統計,只針對時間窗口內統計意義不太大,而是須要進行累計統計。對useragent有專門的包能夠對其進行解析,名稱爲user_agents
,先看一個事例:
from user_agents import parse # pip install user-agents ua = "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0" u = parse(ua) print(u.browser) print(u.browser.family, u.browser.version_string)
輸出
Browser(family='Sogou Explorer', version=(1, 0), version_string='1.0') Sogou Explorer 1.0
因此能夠利用user_agents
包解析出user agent的名稱和相應的版本。
要對user agent進行統計,那在對日誌文件中的一行日誌文件進行轉換時,須要增長對user agent作相應的轉換,在opt
這個字典中增長相應的轉換函數,以下:
from user_agents import parse ops = { 'datetime': lambda time_str: datetime.datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S %z'), 'status': int, 'size': int, 'useragent': lambda useragent: parse(useragent) }
User agent統計代碼以下:
# 統計結果放在函數useragent_handler以外,記錄當前處理的日誌文件,若是放在useragent_handler內則是統計時間窗口內的日誌條數 ua_dict = defaultdict(lambda: 0) # useragent分析函數 def useragent_handler(iterable: list): for item in iterable: ua = item['useragent'] # ua是一個 user_agents.parsers.UserAgent 對象 key = (ua.browser.family, ua.browser.version_string) # key爲一個元組 ua_dict[key] += 1 # print(sorted(ua_dict.items(), key=lambda x: x[1], reverse=True)) # 以useragent的數量升序排序 return ua_dict
import re import datetime from queue import Queue import threading from collections import defaultdict from pathlib import Path from user_agents import parse ops = { 'datetime': lambda time_str: datetime.datetime.strptime(time_str, '%d/%b/%Y:%H:%M:%S %z'), 'status': int, 'size': int, 'useragent': lambda useragent: parse(useragent) } pattern = '''(?P<remote>[\d.]{7,}) - - \[(?P<datetime>[^\[\]]+)\] "(?P<method>\w+) (?P<url>\S+) \ (?P<protocol>[\w+/\d.]+)" (?P<status>\d+) (?P<size>\d+) .+ "(?P<useragent>.+)"''' regex = re.compile(pattern) # 日誌抽取函數 def extract(line) -> dict: matcher = regex.match(line) if matcher: return {k: ops.get(k, lambda x: x)(v) for k, v in matcher.groupdict().items()} else: return None def window(src: Queue, handler, width: int, interval: int): """ 窗口函數 :param src: 是一個queue :param handler: 數據處理函數 :param width: 時間窗口寬度,秒 :param interval: 處理時間間隔,秒 :return: """ start = datetime.datetime.strptime('1970/01/01 00:00:00 +0800', '%Y/%m/%d %H:%M:%S %z') # 也能夠設置爲None current = datetime.datetime.strptime('1970/01/01 00:00:00 +0800', '%Y/%m/%d %H:%M:%S %z') delta = datetime.timedelta(seconds=width - interval) buffer = [] while True: data = src.get() # 阻塞模式 if data: buffer.append(data) current = data['datetime'] if (current - start).total_seconds() >= interval: # 數據中的時間與定義的start相減後大於等於interval後,處理buffer中的時間間隔就到了 ret = handler(buffer) # 數據處理函數 start = current # 時間點移動 # buffer中的數據處理 # x['datetime'] > current - delta 知足這個條件的數據須要保留,結合「01-日誌分析回顧及數據載入」的圖形看 buffer = [x for x in buffer if x['datetime'] > current - delta] print(ret) # ret是觸發調用函數的執行返回值,在此能夠窗口內的分析結果進行相應的判斷,好比502的狀態碼佔比大於多少時作什麼 # 抽取出日誌文件讀取函數,是一個生成器函數 def open_file(path: str): with open(str(path)) as f: for line in f: field = extract(line) if field: yield field else: continue # 解析失敗時拋棄或打印日誌 # 裝載函數,可受多個路徑,能夠是目錄,若是是目錄只迭代一該目錄下的全部文件,目錄忽略 def load(*path): for item in path: p = Path(item) if not p.exists(): continue if p.is_dir(): for file in p.iterdir(): if file.is_file(): # 只處理文件,目錄無論 yield from open_file(str(file)) elif p.is_file(): yield from open_file(str(p)) # 處理函數 def do_nothing_handler(iterable: list): print(iterable) print(len(iterable)) # 狀態碼分析處理函數,返回各狀態碼的佔比 def status_handler(iterable: list): status = defaultdict(lambda: 0) for item in iterable: key = item['status'] status[key] += 1 total = sum(status.values()) # total = len(iterable) # 數據總條數求值有多種方式,也能夠上for循環中計數 result = {k: v/total*100 for k, v in status.items()} # print(result) return result # 統計結果放在函數useragent_handler以外,記錄當前處理的日誌文件,若是放在useragent_handler內則是統計時間窗口內的日誌條數 ua_dict = defaultdict(lambda: 0) # useragent分析函數 def useragent_handler(iterable: list): for item in iterable: ua = item['useragent'] # ua是一個 user_agents.parsers.UserAgent 對象 key = (ua.browser.family, ua.browser.version_string) # key爲一個元組 ua_dict[key] += 1 # print(sorted(ua_dict.items(), key=lambda x: x[1], reverse=True)) # 以useragent的數量升序排序 return ua_dict # 分發器函數 # 1. 每個window函數擁有本身的隊列,相應的處理函數,以及時間窗口及處理時間間隔 # 2. window函數至關於就是生產者消費者模型中的消費者,在實際的業務環境中消費者可能許多,拿到相同的一份數據後進行各自的handler處理 def dispatcher(src): # src參數是一個生成器,便是數據源 queues = [] threads = [] def reg(handler, width, interval): # 註冊時須要分配消費都函數window各息的隊列,建立各息的線程對象 q = Queue() queues.append(q) t = threading.Thread(target=window, args=(q, handler, width, interval)) threads.append(t) def run(): # 啓動線程 for t in threads: t.start() # 把日誌裝載函數load返回的數據put進各個消費者的隊列 for x in src: for q in queues: q.put(x) return reg, run path = 'test.log' # 接受可變參數傳遞,如 path = 'test.log, /var/logs/' regs, runs = dispatcher(load(path)) # 註冊窗口,能夠註冊多個,根據各自的日誌處理邏輯傳入handler, width, interval便可。這樣每一個window函數都在各自的線程中運行,互不影響。 # regs(do_nothing_handler, 10, 5) # 註冊狀態碼處理函數 regs(status_handler, 10, 5) # 註冊useragent分析函數 regs(useragent_handler, 10, 10) # useragent分析不是對時間窗口內的數據求和或求平均值,數據不須要有重複 # 運行 runs()