Redisson實現分佈式鎖—RedissonLock

Redisson實現分佈式鎖—RedissonLock

有關Redisson實現分佈式鎖上一篇博客講了分佈式的鎖原理:Redisson實現分佈式鎖---原理html

這篇主要講RedissonLock和RLock。Redisson分佈式鎖的實現是基於RLock接口,RedissonLock實現RLock接口。java

1、RLock接口

一、概念

public interface RLock extends Lock, RExpirable, RLockAsync

很明顯RLock是繼承Lock鎖,因此他有Lock鎖的全部特性,好比lock、unlock、trylock等特性,同時它還有不少新特性:強制鎖釋放,帶有效期的鎖,。node

二、RLock鎖API

這裏針對上面作個整理,這裏列舉幾個經常使用的接口說明redis

public interface RRLock {
    //----------------------Lock接口方法-----------------------

    /**
     * 加鎖 鎖的有效期默認30秒
     */
    void lock();
    /**
     * tryLock()方法是有返回值的,它表示用來嘗試獲取鎖,若是獲取成功,則返回true,若是獲取失敗(即鎖已被其餘線程獲取),則返回false .
     */
    boolean tryLock();
    /**
     * tryLock(long time, TimeUnit unit)方法和tryLock()方法是相似的,只不過區別在於這個方法在拿不到鎖時會等待必定的時間,
     * 在時間期限以內若是還拿不到鎖,就返回false。若是若是一開始拿到鎖或者在等待期間內拿到了鎖,則返回true。
     *
     * @param time 等待時間
     * @param unit 時間單位 小時、分、秒、毫秒等
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    /**
     * 解鎖
     */
    void unlock();
    /**
     * 中斷鎖 表示該鎖能夠被中斷 假如A和B同時調這個方法,A獲取鎖,B爲獲取鎖,那麼B線程能夠經過
     * Thread.currentThread().interrupt(); 方法真正中斷該線程
     */
    void lockInterruptibly();

    //----------------------RLock接口方法-----------------------
    /**
     * 加鎖 上面是默認30秒這裏能夠手動設置鎖的有效時間
     *
     * @param leaseTime 鎖有效時間
     * @param unit      時間單位 小時、分、秒、毫秒等
     */
    void lock(long leaseTime, TimeUnit unit);
    /**
     * 這裏比上面多一個參數,多添加一個鎖的有效時間
     *
     * @param waitTime  等待時間
     * @param leaseTime 鎖有效時間
     * @param unit      時間單位 小時、分、秒、毫秒等
     */
    boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
    /**
     * 檢驗該鎖是否被線程使用,若是被使用返回True
     */
    boolean isLocked();
    /**
     * 檢查當前線程是否得到此鎖(這個和上面的區別就是該方法能夠判斷是否當前線程得到此鎖,而不是此鎖是否被線程佔有)
     * 這個比上面那個實用
     */
    boolean isHeldByCurrentThread();
    /**
     * 中斷鎖 和上面中斷鎖差很少,只是這裏若是得到鎖成功,添加鎖的有效時間
     * @param leaseTime  鎖有效時間
     * @param unit       時間單位 小時、分、秒、毫秒等
     */
    void lockInterruptibly(long leaseTime, TimeUnit unit);  
}

RLock相關接口,主要是新添加了 leaseTime 屬性字段,主要是用來設置鎖的過時時間,避免死鎖。api


2、RedissonLock實現類

public class RedissonLock extends RedissonExpirable implements RLock

RedissonLock實現了RLock接口,因此實現了接口的具體方法。這裏我列舉幾個方法說明下併發

一、void lock()方法

