Redis分佈式鎖(三):支持鎖可重入,避免鎖遞歸調用時死鎖

使用現狀

Redis分佈式鎖的基礎內容,咱們已經在Redis分佈式鎖:基於AOP和Redis實現的簡易版分佈式鎖這篇文章中講過了,也在文章中示範了正常的加鎖和解鎖方法。redis

Redis分佈式鎖爲什麼要支持續期,以及如何支持續期的方法,咱們也已經在Redis分佈式鎖(二):支持鎖的續期,避免鎖超時後致使多個線程得到鎖編程

從上次升級爲可續期的分佈式鎖後的半年時間內,這款自研的簡易版分佈式鎖依然運行良好。bash

發現問題

但在最近查線上日誌的時候偶然發現,有一個業務場景下,分佈式鎖會發生死鎖。dom

咱們通過初步排查,定位到是由於在一個已持有鎖的方法中調用了另外一個須要加鎖執行的方法致使的。分佈式

簡化後的函數以下圖所示:ide

@Service
public class Lock1ServiceImpl implements Lock1Service {
    @Override
    @LockAnnotation(lockField = "test", lockKey = "1")
    public void test() {
    
    }
}

@Service
public class Lock2ServiceImpl implements Lock2Service {

    @Autowired
    private Lock1Service lock1Service;

    @Override
    @LockAnnotation(lockField = "test", lockKey = "1")
    public void test(){
        lock1Service.test();
    }
}

複製代碼

死鎖的過程主要以下:函數

Lock2Service的test方法須要加鎖(鎖的key爲"test:1"),而後在方法內部調用了Lock1Service的test方法,而Lock1Service的test方法也須要加一樣的鎖。post

在執行到lock1Service.test()時,Lock2Service的test方法並無執行結束,因此不會釋放鎖,而lock1Service執行test方法又必須去拿到鎖,這時候就發生死鎖了。優化

這段代碼只能等lock1Service通過一段時間等待後主動放棄繼續拿鎖後才能繼續往下進行。而lock1Service的test方法將永遠也沒法執行。ui

解決方案

問題既然已經出現了,那麼接下來咱們該作的就是想辦法來儘可能避免這個狀況。

咱們很快就想到了jdk中自帶的ReentrantLock,咱們能夠按照一樣的原理實現可重入的分佈式鎖。

實現可重入的原理也和ReentrantLock的原理相似,和ReentrantLock的不一樣之處在於分佈式鎖在第一次去獲取鎖的時候須要採用redis去分佈式競爭。而當已持有鎖且須要重入的時候,分佈式鎖會降級爲本地鎖。只有同一個線程的資源纔有重入的概念。

如下是lock類的主要細節,加鎖過程和解鎖過程以下所示:

