在構建一個系統時,咱們經常須要記錄當前發生的事情,以及記錄特定消息出現的頻率,根據出現頻率的高低來決定消息的排列信息,幫助咱們找到重要的信息。redis
常見記錄日誌的方法有兩種:安全
syslog的轉發功能能夠將不一樣的日誌分別存儲在同一臺服務器的多個文件裏面,對於長時間地記錄日誌很是有幫助。咱們可使用redis來存儲與時間緊密相關的日誌,從而在功能上替代那些須要在短時間內被存儲的syslog消息。服務器
咱們須要使用 「列表」 來存儲最新日誌文件,使用LPUSH命令將日誌消息推入到列表中。若是咱們以後想要查看已有日誌消息的話,可使用LRANGE命令來拉取列表中的消息。測試
咱們還要命名不一樣的日誌消息隊列,根據問題的嚴重性對日誌進行分級。this
import time import logging import unittest import redis from datetime import datetime # 設置一個字典,將大部分日誌的安全級別映射爲字符串 SEVERITY = { logging.DEBUG: 'debug', logging.INFO: 'info', logging.WARNING: 'warning', logging.ERROR: 'error', logging.CRITICAL: 'critical', } SEVERITY.update((name, name) for name in SEVERITY.values()) """ 存儲最新日誌文件,命名不一樣的日誌消息隊列,根據問題的嚴重性對日誌進行分級 @param {object} @param {string} name 消息隊列名稱 @param {string} message 消息 @param {string} severity安全級別 @param {object} pip pipline """ def logRecent(conn, name, message, severity=logging.INFO, pip=None): # 將日誌的安全級別轉換爲簡單的字符串 severity = str(SEVERITY.get(severity, severity)).lower() # 建立要保存的redis列表key destination = 'recent:%s:%s'%(name, severity) # 將當前時間加到消息裏面,用於記錄消息的發送時間 message = time.asctime() + ' ' + message # 使用流水線來將通訊往返次數下降爲一次 pipe = pip or conn.pipeline() # 將消息添加到列表的最前面 pipe.lpush(destination, message) # 修剪日誌列表,讓它只包含最新的100條消息 pipe.ltrim(destination, 0, 99) pipe.execute()
咱們須要記錄較高頻率出現的日誌,使用「有序集合」,將消息做爲成員,消息出現的頻率爲成員的分值。debug
爲了確保咱們看到的常見消息都是最新的,須要以每小時一次的頻率對消息進行輪換,並在輪換日誌的時候保留上一個小時記錄的常見消息,從而防止沒有任何消息存儲的狀況出現。日誌
""" 記錄較高頻率出現的日誌,每小時一次的頻率對消息進行輪換,並在輪換日誌的時候保留上一個小時記錄的常見消息 @param {object} @param {string} name 消息隊列名稱 @param {string} message 消息 @param {string} severity安全級別 @param {int} timeout 執行超時時間 """ def logCommon(conn, name, message, severity=logging.INFO, timeout=5): # 設置日誌安全級別 severity = str(SEVERITY.get(severity, severity)).lower() # 負責存儲近期的常見日誌消息的鍵 destination = 'common:%s:%s'%(name, severity) # 每小時須要輪換一第二天志,須要記錄當前的小時數 start_key = destination + ':start' pipe = conn.pipeline() end = time.time() + timeout while time.time() < end: try: # 對記錄當前小時數的鍵進行監聽,確保輪換操做能夠正常進行 pipe.watch(start_key) # 當前時間 now = datetime.utcnow().timetuple() # 取得當前所處的小時數 hour_start = datetime(*now[:4]).isoformat() existing = pipe.get(start_key) # 開始事務 pipe.multi() # 若是這個常見日誌消息記錄的是上個小時的日誌 if existing and existing < hour_start: # 將這些舊的常見日誌歸檔 pipe.rename(destination, destination + ':last') pipe.rename(start_key, destination + ':pstart') # 更新當前所處的小時數 pipe.set(start_key, hour_start) elif not existing: pipe.set(start_key, hour_start) # 記錄日誌出現次數 pipe.zincrby(destination, message) # 將日誌記錄到日誌列表中,調用excute logRecent(pipe, name, message, severity, pipe) return except redis.exceptions.WatchError: continue
測試
測試代碼以下:code
class TestLog(unittest.TestCase): def setUp(self): import redis self.conn = redis.Redis(db=15) self.conn.flushdb def tearDown(self): self.conn.flushdb() del self.conn print print def testLogRecent(self): import pprint conn = self.conn print "Let's write a few logs to the recent log" for msg in xrange(5): logRecent(conn, 'test', 'this is message %s'%msg) recent = conn.lrange('recent:test:info', 0, -1) print 'The current recent message log has this many message:', len(recent) print 'Those message include:' pprint.pprint(recent[:10]) self.assertTrue(len(recent) >= 5) def testLogCommon(self): import pprint conn = self.conn print "Let's writ a few logs to the common log" for count in xrange(1, 6): for i in xrange(count): logCommon(conn, 'test', 'message-%s'%count) common = conn.zrevrange('common:test:info', 0, -1, withscores=True) print 'The current common message log has this many message:', len(common) print 'Those common message include:' pprint.pprint(common) self.assertTrue(len(common) >= 5) if __name__ == '__main__': unittest.main()