上一篇文章主要側重如何獲取鎖以及所獲取成功的場景,本文將着重對失敗以及解鎖的狀況進行分析,探尋Redisson分佈式鎖最具藝術的地方。node
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); final long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= (System.currentTimeMillis() - current); if (time <= 0) { acquireFailed(threadId); return false; } current = System.currentTimeMillis(); // 訂閱監聽redis消息,而且建立RedissonLockEntry,其中RedissonLockEntry中比較關鍵的是一個 Semaphore屬性對象,用來控制本地的鎖請求的信號量同步,返回的是netty框架的Future實現。 final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); // 阻塞等待subscribe的future的結果對象,若是subscribe方法調用超過了time,說明已經超過了客戶端設置的最大wait time,則直接返回false,取消訂閱,再也不繼續申請鎖了。 if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() { @Override public void operationComplete(Future<RedissonLockEntry> future) throws Exception { if (subscribeFuture.isSuccess()) { unsubscribe(subscribeFuture, threadId); } } }); } acquireFailed(threadId); return false; } try { time -= (System.currentTimeMillis() - current); if (time <= 0) { acquireFailed(threadId); return false; } while (true) { long currentTime = System.currentTimeMillis(); // 再次嘗試一次申請鎖 ttl = tryAcquire(leaseTime, unit, threadId); // if (ttl == null) { return true; } time -= (System.currentTimeMillis() - currentTime); if (time <= 0) { acquireFailed(threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); // 經過信號量(共享鎖)阻塞,等待解鎖消息(這一點設計的很是精妙:減小了其餘分佈式節點的等待或者空轉等無效鎖申請的操做,總體提升了性能) // 若是剩餘時間(ttl)小於wait time ,就在 ttl 時間內,從Entry的信號量獲取一個許可(除非被中斷或者一直沒有可用的許可)。 // 不然就在wait time 時間範圍內等待能夠經過信號量 if (ttl >= 0 && ttl < time) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= (System.currentTimeMillis() - currentTime); if (time <= 0) { acquireFailed(threadId); return false; } } } finally { //不管是否得到鎖,都要取消訂閱解鎖消息 unsubscribe(subscribeFuture, threadId); } // return get(tryLockAsync(waitTime, leaseTime, unit)); }
咱們看到當獲取鎖的時長超過請求等待時間,直接進入acquireFailed(進一步調用acquireFailedAsync),並同步返回false,獲取鎖失敗。接下里咱們直接進入該異步(異步處理IO,提升系統吞吐量)方法,對其進行解析:redis
@Override protected RFuture<Void> acquireFailedAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID, "redis.call('zrem', KEYS[2], ARGV[1]); " + "redis.call('lrem', KEYS[1], 0, ARGV[1]); ", Arrays.<Object>asList(getThreadsQueueName(), getTimeoutSetName()), getLockName(threadId)); }
能夠看到,這一步就是把該線程從獲取鎖操做的等待隊列中直接刪掉;網絡
接着往下看,若是未達到請求超時時間,則首先訂閱該鎖的信息。當其餘線程釋放鎖的時候,會同時根據鎖的惟一通道publish一條分佈式的解鎖信息,接收到分佈式消息後, 等待獲取鎖的Semaphore中的監聽隊列中的listenser線程可從新申請鎖,這個後面會深刻講解。下面是訂閱的具體細節:框架
public RFuture<E> subscribe(final String entryName, final String channelName, final ConnectionManager connectionManager) { final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>(); //根據channelName拿到信號量,channelName=UUID+":"+name,對應一個鎖。 final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName); final RPromise<E> newPromise = new RedissonPromise<E>() { @Override public boolean cancel(boolean mayInterruptIfRunning) { return semaphore.remove(listenerHolder.get()); } }; Runnable listener = new Runnable() { @Override public void run() { E entry = entries.get(entryName); if (entry != null) { entry.aquire(); semaphore.release(); entry.getPromise().addListener(new TransferListener<E>(newPromise)); return; } E value = createEntry(newPromise); value.aquire(); E oldValue = entries.putIfAbsent(entryName, value); if (oldValue != null) { oldValue.aquire(); semaphore.release(); oldValue.getPromise().addListener(new TransferListener<E>(newPromise)); return; } RedisPubSubListener<Object> listener = createListener(channelName, value); connectionManager.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener); } }; //把生成的監聽線程listenser加入到信號量的監聽集合中去,後面發佈解鎖消息的時候,會喚醒 semaphore.acquire(listener); listenerHolder.set(listener); return newPromise; }
接着回到tryLock方法,看到finally裏面:不管是否得到鎖,都要取消訂閱解鎖消息,這裏不作贅述。異步
接着咱們一併分析一下解鎖的過程分佈式
public void unlock() { Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId())); if (opStatus == null) { throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); } if (opStatus) { // 解鎖成功以後取消更新鎖expire的時間任務,針對於沒有鎖過時時間的 cancelExpirationRenewal(); } // Future<Void> future = unlockAsync(); // future.awaitUninterruptibly(); // if (future.isSuccess()) { // return; // } // if (future.cause() instanceof IllegalMonitorStateException) { // throw (IllegalMonitorStateException)future.cause(); // } // throw commandExecutor.convertException(future); }
解鎖的邏輯相對簡單,具體步驟以下:ide
若是lock鍵不存在,發消息說鎖已經可用性能
若是鎖不是被當前線程鎖定,則返回nilui
因爲支持可重入,在解鎖時將重入次數須要減1線程
若是計算後的重入次數>0,則從新設置過時時間
若是計算後的重入次數<=0,則發消息說鎖已經可用
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }
下面咱們再看一下Redisson是如何處理解鎖消息的(LockPubSub.unlockMessage):
/** * * @author Nikita Koksharov * */ public class LockPubSub extends PublishSubscribe<RedissonLockEntry> { public static final Long unlockMessage = 0L; @Override protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) { return new RedissonLockEntry(newPromise); } @Override protected void onMessage(RedissonLockEntry value, Long message) { if (message.equals(unlockMessage)) { // 釋放一個許可,喚醒等待的entry.getLatch().tryAcquire去再次嘗試獲取鎖。 value.getLatch().release(); while (true) { Runnable runnableToExecute = null; // 若是entry還有其餘Listeners回調,也喚醒執行。 synchronized (value) { Runnable runnable = value.getListeners().poll(); if (runnable != null) { if (value.getLatch().tryAcquire()) { runnableToExecute = runnable; } else { value.addListener(runnable); } } } if (runnableToExecute != null) { runnableToExecute.run(); } else { return; } } } } }
Redisson還有不少東西能夠挖掘,不只侷限分佈式鎖(對於分佈式鎖的一些細節,本文摘抄了網絡中比較靠譜的一些片斷,方便你們理解)。做者Nikita Koksharov 把原來Conrrent包下不少同步類(好比:CountDownLatch,Semaphore),用分佈式的方式實現了一遍,仍是很厲害的。這些加強的實現,之後在工做都將大有用處。這些點,之後有空的時候再慢慢研究。