Redis如何實現高併發分佈式鎖?

衆所周知,分佈式鎖在微服務架構中是重頭戲,尤爲是在互聯網公司,基本上企業內部都會有本身的一套分佈式鎖開發框架。本文主要介紹使用Redis如何構建高併發分佈式鎖。java

假設 存在一個SpringBoot的控制器,其扣減庫存的業務邏輯以下:redis

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {

    // 將庫存取出來
    int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));

    // 判斷庫存夠不夠減
    if (stock > 0) {
        // 將庫存回寫到redis
        int tmp = stock - 1;
        stringRedisTemplate.opsForValue().set("stock", tmp.toString());
        logger.info("庫存扣減成功");
    } else {
        logger.info("庫存扣減失敗");
    }

    return "finished.";
}

不難看出,在應用服務器運行這段代碼的時候就會有線程安全性問題。由於多個線程同時去修改Redis服務中的數據。所以考慮給這段代碼加上一把鎖:api

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
    synchronized (this) {
        int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));

        // 判斷庫存夠不夠減
        if (stock > 0) {
            // 將庫存回寫到redis
            int tmp = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", tmp.toString());
            logger.info("庫存扣減成功");
        } else {
            logger.info("庫存扣減失敗");
        }
    }
    return "finished.";
}

這樣一來,當多個HTTP請求來請求數據的時候,多個線程去修改同一數據會有JVM本地鎖來進行合理的資源限制。雖然這樣解決了線程安全性問題,可是這僅僅是JVM級別的鎖,在分佈式的環境下,因爲像這樣的Web應用隨時會進行動態擴容,所以當多個應用的時候,一樣會有線程安全性問題,當上面這段代碼遇到相似下面的架構時仍是會有各類各樣的問題:安全

Redis如何實現高併發分佈式鎖?

對於上述的狀況,咱們可使用redis api提供的setnx方法解決:服務器

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {

    // 嘗試獲取鎖
    Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", "World");

    // 判斷是否得到鎖
    if (!flag) { return "error"; }

    int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));

    // 判斷庫存夠不夠減
    if (stock > 0) {
        // 將庫存回寫到redis
        int tmp = stock - 1;
        stringRedisTemplate.opsForValue().set("stock", tmp.toString());
        logger.info("庫存扣減成功");
    } else {
        logger.info("庫存扣減失敗");
    }

    // 刪除鎖
    stringRedisTemplate.delete("Hello");

    return "finished.";
}

setnx key value是將key的值設置爲value,當且僅當key不存在的時候。若是設置成功就返回1,不然就返回0。架構

這樣的話,首先嚐試獲取鎖,而後當業務執行完成的時候再刪除鎖。可是仍是有問題的,當獲取鎖的時候拋出異常或者業務執行拋出異常怎麼辦,因此加入異常處理邏輯:併發

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
    try {
        // 嘗試獲取鎖
        Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", "World");

        // 判斷是否得到鎖
        if (!flag) { return "error"; }

        int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));

        // 判斷庫存夠不夠減
        if (stock > 0) {
            // 將庫存回寫到redis
            int tmp = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", tmp.toString());
            logger.info("庫存扣減成功");
        } else {
            logger.info("庫存扣減失敗");
        }
    } finally {
        // 刪除鎖
        stringRedisTemplate.delete("Hello");
    }
    return "finished.";
}

通過這樣的修改,看起來沒什麼問題了。可是當程序得到鎖而且開始執行業務邏輯的時候,忽然程序掛掉了或者被一些粗暴的運維工程師給kill,在finally中刪除鎖的邏輯就會得不到執行,所以就會產生死鎖。對於這種狀況,咱們能夠給這個鎖設置一個超時時間:app

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
    try {
        // 嘗試獲取鎖
        Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", "World");

        // 設置超時時間, 根據業務場景估計超時時長
        stringRedisTmplate.expire("Hello", 10, TimeUnit.SECONDS);

        // 判斷是否得到鎖
        if (!flag) { return "error"; }

        int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));

        // 判斷庫存夠不夠減
        if (stock > 0) {
            // 將庫存回寫到redis
            int tmp = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", tmp.toString());
            logger.info("庫存扣減成功");
        } else {
            logger.info("庫存扣減失敗");
        }
    } finally {
        // 刪除鎖
        stringRedisTemplate.delete("Hello");
    }
    return "finished.";
}

若是程序這麼來寫,相對來講安全一些了,可是仍是存在問題。試想一下,當獲取鎖成功時,正想給這把鎖設置超時的時候,程序掛掉了,仍是會出現死鎖的,所以在redis較高的版本中提供的setIfAbsent方法中能夠同時設置鎖的超時時間。框架

Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", "World", 10, TimeUnit.SECONDS);

