僞代碼模型java
# get lock lock = 0 while lock != 1: timestamp = current Unix time + lock timeout + 1 lock = SETNX lock.foo timestamp if lock == 1 or (now() > (GET lock.foo) and now() > (GETSET lock.foo timestamp)): break; else: sleep(10ms) # do your job do_job() # release if now() < GET lock.foo: DEL lock.foo
Redis爲單進程單線程模式,採用隊列模式將併發訪問變爲串行訪問。Redis自己沒有鎖的概念,Redis對於多個客戶端鏈接並不存在競爭,可是在Jedis客戶端對Redis進行併發訪問時會發生鏈接超時、數據轉換錯誤、阻塞、客戶端關閉鏈接等問題,這些問題均是因爲客戶端鏈接混亂形成。對此有2種解決方法:python
1.客戶端角度,爲保證每一個客戶端間正常有序與Redis進行通訊,對鏈接進行池化,同時對客戶端讀寫Redis操做採用內部鎖synchronized。redis
2.服務器角度,利用setnx實現鎖。如某客戶端要得到一個名字list的鎖,客戶端使用下面的命令進行獲取:算法
Setnx lock.list current time + lock timeout服務器
如返回1,則該客戶端得到鎖,把lock. list的鍵值設置爲時間值表示該鍵已被鎖定,該客戶端最後能夠經過DEL lock.list來釋放該鎖。併發
如返回0,代表該鎖已被其餘客戶端取得,等對方完成或等待鎖超時。分佈式
第二種須要用到Redis的setnx命令,可是須要注意一些問題。ide
語法:SETNX key valueui
功能:將 key 的值設爲 value ,當且僅當 key 不存在;若給定的 key 已經存在,則 SETNX 不作任何動做。
時間複雜度:O(1)
返回值:設置成功,返回 1 。設置失敗,返回 0 。
模式:將 SETNX 用於加鎖(locking),SETNX 能夠用做加鎖原語(locking primitive)。好比說,要對關鍵字(key) foo 加鎖,客戶端能夠嘗試如下方式:
SETNX lock.foo <current Unix time + lock timeout + 1>
若是 SETNX 返回 1 ,說明客戶端已經得到了鎖, key 設置的unix時間則指定了鎖失效的時間。以後客戶端能夠經過 DEL lock.foo 來釋放鎖。
若是 SETNX 返回 0 ,說明 key 已經被其餘客戶端上鎖了。若是鎖是非阻塞(non blocking lock)的,咱們能夠選擇返回調用,或者進入一個重試循環,直到成功得到鎖或重試超時(timeout)。
可是已經證明僅僅使用SETNX加鎖帶有競爭條件,在特定的狀況下會形成錯誤。
處理死鎖(deadlock)
上面的鎖算法有一個問題:若是由於客戶端失敗、崩潰或其餘緣由致使沒有辦法釋放鎖的話,怎麼辦?
這種情況能夠經過檢測發現——由於上鎖的 key 保存的是 unix 時間戳,假如 key 值的時間戳小於當前的時間戳,表示鎖已經再也不有效。
可是,當有多個客戶端同時檢測一個鎖是否過時並嘗試釋放它的時候,咱們不能簡單粗暴地刪除死鎖的 key ,再用 SETNX 上鎖,由於這時競爭條件(race condition)已經造成了:
C1 和 C2 讀取 lock.foo 並檢查時間戳, SETNX 都返回 0 ,由於它已經被 C3 鎖上了,但 C3 在上鎖以後就崩潰(crashed)了。
C1 向 lock.foo 發送 DEL 命令。
C1 向 lock.foo 發送 SETNX 併成功。
C2 向 lock.foo 發送 DEL 命令。
C2 向 lock.foo 發送 SETNX 併成功。
出錯:由於競爭條件的關係,C1 和 C2 兩個都得到了鎖。
幸虧,如下算法能夠避免以上問題。來看看咱們聰明的 C4 客戶端怎麼辦:
C4 向 lock.foo 發送 SETNX 命令。
由於崩潰掉的 C3 還鎖着 lock.foo ,因此 Redis 向 C4 返回 0 。
C4 向 lock.foo 發送 GET 命令,查看 lock.foo 的鎖是否過時。若是不,則休眠(sleep)一段時間,並在以後重試。
另外一方面,若是 lock.foo 內的 unix 時間戳比當前時間戳老,C4 執行如下命令:
GETSET lock.foo <current Unix timestamp + lock timeout + 1>
由於 GETSET 的做用,C4 能夠檢查看 GETSET 的返回值,肯定 lock.foo 以前儲存的舊值還是那個過時時間戳,若是是的話,那麼 C4 得到鎖。
若是其餘客戶端,好比 C5,比 C4 更快地執行了 GETSET 操做並得到鎖,那麼 C4 的 GETSET 操做返回的就是一個未過時的時間戳(C5 設置的時間戳)。C4 只好從第一步開始重試。注意,即使 C4 的 GETSET 操做對 key 進行了修改,這對將來也沒什麼影響。
這裏假設鎖key對應的value沒有實際業務意義,不然會有問題,並且其實其value也確實不該該用在業務中。spa
爲了讓這個加鎖算法更健壯,得到鎖的客戶端應該經常檢查過時時間以避免鎖因諸如 DEL 等命令的執行而被意外解開,由於客戶端失敗的狀況很是複雜,不只僅是崩潰這麼簡單,還多是客戶端由於某些操做被阻塞了至關長時間,緊接着 DEL 命令被嘗試執行(但這時鎖卻在另外的客戶端手上)。
語法:GETSET key value
功能:將給定 key 的值設爲 value ,並返回 key 的舊值(old value)。當 key 存在但不是字符串類型時,返回一個錯誤。
時間複雜度:O(1)
返回值:返回給定 key 的舊值;當 key 沒有舊值時,也便是, key 不存在時,返回 nil 。
Redis有一系列的命令,特色是以NX結尾,NX是Not eXists的縮寫,如SETNX命令就應該理解爲:SET if Not eXists。這系列的命令很是有用,這裏講使用SETNX來實現分佈式鎖。
用SETNX實現分佈式鎖
利用SETNX很是簡單地實現分佈式鎖。例如:某客戶端要得到一個名字foo的鎖,客戶端使用下面的命令進行獲取:
SETNX lock.foo <current Unix time + lock timeout + 1>
解決死鎖
上面的鎖定邏輯有一個問題:若是一個持有鎖的客戶端失敗或崩潰了不能釋放鎖,該怎麼解決?咱們能夠經過鎖的鍵對應的時間戳來判斷這種狀況是否發生了,若是當前的時間已經大於lock.foo的值,說明該鎖已失效,能夠被從新使用。
發生這種狀況時,可不能簡單的經過DEL來刪除鎖,而後再SETNX一次,當多個客戶端檢測到鎖超時後都會嘗試去釋放它,這裏就可能出現一個競態條件,讓咱們模擬一下這個場景:
C0操做超時了,但它還持有着鎖,C1和C2讀取lock.foo檢查時間戳,前後發現超時了。
C1 發送DEL lock.foo
C1 發送SETNX lock.foo 而且成功了。
C2 發送DEL lock.foo
C2 發送SETNX lock.foo 而且成功了。
這樣一來,C1,C2都拿到了鎖!問題大了!
幸虧這種問題是能夠避免的,讓咱們來看看C3這個客戶端是怎樣作的:
C3發送SETNX lock.foo 想要得到鎖,因爲C0還持有鎖,因此Redis返回給C3一個0
C3發送GET lock.foo 以檢查鎖是否超時了,若是沒超時,則等待或重試。
反之,若是已超時,C3經過下面的操做來嘗試得到鎖:
GETSET lock.foo <current Unix time + lock timeout + 1>
經過GETSET,C3拿到的時間戳若是仍然是超時的,那就說明,C3如願以償拿到鎖了。
若是在C3以前,有個叫C4的客戶端比C3快一步執行了上面的操做,那麼C3拿到的時間戳是個未超時的值,這時,C3沒有如期得到鎖,須要再次等待或重試。留意一下,儘管C3沒拿到鎖,但它改寫了C4設置的鎖的超時值,不過這一點很是微小的偏差帶來的影響能夠忽略不計。
注意:爲了讓分佈式鎖的算法更穩鍵些,持有鎖的客戶端在解鎖以前應該再檢查一次本身的鎖是否已經超時,再去作DEL操做,由於可能客戶端由於某個耗時的操做而掛起,操做完的時候鎖由於超時已經被別人得到,這時就沒必要解鎖了。
示例僞代碼
根據上面的代碼,我寫了一小段Fake代碼來描述使用分佈式鎖的全過程:
# get lock
lock = 0
while lock != 1:
timestamp = current Unix time + lock timeout + 1
lock = SETNX lock.foo timestamp
if lock == 1 or (now() > (GET lock.foo) and now() > (GETSET lock.foo timestamp)):
break;
else:
sleep(10ms)
# do your job
do_job()
# release
if now() < GET lock.foo:
DEL lock.foo
是的,要想這段邏輯能夠重用,使用python的你立刻就想到了Decorator,而用Java的你是否是也想到了那誰?AOP + annotation?行,怎樣舒服怎樣用吧,別重複代碼就行。
java之jedis實現
expireMsecs 鎖持有超時,防止線程在入鎖之後,無限的執行下去,讓鎖沒法釋放
timeoutMsecs 鎖等待超時,防止線程飢餓,永遠沒有入鎖執行代碼的機會
/** * Acquire lock. * * @param jedis * @return true if lock is acquired, false acquire timeouted * @throws InterruptedException * in case of thread interruption */ public synchronized boolean acquire(Jedis jedis) throws InterruptedException { int timeout = timeoutMsecs; while (timeout >= 0) { long expires = System.currentTimeMillis() + expireMsecs + 1; String expiresStr = String.valueOf(expires); //鎖到期時間 if (jedis.setnx(lockKey, expiresStr) == 1) { // lock acquired locked = true; return true; } String currentValueStr = jedis.get(lockKey); //redis裏的時間 if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) { //判斷是否爲空,不爲空的狀況下,若是被其餘線程設置了值,則第二個條件判斷是過不去的 // lock is expired String oldValueStr = jedis.getSet(lockKey, expiresStr); //獲取上一個鎖到期時間,並設置如今的鎖到期時間, //只有一個線程才能獲取上一個線上的設置時間,由於jedis.getSet是同步的 if (oldValueStr != null && oldValueStr.equals(currentValueStr)) { //如過這個時候,多個線程剛好都到了這裏,可是隻有一個線程的設置值和當前值相同,他纔有權利獲取鎖 // lock acquired locked = true; return true; } } timeout -= 100; Thread.sleep(100); } return false; }
通常用法
其中不少繁瑣的邊緣代碼,包括:異常處理,釋放資源等等 。
JedisPool pool; JedisLock jedisLock = new JedisLock(pool.getResource(), lockKey, timeoutMsecs, expireMsecs); try { if (jedisLock.acquire()) { // 啓用鎖 //執行業務邏輯 } else { logger.info("The time wait for lock more than [{}] ms ", timeoutMsecs); } } catch (Throwable t) { // 分佈式鎖異常 logger.warn(t.getMessage(), t); } finally { if (jedisLock != null) { try { jedisLock.release();// 則解鎖 } catch (Exception e) { } } if (jedis != null) { try { pool.returnResource(jedis);// 還到鏈接池裏 } catch (Exception e) { } } }
犀利用法
用匿名類來實現,代碼很是簡潔 至於SimpleLock的實現
SimpleLock lock = new SimpleLock(key); lock.wrap(new Runnable() { @Override public void run() { //此處代碼是鎖上的 } });