【分佈式鎖】redis實現

轉載:https://www.jianshu.com/p/c970cc710

SETNX命令簡介redis

SETNX key value
將key的值設爲value,而且僅當key不存在。
若給定的key已經存在,則SETNX不作任何操做。
SETNX 是SET if Not eXists的簡寫。
返回整數,具體爲算法

  • 1,當 key 的值被設置
  • 0,當 key 的值沒被設置

使用SETNX實現分佈式鎖

多個進程執行如下Redis命令:網絡

SETNX lock.foo <current Unix time + lock timeout + 1>分佈式

若是 SETNX 返回1,說明該進程得到鎖,SETNX將鍵 lock.foo 的值設置爲鎖的超時時間(當前時間 + 鎖的有效時間)。
若是 SETNX 返回0,說明其餘進程已經得到了鎖,進程不能進入臨界區。進程能夠在一個循環中不斷地嘗試 SETNX 操做,以得到鎖。ide

 

解決死鎖

正常第一反應利用SETNX實現分佈式鎖多是這樣的ui

if(SETNX key value){//若是設置成功表示拿到了鎖
    return true;
}
return false;
View Code

而後釋放鎖的時候就直接 DEL掉;
簡單思路是這樣,可是這樣會有不少問題this

  • 若是一個進程得到鎖以後,斷開了與redis的鏈接(進程掛斷或者網絡中斷),那麼鎖一直的不斷釋放,其餘的進程就一直獲取不到鎖,就出現了 「死鎖」
  • 然而,鎖超時時,咱們不能簡單地使用 DEL 命令刪除鍵 lock.foo 以釋放鎖。考慮如下狀況,進程P1已經首先得到了鎖 lock.foo,而後進程P1掛掉了。進程P2,P3正在不斷地檢測鎖是否已釋放或者已超時,執行流程以下:
    1 . P2和P3進程讀取鍵 lock.foo 的值,檢測鎖是否已超時(經過比較當前時間和鍵 lock.foo 的值來判斷是否超時)
    2.P2和P3進程發現鎖 lock.foo 已超時
    3.P2執行 DEL lock.foo命令
    4.P2執行 SETNX lock.foo命令,並返回1,即P2得到鎖
    5.P3執行 DEL lock.foo命令將P2剛剛設置的鍵 lock.foo 刪除(這步是因爲P3剛纔已檢測到鎖已超時)
    6.P3執行 SETNX lock.foo命令,並返回1,即P3得到鎖
    7.P2和P3同時得到了鎖

從上面的狀況能夠得知,在檢測到鎖超時後,進程不能直接簡單地執行 DEL 刪除鍵的操做以得到鎖。spa

爲了解決上述算法可能出現的多個進程同時得到鎖的問題,咱們再來看如下的算法。
咱們一樣假設進程P1已經首先得到了鎖 lock.foo,而後進程P1掛掉了。接下來的狀況:線程

進程P4執行 SETNX lock.foo 以嘗試獲取鎖
因爲進程P1已得到了鎖,因此P4執行 SETNX lock.foo 返回0,即獲取鎖失敗
P4執行 GET lock.foo 來檢測鎖是否已超時,若是沒超時,則等待一段時間,再次檢測
若是P4檢測到鎖已超時,即當前的時間大於鍵 lock.foo 的值,P4會執行如下操做
GETSET lock.foo <current Unix timestamp + lock timeout + 1>
因爲 GETSET 操做在設置鍵的值的同時,還會返回鍵的舊值,經過比較鍵 lock.foo 的舊值是否小於當前時間,能夠判斷進程是否已得到鎖
假如另外一個進程P5也檢測到鎖已超時,並在P4以前執行了 GETSET 操做,那麼P4的 GETSET 操做返回的是一個大於當前時間的時間戳,這樣P4就不會得到鎖而繼續等待。注意到,即便P4接下來將鍵 lock.foo 的值設置了比P5設置的更大的值也沒影響。
另外,值得注意的是,在進程釋放鎖,即執行 DEL lock.foo 操做前,須要先判斷鎖是否已超時。若是鎖已超時,那麼鎖可能已由其餘進程得到,這時直接執行 DEL lock.foo 操做會致使把其餘進程已得到的鎖釋放掉。code

程序代碼

while (timeout >= 0) {
            long expires = System.currentTimeMillis() + expireMsecs + 1;
            String expiresStr = String.valueOf(expires); //鎖到期時間
            if (this.setNX(lockKey, expiresStr)) {
                // lock acquired
                locked = true;
                return true;
            }

            String currentValueStr = this.get(lockKey); //redis裏的時間
            if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
                //判斷是否爲空,不爲空的狀況下,若是被其餘線程設置了值,則第二個條件判斷是過不去的
                // lock is expired

                String oldValueStr = this.getSet(lockKey, expiresStr);
                //獲取上一個鎖到期時間,並設置如今的鎖到期時間,
                //只有一個線程才能獲取上一個線上的設置時間,由於jedis.getSet是同步的
                if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                    //防止誤刪(覆蓋,由於key是相同的)了他人的鎖——這裏達不到效果,這裏值會被覆蓋,可是由於什麼相差了不多的時間,因此能夠接受

                    //[分佈式的狀況下]:如過這個時候,多個線程剛好都到了這裏,可是隻有一個線程的設置值和當前值相同,他纔有權利獲取鎖
                    // lock acquired
                    locked = true;
                    return true;
                }
            }
            timeout -= DEFAULT_ACQUIRY_RESOLUTION_MILLIS;

            /*
                延遲100 毫秒,  這裏使用隨機時間可能會好一點,能夠防止飢餓進程的出現,即,當同時到達多個進程,
                只會有一個進程得到鎖,其餘的都用一樣的頻率進行嘗試,後面有來了一些進行,也以一樣的頻率申請鎖,這將可能致使前面來的鎖得不到知足.
                使用隨機的等待時間能夠必定程度上保證公平性
             */
            Thread.sleep(DEFAULT_ACQUIRY_RESOLUTION_MILLIS);

        }
View Code
相關文章
相關標籤/搜索