1、源碼html
2、日誌採集客戶端git
3、日誌採集服務端github
import socket import traceback import sys import os NET_OPERATION_TIMEOUT = 5 HOST = socket.gethostname() PORT = 1738 def process_worker(): while True: try: _recv_send() except: print traceback.print_exc() def _recv_send(): client, address = s.accept() client.settimeout(NET_OPERATION_TIMEOUT) while True: input = client.recv(4096) if input: print input client.send('0') client.close() def _main(): global s s = socket.socket() host = HOST port = PORT print host, port s.bind((host, port)) s.listen(5) process_worker() if __name__ == "__main__": _main()
import tornado.ioloop import tornado.web import tornado.httpserver import tornado.gen from tornado.concurrent import run_on_executor import logging from logging.handlers import RotatingFileHandler import os import re import traceback import fcntl import errno PWD = os.path.dirname(os.path.realpath(__file__)) log_file = '%s/tcollector_server.log' % PWD LOG = logging.getLogger('tcollector_server') LOG.setLevel(logging.DEBUG) ch = RotatingFileHandler(log_file, 'a', 400 * 1024 * 1024, 10) ch.setFormatter(logging.Formatter('[%(asctime)s][%(levelname)s]' '[%(process)d][%(filename)s]%(message)s')) LOG.addHandler(ch) # 服務端發過來的日誌格式:[prod] [date] [data] PAT = re.compile(r'^(?P<prod>[^ ]+) (?P<date>[^ ]+) (?P<data>.+)') # 日期格式:2015-06-30 DATE_PAT = re.compile(r'^\d{4}-\d{2}-\d{2}$') LOG_DIR = PWD + '/data' try: os.makedirs(LOG_DIR) except OSError as exc: if exc.errno == errno.EEXIST and os.path.isdir(LOG_DIR): pass else: raise file_fd = {} class AutoLock(object): """進程鎖 依賴文件實現進程鎖 """ def __init__(self, file_name): self.__file_name = file_name self.__handle = open(file_name, ‘w’) def __enter__(self): fcntl.flock(self.__handle, fcntl.LOCK_EX) def __exit__(self, exception_type, exception_val, trace): fcntl.flock(self.__handle, fcntl.LOCK_UN) self.__handle.close() class HandlerBase(tornado.web.RequestHandler): """ handler """ def initialize(self): pass def get(self): self.post() @tornado.web.asynchronous @tornado.gen.coroutine def post(self): LOG.info(str(self.request.remote_ip)) body = self.parse_request(self.request) self.process_request(body) self.finish() @staticmethod def parse_request(request): return request.body @staticmethod def save_data(info): file_path = '{log_dir}/{prod}_{date}.dat'.format(log_dir=LOG_DIR, prod=info['prod'], date=info['date']) if file_path not in file_fd: file_fd[file_path] = open(file_path, 'a+') with AutoLock(file_path + '.lock'): file_fd[file_path].write(info['data']) file_fd[file_path].flush() if len(file_path) > 100: for value in file_fd.itervalues(): value.close() def process_request(self, body): lines = body.split('\n') for line in lines: try: line_match = PAT.match(line) if not line_match: LOG.error('error line:' + line) else: date_match = DATE_PAT.match(line_match.groupdict()['date']) if date_match: self.save_data(line_match.groupdict()) except: LOG.error(traceback.format_exc()) LOG.error('exception line:' + line) class LogHandler(HandlerBase): pass application = tornado.web.Application( [ (r"/.*", LogHandler), ] ) if __name__ == '__main__': server = tornado.httpserver.HTTPServer(application, xheaders=True) server.bind(8016) server.start(0) #跟CPU個數同樣的併發進程 tornado.ioloop.IOLoop.instance().start()