咱們須要使用 「列表」 來存儲最新日誌文件,使用LPUSH命令將日誌消息推入到列表中。若是咱們以後想要查看已有日誌消息的話,可使用LRANGE命令來拉取列表中的消息。測試
import time import logging import unittest import redis from datetime import datetime # 設置一個字典,將大部分日誌的安全級別映射爲字符串 SEVERITY = { logging.DEBUG: 'debug', logging.INFO: 'info', logging.WARNING: 'warning', logging.ERROR: 'error', logging.CRITICAL: 'critical', } SEVERITY.update((name, name) for name in SEVERITY.values()) """ 存儲最新日誌文件,命名不一樣的日誌消息隊列,根據問題的嚴重性對日誌進行分級 @param {object} @param {string} name 消息隊列名稱 @param {string} message 消息 @param {string} severity安全級別 @param {object} pip pipline """ def logRecent(conn, name, message, severity=logging.INFO, pip=None): # 將日誌的安全級別轉換爲簡單的字符串 severity = str(SEVERITY.get(severity, severity)).lower() # 建立要保存的redis列表key destination = 'recent:%s:%s'%(name, severity) # 將當前時間加到消息裏面,用於記錄消息的發送時間 message = time.asctime() + ' ' + message # 使用流水線來將通訊往返次數下降爲一次 pipe = pip or conn.pipeline() # 將消息添加到列表的最前面 pipe.lpush(destination, message) # 修剪日誌列表,讓它只包含最新的100條消息 pipe.ltrim(destination, 0, 99) pipe.execute()
""" 記錄較高頻率出現的日誌,每小時一次的頻率對消息進行輪換,並在輪換日誌的時候保留上一個小時記錄的常見消息 @param {object} @param {string} name 消息隊列名稱 @param {string} message 消息 @param {string} severity安全級別 @param {int} timeout 執行超時時間 """ def logCommon(conn, name, message, severity=logging.INFO, timeout=5): # 設置日誌安全級別 severity = str(SEVERITY.get(severity, severity)).lower() # 負責存儲近期的常見日誌消息的鍵 destination = 'common:%s:%s'%(name, severity) # 每小時須要輪換一第二天志,須要記錄當前的小時數 start_key = destination + ':start' pipe = conn.pipeline() end = time.time() + timeout while time.time() < end: try: # 對記錄當前小時數的鍵進行監聽,確保輪換操做能夠正常進行 pipe.watch(start_key) # 當前時間 now = datetime.utcnow().timetuple() # 取得當前所處的小時數 hour_start = datetime(*now[:4]).isoformat() existing = pipe.get(start_key) # 開始事務 pipe.multi() # 若是這個常見日誌消息記錄的是上個小時的日誌 if existing and existing < hour_start: # 將這些舊的常見日誌歸檔 pipe.rename(destination, destination + ':last') pipe.rename(start_key, destination + ':pstart') # 更新當前所處的小時數 pipe.set(start_key, hour_start) elif not existing: pipe.set(start_key, hour_start) # 記錄日誌出現次數 pipe.zincrby(destination, message) # 將日誌記錄到日誌列表中,調用excute logRecent(pipe, name, message, severity, pipe) return except redis.exceptions.WatchError: continue
class TestLog(unittest.TestCase): def setUp(self): import redis self.conn = redis.Redis(db=15) self.conn.flushdb def tearDown(self): self.conn.flushdb() del self.conn print print def testLogRecent(self): import pprint conn = self.conn print "Let's write a few logs to the recent log" for msg in xrange(5): logRecent(conn, 'test', 'this is message %s'%msg) recent = conn.lrange('recent:test:info', 0, -1) print 'The current recent message log has this many message:', len(recent) print 'Those message include:' pprint.pprint(recent[:10]) self.assertTrue(len(recent) >= 5) def testLogCommon(self): import pprint conn = self.conn print "Let's writ a few logs to the common log" for count in xrange(1, 6): for i in xrange(count): logCommon(conn, 'test', 'message-%s'%count) common = conn.zrevrange('common:test:info', 0, -1, withscores=True) print 'The current common message log has this many message:', len(common) print 'Those common message include:' pprint.pprint(common) self.assertTrue(len(common) >= 5) if __name__ == '__main__': unittest.main()