Django多進程日誌文件問題

Django多進程日誌文件問題

最近使用Django作一個項目。在部署的時候發現日誌文件不能滾動(我使用的是RotatingFileHandler),只有一個日誌文件。 查看Log發現一個錯誤消息:PermissionError: [WinError 32] 另外一個程序正在使用此文件html

由於我有一些進程須要使用Django的模型層來操做數據庫。因此再這些單獨的進程中引入了Django:python

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
import django
django.setup()

估計這就是主要緣由,在使用Django時,Django自己就會使用setting中的log配置來初始化日誌模塊。而我正好使用的是基於文件的Handler,因此多個進程啓動後,都會按照這個方式來初始化日誌模塊,致使多個進程都在引用此日誌文件。在日誌文件滾動時,是須要把當前日誌文件重命名爲xxx.1或者xxx.2。可是因爲其餘進程也在使用此文件,因此不能修改文件名。linux

後來再網上查python的多進程日誌文件的滾動問題,發現你們都有相似問題。看來這是python自身的一個常見問題,可是尚未什麼標準的解決方法,有的是採用多進程共享queue的方式,多個進程把日誌往queue中寫,而後一個線程負責把queue中的日誌消息往文件中寫。但多進程共享queue在linux上和windows上表現還有差別,真是愈來愈煩。具體討論請參考這個文章: https://stackoverflow.com/questions/641420/how-should-i-log-while-using-multiprocessing-in-python/894284數據庫

代碼以下:django

import logging
import multiprocessing
import threading
import time
from logging.handlers import RotatingFileHandler


class MultiProcessingLogHandler(logging.Handler):
    def __init__(self, filename, mode='a', maxBytes=0, backupCount=0, encoding=None, delay=False):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(filename, mode, maxBytes, backupCount, encoding, delay)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)

而後把這個類寫在settings.py中的handler的class中。 我測試的時候仍是沒有好使,不知道什麼緣由:( 我寶貴的時間呀!windows

使用zeromq的方案卻是吸引了我,由於我項目中正好使用zeromq,比較簡單可靠。說白了就是把日誌發給一個socket,而後socket服務端讀取消息,並寫入到日誌文件中。 這個方案涉及到要自定義一個Handler,這就是查查python如何自定義handler(zeromq好像已經提供了一個PUBHandler: http://pyzmq.readthedocs.io/en/latest/api/zmq.log.handlers.html#zmq.log.handlers.PUBHandler)。api

關於如何自定義handler,這裏有篇文件講了: http://www.cnblogs.com/shizioo/p/python_logging_handler_custom.html#undefinedapp

formatters = {
    logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
    logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
    logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
    logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
    logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}

# This one will be used by publishing processes
class PUBLogger:
    def __init__(self, host, port=config.PUBSUB_LOGGER_PORT):
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(logging.DEBUG)
        self.ctx = zmq.Context()
        self.pub = self.ctx.socket(zmq.PUB)
        self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port))
        self._handler = PUBHandler(self.pub)
        self._handler.formatters = formatters
        self._logger.addHandler(self._handler)

    @property
    def logger(self):
        return self._logger

# This one will be used by listener process
class SUBLogger:
    def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT):
        self.output_dir = output_dir
        self._logger = logging.getLogger()
        self._logger.setLevel(logging.DEBUG)

        self.ctx = zmq.Context()
        self._sub = self.ctx.socket(zmq.SUB)
        self._sub.bind('tcp://*:{1}'.format(ip, port))
        self._sub.setsockopt(zmq.SUBSCRIBE, "")

        handler = handlers.RotatingFileHandler(os.path.join(output_dir,                 "client_debug.log"), "w", 100 * 1024 * 1024, 10)
        handler.setLevel(logging.DEBUG)
        formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)

  @property
  def sub(self):
      return self._sub

  @property
  def logger(self):
      return self._logger

#  And that's the way we actually run things:

# Listener process will forever listen on SUB socket for incoming messages
def run_sub_logger(ip, event):
    sub_logger = SUBLogger(ip)
    while not event.is_set():
        try:
            topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK)
            log_msg = getattr(logging, topic.lower())
            log_msg(message)
        except zmq.ZMQError as zmq_error:
            if zmq_error.errno == zmq.EAGAIN:
                pass


# Publisher processes loggers should be initialized as follows:

class Publisher:
    def __init__(self, stop_event, proc_id):
        self.stop_event = stop_event
        self.proc_id = proc_id
        self._logger = pub_logger.PUBLogger('127.0.0.1').logger

     def run(self):
         self._logger.info("{0} - Sending message".format(proc_id))

