Redis return stale data and update data Asynchronouslypython
python#!/usr/bin/env python # coding=utf-8 import os import sys import redis from binascii import crc32 import json import logging root_path = [os.path.dirname(os.path.dirname(os.path.abspath(__file__)))] sys.path += root_path class RedisCache(object): redis_pool = None def __init__(self, hosts=['localhost'], duration=600): self.duration = duration self.redis_pool = self.redis_pool or [redis.Redis(connection_pool=redis.ConnectionPool(host=h, port=p, db=d), socket_timeout=0.5) for h,p,d in hosts] def _get_redis(self, key=None): if not key: return self.redis_pool[0] idx = crc32(key) % len(self.redis_pool) return self.redis_pool[idx] def get_value(self, key): v = self._get_redis(key).get(key) if v: try: l = json.loads(v) except Exception, e: raise e else: return l return None def set_value(self, key, value, duration=60): self._get_redis(key).set(key, json.dumps(value)) self._get_redis(key).expire(key, int(duration or self.duration)) def delete(self, key):python self._get_redis(key).delete(key)
設置stale時段,在此期間的get請求即時返回數據以後,經過異步任務更新數據
這裏用了 tornado.ioloop; 任意語言的的異步機制固然沒有問題redis
pythonfrom tornado.ioloop import IOLoop class StaleRedisCache(RedisCache): #def __init__(self, hosts=['localhost'], duration=600, stale=100): def __init__(self, hosts=['localhost'], duration=600, stale=7200): self.stale = stale RedisCache.__init__(self, hosts, duration) def get_value(self, key, callback=None, *args, **kwargs): l = None r = self._get_redis(key) res = r.pipeline().ttl(key).get(key).execute() if res[1]: try: l = json.loads(res[1]) except Exception, e: raise e if not res[0] or res[0] < self.stale and callback: def func(): value = callback and callback(*args, **kwargs) logging.info("set_value for key %s" % key) #r.pipeline().set(key, json.dumps(value)).expire(key, kwargs.get('duration', self.duration)).execute() r.pipeline().set(key, json.dumps(value)).expire(key, int(kwargs.get('duration', self.duration)) + self.stale).execute() return value # redis-cli版本不一樣,res[0] 可能爲 None(<=2.6.*) or -2(>=2.6) if not res[0] or res[0] == -2: return func() IOLoop.current().add_timeout(IOLoop.current().time(), func) return l # 這裏已經不須要單獨的 set_value 接口,由於再 get_value 中已經實現 def set_value(self, key, value, duration=60): pass
pythonstale_cache = StaleRedisCache([('localhost', 6379, 0)]) def sc_callback: return sth.from.mongo or bla bla bla... def abc: value = stale_cache.get_value('key1', sc_callback)
有沒有很簡潔呢~json
在第一個圖片示意圖中,設置了stale=100
, 比 self.duration=600
還小;
這樣其實並無發揮 stale異步更新數據的優點;異步
stale Redis 部分的代碼作了2處改動socket
這樣一來,最終的效果是:tornado