final Boolean tryLock(String lockValue, int waitTime) {
    long startTime = System.currentTimeMillis();
    long endTime = startTime + waitTime * 1000;
    try {
        do {
            final Thread current = Thread.currentThread();
            int c = this.getState();
            if (c == 0) {
                int lockTime = LOCK_TIME;
                if (lockRedisClient.setLock(lockKey, lockValue, lockTime)) {
                    lockOwnerThread = current;
                    this.setState(c + 1);
                    survivalClamProcessor = new SurvivalClamProcessor(lockKey, lockValue, lockTime, lockRedisClient);
                    (survivalThread = threadFactoryManager.getThreadFactory().newThread(survivalClamProcessor)).start();
                    log.info("線程獲取重入鎖成功,鎖的名稱爲{}", lockKey);
                    return Boolean.TRUE;
                }
            } else if (lockOwnerThread == Thread.currentThread()) {
                if (c + 1 < 0) {
                    throw new Error("Maximum lock count exceeded");
                }
                this.setState(c + 1);
                log.info("線程重入鎖成功,鎖的名稱爲{},當前LockCount爲{}", lockKey, state);
                return Boolean.TRUE;
            }
            int sleepTime = SLEEP_TIME_ONCE;
            if (waitTime > 0) {
                log.info("線程暫時沒法得到鎖,當前已等待{}ms,本次將再等待{}ms,鎖的名稱爲{}", System.currentTimeMillis() - startTime, sleepTime, lockKey);
                try {
                    Thread.sleep(sleepTime);
                } catch (InterruptedException e) {
                    log.info("線程等待過程當中被中斷,鎖的名稱爲{}", lockKey, e);
                }
            }
        } while (System.currentTimeMillis() <= endTime);
        if (waitTime == 0) {
            log.info("線程得到鎖失敗,將放棄獲取鎖,鎖的名稱爲{}", lockKey);
        } else {
            log.info("線程得到鎖失敗,以前共等待{}ms,將放棄等待獲取鎖,鎖的名稱爲{}", System.currentTimeMillis() - startTime, lockKey);
        }
        return Boolean.FALSE;
    } catch (Exception e) {
        return Boolean.FALSE;
    }
}
複製代碼
final void unLock(String lockValue) {
    if (lockOwnerThread == Thread.currentThread()) {
        int c = this.getState() - 1;
        if (c == 0) {
            this.setLockOwnerThread(null);
            survivalClamProcessor.stop();
            survivalThread.interrupt();
            this.setSurvivalClamProcessor(null);
            this.setSurvivalThread(null);
            this.setState(c);
            lockRedisClient.delLock(lockKey, lockValue);
            log.info("重入鎖LockCount-1,線程已成功釋放鎖,鎖的名稱爲{}", lockKey);
        } else {
            this.setState(c);
            log.info("重入鎖LockCount-1,鎖的名稱爲{},剩餘LockCount爲{}", lockKey, c);
        }
    }
}

複製代碼

而後爲了不使用者常常會忘記解鎖或解鎖不規範,因此加解鎖的方法都不對外暴露,只對外暴露execute方法:

public <T> T execute(Supplier<T> supplier, int waitTime) {
    String randomValue = UUID.randomUUID().toString();
    Boolean holdLock = Boolean.FALSE;
    try {
        if (holdLock = this.tryLock(randomValue, waitTime)) {
            return supplier.get();
        }
        return null;
    } catch (Exception e) {
        log.error("execute error", e);
        return null;
    } finally {
        if (holdLock) {
            this.unLock(randomValue);
        }
    }
}
複製代碼

另外在aop實現的時候,由於沒法從上下文中獲取到同一個lock對象,故須要經過lockManager.getLock(lockField, lockKey)去獲取到lock對象。若是lockPrefix和lockKey一致的話,將得到到同一個lock對象,從而實現可重入的功能。

public Lock getLock(String lockPrefix, String lockKey) {
    String finalLockKey = StringUtils.isEmpty(lockPrefix) ? lockKey : (lockPrefix.concat(":").concat(lockKey));
    if (lockMap.containsKey(finalLockKey)) {
        return lockMap.get(finalLockKey);
    } else {
        Lock lock = new Lock(finalLockKey, lockRedisClient, threadFactoryManager);
        Lock existLock = lockMap.putIfAbsent(finalLockKey, lock);
        return Objects.nonNull(existLock) ? existLock : lock;
    }
}
複製代碼

更新說明

本次更新,除了實現了分佈式鎖的可重入功能以外,另外還在聲明式分佈式鎖@LockAnnotation註解基礎上,實現了經過execute方法來實現的編程式分佈式鎖。

此外,因爲以前版本已實現可續期的功能,因此LockAnnotation上的lockTime標記爲已過時,鎖的過時時間統一改成30s。經過續期功能來實現須要長時間鎖定的功能。

後續計劃

目前實現的版本中,已可知足分佈式鎖的大部分場景(非公平+可自動續期+可重入的分佈式鎖),已可投入生產環境使用。但目前仍不支持公平鎖,並且在競爭鎖失敗時採用了自旋+線程等待的方式實現了線程阻塞,後續可能會往這兩個方向去優化。

好了,咱們下一期再見,歡迎你們一塊兒留言討論。同時也歡迎點贊,歡迎送小星星~

相關文章
相關標籤/搜索