def run_worker(event, proc_id):
    worker = Publisher(event, proc_id)
    worker.run()

# Starting subscriber process so we won't loose publisher's messages
sub_logger_process = Process(target=run_sub_logger,
                                 args=('127.0.0.1'), stop_event,))
sub_logger_process.start()

#Starting publisher processes
for i in range(MAX_WORKERS_PER_CLIENT):
    processes.append(Process(target=run_worker,
                                 args=(stop_event, i,)))
for p in processes:
    p.start()

上面是別人寫的關於zeromq方式多進程日誌的解決方法。 上面看起來有些複雜,最後使用的是zeromq本身提供的PUBHandler。同時本身再寫一個接收端,並使用Python自身的logging模塊往文件中記錄。socket

PUBHandler使用的是PUB方式:tcp

self.ctx = context or zmq.Context()
self.socket = self.ctx.socket(zmq.PUB)
self.socket.bind(interface_or_socket)

其中inproc://log方式不行,由於這個是線程間通訊,對於並非進程間通訊(參考:http://www.cnblogs.com/fengbohello/p/4328772.html)。因此想使用進程間通訊須要用ipc://address。可是看到此方式只在UNIX系統上徹底實現了,心又涼一半,你讓我再windows上怎麼測試。看來只能換成tcp方式了。 具體實現請看這篇文章:http://seewind.blog.51cto.com/249547/288343

使用PUB是不行的,由於多個進程調用此處會致使Address in use的錯誤。哎~~~,繼續調查,最後決定使用PUSH和PULL模式。

最後的代碼以下:

# -*- coding:utf-8 -*-
import logging

import zmq
from zmq.utils.strtypes import cast_bytes

default_formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(pathname)s[line:%(lineno)d] %(message)s')


class PUSHHandler(logging.Handler):
    """
    改造了zeromq的PUBHandler,而使用PUSH(client), PULL(server)模式
    """
    socket = None

    formatters = {
        logging.DEBUG: default_formatter,
        logging.INFO: default_formatter,
        logging.WARN: default_formatter,
        logging.ERROR: default_formatter,
        logging.CRITICAL: default_formatter
    }

    def __init__(self, interface_or_socket, context=None):
        logging.Handler.__init__(self)
        if isinstance(interface_or_socket, zmq.Socket):
            self.socket = interface_or_socket
            self.ctx = self.socket.context
        else:
            self.ctx = context or zmq.Context()
            self.socket = self.ctx.socket(zmq.PUSH)
            self.socket.connect(interface_or_socket)

    def format(self, record):
        """Format a record."""
        return self.formatters[record.levelno].format(record)

    def emit(self, record):
        """Emit a log message on my socket."""
        try:
            bmsg = cast_bytes(self.format(record))
        except Exception:
            self.handleError(record)
            return

        self.socket.send(bmsg)

在django的settings.py中使用此handler:

'handlers': {
        'console': {
            'level': 'INFO',
            'class': 'logging.StreamHandler',
            'formatter': 'detail',
        },
        'file': {
            'level': 'INFO',
            'class': 'common.zeromq_loghandler.PUSHHandler',
            'interface_or_socket': 'tcp://localhost:%s' % LOG_LISTEN_PORT,
        },
    },

最後log的服務端代碼以下:

# -*- coding:utf-8 -*-
import logging

import zmq

from common.message_log import get_logger
from myproject.settings import LOG_BASE_DIR, LOG_LISTEN_PORT

simple_formatter = logging.Formatter('%(message)s')

logger = get_logger(LOG_BASE_DIR + '/django.log', formatter=simple_formatter)


class LoggingServer(object):
    """
    日誌接收服務端 (爲了解決多進程訪問相同的日誌文件問題)
    """

    def run(self):
        logger.info("start LoggingServer")
        context = zmq.Context()
        socket = context.socket(zmq.PULL)
        socket.bind("tcp://*:%s" % LOG_LISTEN_PORT)
        logger.info("Listen on %s" % LOG_LISTEN_PORT)

        while True:
            try:
                msg = socket.recv_string()
                logger.info(msg)
            except Exception as e:
                logger.error("Error logging server: %s" % e)


def main():
    try:
        server = LoggingServer()
        server.run()
    except Exception as e:
        logger.error('Error: %s' % e)


if __name__ == "__main__":
    main()

就這麼簡單的一個日誌問題,花費了我一天時間,很是不開心 :-(

關於開源的python第三方logging模塊,沒敢用,在Github上有structlog, logzero, python-logstash。不知道哪一個好,或者有什麼坑,你們試試而後告訴我 :)


參考:

相關文章
相關標籤/搜索