Acquire lock when time expired: GETSET lock.foo <current Unix timestamp + lock timeout + 1>
經過 GETSET ,C4 拿到的時間戳若是仍然是超時的,那就代表 C4 如願以償拿到鎖了。併發
若是在 C4 以前,有個叫 C5 的客戶端比 C4 快一步執行了上面的操做,那麼 C4 拿到的時間戳是個未超時的值,這時,C4 沒有如期得到鎖,須要再次等待或重試。留意一下,儘管 C4 沒拿到鎖,但它改寫了 C5 設置的鎖的超時值,可是這點微小的偏差(通常狀況下鎖的持有的時間很是短,因此在該競態下出現的偏差是能夠容忍的)是能夠容忍的。(Note that even if C4 set the key a bit a few seconds in the future this is not a problem)。
爲了這個鎖的算法更健壯一些,持有鎖的客戶端在解鎖以前應該再檢查一次本身的鎖是沒有超時,再去作 DEL 操做,由於客戶端失敗的緣由很複雜,不只僅是崩潰也多是由於某個耗時的操做而掛起,操做完的時候鎖由於超時已經鎖已經被別人得到,這時就沒必要解鎖了。app
仔細的分析這個方案,咱們就會發現這裏有一個漏洞:Release lock 使用的 DEL 命令不支持 CAS 刪除(check-and-set,delete if current value equals old value),在高併發狀況下就會有一些問題:確認持有的鎖沒有超時後執行 DEL 釋放鎖,因爲競態的存在 Redis 服務器執行命令時鎖可能已過時( 「真的」 恰好過時或者被其餘客戶端競爭鎖時設置了一個較小的過時時間而致使過時)且被其餘客戶端持有。這種狀況下將會(非法)釋放其餘客戶端持有的鎖。
解決方案: 先肯定鎖沒有超時,再經過 EVAL 命令(在 Redis 2.6 及以上版本提供) 在執行 Lua 腳本:先執行 GET 指令獲取鎖的時間戳,確認和本身的時間戳一致後再執行 DEL 釋放鎖。
設計缺陷:
上述解決方案並不完美,只解決了過時鎖的釋放問題,可是因爲這個方案自己的缺陷,客戶端獲取鎖時發生競爭(C4 改寫 C5 時間戳的例子),那麼 lock.foo 的 「時間戳」 將與本地的不一致,這個時候不會執行 DEL 命令,而是等待鎖失效,這在高併發的環境下是低效的。
classLock(object): """ A shared, distributed Lock that uses a Redis server to hold its state. This Lock can be shared across processes and/or machines. It works asynchronously and plays nice with the Tornado IOLoop. """
LOCK_FOREVER = float(2 ** 31 + 1) # 1 past max unix time
def__init__(self, redis_client, lock_name, lock_ttl=None, polling_interval=0.1): """ Create a new Lock object using the Redis key ``lock_name`` for state, that behaves like a threading.Lock. This method is synchronous, and returns immediately. It doesn't acquire the Lock or in fact trigger any sort of communications with the Redis server. This must be done using the Lock object itself. If specified, ``lock_ttl`` indicates the maximum life time for the lock. If none is specified, it will remain locked until release() is called. ``polling_interval`` indicates the time between acquire attempts (polling) when the lock is in blocking mode and another client is currently holding the lock. Note: If using ``lock_ttl``, you should make sure all the hosts that are running clients have their time synchronized with a network time service like ntp. """ self.redis_client = redis_client self.lock_name = lock_name self.acquired_until = None self.lock_ttl = lock_ttl self.polling_interval = polling_interval if self.lock_ttl and self.polling_interval > self.lock_ttl: raise LockError("'polling_interval' must be less than 'lock_ttl'")
@gen.engine defacquire(self, blocking=True, callback=None): """ Acquire the lock. Returns True once the lock is acquired. If ``blocking`` is False, always return immediately. If the lock was acquired, return True, otherwise return False. Otherwise, block until the lock is acquired (or an error occurs). If ``callback`` is supplied, it is called with the result. """
# Loop until we have a conclusive result while1:
# Get the current time unixtime = int(mod_time.time())
# If the lock has a limited lifetime, create a timeout value if self.lock_ttl: timeout_at = unixtime + self.lock_ttl # Otherwise, set the timeout value at forever (dangerous) else: timeout_at = Lock.LOCK_FOREVER timeout_at = float(timeout_at)
# Try and get the lock, setting the timeout value in the appropriate key, # but only if a previous value does not exist in Redis result = yield gen.Task(self.redis_client.setnx, self.lock_name, timeout_at)
# If we managed to get the lock if result:
# We successfully acquired the lock! self.acquired_until = timeout_at if callback: callback(True) return
# We didn't get the lock, another value is already there # Check to see if the current lock timeout value has already expired result = yield gen.Task(self.redis_client.get, self.lock_name) existing = float(result or1)
# Has it expired? if existing < unixtime:
# The previous lock is expired. We attempt to overwrite it, getting the current value # in the server, just in case someone tried to get the lock at the same time result = yield gen.Task(self.redis_client.getset, self.lock_name, timeout_at) existing = float(result or1)
# If the value we read is older than our own current timestamp, we managed to get the # lock with no issues - the timeout has indeed expired if existing < unixtime:
# We successfully acquired the lock! self.acquired_until = timeout_at if callback: callback(True) return
# However, if we got here, then the value read from the Redis server is newer than # our own current timestamp - meaning someone already got the lock before us. # We failed getting the lock.
# If we are not signalled to block ifnot blocking:
# We failed acquiring the lock... if callback: callback(False) return
# Otherwise, we "sleep" for an amount of time equal to the polling interval, after which # we will try getting the lock again. yield gen.Task(self.redis_client._io_loop.add_timeout, self.redis_client._io_loop.time() + self.polling_interval)
@gen.engine defrelease(self, callback=None): """ Releases the already acquired lock. If ``callback`` is supplied, it is called with True when finished. """
if self.acquired_until isNone: raise ValueError("Cannot release an unlocked lock")
# Get the current lock value result = yield gen.Task(self.redis_client.get, self.lock_name) existing = float(result or1)
# 從上下文代碼中能夠看出,在這個實現中,有一個限制:獲取鎖的時候設置的 lock_ttl 必須可以保證釋放鎖時,鎖未過時。 # 不然,當前鎖過時後,將會非法釋放其餘客戶端持有的鎖。若是沒法估計持有鎖後代碼的執行時間,則能夠增長當前鎖的過時檢測, # 當 self.acquired_until <= int(mod_time.time()) 時不執行 DEL 命令。不過,這個限制在通常的應用中卻是能夠知足, # 因此這個實現不會有太大的問題。 # 因爲 GET、DEL 之間的時間差,以及 DEL 命令發出到 執行 之間的時間差,高併發狀況下,鎖過時釋放的問題依然存在,這個是 # 算法缺陷。併發不大的狀況下,問題不大。 # # 注:這個條件判斷 existing >= self.acquired_until 是有這樣一個潛在的前提,使用鎖的客戶端代碼正常運行的狀況下, # 考慮到併發代碼使用相同的 lock_ttl 獲取鎖,競爭失敗的客戶端將會把鎖的過時時間設置的更長一些,這裏的判斷是有意義的。 # If the lock time is in the future, delete the lock if existing >= self.acquired_until: yield gen.Task(self.redis_client.delete, self.lock_name) self.acquired_until = None