這個日誌沒有依賴本身的其餘包,複製便可運行,也能夠從pypi網站上下載或者pip來安裝這個日誌。html
一、日誌內置了7種模板,其中模版4和模板5,能夠實現點擊日誌跳轉到指定文件指定行數的功能,前所未有的實現這種方式。python
二、使用了ColorHandler做爲默認的控制檯顯示日誌,而不是使用官方的StramHandler,實現五光十色的日誌,在茫茫大海的日誌中一眼就能看出哪些是調試日誌,哪些是錯誤日誌哪些是警告日誌和嚴重日誌。綠色表明debug,天藍色表明info,黃色表明warning,粉紅色表明錯誤,血紅色表明嚴重錯誤,顏色符合正常邏輯,具體的顏色顯示業餘本身設置的pycharm主題和配色有關,建議使用黑色主題,具體的顏色顯示與pycahrm版本也有一些關係。linux
三、實現了進程安全的日誌切片,引用的是第三方的Handlermongodb
四、添加了對國內郵箱 qq 163等支持的mailhandler,而且支持郵件發送控頻。數據庫
五、添加了MongoHanler,能夠自動拆分日誌字段插入mongojson
6.一、以上這些handler都不須要去手動調用添加各類handler,都是經過傳參的方式,如,設置了文件名那麼自動生成文件日誌,添加了mongo的url登陸連接,則添加mongohandler,以此類推。bootstrap
6.二、要搞清楚爲啥logger和各類handler,要弄清楚日誌命名空間,各類handler的關係和logger的關係必須弄清楚23種設計模式的觀察者模式,搞清楚這個模式了,就能夠本身擴展各類各樣的handler來知足本身的需求。windows
爲何要使用日誌呢,設計模式
以前同事所有print,十分蛋疼,項目有幾十萬行,一運行起來,各類地方嵌套import各類模塊,處處去print,十分操蛋,徹底不知道哪裏冒出來的日誌,很差禁止,擴展不了各類handler,總之比日誌差太多了。。api
拿print當作日誌用是屬於py很初級的表現。
這個很長,是加了不少種handler,和同類型handler自動去重。
簡化版的是打猴子補丁自動變彩,不須要使用這個日誌了。
# coding=utf8 """ 日誌管理,支持日誌打印到控制檯和寫入切片文件和mongodb和email和釘釘機器人和elastic和kafka。 使用方式爲 logger = LogManager('logger_name').get_and_add_handlers(log_level_int=1, is_add_stream_handler=True, log_path=None, log_filename=None, log_file_size=10,mongo_url=None,formatter_template=2) 或者 logger = LogManager('logger_name').get_without_handlers(),此種沒有handlers不當即記錄日誌,以後能夠在單獨統一的總閘處對全部日誌根據loggerame進行get_and_add_handlers添加相關的各類handlers 建立一個郵件日誌的用法爲 logger = LogManager.bulid_a_logger_with_mail_handler('mail_logger_name', mail_time_interval=10, toaddrs=('909686xxx@qq.com', 'yangxx4508@dingtalk.com',subject='你的主題)),使用了獨立的建立方式 concurrent_log_handler的ConcurrentRotatingFileHandler解決了logging模塊自帶的RotatingFileHandler多進程切片錯誤,此ConcurrentRotatingFileHandler在win和linux多進程場景下log文件切片都ok. 一、根據日誌級別,使用coolorhanlder代替straemhandler打印5種顏色的日誌,一目瞭然哪裏是嚴重的日誌。 二、帶有多種handler,郵件 mongo stream file的。 三、支持pycharm點擊日誌跳轉到對應代碼文件的對應行。 四、對相同命名空間的logger能夠無限添加同種類型的handlers,不會重複使用同種handler記錄日誌。不須要用戶本身去判斷。 """ import json import traceback from queue import Queue import socket import datetime import sys import os from elasticsearch import Elasticsearch, helpers from threading import Lock, Thread import unittest import time from collections import OrderedDict import pymongo import logging from logging import handlers from concurrent_log_handler import ConcurrentRotatingFileHandler # 須要安裝。concurrent-log-handler==0.9.1 from kafka import KafkaProducer from app import config as app_config os_name = os.name DING_TALK_TOKEN = '3ddxxxxxxxxx' # 釘釘報警機器人 EMAIL_HOST = ('smtp.sohu.com', 465) EMAIL_FROMADDR = 'yxx@sohu.com' EMAIL_TOADDRS = ('chao.xx@ab.com', 'yxx@cd.com',) EMAIL_CREDENTIALS = ('yxx@sohu.com', 'acb1xxx') # ELASTIC_HOST = '1xx.90.89.xx' ELASTIC_PORT = 9200 ALWAYS_ADD_ES_HANDLER_IN_TEST_ENVIRONENT = True KAFKA_BOOTSTRAP_SERVERS = ['1xx.90.89.xx:9092'] # noinspection PyProtectedMember,PyUnusedLocal,PyIncorrectDocstring def very_nb_print(*args, sep=' ', end='\n', file=None): """ 超流弊的print補丁 :param x: :return: """ # 獲取被調用函數在被調用時所處代碼行數 line = sys._getframe().f_back.f_lineno # 獲取被調用函數所在模塊文件名 file_name = sys._getframe(1).f_code.co_filename # sys.stdout.write(f'"{__file__}:{sys._getframe().f_lineno}" {x}\n') args = (str(arg) for arg in args) # REMIND 防止是數字不能被join sys.stdout.write(f'"{file_name}:{line}" {time.strftime("%H:%M:%S")} \033[0;94m{"".join(args)}\033[0m\n') # 36 93 96 94 # noinspection PyShadowingBuiltins # print = very_nb_print formatter_dict = { 1: logging.Formatter( '日誌時間【%(asctime)s】 - 日誌名稱【%(name)s】 - 文件【%(filename)s】 - 第【%(lineno)d】行 - 日誌等級【%(levelname)s】 - 日誌信息【%(message)s】', "%Y-%m-%d %H:%M:%S"), 2: logging.Formatter( '%(asctime)s - %(name)s - %(filename)s - %(funcName)s - %(lineno)d - %(levelname)s - %(message)s', "%Y-%m-%d %H:%M:%S"), 3: logging.Formatter( '%(asctime)s - %(name)s - 【 File "%(pathname)s", line %(lineno)d, in %(funcName)s 】 - %(levelname)s - %(message)s', "%Y-%m-%d %H:%M:%S"), # 一個模仿traceback異常的可跳轉到打印日誌地方的模板 4: logging.Formatter( '%(asctime)s - %(name)s - "%(filename)s" - %(funcName)s - %(lineno)d - %(levelname)s - %(message)s - File "%(pathname)s", line %(lineno)d ', "%Y-%m-%d %H:%M:%S"), # 這個也支持日誌跳轉 5: logging.Formatter( '%(asctime)s - %(name)s - "%(pathname)s:%(lineno)d" - %(funcName)s - %(levelname)s - %(message)s', "%Y-%m-%d %H:%M:%S"), # 我認爲的最好的模板,推薦 6: logging.Formatter('%(name)s - %(asctime)-15s - %(filename)s - %(lineno)d - %(levelname)s: %(message)s', "%Y-%m-%d %H:%M:%S"), 7: logging.Formatter('%(levelname)s - %(filename)s - %(lineno)d - %(message)s'), # 一個只顯示簡短文件名和所處行數的日誌模板 } # noinspection PyMissingOrEmptyDocstring class LogLevelException(Exception): def __init__(self, log_level): err = '設置的日誌級別是 {0}, 設置錯誤,請設置爲1 2 3 4 5 範圍的數字'.format(log_level) Exception.__init__(self, err) # noinspection PyMissingOrEmptyDocstring class MongoHandler(logging.Handler): """ 一個mongodb的log handler,支持日誌按loggername建立不一樣的集合寫入mongodb中 """ # msg_pattern = re.compile('(\d+-\d+-\d+ \d+:\d+:\d+) - (\S*?) - (\S*?) - (\d+) - (\S*?) - ([\s\S]*)') def __init__(self, mongo_url, mongo_database='logs'): """ :param mongo_url: mongo鏈接 :param mongo_database: 保存日誌的數據庫,默認使用logs數據庫 """ logging.Handler.__init__(self) mongo_client = pymongo.MongoClient(mongo_url) self.mongo_db = mongo_client.get_database(mongo_database) def emit(self, record): # noinspection PyBroadException, PyPep8 try: """如下使用解析日誌模板的方式提取出字段""" # msg = self.format(record) # logging.LogRecord # msg_match = self.msg_pattern.search(msg) # log_info_dict = {'time': msg_match.group(1), # 'name': msg_match.group(2), # 'file_name': msg_match.group(3), # 'line_no': msg_match.group(4), # 'log_level': msg_match.group(5), # 'detail_msg': msg_match.group(6), # } level_str = None if record.levelno == 10: level_str = 'DEBUG' elif record.levelno == 20: level_str = 'INFO' elif record.levelno == 30: level_str = 'WARNING' elif record.levelno == 40: level_str = 'ERROR' elif record.levelno == 50: level_str = 'CRITICAL' log_info_dict = OrderedDict() log_info_dict['time'] = time.strftime('%Y-%m-%d %H:%M:%S') log_info_dict['name'] = record.name log_info_dict['file_path'] = record.pathname log_info_dict['file_name'] = record.filename log_info_dict['func_name'] = record.funcName log_info_dict['line_no'] = record.lineno log_info_dict['log_level'] = level_str log_info_dict['detail_msg'] = record.msg col = self.mongo_db.get_collection(record.name) col.insert_one(log_info_dict) except (KeyboardInterrupt, SystemExit): raise except Exception: self.handleError(record) class KafkaHandler(logging.Handler): """ 日誌批量寫入kafka中。 """ ES_INTERVAL_SECONDS = 0.5 host_name = socket.gethostname() host_process = f'{host_name} -- {os.getpid()}' script_name = sys.argv[0] task_queue = Queue() last_es_op_time = time.time() has_start_do_bulk_op = False kafka_producer = None es_index_prefix = 'pylog-' def __init__(self, bootstrap_servers, **configs): """ :param elastic_hosts: es的ip地址,數組類型 :param elastic_port: es端口 :param index_prefix: index名字前綴。 """ logging.Handler.__init__(self) producer = KafkaProducer(bootstrap_servers=bootstrap_servers, **configs) if not self.__class__.kafka_producer: self.__class__.kafka_producer = producer t = Thread(target=self._do_bulk_op) t.setDaemon(True) t.start() @classmethod def __add_task_to_bulk(cls, task): cls.task_queue.put(task) # noinspection PyUnresolvedReferences @classmethod def __clear_bulk_task(cls): cls.task_queue.queue.clear() @classmethod def _do_bulk_op(cls): if cls.has_start_do_bulk_op: return cls.has_start_do_bulk_op = True # very_nb_print(cls.kafka_producer) while 1: try: if cls.task_queue.qsize() > 10000: very_nb_print('kafka防止意外日誌積累太多了,內存泄漏') cls.__clear_bulk_task() return # noinspection PyUnresolvedReferences tasks = list(cls.task_queue.queue) cls.__clear_bulk_task() for task in tasks: topic = (cls.es_index_prefix + task['name']).replace('.', '').replace('_', '').replace('-', '') # very_nb_print(topic) cls.kafka_producer.send(topic, json.dumps(task).encode()) cls.last_es_op_time = time.time() except Exception as e: very_nb_print(e) finally: time.sleep(cls.ES_INTERVAL_SECONDS) def emit(self, record): # noinspection PyBroadException, PyPep8 try: level_str = None if record.levelno == 10: level_str = 'DEBUG' elif record.levelno == 20: level_str = 'INFO' elif record.levelno == 30: level_str = 'WARNING' elif record.levelno == 40: level_str = 'ERROR' elif record.levelno == 50: level_str = 'CRITICAL' log_info_dict = OrderedDict() log_info_dict['@timestamp'] = datetime.datetime.utcfromtimestamp(record.created).isoformat() log_info_dict['time'] = time.strftime('%Y-%m-%d %H:%M:%S') log_info_dict['name'] = record.name log_info_dict['host'] = self.host_name log_info_dict['host_process'] = self.host_process log_info_dict['file_path'] = record.pathname log_info_dict['file_name'] = record.filename log_info_dict['func_name'] = record.funcName log_info_dict['line_no'] = record.lineno log_info_dict['log_level'] = level_str log_info_dict['msg'] = str(record.msg) log_info_dict['script'] = self.script_name log_info_dict['es_index'] = f'{self.es_index_prefix}{record.name.lower()}' self.__add_task_to_bulk(log_info_dict) except (KeyboardInterrupt, SystemExit): raise except Exception: self.handleError(record) class ElasticHandler000(logging.Handler): """ 日誌批量寫入es中。 """ ES_INTERVAL_SECONDS = 2 host_name = socket.gethostname() def __init__(self, elastic_hosts: list, elastic_port, index_prefix='pylog-'): """ :param elastic_hosts: es的ip地址,數組類型 :param elastic_port: es端口 :param index_prefix: index名字前綴。 """ logging.Handler.__init__(self) self._es_client = Elasticsearch(elastic_hosts, port=elastic_port) self._index_prefix = index_prefix self._task_list = [] self._task_queue = Queue() self._last_es_op_time = time.time() t = Thread(target=self._do_bulk_op) t.setDaemon(True) t.start() def __add_task_to_bulk(self, task): self._task_queue.put(task) def __clear_bulk_task(self): # noinspection PyUnresolvedReferences self._task_queue.queue.clear() def _do_bulk_op(self): while 1: try: if self._task_queue.qsize() > 10000: very_nb_print('防止意外日誌積累太多了,不插入es了。') self.__clear_bulk_task() return # noinspection PyUnresolvedReferences tasks = list(self._task_queue.queue) self.__clear_bulk_task() helpers.bulk(self._es_client, tasks) self._last_es_op_time = time.time() except Exception as e: very_nb_print(e) finally: time.sleep(1) def emit(self, record): # noinspection PyBroadException, PyPep8 try: level_str = None if record.levelno == 10: level_str = 'DEBUG' elif record.levelno == 20: level_str = 'INFO' elif record.levelno == 30: level_str = 'WARNING' elif record.levelno == 40: level_str = 'ERROR' elif record.levelno == 50: level_str = 'CRITICAL' log_info_dict = OrderedDict() log_info_dict['@timestamp'] = datetime.datetime.utcfromtimestamp(record.created).isoformat() log_info_dict['time'] = time.strftime('%Y-%m-%d %H:%M:%S') log_info_dict['name'] = record.name log_info_dict['host'] = self.host_name log_info_dict['file_path'] = record.pathname log_info_dict['file_name'] = record.filename log_info_dict['func_name'] = record.funcName log_info_dict['line_no'] = record.lineno log_info_dict['log_level'] = level_str log_info_dict['msg'] = str(record.msg) self.__add_task_to_bulk({ "_index": f'{self._index_prefix}{record.name.lower()}', "_type": f'{self._index_prefix}{record.name.lower()}', "_source": log_info_dict }) # self.__add_task_to_bulk({ # "_index": f'{self._index_prefix}{record.name.lower()}', # "_type": f'{self._index_prefix}{record.name.lower()}', # "_source": log_info_dict # }) # if time.time() - self._last_es_op_time > self.ES_INTERVAL_SECONDS: # self._do_bulk_op() except (KeyboardInterrupt, SystemExit): raise except Exception: self.handleError(record) # noinspection PyUnresolvedReferences class ElasticHandler(logging.Handler): """ 日誌批量寫入es中。 """ ES_INTERVAL_SECONDS = 0.5 host_name = socket.gethostname() host_process = f'{host_name} -- {os.getpid()}' script_name = sys.argv[0] task_queue = Queue() last_es_op_time = time.time() has_start_do_bulk_op = False def __init__(self, elastic_hosts: list, elastic_port, index_prefix='pylog-'): """ :param elastic_hosts: es的ip地址,數組類型 :param elastic_port: es端口 :param index_prefix: index名字前綴。 """ logging.Handler.__init__(self) self._es_client = Elasticsearch(elastic_hosts, port=elastic_port) self._index_prefix = index_prefix t = Thread(target=self._do_bulk_op) t.setDaemon(True) t.start() @classmethod def __add_task_to_bulk(cls, task): cls.task_queue.put(task) # noinspection PyUnresolvedReferences @classmethod def __clear_bulk_task(cls): cls.task_queue.queue.clear() def _do_bulk_op(self): if self.__class__.has_start_do_bulk_op: return self.__class__.has_start_do_bulk_op = True while 1: try: if self.__class__.task_queue.qsize() > 10000: very_nb_print('防止意外日誌積累太多了,不插入es了。') self.__clear_bulk_task() return tasks = list(self.__class__.task_queue.queue) self.__clear_bulk_task() helpers.bulk(self._es_client, tasks) self.__class__.last_es_op_time = time.time() except Exception as e: very_nb_print(e) finally: time.sleep(self.ES_INTERVAL_SECONDS) def emit(self, record): # noinspection PyBroadException, PyPep8 try: level_str = None if record.levelno == 10: level_str = 'DEBUG' elif record.levelno == 20: level_str = 'INFO' elif record.levelno == 30: level_str = 'WARNING' elif record.levelno == 40: level_str = 'ERROR' elif record.levelno == 50: level_str = 'CRITICAL' log_info_dict = OrderedDict() log_info_dict['@timestamp'] = datetime.datetime.utcfromtimestamp(record.created).isoformat() log_info_dict['time'] = time.strftime('%Y-%m-%d %H:%M:%S') log_info_dict['name'] = record.name log_info_dict['host'] = self.host_name log_info_dict['host_process'] = self.host_process log_info_dict['file_path'] = record.pathname log_info_dict['file_name'] = record.filename log_info_dict['func_name'] = record.funcName log_info_dict['line_no'] = record.lineno log_info_dict['log_level'] = level_str log_info_dict['msg'] = str(record.msg) log_info_dict['script'] = self.script_name self.__add_task_to_bulk({ "_index": f'{self._index_prefix}{record.name.lower()}', "_type": f'{self._index_prefix}{record.name.lower()}', "_source": log_info_dict }) # self.__add_task_to_bulk({ # "_index": f'{self._index_prefix}{record.name.lower()}', # "_type": f'{self._index_prefix}{record.name.lower()}', # "_source": log_info_dict # }) # if time.time() - self._last_es_op_time > self.ES_INTERVAL_SECONDS: # self._do_bulk_op() except (KeyboardInterrupt, SystemExit): raise except Exception: self.handleError(record) class ColorHandler000(logging.Handler): """彩色日誌handler,根據不一樣級別的日誌顯示不一樣顏色""" bule = 96 if os_name == 'nt' else 36 yellow = 93 if os_name == 'nt' else 33 def __init__(self): logging.Handler.__init__(self) self.formatter_new = logging.Formatter( '%(asctime)s - %(name)s - "%(filename)s" - %(funcName)s - %(lineno)d - %(levelname)s - %(message)s', "%Y-%m-%d %H:%M:%S") # 對控制檯日誌單獨優化顯示和跳轉,單獨對字符串某一部分使用特殊顏色,主要用於第四種模板,以避免filehandler和mongohandler中帶有\033 @classmethod def _my_align(cls, string, length): if len(string) > length * 2: return string custom_length = 0 for w in string: custom_length += 1 if cls._is_ascii_word(w) else 2 if custom_length < length: place_length = length - custom_length string += ' ' * place_length return string @staticmethod def _is_ascii_word(w): if ord(w) < 128: return True def emit(self, record): """ 30 40 黑色 31 41 紅色 32 42 綠色 33 43 黃色 34 44 藍色 35 45 紫紅色 36 46 青藍色 37 47 白色 :type record:logging.LogRecord :return: """ if self.formatter is formatter_dict[4] or self.formatter is self.formatter_new: self.formatter = self.formatter_new if os.name == 'nt': self.__emit_for_fomatter4_pycahrm(record) # 使用模板4並使用pycharm時候 else: self.__emit_for_fomatter4_linux(record) # 使用模板4並使用linux時候 else: self.__emit(record) # 其餘模板 def __emit_for_fomatter4_linux(self, record): """ 當使用模板4針對linxu上的終端打印優化顯示 :param record: :return: """ # noinspection PyBroadException,PyPep8 try: msg = self.format(record) file_formatter = ' ' * 10 + '\033[7mFile "%s", line %d\033[0m' % (record.pathname, record.lineno) if record.levelno == 10: print('\033[0;32m%s' % self._my_align(msg, 150) + file_formatter) elif record.levelno == 20: print('\033[0;34m%s' % self._my_align(msg, 150) + file_formatter) elif record.levelno == 30: print('\033[0;33m%s' % self._my_align(msg, 150) + file_formatter) elif record.levelno == 40: print('\033[0;35m%s' % self._my_align(msg, 150) + file_formatter) elif record.levelno == 50: print('\033[0;31m%s' % self._my_align(msg, 150) + file_formatter) except (KeyboardInterrupt, SystemExit): raise except Exception: self.handleError(record) def __emit_for_fomatter4_pycahrm(self, record): """ 當使用模板4針對pycahrm的打印優化顯示 :param record: :return: """ # \033[0;93;107mFile "%(pathname)s", line %(lineno)d, in %(funcName)s\033[0m # noinspection PyBroadException try: msg = self.format(record) # for_linux_formatter = ' ' * 10 + '\033[7m;File "%s", line %d\033[0m' % (record.pathname, record.lineno) file_formatter = ' ' * 10 + '\033[0;93;107mFile "%s", line %d\033[0m' % (record.pathname, record.lineno) if record.levelno == 10: print('\033[0;32m%s\033[0m' % self._my_align(msg, 200) + file_formatter) # 綠色 elif record.levelno == 20: print('\033[0;36m%s\033[0m' % self._my_align(msg, 200) + file_formatter) # 青藍色 elif record.levelno == 30: print('\033[0;92m%s\033[0m' % self._my_align(msg, 200) + file_formatter) # 藍色 elif record.levelno == 40: print('\033[0;35m%s\033[0m' % self._my_align(msg, 200) + file_formatter) # 紫紅色 elif record.levelno == 50: print('\033[0;31m%s\033[0m' % self._my_align(msg, 200) + file_formatter) # 血紅色 except (KeyboardInterrupt, SystemExit): raise except: # NOQA self.handleError(record) def __emit(self, record): # noinspection PyBroadException try: msg = self.format(record) if record.levelno == 10: print('\033[0;32m%s\033[0m' % msg) # 綠色 elif record.levelno == 20: print('\033[0;%sm%s\033[0m' % (self.bule, msg)) # 青藍色 36 96 elif record.levelno == 30: print('\033[0;%sm%s\033[0m' % (self.yellow, msg)) elif record.levelno == 40: print('\033[0;35m%s\033[0m' % msg) # 紫紅色 elif record.levelno == 50: print('\033[0;31m%s\033[0m' % msg) # 血紅色 except (KeyboardInterrupt, SystemExit): raise except: # NOQA self.handleError(record) class ColorHandler(logging.Handler): """ A handler class which writes logging records, appropriately formatted, to a stream. Note that this class does not close the stream, as sys.stdout or sys.stderr may be used. """ terminator = '\n' bule = 96 if os_name == 'nt' else 36 yellow = 93 if os_name == 'nt' else 33 def __init__(self, stream=None, is_pycharm_2019=False): """ Initialize the handler. If stream is not specified, sys.stderr is used. """ logging.Handler.__init__(self) if stream is None: stream = sys.stdout # stderr無彩。 self.stream = stream self._is_pycharm_2019 = is_pycharm_2019 self._display_method = 7 if os_name == 'posix' else 0 def flush(self): """ Flushes the stream. """ self.acquire() try: if self.stream and hasattr(self.stream, "flush"): self.stream.flush() finally: self.release() def emit0(self, record): """ Emit a record. If a formatter is specified, it is used to format the record. The record is then written to the stream with a trailing newline. If exception information is present, it is formatted using traceback.print_exception and appended to the stream. If the stream has an 'encoding' attribute, it is used to determine how to do the output to the stream. """ # noinspection PyBroadException try: msg = self.format(record) stream = self.stream if record.levelno == 10: # msg_color = ('\033[0;32m%s\033[0m' % msg) # 綠色 msg_color = ('\033[%s;%sm%s\033[0m' % (self._display_method, 34 if self._is_pycharm_2019 else 32, msg)) # 綠色 elif record.levelno == 20: msg_color = ('\033[%s;%sm%s\033[0m' % (self._display_method, self.bule, msg)) # 青藍色 36 96 elif record.levelno == 30: msg_color = ('\033[%s;%sm%s\033[0m' % (self._display_method, self.yellow, msg)) elif record.levelno == 40: msg_color = ('\033[%s;35m%s\033[0m' % (self._display_method, msg)) # 紫紅色 elif record.levelno == 50: msg_color = ('\033[%s;31m%s\033[0m' % (self._display_method, msg)) # 血紅色 else: msg_color = msg # print(msg_color,'***************') stream.write(msg_color) stream.write(self.terminator) self.flush() except Exception: self.handleError(record) def emit(self, record): """ Emit a record. If a formatter is specified, it is used to format the record. The record is then written to the stream with a trailing newline. If exception information is present, it is formatted using traceback.print_exception and appended to the stream. If the stream has an 'encoding' attribute, it is used to determine how to do the output to the stream. """ # noinspection PyBroadException try: # very_nb_print(record) msg = self.format(record) stream = self.stream msg1, msg2 = self.__spilt_msg(record.levelno, msg) if record.levelno == 10: # msg_color = ('\033[0;32m%s\033[0m' % msg) # 綠色 msg_color = f'\033[0;32m{msg1}\033[0m \033[7;32m{msg2}\033[0m' # 綠色 elif record.levelno == 20: # msg_color = ('\033[%s;%sm%s\033[0m' % (self._display_method, self.bule, msg)) # 青藍色 36 96 msg_color = f'\033[0;{self.bule}m{msg1}\033[0m \033[7;{self.bule}m{msg2}\033[0m' elif record.levelno == 30: # msg_color = ('\033[%s;%sm%s\033[0m' % (self._display_method, self.yellow, msg)) msg_color = f'\033[0;{self.yellow}m{msg1}\033[0m \033[7;{self.yellow}m{msg2}\033[0m' elif record.levelno == 40: # msg_color = ('\033[%s;35m%s\033[0m' % (self._display_method, msg)) # 紫紅色 msg_color = f'\033[0;35m{msg1}\033[0m \033[7;35m{msg2}\033[0m' elif record.levelno == 50: # msg_color = ('\033[%s;31m%s\033[0m' % (self._display_method, msg)) # 血紅色 msg_color = f'\033[0;31m{msg1}\033[0m \033[7;31m{msg2}\033[0m' else: msg_color = msg # print(msg_color,'***************') stream.write(msg_color) stream.write(self.terminator) self.flush() except Exception as e: very_nb_print(e) very_nb_print(traceback.format_exc()) # self.handleError(record) @staticmethod def __spilt_msg(log_level, msg: str): split_text = '- 級別 -' if log_level == 10: split_text = '- DEBUG -' elif log_level == 20: split_text = '- INFO -' elif log_level == 30: split_text = '- WARNING -' elif log_level == 40: split_text = '- ERROR -' elif log_level == 50: split_text = '- CRITICAL -' msg_split = msg.split(split_text, maxsplit=1) return msg_split[0] + split_text, msg_split[-1] def __repr__(self): level = logging.getLevelName(self.level) name = getattr(self.stream, 'name', '') if name: name += ' ' return '<%s %s(%s)>' % (self.__class__.__name__, name, level) class CompatibleSMTPSSLHandler(handlers.SMTPHandler): """ 官方的SMTPHandler不支持SMTP_SSL的郵箱,這個能夠兩個都支持,而且支持郵件發送頻率限制 """ def __init__(self, mailhost, fromaddr, toaddrs: tuple, subject, credentials=None, secure=None, timeout=5.0, is_use_ssl=True, mail_time_interval=0): """ :param mailhost: :param fromaddr: :param toaddrs: :param subject: :param credentials: :param secure: :param timeout: :param is_use_ssl: :param mail_time_interval: 發郵件的時間間隔,能夠控制日誌郵件的發送頻率,爲0不進行頻率限制控制,若是爲60,表明1分鐘內最多發送一次郵件 """ # noinspection PyCompatibility # very_nb_print(credentials) super().__init__(mailhost, fromaddr, toaddrs, subject, credentials, secure, timeout) self._is_use_ssl = is_use_ssl self._current_time = 0 self._time_interval = 3600 if mail_time_interval < 3600 else mail_time_interval # 60分鐘發一次羣發郵件,之後用釘釘代替郵件,郵件頻率限制的太死了。 self._msg_map = dict() # 是一個內容爲鍵時間爲值得映射 self._lock = Lock() def emit0(self, record: logging.LogRecord): """ 不用這個判斷內容 """ from threading import Thread if sys.getsizeof(self._msg_map) > 10 * 1000 * 1000: self._msg_map.clear() if record.msg not in self._msg_map or time.time() - self._msg_map[record.msg] > self._time_interval: self._msg_map[record.msg] = time.time() # print('發送郵件成功') Thread(target=self.__emit, args=(record,)).start() else: very_nb_print(f' 郵件發送太頻繁間隔不足60分鐘,這次不發送這個郵件內容: {record.msg} ') def emit(self, record: logging.LogRecord): """ Emit a record. Format the record and send it to the specified addressees. """ from threading import Thread with self._lock: if time.time() - self._current_time > self._time_interval: self._current_time = time.time() Thread(target=self.__emit, args=(record,)).start() else: very_nb_print(f' 郵件發送太頻繁間隔不足60分鐘,這次不發送這個郵件內容: {record.msg} ') def __emit(self, record): # noinspection PyBroadException try: import smtplib from email.message import EmailMessage import email.utils t_start = time.time() port = self.mailport if not port: port = smtplib.SMTP_PORT smtp = smtplib.SMTP_SSL(self.mailhost, port, timeout=self.timeout) if self._is_use_ssl else smtplib.SMTP( self.mailhost, port, timeout=self.timeout) msg = EmailMessage() msg['From'] = self.fromaddr msg['To'] = ','.join(self.toaddrs) msg['Subject'] = self.getSubject(record) msg['Date'] = email.utils.localtime() msg.set_content(self.format(record)) if self.username: if self.secure is not None: smtp.ehlo() smtp.starttls(*self.secure) smtp.ehlo() smtp.login(self.username, self.password) smtp.send_message(msg) smtp.quit() # noinspection PyPep8 very_nb_print( f'發送郵件給 {self.toaddrs} 成功,' f'用時{round(time.time() - t_start, 2)} ,發送的內容是--> {record.msg} \033[0;35m!!!請去郵箱檢查,可能在垃圾郵件中\033[0m') except Exception as e: # self.handleError(record) very_nb_print( f'[log_manager.py] {time.strftime("%H:%M:%S", time.localtime())} \033[0;31m !!!!!! 郵件發送失敗,緣由是: {e} \033[0m') class DingTalkHandler(logging.Handler): def __init__(self, ding_talk_token=None, time_interval=60): super().__init__() self._ding_talk_url = f'https://oapi.dingtalk.com/robot/send?access_token={ding_talk_token}' self._current_time = 0 self._time_interval = time_interval # 最好別頻繁發。 self._lock = Lock() def emit(self, record): # from threading import Thread with self._lock: if time.time() - self._current_time > self._time_interval: # very_nb_print(self._current_time) self.__emit(record) # Thread(target=self.__emit, args=(record,)).start() self._current_time = time.time() else: very_nb_print(f' 這次離上次發送釘釘消息時間間隔不足 {self._time_interval} 秒,這次不發送這個釘釘內容: {record.msg} ') def __emit(self, record): import requests message = self.format(record) data = {"msgtype": "text", "text": {"content": message, "title": '這裏的標題能起做用嗎??'}} try: resp = requests.post(self._ding_talk_url, json=data, timeout=(30, 40)) very_nb_print(f'釘釘返回 : {resp.text}') except requests.RequestException as e: very_nb_print(f"發送消息給釘釘機器人失敗 {e}") def revision_call_handlers(self, record): # 對logging標準模塊打猴子補丁。主要是使父命名空間的handler不重複記錄當前命名空間日誌已有種類的handler。 """ 重要。這可使同名logger或父logger隨意添加同種類型的handler,確保不會重複打印。 :param self: :param record: :return: """ """ Pass a record to all relevant handlers. Loop through all handlers for this logger and its parents in the logger hierarchy. If no handler was found, output a one-off error message to sys.stderr. Stop searching up the hierarchy whenever a logger with the "propagate" attribute set to zero is found - that will be the last logger whose handlers are called. """ c = self found = 0 hdlr_type_set = set() while c: for hdlr in c.handlers: hdlr_type = type(hdlr) if hdlr_type == ColorHandler: hdlr_type = logging.StreamHandler found = found + 1 if record.levelno >= hdlr.level: if hdlr_type not in hdlr_type_set: hdlr.handle(record) hdlr_type_set.add(hdlr_type) if not c.propagate: c = None # break out else: c = c.parent # noinspection PyRedundantParentheses if (found == 0): if logging.lastResort: if record.levelno >= logging.lastResort.level: logging.lastResort.handle(record) elif logging.raiseExceptions and not self.manager.emittedNoHandlerWarning: sys.stderr.write("No handlers could be found for logger" " \"%s\"\n" % self.name) self.manager.emittedNoHandlerWarning = True logging.Logger.callHandlers = revision_call_handlers # 打猴子補丁。 # noinspection PyTypeChecker def get_logs_dir_by_folder_name(folder_name='/app/'): """獲取app文件夾的路徑,如獲得這個路徑 D:/coding/hotel_fares/app 若是沒有app文件夾,就在當前文件夾新建 """ three_parts_str_tuple = (os.path.dirname(__file__).replace('\\', '/').partition(folder_name)) # print(three_parts_str_tuple) if three_parts_str_tuple[1]: return three_parts_str_tuple[0] + three_parts_str_tuple[1] + 'logs/' # noqa else: return three_parts_str_tuple[0] + '/logs/' # NOQA def get_logs_dir_by_disk_root(): """ 返回磁盤根路徑下的pythonlogs文件夾,當使用文件日誌時候自動建立這個文件夾。 :return: """ from pathlib import Path return str(Path(Path(__file__).absolute().root) / Path('pythonlogs')) # noinspection PyMissingOrEmptyDocstring,PyPep8 class LogManager(object): """ 一個日誌管理類,用於建立logger和添加handler,支持將日誌打印到控制檯打印和寫入日誌文件和mongodb和郵件。 """ logger_name_list = [] logger_list = [] def __init__(self, logger_name=None, is_pycharm_2019=False): """ :param logger_name: 日誌名稱,當爲None時候建立root命名空間的日誌,通常狀況下千萬不要傳None,除非你肯定須要這麼作和是在作什麼 """ self._logger_name = logger_name self.logger = logging.getLogger(logger_name) self._is_pycharm_2019 = is_pycharm_2019 # 此處可使用*args ,**kwargs減小不少參數,但爲了pycharm更好的自動智能補全提示放棄這麼作 @classmethod def bulid_a_logger_with_mail_handler(cls, logger_name, log_level_int=10, *, is_add_stream_handler=True, do_not_use_color_handler=False, log_path=get_logs_dir_by_disk_root(), log_filename=None, log_file_size=100, mongo_url=None, is_add_elastic_handler=False, is_add_kafka_handler=False, ding_talk_token=DING_TALK_TOKEN, ding_talk_time_interval=60, formatter_template=5, mailhost: tuple = EMAIL_HOST, # ('smtpdm.aliyun.com', 465), # 公司郵箱有頻率限制影響業務 fromaddr: str = EMAIL_FROMADDR, # 'matafyhotel-techl@matafy.com', toaddrs: tuple = EMAIL_TOADDRS, subject: str = '馬踏飛燕日誌報警測試', credentials: tuple = EMAIL_CREDENTIALS, # ('matafyhotel-techl@matafy.com', 'DDMkXzmlZtlNXB81YrYH'), secure=None, timeout=5.0, is_use_ssl=True, mail_time_interval=60): """ 建立一個附帶郵件handler的日誌 :param logger_name: :param log_level_int: 能夠用1 2 3 4 5 ,用能夠用官方logging模塊的正規的10 20 30 40 50,兼容。 :param is_add_stream_handler: :param do_not_use_color_handler: :param log_path: :param log_filename: :param log_file_size: :param mongo_url: :param is_add_elastic_handler: 是否添加eshandler :param is_add_kafka_handler: 日誌是否發佈到kafka。 :param ding_talk_token:釘釘機器人token :param ding_talk_time_interval : 時間間隔,少於這個時間不發送釘釘消息 :param formatter_template: :param mailhost: :param fromaddr: :param toaddrs: :param subject: :param credentials: :param secure: :param timeout: :param is_use_ssl: :param mail_time_interval: 郵件的頻率控制,爲0不限制,若是爲100,表明100秒內相同內容的郵件最多發送一次郵件 :return: """ if log_filename is None: log_filename = f'{logger_name}.log' logger = cls(logger_name).get_logger_and_add_handlers(log_level_int=log_level_int, is_add_stream_handler=is_add_stream_handler, do_not_use_color_handler=do_not_use_color_handler, log_path=log_path, log_filename=log_filename, log_file_size=log_file_size, mongo_url=mongo_url, is_add_elastic_handler=is_add_elastic_handler, is_add_kafka_handler=is_add_kafka_handler, ding_talk_token=ding_talk_token, ding_talk_time_interval=ding_talk_time_interval, formatter_template=formatter_template, ) smtp_handler = CompatibleSMTPSSLHandler(mailhost, fromaddr, toaddrs, subject, credentials, secure, timeout, is_use_ssl, mail_time_interval, ) log_level_int = log_level_int * 10 if log_level_int < 10 else log_level_int smtp_handler.setLevel(log_level_int) smtp_handler.setFormatter(formatter_dict[formatter_template]) logger.addHandler(smtp_handler) return logger # 加*是爲了強制在調用此方法時候使用關鍵字傳參,若是以位置傳參強制報錯,由於此方法後面的參數中間可能之後隨時會增長更多參數,形成以前的使用位置傳參的代碼參數意義不匹配。 # noinspection PyAttributeOutsideInit def get_logger_and_add_handlers(self, log_level_int: int = 10, *, is_add_stream_handler=True, do_not_use_color_handler=False, log_path=get_logs_dir_by_disk_root(), log_filename=None, log_file_size=100, mongo_url=None, is_add_elastic_handler=False, is_add_kafka_handler=False, ding_talk_token=None, ding_talk_time_interval=60, formatter_template=5): """ :param log_level_int: 日誌輸出級別,設置爲 1 2 3 4 5,分別對應原生logging.DEBUG(10),logging.INFO(20),logging.WARNING(30),logging.ERROR(40),logging.CRITICAL(50)級別,如今能夠直接用10 20 30 40 50了,兼容了。 :param is_add_stream_handler: 是否打印日誌到控制檯 :param do_not_use_color_handler :是否禁止使用color彩色日誌 :param log_path: 設置存放日誌的文件夾路徑 :param log_filename: 日誌的名字,僅當log_path和log_filename都不爲None時候才寫入到日誌文件。 :param log_file_size :日誌大小,單位M,默認10M :param mongo_url : mongodb的鏈接,爲None時候不添加mongohandler :param is_add_elastic_handler: 是否記錄到es中。 :param is_add_kafka_handler: 日誌是否發佈到kafka。 :param ding_talk_token:釘釘機器人token :param ding_talk_time_interval : 時間間隔,少於這個時間不發送釘釘消息 :param formatter_template :日誌模板,1爲formatter_dict的詳細模板,2爲簡要模板,5爲最好模板 :type log_level_int :int :type is_add_stream_handler :bool :type log_path :str :type log_filename :str :type mongo_url :str :type log_file_size :int """ self._logger_level = log_level_int * 10 if log_level_int < 10 else log_level_int self._is_add_stream_handler = is_add_stream_handler self._do_not_use_color_handler = do_not_use_color_handler self._log_path = log_path self._log_filename = log_filename self._log_file_size = log_file_size self._mongo_url = mongo_url self._is_add_elastic_handler = is_add_elastic_handler self._is_add_kafka_handler = is_add_kafka_handler self._ding_talk_token = ding_talk_token self._ding_talk_time_interval = ding_talk_time_interval self._formatter = formatter_dict[formatter_template] self.logger.setLevel(self._logger_level) self.__add_handlers() # self.logger_name_list.append(self._logger_name) # self.logger_list.append(self.logger) return self.logger def get_logger_without_handlers(self): """返回一個不帶hanlers的logger""" return self.logger # noinspection PyMethodMayBeStatic,PyMissingOrEmptyDocstring def look_over_all_handlers(self): very_nb_print(f'{self._logger_name}名字的日誌的全部handlers是--> {self.logger.handlers}') def remove_all_handlers(self): for hd in self.logger.handlers: self.logger.removeHandler(hd) def remove_handler_by_handler_class(self, handler_class: type): """ 去掉指定類型的handler :param handler_class:logging.StreamHandler,ColorHandler,MongoHandler,ConcurrentRotatingFileHandler,MongoHandler,CompatibleSMTPSSLHandler的一種 :return: """ if handler_class not in (logging.StreamHandler, ColorHandler, MongoHandler, ConcurrentRotatingFileHandler, MongoHandler, CompatibleSMTPSSLHandler, ElasticHandler, DingTalkHandler): raise TypeError('設置的handler類型不正確') for handler in self.logger.handlers: if isinstance(handler, handler_class): self.logger.removeHandler(handler) def __add_a_hanlder(self, handlerx: logging.Handler): for hdlr in self.logger.handlers: if type(hdlr) == type(handlerx): return handlerx.setLevel(10) handlerx.setFormatter(self._formatter) self.logger.addHandler(handlerx) def __add_handlers(self): pass # REMIND 添加控制檯日誌 if self._is_add_stream_handler: handler = ColorHandler(is_pycharm_2019=self._is_pycharm_2019) if not self._do_not_use_color_handler else logging.StreamHandler() # 不使用streamhandler,使用自定義的彩色日誌 # handler = logging.StreamHandler() self.__add_a_hanlder(handler) # REMIND 添加多進程安全切片的文件日誌 if all([self._log_path, self._log_filename]): if not os.path.exists(self._log_path): os.makedirs(self._log_path) log_file = os.path.join(self._log_path, self._log_filename) rotate_file_handler = None if os_name == 'nt': # windows下用這個,非進程安全 rotate_file_handler = ConcurrentRotatingFileHandler(log_file, maxBytes=self._log_file_size * 1024 * 1024, backupCount=3, encoding="utf-8") if os_name == 'posix': # linux下可使用ConcurrentRotatingFileHandler,進程安全的日誌方式 rotate_file_handler = ConcurrentRotatingFileHandler(log_file, maxBytes=self._log_file_size * 1024 * 1024, backupCount=3, encoding="utf-8") self.__add_a_hanlder(rotate_file_handler) # REMIND 添加mongo日誌。 if self._mongo_url: self.__add_a_hanlder(MongoHandler(self._mongo_url)) # REMIND 添加es日誌。 # if app_config.env == 'test' and self._is_add_elastic_handler: if app_config.env == 'testxxx': # 使用kafka。不直接es。 """ 生產環境使用阿里雲 oss日誌,不使用這個。 """ self.__add_a_hanlder(ElasticHandler([ELASTIC_HOST], ELASTIC_PORT)) # REMIND 添加kafka日誌。 # if self._is_add_kafka_handler: if app_config.env == 'test': self.__add_a_hanlder(KafkaHandler(KAFKA_BOOTSTRAP_SERVERS, )) # REMIND 添加釘釘日誌。 if self._ding_talk_token: self.__add_a_hanlder(DingTalkHandler(self._ding_talk_token, self._ding_talk_time_interval)) def get_logger(log_name): return LogManager(log_name).get_logger_and_add_handlers(log_filename=f'{log_name}.log') class LoggerMixin(object): subclass_logger_dict = {} @property def logger_extra_suffix(self): return self.__logger_extra_suffix @logger_extra_suffix.setter def logger_extra_suffix(self, value): # noinspection PyAttributeOutsideInit self.__logger_extra_suffix = value @property def logger_full_name(self): try: # noinspection PyUnresolvedReferences return type(self).__name__ + '-' + self.logger_extra_suffix except AttributeError: # very_nb_print(type(e)) return type(self).__name__ @property def logger(self): logger_name_key = self.logger_full_name + '1' if logger_name_key not in self.subclass_logger_dict: logger_var = LogManager(self.logger_full_name).get_logger_and_add_handlers() self.subclass_logger_dict[logger_name_key] = logger_var return logger_var else: return self.subclass_logger_dict[logger_name_key] @property def logger_with_file(self): logger_name_key = self.logger_full_name + '2' if logger_name_key not in self.subclass_logger_dict: logger_var = LogManager(self.logger_full_name).get_logger_and_add_handlers(log_filename=self.logger_full_name + '.log', log_file_size=50) self.subclass_logger_dict[logger_name_key] = logger_var return logger_var else: return self.subclass_logger_dict[logger_name_key] @property def logger_with_file_mongo(self): from app import config logger_name_key = self.logger_full_name + '3' if logger_name_key not in self.subclass_logger_dict: logger_var = LogManager(self.logger_full_name).get_logger_and_add_handlers(log_filename=self.logger_full_name + '.log', log_file_size=50, mongo_url=config.connect_url) self.subclass_logger_dict[logger_name_key] = logger_var return logger_var else: return self.subclass_logger_dict[logger_name_key] class LoggerMixinDefaultWithFileHandler(LoggerMixin): subclass_logger_dict = {} @property def logger(self): logger_name_key = self.logger_full_name + '3' if logger_name_key not in self.subclass_logger_dict: logger_var = LogManager(self.logger_full_name).get_logger_and_add_handlers(log_filename=self.logger_full_name + '.log', log_file_size=50) self.subclass_logger_dict[logger_name_key] = logger_var return logger_var else: return self.subclass_logger_dict[logger_name_key] class LoggerLevelSetterMixin: # noinspection PyUnresolvedReferences def set_log_level(self, log_level=10): try: self.logger.setLevel(log_level) except AttributeError as e: very_nb_print(e) return self simple_logger = LogManager('simple').get_logger_and_add_handlers() defaul_logger = LogManager('hotel').get_logger_and_add_handlers(do_not_use_color_handler=True, formatter_template=7) file_logger = LogManager('hotelf').get_logger_and_add_handlers(do_not_use_color_handler=True, log_filename='hotel_' + time.strftime("%Y-%m-%d", time.localtime()) + ".log", formatter_template=7) # noinspection PyMethodMayBeStatic,PyNestedDecorators,PyArgumentEqualDefault class _Test(unittest.TestCase): # noinspection PyMissingOrEmptyDocstring @classmethod def tearDownClass(cls): """ """ time.sleep(1) @unittest.skip def test_repeat_add_handlers_(self): """測試重複添加handlers""" LogManager('test').get_logger_and_add_handlers(log_path='../logs', log_filename='test.log') LogManager('test').get_logger_and_add_handlers(log_path='../logs', log_filename='test.log') LogManager('test').get_logger_and_add_handlers(log_path='../logs', log_filename='test.log') test_log = LogManager('test').get_logger_and_add_handlers(log_path='../logs', log_filename='test.log') print('下面這一句不會重複打印四次和寫入日誌四次') time.sleep(1) test_log.debug('這一句不會重複打印四次和寫入日誌四次') @unittest.skip def test_get_logger_without_hanlders(self): """測試沒有handlers的日誌""" log = LogManager('test2').get_logger_without_handlers() print('下面這一句不會被打印') time.sleep(1) log.info('這一句不會被打印') @unittest.skip def test_add_handlers(self): """這樣能夠在具體的地方任意寫debug和info級別日誌,只須要在總閘處規定級別就能過濾,很方便""" LogManager('test3').get_logger_and_add_handlers(2) log1 = LogManager('test3').get_logger_without_handlers() print('下面這一句是info級別,能夠被打印出來') time.sleep(1) log1.info('這一句是info級別,能夠被打印出來') print('下面這一句是debug級別,不能被打印出來') time.sleep(1) log1.debug('這一句是debug級別,不能被打印出來') @unittest.skip def test_only_write_log_to_file(self): # NOQA """只寫入日誌文件""" log5 = LogManager('test5').get_logger_and_add_handlers(20) log6 = LogManager('test6').get_logger_and_add_handlers(is_add_stream_handler=False, log_filename='test6.log') print('下面這句話只寫入文件') log5.debug('這句話只寫入文件') log6.debug('這句話只寫入文件') @unittest.skip def test_get_app_logs_dir(self): # NOQA print(get_logs_dir_by_folder_name()) print(get_logs_dir_by_disk_root()) @unittest.skip def test_none(self): # noinspection PyUnusedLocal log1 = LogManager('log1').get_logger_and_add_handlers() LogManager().get_logger_and_add_handlers() LogManager().get_logger_and_add_handlers() log1 = LogManager('log1').get_logger_and_add_handlers() LogManager().get_logger_and_add_handlers() LogManager('log1').get_logger_and_add_handlers(log_filename='test_none.log') log1.debug('打印幾回?') @unittest.skip def test_formater(self): logger2 = LogManager('test_formater2').get_logger_and_add_handlers(formatter_template=6) logger2.debug('測試日誌模板2') logger5 = LogManager('test_formater5').get_logger_and_add_handlers(formatter_template=5) logger5.error('測試日誌模板5') defaul_logger.debug('dddddd') file_logger.info('ffffff') @unittest.skip def test_bulid_a_logger_with_mail_handler(self): """ 測試日誌發送到郵箱中 :return: """ logger = LogManager.bulid_a_logger_with_mail_handler('mail_logger_name', mail_time_interval=60, toaddrs=( '909686xxx@qq.com', 'yanxx@dingtalk.com', )) for _ in range(100): logger.warning('測試郵件日誌的內容。。。。') time.sleep(10) @unittest.skip def test_ding_talk(self): logger = LogManager('testdinding').get_logger_and_add_handlers(ding_talk_token=DING_TALK_TOKEN, ding_talk_time_interval=10) logger.debug('啦啦啦德瑪西亞1') logger.debug('啦啦啦德瑪西亞2') time.sleep(10) logger.debug('啦啦啦德瑪西亞3') @unittest.skip def test_remove_handler(self): logger = LogManager('test13').get_logger_and_add_handlers() logger.debug('去掉coloerhandler前') LogManager('test13').remove_handler_by_handler_class(ColorHandler) logger.debug('去掉coloerhandler後,此記錄不會被打印') @unittest.skip def test_logging(self): # logging命名空間是root,會致使日誌重複打印,不要直接用。 logger = LogManager('test14').get_logger_and_add_handlers(formatter_template=4) logger.debug('xxxx') logging.warning('yyyyyyy') logger.warning('zzzzzzzzz') @unittest.skip def test_logger_level_setter_mixin(self): """ 測試能夠設置日誌級別的mixin類 :return: """ print('測試很是流弊的print') class A(LoggerMixin, LoggerLevelSetterMixin): pass a = A().set_log_level(20) a.logger.debug('這句話不能被顯示') # 這句話不能被打印 a.logger.error('這句話能夠顯示') # @unittest.skip def test_color_and_mongo_hanlder(self): """測試彩色日誌和日誌寫入mongodb""" very_nb_print('測試顏色和mongo') logger = LogManager('helloMongo', is_pycharm_2019=False).get_logger_and_add_handlers(mongo_url=app_config.connect_url, formatter_template=5) logging.error('xxxx') # logger = LogManager('helloMongo', is_pycharm_2019=False).get_logger_and_add_handlers(formatter_template=5) for i in range(100000): time.sleep(1) logger.debug('一個debug級別的日誌。' * 5) logger.info('一個info級別的日誌。' * 5) logger.warning('一個warning級別的日誌。' * 5) logger.error('一個error級別的日誌。' * 5) logger.critical('一個critical級別的日誌。' * 5) if __name__ == "__main__": unittest.main() # raise Exception