問題:假設設置失效時間10秒,若是因爲某些緣由致使10秒還沒執行完任務,這時候鎖自動失效,致使其餘線程也會拿到分佈式鎖,怎麼處理? 答:Redisson內部提供了一個監控鎖的看門狗,它的做用是在Redisson實例被關閉前,不斷的延長鎖的有效期。github
具體使用能夠參考Redisson官方文檔 這裏貼上我簡單使用的例子:redis
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.11.4</version> </dependency>
# common spring boot settings #spring.redis.database= #spring.redis.host= #spring.redis.port= spring.redis.password=XXXXXXXXXX #spring.redis.ssl= #spring.redis.timeout= spring.redis.cluster.nodes=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx #spring.redis.sentinel.master= #spring.redis.sentinel.nodes= # Redisson settings #path to redisson.yaml or redisson.json #spring.redis.redisson.config=classpath:redisson.yaml
public void contextLoads() { //簡單使用測試 // RBucket<String> bucket = redisson.getBucket("bucket"); // bucket.set("test"); // String obj = bucket.get(); // System.out.println(obj); // 得到鎖對象實例 RLock lock = redisson.getLock("lock"); // 獲取分佈式鎖,採用默認超時時間30秒 // 若是負責儲存這個分佈式鎖的Redisson節點宕機之後, // 並且這個鎖正好處於鎖住的狀態時, // 這個鎖會出現鎖死的狀態。 // 爲了不這種狀況的發生,Redisson內部提供了一個監控鎖的看門狗, // 它的做用是在Redisson實例被關閉前,不斷的延長鎖的有效期。 // 默認狀況下,看門狗的檢查鎖的超時時間是30秒鐘, // 也能夠經過修改Config.lockWatchdogTimeout來另行指定 lock.lock(); try { Thread.sleep(80000); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 釋放鎖 lock.unlock(); } // 加鎖之後10秒鐘自動解鎖 // 無需調用unlock方法手動解鎖 // 這種指定了超時時間的鎖不會走看門狗邏輯, // 即會發生任務沒有執行完成時,鎖超時了,其餘進程會獲取到這個分佈式鎖。 // 儘可能使用第一種方式,走看門狗邏輯。 // lock.lock(40, TimeUnit.SECONDS); // try { // Thread.sleep(80000); // } catch (InterruptedException e) { // e.printStackTrace(); // } }
@Override public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); } }
注意這裏第一個入參爲-1。 進入lock方法:springboot
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); //獲取鎖邏輯 Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { //獲取成功,返回 return; } //訂閱鎖 RFuture<RedissonLockEntry> future = subscribe(threadId); commandExecutor.syncSubscription(future); try { //持續獲取鎖 while (true) { ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { try { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { getEntry(threadId).getLatch().acquire(); } else { getEntry(threadId).getLatch().acquireUninterruptibly(); } } } } finally { //最終取消訂閱獲取鎖 unsubscribe(future, threadId); } }
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); } private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } //獲取鎖 RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); //看門狗邏輯 ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
由於leaseTime爲-1,因此首先異步的獲取鎖,以後會走看門狗邏輯。 先看獲取鎖的操做:tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " "redis.call('hset', KEYS[1], ARGV[2], 1); " "redis.call('pexpire', KEYS[1], ARGV[1]); " "return nil; " "end; " "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " "redis.call('hincrby', KEYS[1], ARGV[2], 1); " "redis.call('pexpire', KEYS[1], ARGV[1]); " "return nil; " "end; " "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
Redisson 使用 EVAL 命令執行上面的 Lua 腳原本完成獲取鎖的操做:
@Override public void unlock() { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } } } @Override public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise<Void>(); RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { if (e != null) { cancelExpirationRenewal(threadId); result.tryFailure(e); return; } if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " id " thread-id: " threadId); result.tryFailure(cause); return; } cancelExpirationRenewal(threadId); result.trySuccess(null); }); return result; }
上面opStatus爲null時,會拋出異常,必須由加鎖的線程釋放鎖。 再來看核心方法:unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " "return nil;" "end; " "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " "if (counter > 0) then " "redis.call('pexpire', KEYS[1], ARGV[2]); " "return 0; " "else " "redis.call('del', KEYS[1]); " "redis.call('publish', KEYS[2], ARGV[1]); " "return 1; " "end; " "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
依然使用 EVAL 命令執行 Lua 腳原本釋放鎖:
Redis做者antirez基於分佈式環境下提出了一種更高級的分佈式鎖的實現方式:Redlock。 Redisson中也實現了這種算法,具體能夠參考看8.4章節 這裏簡單描述一下這種算法: 假設有5個互不鏈接的Redis集羣
RLock lock1 = redissonInstance1.getLock("lock1"); RLock lock2 = redissonInstance2.getLock("lock2"); RLock lock3 = redissonInstance3.getLock("lock3"); RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3); // 同時加鎖:lock1 lock2 lock3 // 紅鎖在大部分節點上加鎖成功就算成功。 lock.lock(); ... lock.unlock();
Zookeeper是一個一致性的文件系統,保證了其每一個節點的惟一性。 有4種節點類型:
