1、問題描述2、分析2.1 logging模塊實現日誌回滾2.2 多進程日誌安全輸出到同一文件方案3、解決方案3.1 使用ConcurrentRotatingFileHandler包3.2 對日誌輸出加鎖3.3 重寫FileHandler類3.4 單獨進程負責日誌事件3.5 logging.SocketHandler的方案4、參考文獻html
項目中,使用RotatingFileHandler
根據日誌文件大小來切分日誌。設置文件的MaxBytes
爲1GB
, backupCount
大小爲5。python
經查看,發現日誌文件的大小均小於10MB
,且每一個回滾日誌文件的寫入時間也都比較接近。nginx
日誌文件太小,猜想是代碼有問題,或者是文件內容有丟失;日誌寫入時間接近猜想是同時寫入的問題。web
經檢查,代碼沒有問題,排除此緣由。考慮當前使用gunicorn
的多進程啓動程序,多半是多個進程同時寫入當個文件形成日誌文件丟失。安全
logging
模塊是線程安全的,但並非進程安全的。服務器
如何解決此問題呢?首先先過一遍Python
的logging
模塊在處理日誌回滾的具體實現方法。併發
logging
中RotatingFileHandler
類和TimedRotatingFileHandler
類分別實現按照日誌文件大小和日誌文件時間來切分文件,均繼承自BaseRotatingHandler
類。app
BaseRotatingHandler
類中實現了文件切分的觸發和執行,具體過程以下:dom
1def emit(self, record):
2 """
3 Emit a record.
4 Output the record to the file, catering for rollover as described
5 in doRollover().
6 """
7 try:
8 if self.shouldRollover(record):
9 self.doRollover()
10 logging.FileHandler.emit(self, record)
11 except Exception:
12 self.handleError(record)
複製代碼
具體的執行過程shouldRollover(record)
和doRollover()
函數則在RotatingFileHandler
類和TimedRotatingFileHandler
類中實現。socket
以RotatingFileHandler
類爲例,doRollover()
函數流程以下:
1def doRollover(self):
2 if self.stream:
3 self.stream.close()
4 self.stream = None
5 if self.backupCount > 0:
6 for i in range(self.backupCount - 1, 0, -1): # 從backupCount,依次到1
7 sfn = self.rotation_filename("%s.%d" % (self.baseFilename, i))
8 dfn = self.rotation_filename("%s.%d" % (self.baseFilename,
9 i + 1))
10 if os.path.exists(sfn):
11 if os.path.exists(dfn):
12 os.remove(dfn)
13 os.rename(sfn, dfn) # 實現將xx.log.i->xx.log.i+1
14 dfn = self.rotation_filename(self.baseFilename + ".1")
15 # ---------start-----------
16 if os.path.exists(dfn): # 判斷若是xx.log.1存在,則刪除xx.log.1
17 os.remove(dfn)
18 self.rotate(self.baseFilename, dfn) # 將xx.log->xx.log.1
19 # ----------end------------
20 if not self.delay:
21 self.stream = self._open() # 執行新的xx.log
複製代碼
分析如上過程,整個步驟是:
self.baseFilename
,該值self.baseFilename = os.path.abspath(filename)
是設置的日誌文件的絕對路徑,假設baseFilename
爲error.log
。error.log.i
重命名爲error.log.i+1
。error.log.1
是否存在,若存在,則刪除,將當前日誌文件error.log
重命名爲error.log.1
。self.stream
從新指向新建error.log
文件。當程序啓動多進程時,每一個進程都會執行doRollover
過程,如有多個進程進入臨界區,則會致使dfn
被刪除屢次等多種混亂操做。
相應的解決方法:
Queue
和QueueHandler
將全部日誌事件發送至一個進程中)Lock
類來序列化對進程的文件訪問)SocketHandler
,而後用一個實現了套接字服務器的單獨進程一邊從套接字中讀取一邊將日誌記錄至文件(Python
手冊中提供)該方法就屬於加鎖方案。
ConcurrentLogHandler
能夠在多進程環境下安全的將日誌寫入到同一個文件,而且能夠在日誌文件達到特定大小時,分割日誌文件(支持按文件大小分割)。但ConcurrentLogHandler
不支持按時間分割日誌文件的方式。
ConcurrentLogHandler
模塊使用文件鎖定,以便多個進程同時記錄到單個文件,而不會破壞日誌事件。該模塊提供與RotatingFileHandler
相似的文件循環方案。
該模塊的首要任務是保留您的日誌記錄,這意味着日誌文件將大於指定的最大大小(RotatingFileHandler
是嚴格遵照最大文件大小),若是有多個腳本的實例同時運行並寫入同一個日誌文件,那麼全部腳本都應該使用ConcurrentLogHandler
,不該該混合和匹配這這個類。
併發訪問經過使用文件鎖來處理,該文件鎖應確保日誌消息不會被丟棄或破壞。這意味着將爲寫入磁盤的每一個日誌消息獲取並釋放文件鎖。(在Windows上,您可能還會遇到臨時狀況,必須爲每一個日誌消息打開和關閉日誌文件。)這可能會影響性能。在個人測試中,性能綽綽有餘,可是若是您須要大容量或低延遲的解決方案,建議您將其放在其餘地方。
這個包捆綁了portalocker
來處理文件鎖定。因爲使用了portalocker
模塊,該模塊當前僅支持「nt」
和「posix」
平臺。
安裝:
1pip install ConcurrentLogHandler
複製代碼
該模塊支持
Python2.6
及之後版本。
ConcurrentLogHandler
的使用方法與其餘handler
類一致,如與RotatingFileHandler
的使用方法同樣。
初始化函數及參數:
1class ConcurrentRotatingFileHandler(BaseRotatingHandler):
2 """
3 Handler for logging to a set of files, which switches from one file to the
4 next when the current file reaches a certain size. Multiple processes can
5 write to the log file concurrently, but this may mean that the file will
6 exceed the given size.
7 """
8 def __init__(self, filename, mode='a', maxBytes=0, backupCount=0,
9 encoding=None, debug=True, delay=0):
複製代碼
參數含義同Python
內置RotatingFileHandler
類相同,具體可參考上一篇博文Python 日誌模塊logging分析及使用-1。一樣繼承自BaseRotatingHandler
類。
簡單的示例:
1import logging
2from cloghandler import ConcurrentRotatingFileHandler
3
4logger = logging.getLogger()
5rotateHandler = ConcurrentRotatingFileHandler('./logs/my_logfile.log', "a", 1024*1024, 5)
6logger.addHandler(rotateHandler)
7logger.setLevel(logging.DEBUG)
8
9logger.info('This is a info message.')
複製代碼
爲了適應沒有ConcurrentRotatingFileHandler
包的狀況,增長回退使用RotatingFileHandler
的代碼:
1try:
2 from cloghandler import ConcurrentRotatingFileHandler as RFHandler
3except ImportError:
4 from warning import warn
5 warn('ConcurrentRotatingFileHandler package not installed, Using builtin log handler')
6 from logging.handlers import RotatingFileHandler as RFHandler
複製代碼
運行後能夠發現,會自動建立一個.lock
文件,經過鎖的方式來安全的寫日誌文件。
TimedRotatingFileHandler
類doRollover
函數的主要部分以下:
1def doRollover(self):
2 ....
3 dfn = self.rotation_filename(self.baseFilename + "." +
4 time.strftime(self.suffix, timeTuple))
5 # -------begin-------
6 if os.path.exists(dfn): # 判斷若是存在dfn,則刪除
7 os.remove(dfn)
8 self.rotate(self.baseFilename, dfn) # 將當前日誌文件重命名爲dfn
9 # --------end--------
10 if self.backupCount > 0:
11 for s in self.getFilesToDelete():
12 os.remove(s)
13 if not self.delay:
14 self.stream = self._open()
15 ....
複製代碼
修改思路:
判斷dfn
文件是否已經存在,若是存在,表示已經被rename
過了;若是不存在,則只容許一個進程去rename
,其餘進程需等待。
新建一個類繼承自TimeRotatingFileHandler
,修改doRollover
函數,只需處理上面代碼的註釋部分便可。以下:
1class MPTimeRotatingFileHandler(TimeRotatingFileHandler):
2 def doRollover(self):
3 ....
4 dfn = self.rotation_filename(self.baseFilename + "." +
5 time.strftime(self.suffix, timeTuple))
6 # ----modify start----
7 if not os.path.exists(dfn):
8 f = open(self.baseFilename, 'a')
9 fcntl.lockf(f.fileno(), fcntl.LOCK_EX)
10 if os.path.exists(self.baseFilename): # 判斷baseFilename是否存在
11 self.rotate(self.baseFilename, dfn)
12 # ----modify end-----
13 if self.backupCount > 0:
14 for s in self.getFilesToDelete():
15 os.remove(s)
16 ....
複製代碼
logging.handlers.py
中各種的繼承關係以下圖所示:
TimeRotatingFileHandler
類就是繼承自該類,在FileHandler
類中增長一些處理。
具體可參考如下博文:
在
Python
官方手冊中,提供了多進程中日誌記錄至單個文件的方法。
logging
是線程安全的,將單個進程中的多個線程日誌記錄至單個文件也是支持的。但將多個進程中的日誌記錄至單個文件中則不支持,由於在Python
中並無在多個進程中實現對單個文件訪問的序列化的標準方案。
將多個進程中日誌記錄至單個文件中,有如下幾個方案:
SocketHandler
,而後用一個實現了套接字服務器的單獨進程一邊從套接字中讀取一邊將日誌記錄至文件。Queue
和 QueueHandler 將全部的日誌事件發送至你的多進程應用的一個進程中。一個單獨監聽進程負責監聽其餘進程的日誌事件,並根據本身的配置記錄。
示例:
1import logging
2import logging.handlers
3import multiprocessing
4
5from random import choice, random
6import time
7
8def listener_configurer():
9 root = logging.getLogger()
10 h = logging.handlers.RotatingFileHandler('test.log', 'a', 300,10) # rotate file設置的很小,以便於查看結果
11 f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
12 h.setFormatter(f)
13 root.addHandler(h)
14
15def listenser_process(queue, configurer):
16 configurer()
17 while True:
18 try:
19 record = queue.get()
20 if record is None:
21 break
22 logger = logging.getLogger(record.name)
23 logger.handle(record)
24 except Exception:
25 import sys, traceback
26 print('Whoops! Problem:', file=sys.stderr)
27 trackback.print_exc(file=sys.stderr)
28
29LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
30 logging.ERROR, logging.CRITICAL]
31
32LOGGERS = ['a.b.c', 'd.e.f']
33
34MESSAGES = [
35 'Random message #1',
36 'Random message #2',
37 'Random message #3',
38]
39
40def worker_configurer(queue):
41 h = logging.handlers.QueueHandler(queue)
42 root = logging.getLogger()
43 root.addHandler(h)
44 root.setLevel(logging.DEBUG)
45
46# 該循環僅記錄10個事件,這些事件具備隨機的介入延遲,而後終止
47def worker_process(queue, configurer):
48 configurer(queue)
49 name = multiprocessing.current_process().name
50 print('Worker started:%s'%name)
51 for i in range(10):
52 time.sleep(random())
53 logger = logging.getLogger(choice(LOGGERS))
54 level = choice(LEVELS)
55 message = choice(MESSAGES)
56 logger.log(level, message)
57# 建立隊列,建立並啓動監聽器,建立十個工做進程並啓動它們,等待它們完成,而後將None發送到隊列以通知監聽器完成
58def main():
59 queue = multiprocessing.Queue(-1)
60 listener = multiprocessing.Process(target=listener_process,
61 args=(queue, listener_configurer))
62 listener.start()
63 workers = []
64 for i in range(10):
65 worker = multiprocessing.Process(target=worker_process,
66 args=(queue, listener_configurer))
67 workers.append(worker)
68 worker.start()
69 for w in workers:
70 w.join()
71 queue.put_nowait(None)
72 listener.join()
73
74if __name__ == '__main__':
75 main()
複製代碼
使用主進程中一個單獨的線程記錄日誌
下面這段代碼展現瞭如何使用特定的日誌記錄配置,例如foo
記錄器使用了特殊的處理程序,將foo
子系統中全部的事件記錄至一個文件mplog-foo.log
。在主進程(即便是在工做進程中產生的日誌事件)的日誌記錄機制中將直接使用恰當的配置。
1import logging
2import logging.config
3import logging.handlers
4from multiprocessing import Process, Queue
5import random
6import threading
7import time
8
9def logger_thread(q):
10 while True:
11 record = q.get()
12 if record is None:
13 break
14 logger = logging.getLogger(record.name)
15 logger.handle(record)
16
17def worker_process(q):
18 qh = logging.handlers.QueueHandler(q)
19 root = logging.getLogger()
20 root.setLevel(logging.DEBUG)
21 root.addHandler(qh)
22 levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
23 logging.CRITICAL]
24 loggers = ['foo', 'foo.bar', 'foo.bar.baz', 'spam', 'spam.ham', 'spam.ham.eggs']
25
26 for i in range(100):
27 lv1=l = random.choice(levles)
28 logger = logging.getLogger(random.choice(loggers))
29 logger.log(lvl, 'Message no. %d', i)
30
31for __name__ == '__main__':
32 q = Queue()
33 d = {
34 'version': 1,
35 'formatters': {
36 'detailed': {
37 'class': 'logging.Formatter',
38 'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
39 }
40 },
41 'handlers': {
42 'console': {
43 'class': 'logging.StreamHandler',
44 'level': 'INFO',
45 },
46 'file': {
47 'class': 'logging.FileHandler',
48 'filename': 'mplog.log',
49 'mode': 'w',
50 'formatter': 'detailed',
51 },
52 'foofile': {
53 'class': 'logging.FileHandler',
54 'filename': 'mplog-foo.log',
55 'mode': 'w',
56 'formatter': 'detailed',
57 },
58 'errors': {
59 'class': 'logging.FileHandler',
60 'filename': 'mplog-errors.log',
61 'mode': 'w',
62 'level': 'ERROR',
63 'formatter': 'detailed',
64 },
65 },
66 'loggers': {
67 'foo': {
68 'handlers': ['foofile']
69 }
70 },
71 'root': {
72 'level': 'DEBUG',
73 'handlers': ['console', 'file', 'errors']
74 },
75 }
76 workers = []
77 for i in range(5):
78 wp = Process(target=worker_process, name='worker %d'%(i+1), args=(q,))
79 workers.append(wp)
80 wp.start()
81 logging.config.dictConfig(d)
82 lp = threading.Thread(target=logger_thread, args=(q,))
83 lp.start()
84
85 for wp in workers:
86 wp.join()
87 q.put(None)
88 lp.join()
複製代碼
具體實現參考以下博客:
Python中logging在多進程環境下打印日誌 - VictoKu - 博客園