這樣一來,嘗試獲取鎖和設置鎖的超時時間就具有原子性了。實際上通過咱們這一番改造,這在小型企業已經沒有太大的問題, 由於像這種代碼天天也就執行幾百次,並不算作高併發的場景。當這樣的代碼被暴露在超高併發場景下的時候,仍是會存在各類各樣的問題。試想一個場景,當一個HTTP請求請求到控制器的時候,應用獲取到鎖了,超時時間也設置成功了,可是應用的業務邏輯超過了超時時間,咱們這裏的超時時間設置的是10秒,當應用的業務邏輯執行15秒的時候,鎖就被redis服務刪除了。假設剛好此時又有一個HTTP請求來請求控制器,此時應用服務器會再啓動一個線程來獲取鎖,並且還獲取成功了,可是此次的HTTP請求對應的業務邏輯尚未執行完。新來的TTTP請求也在執行,因爲新來的HTTP請求也在執行,由於鎖超時後被刪除,新的HTTP請求也成功獲取鎖了。當原來的HTTP請求對應的業務邏輯執行完成之後,嘗試刪除鎖,這樣正好刪除的是新來的HTTP請求對應的鎖。這個時候redis中又沒有鎖了,這樣第三個HTTP請求又會得到鎖,因此狀況就不妙了。運維

爲了解決上面的問題,咱們能夠將代碼優化爲下面的樣子:

@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
    String clientUuid = UUID.randomUUID().toString();
    try {
        // 嘗試獲取鎖,設置超時時間, 根據業務場景估計超時時長
        Boolean flag = stringRedisTmplate.opsForValue().setIfAbsent("Hello", clientUuid, 10, TimeUnit.SECONDS);

        // 判斷是否得到鎖
        if (!flag) { return "error"; }

        int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));

        // 判斷庫存夠不夠減
        if (stock > 0) {
            // 將庫存回寫到redis
            int tmp = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", tmp.toString());
            logger.info("庫存扣減成功");
        } else {
            logger.info("庫存扣減失敗");
        }
    } finally {
        // 刪除鎖的時候判斷是否是本身的鎖
        if (clientUuid.equals(stringRedisTemplate.opsForValue().get("Hello"))) {
            stringRedisTemplate.delete("Hello");   
        }
    }
    return "finished.";
}

可是因爲程序的不可預知性,誰也不能保證極端狀況下,同時會有多個線程同時執行這段業務邏輯。咱們能夠在當執行業務邏輯的時候同時開一個定時器線程,每隔幾秒就從新將這把鎖設置爲10秒,也就是給這把鎖進行「續命」。這樣就用擔憂業務邏輯到底執行多長時間了。可是這樣程序的複雜性就會增長,每一個業務邏輯都要寫好多的代碼,所以這裏推薦在分佈式環境下使用redisson。所以咱們使用redisson實現分支線程的代碼:

  • 引入依賴:
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.6.5</version>
</dependency>
  • 初始化Redisson的客戶端配置:
@Bean
public Redisson redisson () {
    Config cfg = new Config();
    cfg.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0);
    return (Redisson) Redisson.create(cfg);
}
  • 在程序中注入Redisson客戶端:
@Autowired
private Redisson redisson;
  • 對應的業務邏輯:
@Autowired
private StringRedisTemplate stringRedisTemplate;

@RequestMapping(value = "/deduct-stock")
public String deductSotck() throws Exception {
    // 獲取鎖對象
    RLock lock = redisson.getLock("Hello");
    try {
        // 嘗試加鎖, 默認30秒, 自動後臺開一個線程實現鎖的續命
        lock.tryLock();

        int i = Interger.parseInt(stringRedisTemplate.opsForValue().get("stock"));

        // 判斷庫存夠不夠減
        if (stock > 0) {
            // 將庫存回寫到redis
            int tmp = stock - 1;
            stringRedisTemplate.opsForValue().set("stock", tmp.toString());
            logger.info("庫存扣減成功");
        } else {
            logger.info("庫存扣減失敗");
        }
    } finally {
        // 釋放鎖
        lock.unlock();
    }
    return "finished.";
}

Redisson分佈式鎖的實現原理以下:

Redis如何實現高併發分佈式鎖?

可是這個架構仍是存在問題的,由於redis服務器是主從的架構,當在master節點設置鎖以後,slave節點會馬上同步。可是若是剛在master節點設置上了鎖,slave節點還沒來得及設置,master節點就掛掉了。仍是會產生上一樣的問題,新的線程得到鎖。

所以使用redis構建高併發的分佈式鎖,僅適合單機架構,當使用主從架構的redis時仍是會出現線程安全性問題。

相關文章
相關標籤/搜索