文章主旨
本文主要說明使用redis(codis)實現分佈式鎖的方法和要常見的問題及解決辦法java
主要原理
/** * Set the string value as value of the key. The string can't be longer than 1073741824 bytes (1 * GB). * @param key * @param value * @param nxxx NX|XX, NX -- Only set the key if it does not already exist. XX -- Only set the key * if it already exist. * @param expx EX|PX, expire time units: EX = seconds; PX = milliseconds * @param time expire time in the units of <code>expx</code> * @return Status code reply */ public String set(final String key, final String value, final String nxxx, final String expx, final long time) { checkIsInMultiOrPipeline(); client.set(key, value, nxxx, expx, time); return client.getStatusCodeReply(); }
參數nxxx爲NX時,當key不存在的時候纔會執行set操做,結合redis單線程執行命令的特色能夠實現操做互斥。git
常見問題及解決辦法
問題1: 在哪裏釋放鎖
正常過程分三部分:加鎖,執行任務,釋放鎖。正常執行沒有問題,可是若是執行任務階段拋出異常,就會致使鎖沒有主動釋放。github
解決辦法
正確的寫法是放到finally塊中釋放鎖,這樣能夠解決拋異常的問題。僞代碼大概是這樣:redis
try { lock(); doTask(); } finally { releaseLock(); }
問題2: 鎖過時時間長短選擇問題
- 鎖過時時間過短: 容易發生任務還沒執行完,鎖就自動釋放了,就會致使發生併發,分佈式鎖失效。
- 鎖時間太長: 雖然絕大多數狀況下finally都能釋放鎖,可是也有例外,好比程序非正常終止,好比執行釋放操做時redis服務不響應了。就會致使鎖長期空置,沒法被任何程序得到,過時時間越長,影響時間越長,通常都須要人工處理,很煩。
解決辦法
要是能自動給鎖續過時時間就行了。因此方法是獲取鎖的時候,鎖的時間設置稍短一些,好比30秒,在獲取鎖成功後,起一個按期執行的後臺任務,每隔10秒,設置一下過時時間爲30秒。spring
問題3: 如何防止釋放了別人的鎖
假設碰到了問題2中鎖過時時間過短的問題了,A任務獲取了鎖,過時時間是5秒,在第10秒的時候,A任務還在執行中,可是顯然鎖已經失效了,這時候B任務獲取到了鎖,在第11秒的時候,A任務執行完成了,執行釋放鎖操做,若是隻是根據key判斷的話,兩個key是相同的,因此A會把B剛獲取到的鎖給釋放掉,會形成一連串的不良反應,鎖都亂套了。springboot
解決辦法
不能簡單根據key知道鎖的歸屬,因此在獲取鎖的時候,應該生成一個惟一的value,用來標識。能夠在set的時候,生成一個惟一的guid作爲vlaue,在釋放鎖的時候,先判斷獲取redis裏的value看看是否是和set的一致,只有一致的狀況下,才執行del操做釋放鎖。這樣就能夠解決。僞代碼以下:網絡
val = redis.get(key); if val == setValue redis.del(key); endif
可是僞代碼裏獲取值和刪除key並非原子,因此仍是可能產生問題。改用lua腳原本確保操做的原子性。併發
String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
問題4: 偶發性網絡抖動致使加鎖,或鎖釋放失敗
網絡抖動仍是比較常見的問題,解決辦法也很簡單,就是加鎖和釋放鎖操做加一個重試機制,合理的參數設置能夠達到很大程度的容錯目的。分佈式
其實到目前爲止,redis分佈式鎖依然不是徹底可靠的,由於redis服務的問題致使數據不一致的問題沒有被考慮到,不過要求不是那麼嚴格的場景夠用了。fetch
代碼實現
handy-lock是一個redis分佈式鎖java類庫,解決了上面提到的問題,簡單易用。
特性
- 兼容codis
- setnx原子加鎖,lua腳本原子釋放鎖
- 每次加鎖value用guid,不會存在釋放別人的鎖的問題
- 加鎖和釋放鎖自帶重試機制
- 提供了自動延期的鎖,避免過時時間長度選擇的困境
引用
<dependency> <groupId>com.github.free-jungle</groupId> <artifactId>handy-lock-starter</artifactId> <version>1.0.0</version> </dependency>
配置
redis操做使用springboot的RedisTemplate實現,因此配置和spring.redis配置是同樣的
spring: redis: host: 127.0.0.1 port: 6379
使用舉例
例1: 使用RedisLockManager加鎖[推薦]
@Resource private RedisLockManager redisLockManager; public void lockUseLockManager(String id) { String lockKey = String.format("%s.%s#%s", this.getClass().getCanonicalName(), "lockUseLockManager", id); try (RedisLock redisLock = redisLockManager.fetchAndTryLock(lockKey, 5000, 1000, 2)) { TimeUnit.SECONDS.sleep(2L); } catch (LockFailException | IOException | InterruptedException ex) { LOGGER.error("error when lockUseLockManager", ex); } }
例2: 註解方式加鎖-靜態key
@RedisDistributedLockable(key = "com.github.free.jungle.lock.examples.service.impl.lockUseAnnotation", expireInMilliseconds = 10000, waitInMilliseconds = 10, tryCount = 1) public void lockUseAnnotation() { LOGGER.info("start lockUseAnnotation"); try { TimeUnit.MILLISECONDS.sleep(100L); } catch (Exception ex) { LOGGER.error("error when lockUseAnnotation", ex); } LOGGER.info("end lockUseAnnotation"); }
雖然提供了註解的使用方式,仍是推薦直接用例1的方式,代碼易讀,易懂,用起來也很簡單,沒有比 註解麻煩。
例3: 註解方式加鎖-動態key
@RedisDistributedLockable(keySpel = "'com.github.free.jungle.lock.examples.service.impl.lockUserAnnotationWithSpel#'+#id", expireInMilliseconds = 10000, waitInMilliseconds = 1000, tryCount = 3) public void lockUserAnnotationWithSpel(String id) { LOGGER.info("start lockUserAnnotationWithSpel:{}", id); try { TimeUnit.SECONDS.sleep(2L); } catch (Exception ex) { LOGGER.error("error when lockUseAnnotation", ex); } LOGGER.info("end lockUserAnnotationWithSpel:{}", id); }
適用於key須要根據入參動態拼裝的狀況,其中keySpel是spel表達式
例4: 自動延長過時時間的分佈式鎖[推薦]
上面的用法有一個困難的問題,就是過時時間參數(expireInMilliseconds)的配置可能很難,由於:
- 鎖有效時間過短,任務還沒執行完,redis就過時了,這樣分佈式執行就會產生併發的問題
- 鎖有效期太長,極端狀況當程序異常退出,沒有正確釋放鎖,鎖長時間沒法獲取,致使任務沒法進行的問題
因此實現了一個可以自動延長鎖有效時間的加鎖方法,使用方法以下:
@Resource private RedisLockManager redisLockManager; public void lockWithScheduleUseLockManager(String id) { String lockKey = String.format("%s.%s#%s", this.getClass().getCanonicalName(), "lockUseLockManager", id); try (RedisLock redisLock = redisLockManager.fetchAndTryLockWithSchedule(lockKey)) { TimeUnit.SECONDS.sleep(30L); } catch (LockFailException | IOException | InterruptedException ex) { LOGGER.error("error when lockUseLockManager", ex); } }
詳細方法說明
類RedisLockManager 的方法有較爲詳盡的註釋,請直接查看源碼。
樣例項目
handy-lock-examples是專門的使用樣例項目,做爲參考使用
已知問題
只是實現了客戶端層面的簡易分佈式鎖,因此沒法處理因爲redis服務端故障形成數據不一致的問題,須要 更嚴謹的分佈式鎖的狀況建議用Redssion