弄懂這些redis分佈式鎖知識點,明天就去跟老闆談漲薪!(建議收藏)

1 介紹

這篇博文講介紹如何一步步構建一個基於Redis的分佈式鎖。會從最原始的版本開始,而後根據問題進行調整,最後完成一個較爲合理的分佈式鎖。java

本篇文章會將分佈式鎖的實現分爲兩部分,一個是單機環境,另外一個是集羣環境下的Redis鎖實現。在介紹分佈式鎖的實現以前,先來了解下分佈式鎖的一些信息。面試

開始以前,記得點贊收藏加關注哦 ,須要下載PDF版本和獲取更多知識點、面試題的朋友能夠點一點下方連接免費領取算法

連接:點這裏!!! 799215493 暗號:CSDN數據庫

在這裏插入圖片描述

2 分佈式鎖

2.1 什麼是分佈式鎖?

分佈式鎖是控制分佈式系統或不一樣系統之間共同訪問共享資源的一種鎖實現,若是不一樣的系統或同一個系統的不一樣主機之間共享了某個資源時,每每須要互斥來防止彼此干擾來保證一致性。服務器

2.2 分佈式鎖須要具有哪些條件

  1. 互斥性:在任意一個時刻,只有一個客戶端持有鎖。
  2. 無死鎖:即使持有鎖的客戶端崩潰或者其餘意外事件,鎖仍然能夠被獲取。
  3. 容錯:只要大部分Redis節點都活着,客戶端就能夠獲取和釋放鎖

2.3 分佈式鎖的實現有哪些?

  1. 數據庫
  2. Memcached(add命令)
  3. Redis(setnx命令)
  4. Zookeeper(臨時節點)
  5. 等等

3 單機Redis的分佈式鎖

3.1 準備工做

3.1.1 定義常量類
public class LockConstants { 
    public static final String OK = "OK";

    /** NX|XX, NX -- Only set the key if it does not already exist. XX -- Only set the key if it already exist. **/
    public static final String NOT_EXIST = "NX";
    public static final String EXIST = "XX";

    /** expx EX|PX, expire time units: EX = seconds; PX = milliseconds **/
    public static final String SECONDS = "EX";
    public static final String MILLISECONDS = "PX";

    private LockConstants() { }
}
3.1.2 定義鎖的抽象類

抽象類RedisLock實現java.util.concurrent包下的Lock接口,而後對一些方法提供默認實現,子類只需實現lock方法和unlock方法便可。代碼以下網絡

public abstract class RedisLock implements Lock { 

    protected Jedis jedis;
    protected String lockKey;

    public RedisLock(Jedis jedis,String lockKey) { 
        this(jedis, lockKey);
    }


    public void sleepBySencond(int sencond){ 
        try { 
            Thread.sleep(sencond*1000);
        } catch (InterruptedException e) { 
            e.printStackTrace();
        }
    }


    @Override
    public void lockInterruptibly(){ }

    @Override
    public Condition newCondition() { 
        return null;
    }

    @Override
    public boolean tryLock() { 
        return false;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit){ 
        return false;
    }

}

3.2 最基礎的版本1

先來一個最基礎的版本,代碼以下多線程

public class LockCase1 extends RedisLock { 

    public LockCase1(Jedis jedis, String name) { 
        super(jedis, name);
    }

    @Override
    public void lock() { 
        while(true){ 
            String result = jedis.set(lockKey, "value", NOT_EXIST);
            if(OK.equals(result)){ 
                System.out.println(Thread.currentThread().getId()+"加鎖成功!");
                break;
            }
        }
    }

    @Override
    public void unlock() { 
        jedis.del(lockKey);
    }
}

LockCase1類提供了lock和unlock方法。dom

其中lock方法也就是在reids客戶端執行以下命令分佈式

SET lockKey value NX

而unlock方法就是調用DEL命令將鍵刪除。ide

好了,方法介紹完了。如今來想一想這其中會有什麼問題?

假設有兩個客戶端A和B,A獲取到分佈式的鎖。A執行了一會,忽然A所在的服務器斷電了(或者其餘什麼的),也就是客戶端A掛了。這時出現一個問題,這個鎖一直存在,且不會被釋放,其餘客戶端永遠獲取不到鎖。以下示意圖

在這裏插入圖片描述
能夠經過設置過時時間來解決這個問題

3.3 版本2-設置鎖的過時時間

public void lock() { 
    while(true){ 
        String result = jedis.set(lockKey, "value", NOT_EXIST,SECONDS,30);
        if(OK.equals(result)){ 
            System.out.println(Thread.currentThread().getId()+"加鎖成功!");
            break;
        }
    }
}

相似的Redis命令以下

SET lockKey value NX EX 30

注:要保證設置過時時間和設置鎖具備原子性

這時又出現一個問題,問題出現的步驟以下

  1. 客戶端A獲取鎖成功,過時時間30秒。
  2. 客戶端A在某個操做上阻塞了50秒。
  3. 30秒時間到了,鎖自動釋放了。
  4. 客戶端B獲取到了對應同一個資源的鎖。
  5. 客戶端A從阻塞中恢復過來,釋放掉了客戶端B持有的鎖。

