有關Redisson實現分佈式鎖上一篇博客講了分佈式的鎖原理:Redisson實現分佈式鎖---原理html
這篇主要講RedissonLock和RLock。Redisson分佈式鎖的實現是基於RLock接口,RedissonLock實現RLock接口。java
public interface RLock extends Lock, RExpirable, RLockAsync
很明顯RLock是繼承Lock鎖,因此他有Lock鎖的全部特性,好比lock、unlock、trylock等特性,同時它還有不少新特性:強制鎖釋放,帶有效期的鎖,。node
這裏針對上面作個整理,這裏列舉幾個經常使用的接口說明redis
public interface RRLock { //----------------------Lock接口方法----------------------- /** * 加鎖 鎖的有效期默認30秒 */ void lock(); /** * tryLock()方法是有返回值的,它表示用來嘗試獲取鎖,若是獲取成功,則返回true,若是獲取失敗(即鎖已被其餘線程獲取),則返回false . */ boolean tryLock(); /** * tryLock(long time, TimeUnit unit)方法和tryLock()方法是相似的,只不過區別在於這個方法在拿不到鎖時會等待必定的時間, * 在時間期限以內若是還拿不到鎖,就返回false。若是若是一開始拿到鎖或者在等待期間內拿到了鎖,則返回true。 * * @param time 等待時間 * @param unit 時間單位 小時、分、秒、毫秒等 */ boolean tryLock(long time, TimeUnit unit) throws InterruptedException; /** * 解鎖 */ void unlock(); /** * 中斷鎖 表示該鎖能夠被中斷 假如A和B同時調這個方法,A獲取鎖,B爲獲取鎖,那麼B線程能夠經過 * Thread.currentThread().interrupt(); 方法真正中斷該線程 */ void lockInterruptibly(); //----------------------RLock接口方法----------------------- /** * 加鎖 上面是默認30秒這裏能夠手動設置鎖的有效時間 * * @param leaseTime 鎖有效時間 * @param unit 時間單位 小時、分、秒、毫秒等 */ void lock(long leaseTime, TimeUnit unit); /** * 這裏比上面多一個參數,多添加一個鎖的有效時間 * * @param waitTime 等待時間 * @param leaseTime 鎖有效時間 * @param unit 時間單位 小時、分、秒、毫秒等 */ boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; /** * 檢驗該鎖是否被線程使用,若是被使用返回True */ boolean isLocked(); /** * 檢查當前線程是否得到此鎖(這個和上面的區別就是該方法能夠判斷是否當前線程得到此鎖,而不是此鎖是否被線程佔有) * 這個比上面那個實用 */ boolean isHeldByCurrentThread(); /** * 中斷鎖 和上面中斷鎖差很少,只是這裏若是得到鎖成功,添加鎖的有效時間 * @param leaseTime 鎖有效時間 * @param unit 時間單位 小時、分、秒、毫秒等 */ void lockInterruptibly(long leaseTime, TimeUnit unit); }
RLock相關接口,主要是新添加了 leaseTime
屬性字段,主要是用來設置鎖的過時時間,避免死鎖。api
public class RedissonLock extends RedissonExpirable implements RLock
RedissonLock實現了RLock接口,因此實現了接口的具體方法。這裏我列舉幾個方法說明下併發
@Override public void lock() { try { lockInterruptibly(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }
發現lock鎖裏面進去其實用的是lockInterruptibly
(中斷鎖,表示能夠被中斷),並且捕獲異常後用 Thread.currentThread().interrupt()來真正中斷當前線程,其實它們是搭配一塊兒使用的。框架
具體有關lockInterruptibly()方法講解推薦一個博客。博客
:Lock的lockInterruptibly()分佈式
接下來執行流程,這裏理下關鍵幾步ide
/** * 一、帶上默認值調另外一箇中斷鎖方法 */ @Override public void lockInterruptibly() throws InterruptedException { lockInterruptibly(-1, null); } /** * 二、另外一箇中斷鎖的方法 */ void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException /** * 三、這裏已經設置了鎖的有效時間默認爲30秒 (commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()=30) */ RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); /** * 四、最後經過lua腳本訪問Redis,保證操做的原子性 */ <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
那麼void lock(long leaseTime, TimeUnit unit)方法其實和上面很類似了,就是從上面第二步開始的。高併發
接口的參數和含義上面已經說過了,如今咱們開看下源碼,這裏只顯示一些重要邏輯。
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); //一、 獲取鎖同時獲取成功的狀況下,和lock(...)方法是同樣的 直接返回True,獲取鎖False再往下走 if (ttl == null) { return true; } //二、若是超過了嘗試獲取鎖的等待時間,固然返回false 了。 time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(threadId); return false; } // 三、訂閱監聽redis消息,而且建立RedissonLockEntry,其中RedissonLockEntry中比較關鍵的是一個 Semaphore屬性對象,用來控制本地的鎖請求的信號量同步,返回的是netty框架的Future實現。 final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); // 阻塞等待subscribe的future的結果對象,若是subscribe方法調用超過了time,說明已經超過了客戶端設置的最大wait time,則直接返回false,取消訂閱,再也不繼續申請鎖了。 // 只有await返回true,才進入循環嘗試獲取鎖 if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() { @Override public void operationComplete(Future<RedissonLockEntry> future) throws Exception { if (subscribeFuture.isSuccess()) { unsubscribe(subscribeFuture, threadId); } } }); } acquireFailed(threadId); return false; } //四、若是沒有超過嘗試獲取鎖的等待時間,那麼經過While一直獲取鎖。最終只會有兩種結果 //1)、在等待時間內獲取鎖成功 返回true。2)等待時間結束了尚未獲取到鎖那麼返回false。 while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(leaseTime, unit, threadId); // 獲取鎖成功 if (ttl == null) { return true; } // 獲取鎖失敗 time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(threadId); return false; } } }
重點
tryLock通常用於特定知足需求的場合,但不建議做爲通常需求的分佈式鎖,通常分佈式鎖建議用void lock(long leaseTime, TimeUnit unit)。由於從性能上考慮,在高併發狀況下後者效率是前者的好幾倍
解鎖的邏輯很簡單。
@Override public void unlock() { // 1.經過 Lua 腳本執行 Redis 命令釋放鎖 Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId())); // 2.非鎖的持有者釋放鎖時拋出異常 if (opStatus == null) { throw new IllegalMonitorStateException( "attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); } // 3.釋放鎖後取消刷新鎖失效時間的調度任務 if (opStatus) { cancelExpirationRenewal(); } }
使用 EVAL 命令執行 Lua 腳原本釋放鎖:
publish
命令發佈釋放鎖消息並返回 1
。nil
。hincrby
對鎖的值減一。0
;若是剛纔釋放的已是最後一把鎖,則執行 del
命令刪除鎖的 key,併發布鎖釋放消息,返回 1
。注意
這裏有個實際開發過程當中,容易出現很容易出現上面第二步異常,非鎖的持有者釋放鎖時拋出異常。好比下面這種狀況
//設置鎖1秒過去 redissonLock.lock("redisson", 1); /** * 業務邏輯須要諮詢2秒 */ redissonLock.release("redisson"); /** * 線程1 進來得到鎖後,線程一切正常並無宕機,但它的業務邏輯須要執行2秒,這就會有個問題,在 線程1 執行1秒後,這個鎖就自動過時了, * 那麼這個時候 線程2 進來了。在線程1去解鎖就會拋上面這個異常(由於解鎖和當前鎖已經不是同一線程了) */
只要本身變優秀了,其餘的事情纔會跟着好起來(中將6)