Redis分佈式鎖的基礎內容,咱們已經在Redis分佈式鎖:基於AOP和Redis實現的簡易版分佈式鎖這篇文章中講過了,也在文章中示範了正常的加鎖和解鎖方法。redis
分佈式鎖在以前的項目中一直運行良好,沒有辜負咱們的指望。bash
但在最近查線上日誌的時候偶然發現,有一個業務場景下,分佈式鎖偶爾會失效,致使有多個線程同時執行了相同的代碼。dom
咱們通過初步排查,定位到是由於在這段代碼中間調用了第三方的接口致使。分佈式
由於業務代碼耗時過長,超過了鎖的超時時間,形成鎖自動失效,而後另一個線程意外的持有了鎖。因而就出現了多個線程共同持有鎖的現象。ide
問題既然已經出現了,那麼接下來咱們就應該考慮解決方案了。函數
咱們也曾經想過,是否能夠經過合理地設置LockTime(鎖超時時間)來解決這個問題?post
但LockTime的設置本來就很不容易。LockTime設置太小,鎖自動超時的機率就會增長,鎖異常失效的機率也就會增長,而LockTime設置過大,萬一服務出現異常沒法正常釋放鎖,那麼出現這種異常鎖的時間也就越長。咱們只能經過經驗去配置,一個能夠接受的值,基本上是這個服務歷史上的平均耗時再增長必定的buff。ui
既然這條路走不通了,那麼還有其餘路能夠走麼?this
固然仍是有的,咱們能夠先給鎖設置一個LockTime,而後啓動一個守護線程,讓守護線程在一段時間後,從新去設置這個鎖的LockTime。spa
看起來很簡單是否是?
但在實際操做中,咱們要注意如下幾點:
一、和釋放鎖的狀況一致,咱們須要先判斷鎖的對象是否沒有變。不然會形成不管誰持有鎖,守護線程都會去從新設置鎖的LockTime。不該該續的不能瞎續。
二、守護線程要在合理的時間再去從新設置鎖的LockTime,不然會形成資源的浪費。不能動不動就去續。
三、若是持有鎖的線程已經處理完業務了,那麼守護線程也應該被銷燬。不能主人都掛了,守護者還在那裏繼續浪費資源。
咱們首先先生成一個內部類去實現Runnable,做爲守護線程的參數。
public class SurvivalClamProcessor implements Runnable {
private static final int REDIS_EXPIRE_SUCCESS = 1;
SurvivalClamProcessor(String field, String key, String value, int lockTime) {
this.field = field;
this.key = key;
this.value = value;
this.lockTime = lockTime;
this.signal = Boolean.TRUE;
}
private String field;
private String key;
private String value;
private int lockTime;
//線程關閉的標記
private volatile Boolean signal;
void stop() {
this.signal = Boolean.FALSE;
}
@Override
public void run() {
int waitTime = lockTime * 1000 * 2 / 3;
while (signal) {
try {
Thread.sleep(waitTime);
if (cacheUtils.expandLockTime(field, key, value, lockTime) == REDIS_EXPIRE_SUCCESS) {
if (logger.isInfoEnabled()) {
logger.info("expandLockTime 成功,本次等待{}ms,將重置鎖超時時間重置爲{}s,其中field爲{},key爲{}", waitTime, lockTime, field, key);
}
} else {
if (logger.isInfoEnabled()) {
logger.info("expandLockTime 失敗,將致使SurvivalClamConsumer中斷");
}
this.stop();
}
} catch (InterruptedException e) {
if (logger.isInfoEnabled()) {
logger.info("SurvivalClamProcessor 處理線程被強制中斷");
}
} catch (Exception e) {
logger.error("SurvivalClamProcessor run error", e);
}
}
if (logger.isInfoEnabled()) {
logger.info("SurvivalClamProcessor 處理線程已中止");
}
}
}
複製代碼
其中expandLockTime是經過Lua腳本實現的。延長鎖超時的腳本語句和釋放鎖的Lua腳本相似。
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1],ARGV[2]) else return '0' end";
複製代碼
在以上代碼中,咱們將waitTime設置爲Math.max(1, lockTime * 2 / 3),即守護線程許須要等待waitTime後才能夠去從新設置鎖的超時時間,避免了資源的浪費。
同時在expandLockTime時候也去判斷了當前持有鎖的對象是否一致,避免了胡亂重置鎖超時時間的狀況。
而後咱們在得到鎖的代碼以後,添加以下代碼:
SurvivalClamProcessor survivalClamProcessor
= new SurvivalClamProcessor(lockField, lockKey, randomValue, lockTime);
Thread survivalThread = new Thread(survivalClamProcessor);
survivalThread.setDaemon(Boolean.TRUE);
survivalThread.start();
Object returnObject = joinPoint.proceed(args);
survivalClamProcessor.stop();
survivalThread.interrupt();
return returnObject;
複製代碼
這段代碼會先初始化守護線程的內部參數,而後經過start函數啓動線程,最後在業務執行完以後,設置守護線程的關閉標記,最後經過interrupt()去中斷sleep狀態,保證線程及時銷燬。
本文講解了如何經過啓動一個守護線程去重置鎖超時時間,也同時介紹了在實現過程的注意點。隨帶着也科普了一下線程銷燬的正確方式。
那麼關於分佈式鎖還有下文麼?我也不知道,權當是有吧,可能下一期會講講如何經過其餘方式(除Redis以外的)去實現分佈式鎖,也多是講一下Redis分佈式鎖的其餘問題和解決方案。
好了,咱們下一期再見,歡迎你們一塊兒留言討論。同時也歡迎點贊,歡迎送小星星~