日誌收集工具tcollector

    tcollector是openTSDB的一部分,它用來採集客戶端日誌發送給server端數據庫,這裏我只使用它來採集日誌,不涉及openTSDB。

1、源碼html

tcollector的核心代碼僅一千多行,主要包括這幾個部分:讀取線程、發送線程、主循環。
代碼中的daemon進程、logging模塊、optparse模塊、Queue模塊、多線程、動態模塊加載、子進程式插件都是不錯的學習demo。

2、日誌採集客戶端git

能夠修改tcollector的行處理、發送部分的代碼,並增大Queue大小,用來作日誌採集客戶端框架。再實現日誌採集程序,把採集程序代碼放在collectors/0/目錄下,採集框架會將採集程序調起運行,採集程序經過stdout將日誌發給採集框架,採集框架發出日誌給服務端。採集程序代碼能夠參考官方提供的多個demo。
再實現一個日誌收集服務端用於接收、處理日誌,日誌收集服務端能夠是TCP Server,或者修改發送端採用HTTP,服務端採用Tornado,均可以很方便的實現。

3、日誌採集服務端github

 
TCP Server demo
 
import socket
import traceback
import sys
import os
 
 
NET_OPERATION_TIMEOUT = 5
HOST = socket.gethostname()
PORT = 1738
 
 
def process_worker():
    while True:
        try:
            _recv_send()
        except:
            print traceback.print_exc()
 
 
def _recv_send():
    client, address = s.accept()
    client.settimeout(NET_OPERATION_TIMEOUT)
    while True:
        input = client.recv(4096)
        if input:
            print input
        client.send('0')
    client.close()
 
 
def _main():
    global s
    s = socket.socket()
    host = HOST
    port = PORT
    print host, port
    s.bind((host, port))
    s.listen(5)
 
    process_worker()
 
 
if __name__ == "__main__":
    _main()
 

 

Tornado HTTP Server demo
 
import tornado.ioloop
import tornado.web
import tornado.httpserver
import tornado.gen
from tornado.concurrent import run_on_executor
import logging
from logging.handlers import RotatingFileHandler
import os
import re
import traceback
import fcntl
import errno
 
 
PWD = os.path.dirname(os.path.realpath(__file__))
 
 
log_file = '%s/tcollector_server.log' % PWD
LOG = logging.getLogger('tcollector_server')
LOG.setLevel(logging.DEBUG)
ch = RotatingFileHandler(log_file, 'a', 400 * 1024 * 1024, 10)
ch.setFormatter(logging.Formatter('[%(asctime)s][%(levelname)s]'
                                  '[%(process)d][%(filename)s]%(message)s'))
LOG.addHandler(ch)
 
# 服務端發過來的日誌格式:[prod] [date] [data]
PAT = re.compile(r'^(?P<prod>[^ ]+) (?P<date>[^ ]+) (?P<data>.+)')
# 日期格式:2015-06-30
DATE_PAT = re.compile(r'^\d{4}-\d{2}-\d{2}$')
LOG_DIR = PWD + '/data'
 
try:
    os.makedirs(LOG_DIR)
except OSError as exc:
    if exc.errno == errno.EEXIST and os.path.isdir(LOG_DIR):
        pass
    else:
        raise
 
file_fd = {}
 
 
class AutoLock(object):
    """進程鎖
    依賴文件實現進程鎖
    """
    def __init__(self, file_name):
        self.__file_name = file_name
        self.__handle = open(file_name, ‘w’)
 
    def __enter__(self):
        fcntl.flock(self.__handle, fcntl.LOCK_EX)
 
    def __exit__(self, exception_type, exception_val, trace):
        fcntl.flock(self.__handle, fcntl.LOCK_UN)
        self.__handle.close()
 
 
class HandlerBase(tornado.web.RequestHandler):
    """ handler
    """
    def initialize(self):
        pass
 
    def get(self):
        self.post()
 
    @tornado.web.asynchronous
    @tornado.gen.coroutine
    def post(self):
        LOG.info(str(self.request.remote_ip))
        body = self.parse_request(self.request)
        self.process_request(body)
        self.finish()
 
    @staticmethod
    def parse_request(request):
        return request.body
 
    @staticmethod
    def save_data(info):
        file_path = '{log_dir}/{prod}_{date}.dat'.format(log_dir=LOG_DIR,
                                                          prod=info['prod'],
                                                          date=info['date'])
        if file_path not in file_fd:
            file_fd[file_path] = open(file_path, 'a+')
 
        with AutoLock(file_path + '.lock'):
            file_fd[file_path].write(info['data'])
            file_fd[file_path].flush()
 
        if len(file_path) > 100:
            for value in file_fd.itervalues():
                value.close()
 
 
    def process_request(self, body):
        lines = body.split('\n')
        for line in lines:
            try:
                line_match = PAT.match(line)
                if not line_match:
                    LOG.error('error line:' + line)
                else:
                    date_match = DATE_PAT.match(line_match.groupdict()['date'])
                    if date_match:
                        self.save_data(line_match.groupdict())
            except:
                LOG.error(traceback.format_exc())
                LOG.error('exception line:' + line)
 
 
class LogHandler(HandlerBase):
    pass
 
 
application = tornado.web.Application(
    [
        (r"/.*", LogHandler),
        ]
)
 
 
if __name__ == '__main__':
    server = tornado.httpserver.HTTPServer(application, xheaders=True)
    server.bind(8016)
    server.start(0)  #跟CPU個數同樣的併發進程
    tornado.ioloop.IOLoop.instance().start()
 
相關文章
相關標籤/搜索