Redission鎖繼承Implements Reentrant Lock,因此具有 Reentrant Lock 鎖中的一些特性:超時,重試,可中斷等。加上Redission中Redis具有分佈式的特性,因此很是適合用來作Java中的分佈式鎖。 下面咱們對其加鎖、解鎖過程當中的源碼細節進行一一分析。redis
鎖的接口定義了一下方法:分佈式
分佈式鎖當中加鎖,咱們經常使用的加鎖接口:ide
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
下面咱們來看一下方法的具體實現:ui
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); final long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= (System.currentTimeMillis() - current); if (time <= 0) { acquireFailed(threadId); return false; } current = System.currentTimeMillis(); final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() { @Override public void operationComplete(Future<RedissonLockEntry> future) throws Exception { if (subscribeFuture.isSuccess()) { unsubscribe(subscribeFuture, threadId); } } }); } acquireFailed(threadId); return false; } try { time -= (System.currentTimeMillis() - current); if (time <= 0) { acquireFailed(threadId); return false; } while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } time -= (System.currentTimeMillis() - currentTime); if (time <= 0) { acquireFailed(threadId); return false; } // waiting for message currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } time -= (System.currentTimeMillis() - currentTime); if (time <= 0) { acquireFailed(threadId); return false; } } } finally { unsubscribe(subscribeFuture, threadId); } // return get(tryLockAsync(waitTime, leaseTime, unit)); }
首先咱們看到調用tryAcquire嘗試獲取鎖,在這裏是否能獲取到鎖,是根據鎖名稱的過時時間TTL來斷定的(TTL<=0:則說明該鎖不存在或者已經超時,此時獲取鎖成功。TTL>0:則說明該鎖被其餘現成持有,此時獲取鎖失敗);線程
下面咱們接着看一下tryAcquire的實現:blog
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); }
能夠看到真正獲取鎖的操做通過一層get操做裏面執行的,這裏爲什麼要這麼操做,本人也不是太理解,若有理解錯誤,歡迎指正。繼承
get 是由CommandAsyncExecutor(一個線程Executor)封裝的一個Executor
設置一個單線程的同步控制器CountDownLatch,用於控制單個線程的中斷信息。我的理解通過中間的這麼一步:主要是爲了支持線程可中斷操做。接口
public <V> V get(RFuture<V> future) { if (!future.isDone()) { final CountDownLatch l = new CountDownLatch(1); future.addListener(new FutureListener<V>() { @Override public void operationComplete(Future<V> future) throws Exception { l.countDown(); } }); boolean interrupted = false; while (!future.isDone()) { try { l.await(); } catch (InterruptedException e) { interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } } // commented out due to blocking issues up to 200 ms per minute for each thread:因爲每一個線程的阻塞問題,每分鐘高達200毫秒 // future.awaitUninterruptibly(); if (future.isSuccess()) { return future.getNow(); } throw convertException(future); }
咱們進一步往下看:get
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.addListener(new FutureListener<Long>() { @Override public void operationComplete(Future<Long> future) throws Exception { if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; }
首先判斷鎖是否有超時時間,有過時時間的話,會在後面獲取鎖的時候設置進去。沒有過時時間的話,則會用默認的同步
private long lockWatchdogTimeout = 30 * 1000;
下面咱們在進一步往下分析真正獲取鎖的操做:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
我把裏面的重點信息作了如下三點總結:
1:真正執行的是一段具備原子性的Lua腳本,而且最終也是由CommandAsynExecutor去執行。
2:鎖真正持久化到Redis時,用的hash類型key field value
3:獲取鎖的三個參數:getName()是邏輯鎖名稱,例如:分佈式鎖要鎖住的methodName+params;internalLockLeaseTime是毫秒單位的鎖過時時間;getLockName則是鎖對應的線程級別的名稱,由於支持相同線程可重入,不一樣線程不可重入,因此這裏的鎖的生成方式是:UUID+":"threadId。有的同窗可能會問,這樣不是很縝密:不一樣的JVM可能會生成相同的threadId,因此Redission這裏加了一個區分度很高的UUID;
Lua腳本中的執行分爲如下三步:
1:exists檢查redis中是否存在鎖名稱;若是不存在,則獲取成功;同時把邏輯鎖名稱KEYS[1],線程級別的鎖名稱[ARGV[2],value=1,設置到redis。並設置邏輯鎖名稱的過時時間ARGV[2],返回;
2:若是檢查到存在KEYS[1],[ARGV[2],則說明獲取成功,此時會自增對應的value值,記錄重入次數;並更新鎖的過時時間
3:key不存,直接返回key的剩餘過時時間(-2)
鎖獲取失敗、解鎖過程後在後面的文章繼續補充