該腳本實現當天的重點URL訪問量和頭一天相同小時時間段內的訪問量對比,超過指定的閾值則報警通知而且記錄日誌
#! /usr/local/bin/python3
import bz2
import re
import contextlib
import shelve
import datetime
import sys
#enviroment.py爲提供一些公共函數的函數庫,部分使用到的函數下文給出
import enviroment as eo
'''
def get_log(log_filename,log_format=
'[%(asctime)-15s] [%(levelname)s] [%(filename)s] [%(funcName)s] [%(lineno)d] %(message)s'
,log_level='debug'):
if log_level=='debug':
logging.basicConfig(format=log_format,filename=log_filename,level=logging.DEBUG)
def get_local_ip_tag():
short_ip=(subprocess.check_output(
'/sbin/ifconfig | grep eth0 -A1 | tail -1 | awk \'{print $2}\' | awk -F "." \'{print $4}\''
,shell=True)
.decode().strip())
return short_ip+'log:'
#下文中的 eo.sms_XX(message) 函數都是對該函數的再包裝,提供receivers
def send_message(receivers,message):
for receiver in receivers:
warn_url='http_sendmessage_interface?'
query_args={'username':receivers[receiver],'message':get_local_ip_tag()+message}
encoded_args=urllib.parse.urlencode(query_args)
warn_url=warn_url+encoded_args
response=urllib.request.urlopen(warn_url)
'''
def generate_previous_hour():
format='%d/%b/%Y:%H'
previous_hour_time=datetime.datetime.today().timetuple().tm_hour-1
return previous_hour_time,(datetime.datetime.today()-datetime.timedelta(hours=1)).strftime(format)
def check_hour_logs(logpath,keys,examine_hour):
result={key:0 for key in keys}
regex_previous_hour=re.compile(examine_hour)
#因爲日誌是按照時間順序,並且時間戳字段line.split(' ')[3]在一個月內能夠按照字典順序比較
#這裏能夠改進成時間比較。
#cron的自己暫時設定爲0點小時不執行,因此不會觸發跨月的這個漏洞
#這裏不進行時間比較還有跟實際使用腳本相關的其餘緣由,字典順序夠用。
#搜索指定小時的日誌,能夠跨越式前進搜索
#這裏增量設定爲10M,能夠根據實際的日誌大小進行設定
step=10*1024*1024
with open(logpath) as file:
line=file.readline()
while line:
time_line=line.split(' ')[3][1:]
if time_line>examine_hour:
file.seek(file.tell()-step-10240)
#再讀一行保證下一行爲完整行
file.readline()
break
file.seek(file.tell()+step)
file.readline()
line=file.readline().strip()
for line in file:
line=line.strip()
for key in keys:
regex_key=re.compile(key)
words=line.split(' ')
#words的第四個字段爲時間戳字段、第七個字段爲訪問URL
if regex_previous_hour.search(words[3]) and regex_key.search(words[6]):
result[key]+=1
return result
def compare_data(db,previous_hour_time,examine_keys):
with contextlib.closing(shelve.open(db)) as dba:
yesterday_key='yesterday'
today_key='today'
if yesterday_key in dba:
try:
yesterday=dba[yesterday_key]
today=dba[today_key]
for key in examine_keys:
#報警閾值爲今天的某個URL訪問量大於400,而且是昨天相同時間段的兩倍
if int(today[previous_hour_time][key]) > 2*int(yesterday[previous_hour_time][key]) and int(today[previous_hour_time][key])>400:
eo.logging.error('alarm. %s too large.the number of visits is %s',key,today[previous_hour_time][key])
message='alarm. {0} too large.the number of visits is {1}'.format(key,today[previous_hour_time][key])
eo.sms_XX(message)
except KeyError:
eo.logging.warning('%s',sys.exc_info()[0:2])
def store_data(db,previous_hour_data,previous_hour_time):
yesterday_key='yesterday'
today_key='today'
with contextlib.closing(shelve.open(db,writeback=True)) as dba:
if today_key not in dba:
today={}
else:
today=dba[today_key]
today[previous_hour_time]=previous_hour_data
if int(previous_hour_time)==23:
dba[yesterday_key]=dba[today_key].copy()
#使用這些函數能夠方便的操做db,好比查看能夠使用print_all_data函數
#這些方法也能夠放在environment.py中
def put_data(filename,key,saved_data):
with contextlib.closing(shelve.open(filename)) as dba:
dba[key]=saved_data
def get_data(filename,key):
with contextlib.closing(shelve.open(filename)) as dba:
return dba[key]
def print_all_data(filename):
with contextlib.closing(shelve.open(filename)) as dba:
for key in dba:
print(key+':'+str(dba[key]))
def delete_data(filename,key=0):
with contextlib.closing(shelve.open(filename)) as dba:
try:
if key:
del dba[key]
else:
dba.clear()
except KeyError:
print('find a KeyError no key:'+key)
def main(log_name,db,examine_keys,my_log):
eo.get_log(my_log)
previous_hour_time,previous_hour=generate_previous_hour()
previous_hour_data=check_hour_logs(log_name,examine_keys,previous_hour)
store_data(db,previous_hour_data,previous_hour_time)
compare_data(db,previous_hour_time,examine_keys)
if __name__ == '__main__':
log_name=''
basedir=''
db=basedir+'/examine_important_url_hours.db'
my_log=basedir+'/run.log'
examine_keys=[]
main(log_name,db,examine_keys,my_log)