Redis分佈式鎖(二):支持鎖的續期,避免鎖超時後致使多個線程得到鎖

使用現狀

Redis分佈式鎖的基礎內容,咱們已經在Redis分佈式鎖:基於AOP和Redis實現的簡易版分佈式鎖這篇文章中講過了,也在文章中示範了正常的加鎖和解鎖方法。redis

分佈式鎖在以前的項目中一直運行良好,沒有辜負咱們的指望。bash

發現問題

但在最近查線上日誌的時候偶然發現,有一個業務場景下,分佈式鎖偶爾會失效,致使有多個線程同時執行了相同的代碼。dom

咱們通過初步排查,定位到是由於在這段代碼中間調用了第三方的接口致使。分佈式

由於業務代碼耗時過長,超過了鎖的超時時間,形成鎖自動失效,而後另一個線程意外的持有了鎖。因而就出現了多個線程共同持有鎖的現象。ide

解決方案

問題既然已經出現了,那麼接下來咱們就應該考慮解決方案了。函數

咱們也曾經想過,是否能夠經過合理地設置LockTime(鎖超時時間)來解決這個問題?post

但LockTime的設置本來就很不容易。LockTime設置太小,鎖自動超時的機率就會增長,鎖異常失效的機率也就會增長,而LockTime設置過大,萬一服務出現異常沒法正常釋放鎖,那麼出現這種異常鎖的時間也就越長。咱們只能經過經驗去配置,一個能夠接受的值,基本上是這個服務歷史上的平均耗時再增長必定的buff。ui

既然這條路走不通了,那麼還有其餘路能夠走麼?this

固然仍是有的,咱們能夠先給鎖設置一個LockTime,而後啓動一個守護線程,讓守護線程在一段時間後,從新去設置這個鎖的LockTime。spa

看起來很簡單是否是?

但在實際操做中,咱們要注意如下幾點:
一、和釋放鎖的狀況一致,咱們須要先判斷鎖的對象是否沒有變。不然會形成不管誰持有鎖,守護線程都會去從新設置鎖的LockTime。不該該續的不能瞎續。
二、守護線程要在合理的時間再去從新設置鎖的LockTime,不然會形成資源的浪費。不能動不動就去續。
三、若是持有鎖的線程已經處理完業務了,那麼守護線程也應該被銷燬。不能主人都掛了,守護者還在那裏繼續浪費資源。

代碼實現

咱們首先先生成一個內部類去實現Runnable,做爲守護線程的參數。

public class SurvivalClamProcessor implements Runnable {

    private static final int REDIS_EXPIRE_SUCCESS = 1;

    SurvivalClamProcessor(String field, String key, String value, int lockTime) {
        this.field = field;
        this.key = key;
        this.value = value;
        this.lockTime = lockTime;
        this.signal = Boolean.TRUE;
    }

    private String field;

    private String key;

    private String value;

    private int lockTime;

    //線程關閉的標記
    private volatile Boolean signal;

    void stop() {
        this.signal = Boolean.FALSE;
    }

    @Override
    public void run() {
        int waitTime = lockTime * 1000 * 2 / 3;
        while (signal) {
            try {
                Thread.sleep(waitTime);
                if (cacheUtils.expandLockTime(field, key, value, lockTime) == REDIS_EXPIRE_SUCCESS) {
                    if (logger.isInfoEnabled()) {
                        logger.info("expandLockTime 成功,本次等待{}ms,將重置鎖超時時間重置爲{}s,其中field爲{},key爲{}", waitTime, lockTime, field, key);
                    }
                } else {
                    if (logger.isInfoEnabled()) {
                        logger.info("expandLockTime 失敗,將致使SurvivalClamConsumer中斷");
                    }
                    this.stop();
                }
            } catch (InterruptedException e) {
                if (logger.isInfoEnabled()) {
                    logger.info("SurvivalClamProcessor 處理線程被強制中斷");
                }
            } catch (Exception e) {
                logger.error("SurvivalClamProcessor run error", e);
            }
        }
        if (logger.isInfoEnabled()) {
            logger.info("SurvivalClamProcessor 處理線程已中止");
        }
    }
}
複製代碼

其中expandLockTime是經過Lua腳本實現的。延長鎖超時的腳本語句和釋放鎖的Lua腳本相似。

String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1],ARGV[2]) else return '0' end";

複製代碼

在以上代碼中,咱們將waitTime設置爲Math.max(1, lockTime * 2 / 3),即守護線程許須要等待waitTime後才能夠去從新設置鎖的超時時間,避免了資源的浪費。

同時在expandLockTime時候也去判斷了當前持有鎖的對象是否一致,避免了胡亂重置鎖超時時間的狀況。

而後咱們在得到鎖的代碼以後,添加以下代碼:

SurvivalClamProcessor survivalClamProcessor 
	= new SurvivalClamProcessor(lockField, lockKey, randomValue, lockTime);
Thread survivalThread = new Thread(survivalClamProcessor);
survivalThread.setDaemon(Boolean.TRUE);
survivalThread.start();
Object returnObject = joinPoint.proceed(args);
survivalClamProcessor.stop();
survivalThread.interrupt();
return returnObject;
複製代碼

這段代碼會先初始化守護線程的內部參數,而後經過start函數啓動線程,最後在業務執行完以後,設置守護線程的關閉標記,最後經過interrupt()去中斷sleep狀態,保證線程及時銷燬。

後續

本文講解了如何經過啓動一個守護線程去重置鎖超時時間,也同時介紹了在實現過程的注意點。隨帶着也科普了一下線程銷燬的正確方式。

那麼關於分佈式鎖還有下文麼?我也不知道,權當是有吧,可能下一期會講講如何經過其餘方式(除Redis以外的)去實現分佈式鎖,也多是講一下Redis分佈式鎖的其餘問題和解決方案。

好了,咱們下一期再見,歡迎你們一塊兒留言討論。同時也歡迎點贊,歡迎送小星星~

相關文章
相關標籤/搜索