分佈式部署的應用集羣中保證數據更新的互斥性,且程序出現異常時,鎖可以自動釋放,避免死鎖發生。java
爲了保證分佈式部署的應用集羣中同一時間只有一個客戶端對共享資源進行操做。根據鎖的用途再細分:node
這也是分佈式鎖的關鍵技術。git
問題:假設設置失效時間10秒,若是因爲某些緣由致使10秒還沒執行完任務,這時候鎖自動失效,致使其餘線程也會拿到分佈式鎖,怎麼處理? 答:Redisson內部提供了一個監控鎖的看門狗,它的做用是在Redisson實例被關閉前,不斷的延長鎖的有效期。github
具體使用能夠參考Redisson官方文檔 這裏貼上我簡單使用的例子:redis
我使用的是springboot,因此直接用了redisson提供的集成包。算法
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.11.4</version> </dependency>
我用的redis是官方cluster的3主3從。spring
# common spring boot settings #spring.redis.database= #spring.redis.host= #spring.redis.port= spring.redis.password=XXXXXXXXXX #spring.redis.ssl= #spring.redis.timeout= spring.redis.cluster.nodes=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx #spring.redis.sentinel.master= #spring.redis.sentinel.nodes= # Redisson settings #path to redisson.yaml or redisson.json #spring.redis.redisson.config=classpath:redisson.yaml
public void contextLoads() { //簡單使用測試 // RBucket<String> bucket = redisson.getBucket("bucket"); // bucket.set("test"); // String obj = bucket.get(); // System.out.println(obj); // 得到鎖對象實例 RLock lock = redisson.getLock("lock"); // 獲取分佈式鎖,採用默認超時時間30秒 // 若是負責儲存這個分佈式鎖的Redisson節點宕機之後, // 並且這個鎖正好處於鎖住的狀態時, // 這個鎖會出現鎖死的狀態。 // 爲了不這種狀況的發生,Redisson內部提供了一個監控鎖的看門狗, // 它的做用是在Redisson實例被關閉前,不斷的延長鎖的有效期。 // 默認狀況下,看門狗的檢查鎖的超時時間是30秒鐘, // 也能夠經過修改Config.lockWatchdogTimeout來另行指定 lock.lock(); try { Thread.sleep(80000); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 釋放鎖 lock.unlock(); } // 加鎖之後10秒鐘自動解鎖 // 無需調用unlock方法手動解鎖 // 這種指定了超時時間的鎖不會走看門狗邏輯, // 即會發生任務沒有執行完成時,鎖超時了,其餘進程會獲取到這個分佈式鎖。 // 儘可能使用第一種方式,走看門狗邏輯。 // lock.lock(40, TimeUnit.SECONDS); // try { // Thread.sleep(80000); // } catch (InterruptedException e) { // e.printStackTrace(); // } }
這裏再看一看具體獲取鎖和釋放鎖的核心邏輯:json
首先,調用了RedissonLock中的Lock方法:安全
@Override public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); } }
注意這裏第一個入參爲-1。 進入lock方法:springboot
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); //獲取鎖邏輯 Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { //獲取成功,返回 return; } //訂閱鎖 RFuture<RedissonLockEntry> future = subscribe(threadId); commandExecutor.syncSubscription(future); try { //持續獲取鎖 while (true) { ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { try { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { getEntry(threadId).getLatch().acquire(); } else { getEntry(threadId).getLatch().acquireUninterruptibly(); } } } } finally { //最終取消訂閱獲取鎖 unsubscribe(future, threadId); } }
再看tryAcquire方法:
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); } private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } //獲取鎖 RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); //看門狗邏輯 ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
由於leaseTime爲-1,因此首先異步的獲取鎖,以後會走看門狗邏輯。 先看獲取鎖的操做:tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " "redis.call('hset', KEYS[1], ARGV[2], 1); " "redis.call('pexpire', KEYS[1], ARGV[1]); " "return nil; " "end; " "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " "redis.call('hincrby', KEYS[1], ARGV[2], 1); " "redis.call('pexpire', KEYS[1], ARGV[1]); " "return nil; " "end; " "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
Redisson 使用 EVAL 命令執行上面的 Lua 腳原本完成獲取鎖的操做:
@Override public void unlock() { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } } } @Override public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise<Void>(); RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { if (e != null) { cancelExpirationRenewal(threadId); result.tryFailure(e); return; } if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " id " thread-id: " threadId); result.tryFailure(cause); return; } cancelExpirationRenewal(threadId); result.trySuccess(null); }); return result; }
上面opStatus爲null時,會拋出異常,必須由加鎖的線程釋放鎖。 再來看核心方法:unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " "return nil;" "end; " "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " "if (counter > 0) then " "redis.call('pexpire', KEYS[1], ARGV[2]); " "return 0; " "else " "redis.call('del', KEYS[1]); " "redis.call('publish', KEYS[2], ARGV[1]); " "return 1; " "end; " "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
依然使用 EVAL 命令執行 Lua 腳原本釋放鎖:
上面的代碼解析文本源自:Redisson 分佈式鎖實現分析(一)
Redis做者antirez基於分佈式環境下提出了一種更高級的分佈式鎖的實現方式:Redlock。 Redisson中也實現了這種算法,具體能夠參考看8.4章節 這裏簡單描述一下這種算法: 假設有5個互不鏈接的Redis集羣
貼一段Redisson的小例子:
RLock lock1 = redissonInstance1.getLock("lock1"); RLock lock2 = redissonInstance2.getLock("lock2"); RLock lock3 = redissonInstance3.getLock("lock3"); RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3); // 同時加鎖:lock1 lock2 lock3 // 紅鎖在大部分節點上加鎖成功就算成功。 lock.lock(); ... lock.unlock();
RedissonRedLock繼承自RedissonMultiLock,具體源碼就再也不繼續分析了。
Zookeeper是一個一致性的文件系統,保證了其每一個節點的惟一性。 有4種節點類型:
順序號是單調遞增的計數器,由父節點維護。
分佈式鎖就是利用了Zookeeper的臨時順序標號目錄節點的原理來實現。Locks主節點下面的ID最小的節點得到鎖的權限,其餘客戶端來獲取鎖時,發現本身不是最靠前的,會監視他的前一個鎖節點,當鎖釋放時,相應的節點被刪除,會通知這個等待的客戶端,讓其獲取鎖的權利,至關於造成了一個等待隊列。
Zookeeper分佈式鎖的優勢就是有現成的框架能夠拿來就用,由於有等待隊列,槍鎖的效率也會高。缺點是由於Zookeeper是相似文件系統的數據結構,因此刪除和新增節點的效率會比較低。
《Redis官方文檔》用Redis構建分佈式鎖 8. 分佈式鎖和同步器 Redisson 分佈式鎖實現分析(一) Redis 分佈式鎖的前世此生 Redlock:Redis分佈式鎖最牛逼的實現