redis分佈式鎖實現

文章主旨

本文主要說明使用redis(codis)實現分佈式鎖的方法和要常見的問題及解決辦法java

主要原理

/**
   * Set the string value as value of the key. The string can't be longer than 1073741824 bytes (1
   * GB).
   * @param key
   * @param value
   * @param nxxx NX|XX, NX -- Only set the key if it does not already exist. XX -- Only set the key
   *          if it already exist.
   * @param expx EX|PX, expire time units: EX = seconds; PX = milliseconds
   * @param time expire time in the units of <code>expx</code>
   * @return Status code reply
   */
  public String set(final String key, final String value, final String nxxx, final String expx,
      final long time) {
    checkIsInMultiOrPipeline();
    client.set(key, value, nxxx, expx, time);
    return client.getStatusCodeReply();
  }

參數nxxx爲NX時,當key不存在的時候纔會執行set操做,結合redis單線程執行命令的特色能夠實現操做互斥。git

常見問題及解決辦法

問題1: 在哪裏釋放鎖

正常過程分三部分:加鎖,執行任務,釋放鎖。正常執行沒有問題,可是若是執行任務階段拋出異常,就會致使鎖沒有主動釋放。github

解決辦法

正確的寫法是放到finally塊中釋放鎖,這樣能夠解決拋異常的問題。僞代碼大概是這樣:redis

try {
    lock();
    doTask();
} finally {
    releaseLock();
}

問題2: 鎖過時時間長短選擇問題

  • 鎖過時時間過短: 容易發生任務還沒執行完,鎖就自動釋放了,就會致使發生併發,分佈式鎖失效。
  • 鎖時間太長: 雖然絕大多數狀況下finally都能釋放鎖,可是也有例外,好比程序非正常終止,好比執行釋放操做時redis服務不響應了。就會致使鎖長期空置,沒法被任何程序得到,過時時間越長,影響時間越長,通常都須要人工處理,很煩。

解決辦法

要是能自動給鎖續過時時間就行了。因此方法是獲取鎖的時候,鎖的時間設置稍短一些,好比30秒,在獲取鎖成功後,起一個按期執行的後臺任務,每隔10秒,設置一下過時時間爲30秒。spring

問題3: 如何防止釋放了別人的鎖

假設碰到了問題2中鎖過時時間過短的問題了,A任務獲取了鎖,過時時間是5秒,在第10秒的時候,A任務還在執行中,可是顯然鎖已經失效了,這時候B任務獲取到了鎖,在第11秒的時候,A任務執行完成了,執行釋放鎖操做,若是隻是根據key判斷的話,兩個key是相同的,因此A會把B剛獲取到的鎖給釋放掉,會形成一連串的不良反應,鎖都亂套了。springboot

解決辦法

不能簡單根據key知道鎖的歸屬,因此在獲取鎖的時候,應該生成一個惟一的value,用來標識。能夠在set的時候,生成一個惟一的guid作爲vlaue,在釋放鎖的時候,先判斷獲取redis裏的value看看是否是和set的一致,只有一致的狀況下,才執行del操做釋放鎖。這樣就能夠解決。僞代碼以下:網絡

val = redis.get(key);
if val == setValue
   redis.del(key);
endif

可是僞代碼裏獲取值和刪除key並非原子,因此仍是可能產生問題。改用lua腳原本確保操做的原子性。併發

String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";

問題4: 偶發性網絡抖動致使加鎖,或鎖釋放失敗

網絡抖動仍是比較常見的問題,解決辦法也很簡單,就是加鎖和釋放鎖操做加一個重試機制,合理的參數設置能夠達到很大程度的容錯目的。分佈式

其實到目前爲止,redis分佈式鎖依然不是徹底可靠的,由於redis服務的問題致使數據不一致的問題沒有被考慮到,不過要求不是那麼嚴格的場景夠用了。fetch

代碼實現

handy-lock是一個redis分佈式鎖java類庫,解決了上面提到的問題,簡單易用。

特性

  • 兼容codis
  • setnx原子加鎖,lua腳本原子釋放鎖
  • 每次加鎖value用guid,不會存在釋放別人的鎖的問題
  • 加鎖和釋放鎖自帶重試機制
  • 提供了自動延期的鎖,避免過時時間長度選擇的困境