示意圖以下

在這裏插入圖片描述
這時會有兩個問題

  1. 過時時間如何保證大於業務執行時間?
  2. 如何保證鎖不會被誤刪除?

先來解決如何保證鎖不會被誤刪除這個問題。

這個問題能夠經過設置value爲當前客戶端生成的一個隨機字符串,且保證在足夠長的一段時間內在全部客戶端的全部獲取鎖的請求中都是惟一的。

3.4 版本3-設置鎖的value

抽象類RedisLock增長lockValue字段,lockValue字段的默認值爲UUID隨機值假設當前線程ID。

public abstract class RedisLock implements Lock { 

    //...
    protected String lockValue;

    public RedisLock(Jedis jedis,String lockKey) { 
        this(jedis, lockKey, UUID.randomUUID().toString()+Thread.currentThread().getId());
    }

    public RedisLock(Jedis jedis, String lockKey, String lockValue) { 
        this.jedis = jedis;
        this.lockKey = lockKey;
        this.lockValue = lockValue;
    }

    //...
}

加鎖代碼

public void lock() { 
    while(true){ 
        String result = jedis.set(lockKey, lockValue, NOT_EXIST,SECONDS,30);
        if(OK.equals(result)){ 
            System.out.println(Thread.currentThread().getId()+"加鎖成功!");
            break;
        }
    }
}

解鎖代碼

public void unlock() { 
    String lockValue = jedis.get(lockKey);
    if (lockValue.equals(lockValue)){ 
        jedis.del(lockKey);
    }
}

這時看看加鎖代碼,好像沒有什麼問題啊。

再來看看解鎖的代碼,這裏的解鎖操做包含三步操做:獲取值、判斷和刪除鎖。這時你有沒有想到在多線程環境下的i++操做?

3.4.1 i++問題

i++操做也可分爲三個步驟:讀i的值,進行i+1,設置i的值。

若是兩個線程同時對i進行i++操做,會出現以下狀況

  1. i設置值爲0
  2. 線程A讀到i的值爲0
  3. 線程B也讀到i的值爲0
  4. 線程A執行了+1操做,將結果值1寫入到內存
  5. 線程B執行了+1操做,將結果值1寫入到內存
  6. 此時i進行了兩次i++操做,可是結果卻爲1

在多線程環境下有什麼方式能夠避免這類狀況發生?

解決方式有不少種,例如用AtomicInteger、CAS、synchronized等等。

這些解決方式的目的都是要確保i++ 操做的原子性。那麼回過頭來看看解鎖,同理咱們也是要確保解鎖的原子性。咱們能夠利用Redis的lua腳原本實現解鎖操做的原子性。

3.5 版本4-具備原子性的釋放鎖

3.6 版本5-確保過時時間大於業務執行時間

3.7 測試

累了,不想寫了,須要完整版的朋友點一點下方連接本身領取

連接:點這裏!!! 799215493 暗號:CSDN

在這裏插入圖片描述

4 集羣Redis的分佈式鎖

在Redis的分佈式環境中,Redis 的做者提供了RedLock 的算法來實現一個分佈式鎖。

4.1 加鎖

RedLock算法加鎖步驟以下

  1. 獲取當前Unix時間,以毫秒爲單位。
  2. 依次嘗試從N個實例,使用相同的key和隨機值獲取鎖。在步驟2,當向Redis設置鎖時,客戶端應該設置一個網絡鏈接和響應超時時間,這個超時時間應該小於鎖的失效時間。例如你的鎖自動失效時間爲10秒,則超時時間應該在5-50毫秒之間。這樣能夠避免服務器端Redis已經掛掉的狀況下,客戶端還在死死地等待響應結果。若是服務器端沒有在規定時間內響應,客戶端應該儘快嘗試另一個Redis實例。
  3. 客戶端使用當前時間減去開始獲取鎖時間(步驟1記錄的時間)就獲得獲取鎖使用的時間。當且僅當從大多數(這裏是3個節點)的Redis節點都取到鎖,而且使用的時間小於鎖失效時間時,鎖纔算獲取成功。
  4. 若是取到了鎖,key的真正有效時間等於有效時間減去獲取鎖所使用的時間(步驟3計算的結果)。
  5. 若是由於某些緣由,獲取鎖失敗(沒有在至少N/2+1個Redis實例取到鎖或者取鎖時間已經超過了有效時間),客戶端應該在全部的Redis實例上進行解鎖(即使某些Redis實例根本就沒有加鎖成功)。

4.2 解鎖

向全部的Redis實例發送釋放鎖命令便可,不用關心以前有沒有從Redis實例成功獲取到鎖.

總結

我這裏準備了一線大廠面試資料和我原創的超硬核PDF技術文檔,以及我爲你們精心準備的多套簡歷模板(不斷更新中),但願你們都能找到心儀的工做!

有須要的朋友能夠點一點下方連接免費領取

連接:點這裏!!! 799215493 暗號:CSDN

在這裏插入圖片描述
在這裏插入圖片描述

相關文章
相關標籤/搜索