@Override
    public void lock() {
        try {
            lockInterruptibly();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

發現lock鎖裏面進去其實用的是lockInterruptibly(中斷鎖,表示能夠被中斷),並且捕獲異常後用 Thread.currentThread().interrupt()來真正中斷當前線程,其實它們是搭配一塊兒使用的。框架

具體有關lockInterruptibly()方法講解推薦一個博客。博客Lock的lockInterruptibly()分佈式

接下來執行流程,這裏理下關鍵幾步ide

/**
     * 一、帶上默認值調另外一箇中斷鎖方法
     */
    @Override
    public void lockInterruptibly() throws InterruptedException {
        lockInterruptibly(-1, null);
    }
    /**
     * 二、另外一箇中斷鎖的方法
     */
    void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException 
    /**
     * 三、這裏已經設置了鎖的有效時間默認爲30秒  (commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()=30)
     */
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    /**
     * 四、最後經過lua腳本訪問Redis,保證操做的原子性
     */
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

那麼void lock(long leaseTime, TimeUnit unit)方法其實和上面很類似了,就是從上面第二步開始的。高併發

二、tryLock(long waitTime, long leaseTime, TimeUnit unit)

接口的參數和含義上面已經說過了,如今咱們開看下源碼,這裏只顯示一些重要邏輯。

@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        //一、 獲取鎖同時獲取成功的狀況下,和lock(...)方法是同樣的 直接返回True,獲取鎖False再往下走
        if (ttl == null) {
            return true;
        }
        //二、若是超過了嘗試獲取鎖的等待時間,固然返回false 了。
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }

        // 三、訂閱監聽redis消息,而且建立RedissonLockEntry,其中RedissonLockEntry中比較關鍵的是一個 Semaphore屬性對象,用來控制本地的鎖請求的信號量同步,返回的是netty框架的Future實現。
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        //  阻塞等待subscribe的future的結果對象,若是subscribe方法調用超過了time,說明已經超過了客戶端設置的最大wait time,則直接返回false,取消訂閱,再也不繼續申請鎖了。
        //  只有await返回true,才進入循環嘗試獲取鎖
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

       //四、若是沒有超過嘗試獲取鎖的等待時間,那麼經過While一直獲取鎖。最終只會有兩種結果
        //1)、在等待時間內獲取鎖成功 返回true。2)等待時間結束了尚未獲取到鎖那麼返回false。
        while (true) {
            long currentTime = System.currentTimeMillis();
            ttl = tryAcquire(leaseTime, unit, threadId);
            // 獲取鎖成功
            if (ttl == null) {
                return true;
            }
           //   獲取鎖失敗
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        }
    }

重點 tryLock通常用於特定知足需求的場合,但不建議做爲通常需求的分佈式鎖,通常分佈式鎖建議用void lock(long leaseTime, TimeUnit unit)。由於從性能上考慮,在高併發狀況下後者效率是前者的好幾倍

三、unlock()

解鎖的邏輯很簡單。

@Override
    public void unlock() {
        // 1.經過 Lua 腳本執行 Redis 命令釋放鎖
        Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE,
                RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end;" +
                        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; "+
                        "end; " +
                        "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()),
                LockPubSub.unlockMessage, internalLockLeaseTime,
                getLockName(Thread.currentThread().getId()));
        // 2.非鎖的持有者釋放鎖時拋出異常
        if (opStatus == null) {
            throw new IllegalMonitorStateException(
                    "attempt to unlock lock, not locked by current thread by node id: "
                            + id + " thread-id: " + Thread.currentThread().getId());
        }
        // 3.釋放鎖後取消刷新鎖失效時間的調度任務
        if (opStatus) {
            cancelExpirationRenewal();
        }
    }

使用 EVAL 命令執行 Lua 腳原本釋放鎖:

  1. key 不存在,說明鎖已釋放,直接執行 publish 命令發佈釋放鎖消息並返回 1
  2. key 存在,可是 field 在 Hash 中不存在,說明本身不是鎖持有者,無權釋放鎖,返回 nil
  3. 由於鎖可重入,因此釋放鎖時不能把全部已獲取的鎖全都釋放掉,一次只能釋放一把鎖,所以執行 hincrby 對鎖的值減一
  4. 釋放一把鎖後,若是還有剩餘的鎖,則刷新鎖的失效時間並返回 0;若是剛纔釋放的已是最後一把鎖,則執行 del 命令刪除鎖的 key,併發布鎖釋放消息,返回 1

注意這裏有個實際開發過程當中,容易出現很容易出現上面第二步異常,非鎖的持有者釋放鎖時拋出異常。好比下面這種狀況

//設置鎖1秒過去
        redissonLock.lock("redisson", 1);
        /**
         * 業務邏輯須要諮詢2秒
         */
        redissonLock.release("redisson");
      /**
       * 線程1 進來得到鎖後,線程一切正常並無宕機,但它的業務邏輯須要執行2秒,這就會有個問題,在 線程1 執行1秒後,這個鎖就自動過時了,
       * 那麼這個時候 線程2 進來了。在線程1去解鎖就會拋上面這個異常(由於解鎖和當前鎖已經不是同一線程了)
       */




只要本身變優秀了,其餘的事情纔會跟着好起來(中將6)
相關文章
相關標籤/搜索