讓Redis 異步: 即時返回數據並異步更新數據

Redis return stale data and update data Asynchronouslypython

Redis return stale data and update data Asynchronously

常規的存取redis

python#!/usr/bin/env python
# coding=utf-8
import os
import sys
import redis
from binascii import crc32
import json
import logging

root_path = [os.path.dirname(os.path.dirname(os.path.abspath(__file__)))]
sys.path += root_path


class RedisCache(object):

    redis_pool = None

    def __init__(self, hosts=['localhost'], duration=600):
        self.duration = duration
        self.redis_pool = self.redis_pool or [redis.Redis(connection_pool=redis.ConnectionPool(host=h, port=p, db=d), socket_timeout=0.5) for h,p,d in hosts]

    def _get_redis(self, key=None):
        if not key:
            return self.redis_pool[0]
        idx = crc32(key) % len(self.redis_pool)
        return self.redis_pool[idx]

    def get_value(self, key):
        v = self._get_redis(key).get(key)
        if v:
            try:
                l = json.loads(v)
            except Exception, e:
                raise e
            else:
                return l

        return None

    def set_value(self, key, value, duration=60):
        self._get_redis(key).set(key, json.dumps(value))
        self._get_redis(key).expire(key, int(duration or self.duration))

    def delete(self, key):python
        self._get_redis(key).delete(key)

Stale Redis

設置stale時段,在此期間的get請求即時返回數據以後,經過異步任務更新數據
這裏用了 tornado.ioloop; 任意語言的的異步機制固然沒有問題redis

pythonfrom tornado.ioloop import IOLoop

class StaleRedisCache(RedisCache):
    #def __init__(self, hosts=['localhost'], duration=600, stale=100):
    def __init__(self, hosts=['localhost'], duration=600, stale=7200):
        self.stale = stale
        RedisCache.__init__(self, hosts, duration)
    def get_value(self, key, callback=None, *args, **kwargs):
        l = None
        r = self._get_redis(key)
        res = r.pipeline().ttl(key).get(key).execute()
        if res[1]:
            try:
                l = json.loads(res[1])
            except Exception, e:
                raise e

        if not res[0] or res[0] < self.stale and callback:
            def func():
                value = callback and callback(*args, **kwargs)
                logging.info("set_value for key %s" % key)
                #r.pipeline().set(key, json.dumps(value)).expire(key, kwargs.get('duration', self.duration)).execute()
                r.pipeline().set(key, json.dumps(value)).expire(key, int(kwargs.get('duration', self.duration)) + self.stale).execute()
                return value
            # redis-cli版本不一樣,res[0] 可能爲 None(<=2.6.*) or -2(>=2.6)
            if not res[0] or res[0] == -2:
                return func()

            IOLoop.current().add_timeout(IOLoop.current().time(), func)

        return l

    # 這裏已經不須要單獨的 set_value 接口,由於再 get_value 中已經實現
    def set_value(self, key, value, duration=60):
        pass

使用場景

pythonstale_cache = StaleRedisCache([('localhost', 6379, 0)])

def sc_callback:
    return sth.from.mongo or bla bla bla...

def abc:
    value = stale_cache.get_value('key1', sc_callback)

有沒有很簡潔呢~json

將stale 設置更大

在第一個圖片示意圖中,設置了stale=100, 比 self.duration=600還小;
這樣其實並無發揮 stale異步更新數據的優點;異步

stale Redis 部分的代碼作了2處改動socket

  • self.stale = 7200
  • 異步更新數據後,設置其過時時間爲 self.duration + self.stale

這樣一來,最終的效果是:tornado

  1. 若是數據過時,沒有命中(miss),當即執行callback並返回值,而後將值存入redis,並設置過時時間爲600+7200
  2. 若是hit命中,ttl過時時間大於7200(距離上次更新數據不到600s),直接返回數據
  3. 若是hit命中,ttl過時時間小於7200(距離上次更新數據超過600s),返回數據並異步更新數據
  4. 距離上一次更新數據超過7200+600以後,尚未請求,則數據過時

圖片描述

相關文章
相關標籤/搜索