通常來講,在對數據進行「加鎖」時,程序首先須要經過獲取(acquire)
鎖來獲得對數據進行排他性訪問的能力,而後才能對數據執行一系列操做,最後還要釋放(release)
給其餘程序。對於可以被多個線程訪問的共享內存數據結構(shared-memory data structure)
來講,這種「先獲取鎖,而後執行操做,最後釋放鎖」的動做很是常見。Redis使用WATCH命令來代替對數據進行加鎖,由於WATCH只會在數據被其餘客戶端搶先修改了的狀況下通知執行了這個命令的客戶端,而不會阻止其餘客戶端對數據的修改,因此這個命令被稱爲樂觀鎖(optimistic locking)
。
分佈式鎖也有相似的「首先獲取鎖,而後執行操做,最後釋放鎖」動做,但這種鎖既不是給同一個進程中的多個線程使用,也不是給同一臺機器上的多個進程使用,而是由不一樣機器上的不一樣Redis客戶端進行獲取和釋放的。redis
爲了防止客戶端在取得鎖以後崩潰,並致使鎖一直處於「已被獲取」的狀態,最終版的鎖實現將帶有超時限制特性:若是得到鎖的進程未能在指定的時限內完成操做,那麼鎖將自動釋放。數據結構
致使鎖出現不正確行爲的緣由,以及鎖在不正確運行時的症狀:
持有鎖的進程由於操做時間過長而致使鎖被自動釋放,但進程自己並不知曉這一點,甚至還可能會錯誤地釋放掉了其餘進程持有的鎖。
一個持有鎖並打算執行長時間操做的進程已經崩潰,但其餘想要獲取鎖的進程不知道哪一個進程持有着鎖,也沒法檢測出持有鎖的進程已經崩潰,只能白白地浪費時間等待鎖被釋放。
在一個進程持有的鎖過時以後,其餘多個進程同時嘗試去獲取鎖,而且都得到了鎖。
上面第一種狀況和第三種狀況同時出現,致使有多個進程得到了鎖,而每一個進程都覺得本身是惟一一個得到鎖的進程。分佈式
簡易鎖
爲了對數據進行排他性訪問,程序首先要作的就是獲取鎖。SETNX命令天生就適合用來實現鎖的獲取
功能,這個命令只會在鍵不存在
的狀況下爲鍵賦值,而鎖要作的就是將一個隨機生成的128位UUID設置爲鍵的值,並使用這個值來防止鎖被其餘進程取得。
若是程序嘗試獲取鎖的時候失敗,那麼它將不斷地進行重試,直到成功地取得鎖或者超過給定的時限爲止。ide
def acquire_lock(conn, lockname, acquire_timeout=10): identifier = str(uuid.uuid4()) //128位隨機標識符 end = time.time() + acquire_timeout while time.time() < end: if conn.setnx('lock:' + lockname, identifier): //嘗試獲取鎖 return identifier time.sleep(.001) return False
下面代碼展現了使用鎖從新實現的商品購買操做:程序首先對市場進行加鎖,接着檢查商品的價格,並在確保買家有足夠的錢來購買商品以後,對錢和商品進行相應的轉移。當操做執行完以後,程序就會釋放鎖。函數
def purchase_item_with_lock(conn, buyerid, itemid, sellerid): buyer = "users:%s"%buyerid sellerid = "users:%s"%sellerid item = "%s.%s"%(itemid, sellerid) inventory = "inventory:%s"%buyerid locked = acquire_lock(conn, market) if not locked: return False pipe = conn.pipeline(True) try://檢查指定的商品是否仍在出售,以及買家是否有足夠的錢來購買該商品 pipe.zscore("market:", item) pipe.hget(buyer, 'funds') price, funds = pipe.execute() if price is None or price > funds: return None pipe.hincrby(seller, 'funds', int(price)) pipe.hincrby(buyer, 'funds', int(-price)) pipe.sadd(inventory, itemid) pipe.zrem("market:", item) pipe.execute() return True finally: release_lock(conn, market, locked) //釋放鎖
上面代碼的鎖彷佛是用來加鎖整個購買操做的,但實際上這把鎖是用來鎖住市場數據的,它之因此會包圍着執行購買操做的代碼,是由於程序在操做市場數據期間必須一直持有鎖。測試
接下面的代碼release_lock函數展現了鎖釋放操做的實現代碼:函數首先使用WATCH命令監視表明鎖的鍵,接着檢查鍵目前的值是否和加鎖時設置的值相同,而且確認值沒有變化以後刪除該鍵(這個檢查還能夠防止程序錯誤地釋放同一個鎖屢次)。ui
def release_lock(conn, lockname, identifier): pipe = conn.pipeline(True) lockname = 'lock:' + lockname while True: try: pipe.watch(lockname) if pipe.get(lockname) == identifier: pipe.multi() pipe.delete(lockname) pipe.execute() return True pipe.unwatch() break except redis.exceptions.WatchError: pass return False
通過測試,與以前WATCH實現相比,鎖實現的上架商品數量雖然有所減小,可是在買入商品時卻不須要進行重試,而且上架商品數量和買入商品數量之間的比率,也跟賣家數量和買家數量之間的比率接近。線程
帶有超時限制的鎖
目前的鎖實如今持有者崩潰的時候不會自動釋放,這將致使鎖一直處於已被獲取的狀態。爲了解決這個問題,咱們將爲鎖加上超時功能。code
爲了給鎖加上超時的限制特性,程序將在取得鎖以後,調用EXPIRE命令來爲鎖設置過時時間,使得Redis能夠自動刪除超時的鎖。爲了確保鎖在客戶端已經崩潰(客戶端在執行介於SETNX和EXPIRE之間的時候崩潰是最糟糕的)的狀況下仍然可以自動被釋放,客戶端會嘗試獲取鎖失敗以後,檢查鎖的超時時間,併爲未設置超時時間的鎖設置超時時間。由於鎖總會帶有超時時間,並最終由於超時而自動被釋放,使得其餘客戶端能夠繼續嘗試獲取已被釋放的鎖。進程
須要注意的一點是,由於多個客戶端在同一時間內設置的超時時間基本上都是相同的,因此即便有多個客戶端同時爲同一個鎖設置超時時間,鎖的超時時間也不會產生太大變化。
def acquire_lock_with_timeout(conn, lockname, acquire_timeout=10, lock_timeout=10): identifier = str(uuid.uuid4()) lockname = 'lock:' + lockname lock_timeout = int(math.ceil(lock_timeout)) end = time.time() + acquire_timeout while time.time() < end: if conn.setnx(lockname, identifier): conn.expire(lockname, lock_timeout) return identifier elif not conn.ttl(lockname): conn.expire(lockname, lock_timeout) time.sleep(.001) return False