引用

<dependency>
    <groupId>com.github.free-jungle</groupId>
    <artifactId>handy-lock-starter</artifactId>
    <version>1.0.0</version>
</dependency>

配置

redis操做使用springboot的RedisTemplate實現,因此配置和spring.redis配置是同樣的

spring:
  redis:
    host: 127.0.0.1
    port: 6379

使用舉例

例1: 使用RedisLockManager加鎖[推薦]

@Resource
private RedisLockManager redisLockManager;

public void lockUseLockManager(String id) {
    String lockKey = String.format("%s.%s#%s", this.getClass().getCanonicalName(), "lockUseLockManager", id);
    try (RedisLock redisLock = redisLockManager.fetchAndTryLock(lockKey, 5000, 1000, 2)) {
        TimeUnit.SECONDS.sleep(2L);
    } catch (LockFailException | IOException | InterruptedException ex) {
        LOGGER.error("error when lockUseLockManager", ex);
    }
}

例2: 註解方式加鎖-靜態key

@RedisDistributedLockable(key = "com.github.free.jungle.lock.examples.service.impl.lockUseAnnotation",
        expireInMilliseconds = 10000, waitInMilliseconds = 10, tryCount = 1)
public void lockUseAnnotation() {
    LOGGER.info("start lockUseAnnotation");
    try {
        TimeUnit.MILLISECONDS.sleep(100L);
    } catch (Exception ex) {
        LOGGER.error("error when lockUseAnnotation", ex);
    }
    LOGGER.info("end lockUseAnnotation");
}

雖然提供了註解的使用方式,仍是推薦直接用例1的方式,代碼易讀,易懂,用起來也很簡單,沒有比 註解麻煩。

例3: 註解方式加鎖-動態key

@RedisDistributedLockable(keySpel = "'com.github.free.jungle.lock.examples.service.impl.lockUserAnnotationWithSpel#'+#id",
            expireInMilliseconds = 10000, waitInMilliseconds = 1000, tryCount = 3)
public void lockUserAnnotationWithSpel(String id) {
    LOGGER.info("start lockUserAnnotationWithSpel:{}", id);
    try {
        TimeUnit.SECONDS.sleep(2L);
    } catch (Exception ex) {
        LOGGER.error("error when lockUseAnnotation", ex);
    }
    LOGGER.info("end lockUserAnnotationWithSpel:{}", id);
}

適用於key須要根據入參動態拼裝的狀況,其中keySpel是spel表達式

例4: 自動延長過時時間的分佈式鎖[推薦]

上面的用法有一個困難的問題,就是過時時間參數(expireInMilliseconds)的配置可能很難,由於:

  • 鎖有效時間過短,任務還沒執行完,redis就過時了,這樣分佈式執行就會產生併發的問題
  • 鎖有效期太長,極端狀況當程序異常退出,沒有正確釋放鎖,鎖長時間沒法獲取,致使任務沒法進行的問題

因此實現了一個可以自動延長鎖有效時間的加鎖方法,使用方法以下:

@Resource
private RedisLockManager redisLockManager;

public void lockWithScheduleUseLockManager(String id) {
    String lockKey = String.format("%s.%s#%s", this.getClass().getCanonicalName(), "lockUseLockManager", id);
    try (RedisLock redisLock = redisLockManager.fetchAndTryLockWithSchedule(lockKey)) {
        TimeUnit.SECONDS.sleep(30L);
    } catch (LockFailException | IOException | InterruptedException ex) {
        LOGGER.error("error when lockUseLockManager", ex);
    }
}

詳細方法說明

RedisLockManager 的方法有較爲詳盡的註釋,請直接查看源碼。

樣例項目

handy-lock-examples是專門的使用樣例項目,做爲參考使用

已知問題

只是實現了客戶端層面的簡易分佈式鎖,因此沒法處理因爲redis服務端故障形成數據不一致的問題,須要 更嚴謹的分佈式鎖的狀況建議用Redssion

但願對你有所幫助

相關文章
相關標籤/搜索