Redisson分佈式鎖深刻解析(二)

上一篇文章主要側重如何獲取鎖以及所獲取成功的場景,本文將着重對失敗以及解鎖的狀況進行分析,探尋Redisson分佈式鎖最具藝術的地方。node

@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        final long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();

        // 訂閱監聽redis消息,而且建立RedissonLockEntry,其中RedissonLockEntry中比較關鍵的是一個 Semaphore屬性對象,用來控制本地的鎖請求的信號量同步,返回的是netty框架的Future實現。 
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
       // 阻塞等待subscribe的future的結果對象,若是subscribe方法調用超過了time,說明已經超過了客戶端設置的最大wait time,則直接返回false,取消訂閱,再也不繼續申請鎖了。
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                // 再次嘗試一次申請鎖
                ttl = tryAcquire(leaseTime, unit, threadId);
                // 
                if (ttl == null) {
                    return true;
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                // 經過信號量(共享鎖)阻塞,等待解鎖消息(這一點設計的很是精妙:減小了其餘分佈式節點的等待或者空轉等無效鎖申請的操做,總體提升了性能)
                // 若是剩餘時間(ttl)小於wait time ,就在 ttl 時間內,從Entry的信號量獲取一個許可(除非被中斷或者一直沒有可用的許可)。 
                // 不然就在wait time 時間範圍內等待能夠經過信號量
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            //不管是否得到鎖,都要取消訂閱解鎖消息
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

咱們看到當獲取鎖的時長超過請求等待時間,直接進入acquireFailed(進一步調用acquireFailedAsync),並同步返回false,獲取鎖失敗。接下里咱們直接進入該異步(異步處理IO,提升系統吞吐量)方法,對其進行解析:redis

@Override
protected RFuture<Void> acquireFailedAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
                "redis.call('zrem', KEYS[2], ARGV[1]); " +
                "redis.call('lrem', KEYS[1], 0, ARGV[1]); ",
                Arrays.<Object>asList(getThreadsQueueName(), getTimeoutSetName()), getLockName(threadId));
}

能夠看到,這一步就是把該線程從獲取鎖操做的等待隊列中直接刪掉;網絡

接着往下看,若是未達到請求超時時間,則首先訂閱該鎖的信息。當其餘線程釋放鎖的時候,會同時根據鎖的惟一通道publish一條分佈式的解鎖信息,接收到分佈式消息後, 等待獲取鎖的Semaphore中的監聽隊列中的listenser線程可從新申請鎖,這個後面會深刻講解。下面是訂閱的具體細節:框架

public RFuture<E> subscribe(final String entryName, final String channelName, final ConnectionManager connectionManager) {
    final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
    //根據channelName拿到信號量,channelName=UUID+":"+name,對應一個鎖。
    final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName);
    final RPromise<E> newPromise = new RedissonPromise<E>() {
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return semaphore.remove(listenerHolder.get());
        }
    };

    Runnable listener = new Runnable() {

        @Override
        public void run() {
            E entry = entries.get(entryName);
            if (entry != null) {
                entry.aquire();
                semaphore.release();
                entry.getPromise().addListener(new TransferListener<E>(newPromise));
                return;
            }
            
            E value = createEntry(newPromise);
            value.aquire();
            
            E oldValue = entries.putIfAbsent(entryName, value);
            if (oldValue != null) {
                oldValue.aquire();
                semaphore.release();
                oldValue.getPromise().addListener(new TransferListener<E>(newPromise));
                return;
            }
            
            RedisPubSubListener<Object> listener = createListener(channelName, value);
            connectionManager.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
        }
    };
    //把生成的監聽線程listenser加入到信號量的監聽集合中去,後面發佈解鎖消息的時候,會喚醒
    semaphore.acquire(listener);
    listenerHolder.set(listener);
    
    return newPromise;
}

接着回到tryLock方法,看到finally裏面:不管是否得到鎖,都要取消訂閱解鎖消息,這裏不作贅述。異步

接着咱們一併分析一下解鎖的過程分佈式

public void unlock() {
        Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
        if (opStatus == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + Thread.currentThread().getId());
        }
        if (opStatus) {
            // 解鎖成功以後取消更新鎖expire的時間任務,針對於沒有鎖過時時間的
            cancelExpirationRenewal();
        }

//        Future<Void> future = unlockAsync();
//        future.awaitUninterruptibly();
//        if (future.isSuccess()) {
//            return;
//        }
//        if (future.cause() instanceof IllegalMonitorStateException) {
//            throw (IllegalMonitorStateException)future.cause();
//        }
//        throw commandExecutor.convertException(future);
    }

 

解鎖的邏輯相對簡單,具體步驟以下:ide

  1. 若是lock鍵不存在,發消息說鎖已經可用性能

  2. 若是鎖不是被當前線程鎖定,則返回nilui

  3. 因爲支持可重入,在解鎖時將重入次數須要減1線程

  4. 若是計算後的重入次數>0,則從新設置過時時間

  5. 若是計算後的重入次數<=0,則發消息說鎖已經可用

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

下面咱們再看一下Redisson是如何處理解鎖消息的(LockPubSub.unlockMessage):

/**
 * 
 * @author Nikita Koksharov
 *
 */
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {

    public static final Long unlockMessage = 0L;

    @Override
    protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
        return new RedissonLockEntry(newPromise);
    }

    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
        if (message.equals(unlockMessage)) {
           // 釋放一個許可,喚醒等待的entry.getLatch().tryAcquire去再次嘗試獲取鎖。
            value.getLatch().release();

            while (true) {
                Runnable runnableToExecute = null;
                 // 若是entry還有其餘Listeners回調,也喚醒執行。
                 synchronized (value) {
                    Runnable runnable = value.getListeners().poll();
                    if (runnable != null) {
                        if (value.getLatch().tryAcquire()) {
                            runnableToExecute = runnable;
                        } else {
                            value.addListener(runnable);
                        }
                    }
                }
                
                if (runnableToExecute != null) {
                    runnableToExecute.run();
                } else {
                    return;
                }
            }
        }
    }

}

Redisson還有不少東西能夠挖掘,不只侷限分佈式鎖(對於分佈式鎖的一些細節,本文摘抄了網絡中比較靠譜的一些片斷,方便你們理解)。做者Nikita Koksharov 把原來Conrrent包下不少同步類(好比:CountDownLatch,Semaphore),用分佈式的方式實現了一遍,仍是很厲害的。這些加強的實現,之後在工做都將大有用處。這些點,之後有空的時候再慢慢研究。

相關文章
相關標籤/搜索