以前筆者已經寫過關於分佈式鎖的內容,但囿於彼時對於分佈式鎖的研究還不算太深刻,現在讀來發現仍是存在一些問題,故而寫做本文,對Redis分佈式鎖的實現作一個更加全面、進階的闡述和總結,幫助讀者對Redis分佈式鎖有一個更加深刻客觀的瞭解。關於更多分佈式鎖的其餘實現,在後續的文章中也會陸續展開。html
咱們仍是經過經典的WWH(what why how)三段論方式進行行文。首先再次從宏觀上了解什麼是分佈式鎖以及分佈式鎖的約束條件和常見實現方式。java
這部分主要對分佈式鎖再次作一次較爲完整的回顧與總結。node
引用度孃的詞條,對於分佈式鎖的解釋以下:git
這段話歸納的仍是不錯的,根據概述以及對單機鎖的瞭解,咱們可以提煉並類比得出分佈式鎖的幾個主要約束條件:github
特色 | 描述 |
---|---|
互斥性 | 即:在任意時刻,只有一個客戶端能持有鎖 |
安全性 | 即:不會出現死鎖的狀況,當一個客戶端在持有鎖期間內,因爲意外崩潰而致使鎖未能主動解鎖,其持有的鎖也可以被正確釋放,並保證後續其它客戶端也能加鎖; |
可用性 | 即:分佈式鎖須要有必定的故障恢復能力,經過高可用機制可以保證故障發生的狀況下可以最大限度對外提供服務,無單點風險。如:經過Redis的集羣模式、哨兵模式;ETCD/zookeeper的集羣選主能力等保證HA |
對稱性 | 對於任意一個鎖,其加鎖和解鎖必須是同一個客戶端,即客戶端 A 不能把客戶端 B 加的鎖給解了。這又稱爲鎖的可重入性。 |
基於上述特色,這裏直接給出常見的實現方式,筆者以前的文章也有對這些常見實現方式的詳述,此處只是做爲歸納,再也不展開,感興趣的同窗能夠自行查閱博客的歷史記錄。redis
類別 | 舉例 |
---|---|
經過數據庫方式實現 | 如:採用樂觀鎖、悲觀鎖或者基於主鍵惟一約束實現 |
基於分佈式緩存實現的鎖服務 | 如: Redis 和基於 Redis 的 RedLock(Redisson提供了參考實現) |
基於分佈式一致性算法實現的鎖服務 | 如:ZooKeeper、Chubby(google閉源實現) 和 Etcd |
簡單對分佈式鎖的概念作了一個總結整理後,咱們進入本文的正題,對Redis實現分佈式鎖的機理展開論述。算法
這部分對Redis實現分佈式鎖的原理進行展開論述。數據庫
既然是鎖,核心操做無外乎加鎖、解鎖,首先來看一下經過Redis的哪一個指令進行加鎖操做。緩存
SET lock_name my_random_value NX PX 30000安全
這個指令的含義是在鍵「lock_name」不存在時,設置鍵的值,到期時間爲30秒。咱們經過該命令就能實現加鎖功能。
這裏對該命令作一個較爲詳細的講解。
命令格式:
SET KEY VALUE [EX seconds] [PX milliseconds] [NX|XX]
咱們的目的在於使鎖具備互斥性,所以採用NX參數, 僅在鎖不存在時才能設置鎖成功。
咱們回過頭接着看下加鎖的完整實例:
SET lock_name my_random_value NX PX 30000
解鎖經過del命令便可觸發,完整指令以下:
del lock_name
對該指令作一個解釋:
以上即是基於Redis實現分佈式鎖能力的核心指令,咱們接着看一個常見的錯誤實現案例。
首先看一段java代碼:
Jedis jedis = jedisPool.getResource();
// 若是鎖不存在則進行加鎖
Long lockResult = jedis.setnx(lockName, myRandomValue);
if (lockResult == 1) {
// 設置鎖過時時間,加鎖和設置過時時間是兩步完成的,非原子操做
jedis.expire(lockName, expireTime);
}
複製代碼
setnx() 方法的做用就是 SET IF NOT EXIST,expire() 方法就是給鎖加一個過時時間。 乍看以爲這段代碼沒什麼問題,但仔細推敲一下就能看出,其實這裏是有問題的:加鎖實際上使用了兩條 Redis 命令,這個組合操做是非原子性的。
若是執行setNx成功後,接着執行expire時發生異常致使鎖的過時時間未能設置,便會形成鎖無過時時間。後續若是執行的過程當中出現業務執行異常或者出現FullGC等狀況,將會致使鎖一致沒法釋放,從而形成死鎖。
網上不少博客中採用的就是這種較爲初級的實現方式,不建議仿效。
究其緣由,仍是由於setNx自己雖然可以保證設置值的原子性,但它與expire組合使用,整個操做(加鎖並設置過時時間)便不是原子的,隱藏了死鎖風險。
說完加鎖,咱們接着說說如何進行優雅的可靠解鎖。
這裏共有兩種方案:
咱們看下官網對腳本原子性的解釋:
咱們看一段Lua腳本實現的解鎖代碼;
String script = "if redis.call('get', KEYS[1]) == ARGV[1]
then return redis.call('del', KEYS[1])
else return 0
end";
複製代碼
可能有些讀者朋友對Lua腳本瞭解很少,這裏簡單介紹下這段腳本的含義:
咱們經過 Redis 的 eval() 函數執行 Lua 腳本,其中入參 lockName 賦值給參數 KEYS[1],鎖的具體值賦值給 ARGV[1],eval() 函數將 Lua 腳本交給 Redis 服務端執行。 從上面Redis官網文檔截圖可以看出,經過 eval() 執行 Lua 代碼時,Lua 代碼將被當成一個命令去執行(可保證原子性),而且直到 eval 命令執行完成,Redis 纔會執行其餘命令。所以,經過 Lua 腳本結合eval函數,能夠科學得實現解鎖操做的原子性,避免誤解鎖。
利用Jedis實現的Java版本代碼以下:
Long unlock = 1L;
Jedis jedis = null;
// Lua腳本,用於校驗並釋放鎖
String script = "if redis.call('get', KEYS[1]) == ARGV[1]
then return redis.call('del', KEYS[1])
else
return 0 end";
try {
jedis = jedisPool.getResource();
// 經過 Redis 的 eval() 函數執行 Lua 腳本,
// 入參 lockName 賦值給參數 KEYS[1],myRandomValue 賦值給 ARGV[1],
// eval() 函數將 Lua 腳本交給 Redis 服務端執行。
Object result =
jedis.eval(script,
Collections.singletonList(lockName),
Collections.singletonList(myRandomValue));
// 注意:若是腳本順利執行將返回1,
// 若是執行腳本時,其它的客戶端對這個lockName對應的值進行了更改
// 則返回0
if (unlock.equals(result) {
return true;
}
}
catch (Exception e) {
throw e;
}
finally {
if (null != jedis) {
jedis.close();
}
}
return false;
複製代碼
首先看一下利用Redis事務實現解鎖的代碼實現:
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
// 監控鎖對應的Key,若是其它的客戶端對這個Key進行了更改,那麼本次事務會被取消。
jedis.watch(lockName);
// 成功獲取鎖,則操做公共資源執行自定義流程
// ...自定義流程代碼省略...
// 校驗是否持有鎖
if (lockValue.equals(jedis.get(lockName))) {
// 開啓事務功能,
Transaction multi = jedis.multi();
// 釋放鎖
multi.del(lockName);
// 執行事務(若是其它的客戶端對這個Key進行了更改,那麼本次事務會被取消,不會執行)
// 若是正常執行,因爲只有一個刪除操做,返回的list將只有一個對象。
List<Object> result = multi.exec();
if (RELEASE_SUCCESS.equals(result.size())) {
return true;
}
}
}
catch (Exception e) {
throw e;
}
finally {
if (null != jedis) {
jedis.unwatch();
jedis.close();
}
}
複製代碼
根據代碼實現,咱們總結下經過Redis的事務功能監控並釋放鎖的步驟:
這裏再重點介紹一種常見的錯誤解鎖方式,以便進行警示。
首先看下代碼實現:
Jedis jedis = jedisPool.getResource();
jedis.del(lockName);
複製代碼
該方式直接使用了 jedis.del() 方法刪除鎖且沒有進行校驗。這種不校驗鎖的擁有者而直接執行解鎖的粗暴方式,會致使已經存在的鎖被錯誤的釋放,從而破壞互斥性(如:一個進程直接經過該方是unlock掉另外一個進程的鎖)
那麼如何進行優化呢?一種方式就是在解鎖以前進行校驗,判斷加鎖與解鎖的是否爲同一個客戶端。代碼以下:
Jedis jedis = jedisPool.getResource();
if (lockValue.equals(jedis.get(lockName))) {
jedis.del(lockName);
}
複製代碼
這種解鎖方式相較於上文中粗暴的方式已經有了明顯進步,在解鎖以前進行了校驗。可是問題並無獲得解決,整個解鎖過程仍然是獨立的兩條命令,並不是原子操做。
更爲關鍵之處在於,若是在執行解鎖操做的時候,由於異常(如:業務代碼異常、FullGC致使的stop the world現象等)而出現了客戶端阻塞的現象,致使鎖過時自動釋放,則當前客戶端已經再也不持有鎖。
當進程恢復執行後,未進行鎖持有校驗(即進程認爲本身還持有鎖)而直接調用 del(lockName) 直接對當前存在的鎖進行解鎖操做,從而致使其餘進程持有的鎖被跨進程解鎖的異常現象,這種狀況是不被容許的,它違反了互斥性的原則。
上文中咱們瞭解了基於Redis實現分佈式鎖的原理,也瞭解了實現一個Redis分佈式鎖須要解決的問題。
咱們能夠感覺到實現一個可靠的分佈式鎖並非一件容易的事情。
除了上文提到的現象,就算咱們代碼實現的很健壯,當採用主從架構的Redis集羣,仍會出現異常現象:
對於主從異步複製的架構模式,當出現主節點down機時,從節點的數據還沒有獲得及時同步,此時進程訪問到從機,斷定爲可以加鎖,因而獲取到鎖,從而致使多個進程拿到一把鎖的異常現象。
那麼有沒有一種更加可靠健壯且易用性更好的Redis鎖實現方式呢?答案是顯而易見的,它就是接下來重點講解的Redisson分佈式鎖實現。
關於如何基於Redisson封裝一個開箱即用的分佈式鎖組件能夠移步個人另外一篇文章:《本身寫分佈式鎖-基於redission》,本文中我只對Redisson的分佈式鎖實現進行深度解析,具體的使用及封裝過程還請讀者自行閱讀個人博文。
關於Redisson的分佈式鎖,在github上有較爲詳細的官方文檔,分佈式鎖和同步器,咱們這裏挑重點進行講解。
下文中的部分代碼引自官方文檔,此處作統一聲明。
這部分對Redisson分佈式鎖進行較爲全面的介紹。
基於Redis的Redisson分佈式可重入鎖RLock Java對象實現了java.util.concurrent.locks.Lock接口。同時還提供了異步(Async)、反射式(Reactive)和RxJava2標準的接口。
一種常見的使用方式以下:
RLock lock = redisson.getLock("anyLock");
// 最多見的使用方法
lock.lock();
複製代碼
當儲存這個分佈式鎖的Redisson節點宕機之後,且這個鎖恰好是鎖住的狀態時,會出現鎖死的狀況。爲了不這種死鎖狀況的發生,Redisson內部提供了一個監控鎖的看門狗,它的做用是在Redisson實例被關閉前,提供鎖續約能力,不斷的延長鎖的有效期。
默認狀況下,看門狗的檢查鎖的超時時間是30秒鐘,這個具體的值能夠經過修改Config.lockWatchdogTimeout來另行指定。
Redisson還提供了顯式進行鎖過時時間制定的接口,超過該時間便會對鎖進行自動解鎖,代碼以下:
// 顯式制定解鎖時間,無需調用unlock方法手動解鎖
lock.lock(10, TimeUnit.SECONDS);
// 嘗試加鎖,最多等待100秒,上鎖之後10秒自動解鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
複製代碼
Redisson還提供了異步方式的分佈式鎖執行方法,因爲用的很少,此處再也不贅述,感興趣的同窗能夠自行查看官方文檔。
這裏還要補充一下,Redisson的分佈式鎖實現的優勢之一,在於它的RLock對象徹底符合Java的Lock規範,RLock實現了JUC的Lock接口,之因此稱之爲可重入鎖在於只有擁有鎖的進程才能解鎖,當其餘進程解鎖則會拋出IllegalMonitorStateException錯誤。
這能夠從RLock源碼的聲明出看出端倪
public interface RLock extends Lock, RLockAsync {
......
複製代碼
後文中我會帶領讀者對RLock的源碼實現作一個較爲詳細的解讀。咱們先接着瞭解一下其他的鎖實現。
基於Redis的Redisson分佈式可重入公平鎖也是實現了java.util.concurrent.locks.Lock接口的一種RLock對象。同時還提供了異步(Async)、反射式(Reactive)和RxJava2標準的接口。它保證了當多個Redisson客戶端線程同時請求加鎖時,優先分配給先發出請求的線程。全部請求線程會在一個隊列中排隊,當某個線程出現宕機時,Redisson會等待5秒後繼續下一個線程,也就是說若是前面有5個線程都處於等待狀態,那麼後面的線程會等待至少25秒。
一種常見的Redisson公平鎖使用方式以下:
RLock fairLock = redisson.getFairLock("anyLock");
// 最多見的使用方法
fairLock.lock();
複製代碼
公平鎖實現一樣具備自動續約的能力,該能力也是經過看門狗實現,與上文提到的重入鎖RLock原理徹底相同。下文中提到的鎖類型也具備該能力,所以再也不贅述,讀者只要記住,這些類型的鎖都能經過看門狗實現鎖自動續約,且看門狗檢查鎖超時時間默認爲30s,該參數能夠經過修改Config.lockWatchdogTimeout自行配置。
公平鎖也能夠顯式制定鎖的加鎖時長:
// 10秒鐘之後自動解鎖
// 無需調用unlock方法手動解鎖
fairLock.lock(10, TimeUnit.SECONDS);
// 嘗試加鎖,最多等待100秒,上鎖之後10秒自動解鎖
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
...
fairLock.unlock();
複製代碼
基於Redis的Redisson分佈式聯鎖RedissonMultiLock對象能夠將多個RLock對象關聯爲一個聯鎖,每一個RLock對象實例能夠來自於不一樣的Redisson實例。
這種鎖類型挺有意思的,它爲咱們提供了多重鎖機制,當全部的鎖均加鎖成功,才認爲成功,調用的代碼以下,(我的認爲使用場景並不算多,所以做爲了解便可)
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同時加鎖:lock1 lock2 lock3
// 全部的鎖都上鎖成功纔算成功。
lock.lock();
...
lock.unlock();
複製代碼
紅鎖是Redisson實現的一種高可用的分佈式鎖實現,所以此處對紅鎖作一個較爲詳細的展開。
基於Redis的Redisson紅鎖RedissonRedLock對象實現了Redlock介紹的加鎖算法。該對象也能夠用來將多個RLock對象關聯爲一個紅鎖,每一個RLock對象實例能夠來自於不一樣的Redisson實例。
基於上文對紅鎖的概述,咱們能夠得知,紅鎖是一個複合鎖,且每個鎖的實例是位於不一樣的Redisson實例上的。
看一段紅鎖的使用樣例:
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同時加鎖:lock1 lock2 lock3
// 紅鎖在大部分節點上加鎖成功就算成功。
lock.lock();
...
lock.unlock();
複製代碼
紅鎖一樣可以顯示制定加鎖時間:
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 給lock1,lock2,lock3加鎖,若是沒有手動解開的話,10秒鐘後將會自動解開
lock.lock(10, TimeUnit.SECONDS);
// 爲加鎖等待100秒時間,並在加鎖成功10秒鐘後自動解開
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
複製代碼
這裏引用一下官網對紅鎖算法實現的舉例截圖:
咱們能夠從中提取出紅鎖實現的關鍵點:半數以上節點獲取鎖成功,才認爲加鎖成功,某個節點超時就去下一個繼續獲取。
這裏體現出分佈式領域解決一致性的一種經常使用思路:多數派思想。這種思想在Raft算法、Zab算法、Paxos算法中都有所體現。
Redisson一樣實現了java.util.concurrent.locks.ReadWriteLock接口,使得其具備了讀寫鎖能力。其中,讀鎖和寫鎖都繼承了RLock接口。
同上述的鎖同樣,讀寫鎖一樣是分佈式的。
分佈式可重入讀寫鎖容許同時有多個讀鎖和一個寫鎖處於加鎖狀態。
一種常見的使用方式以下:
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最多見的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
複製代碼
按照慣例,咱們接着看下顯式方式指定加鎖時長的讀寫鎖的調用方式:
// 10秒鐘之後自動解鎖
// 無需調用unlock方法手動解鎖
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
// 嘗試加鎖,最多等待100秒,上鎖之後10秒自動解鎖
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
複製代碼
Redisson同時還實現了分佈式AQS同步器組件,如:分佈式信號量(RSemaphore)、可過時行分佈式信號量(RPermitExpirableSemaphore)、分佈式閉鎖(RCountDownLatch)等,因爲本文主要講解鎖相關的內容,所以再也不進行展開介紹,感興趣的同窗能夠自行查看官方文檔及源碼。
這一章節我將重點對Redisson中的重入鎖(RLock)實現機制進行源碼級別的討論。
咱們從Redisson的github官方倉庫下載最新的Redisson代碼,導入IDEA中進行查看,源碼結構以下:
圖中紅框圈住的模塊即爲Redisson的內核模塊,也是咱們閱讀源碼的重點。分佈式鎖部分的源碼實如今以下路徑
redisson-master
|-redisson
|-src
|-main
|-java
|-org.redisson
複製代碼
咱們逐級展開便可查看關鍵源碼,那麼廢話很少說,直接看代碼。
筆者看源碼的方式應當也是貼近的主流的方式,我通常會從一個demo開始,從代碼的入口逐層深刻進行閱讀,咱們首先找一段重入鎖的demo。
RLock lock = redisson.getLock(lockName);
boolean getLock = false;
try {
getLock = rLock.tryLock(0, expireSeconds, TimeUnit.SECONDS);
if (getLock) {
LOGGER.info("獲取Redisson分佈式鎖[成功],lockName={}", lockName);
} else {
LOGGER.info("獲取Redisson分佈式鎖[失敗],lockName={}", lockName);
}
} catch (InterruptedException e) {
LOGGER.error("獲取Redisson分佈式鎖[異常],lockName=" + lockName, e);
e.printStackTrace();
return false;
}
return getLock;
複製代碼
這段代碼截取自筆者封裝的分佈式鎖組件,目前star數爲92,源碼地址 ,感興趣的能夠幫我點個star,哈哈。
首先,經過 redisson.getLock(lockName); 獲取RLock鎖實例,lockName通常爲具備業務標識的分佈式鎖key。
先看下如何獲取RLock實例:
進入Redisson.java類,找到以下代碼:
@Override
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name, id);
}
複製代碼
此處的id爲UUID。
protected final UUID id = UUID.randomUUID();
複製代碼
能夠看到是調用了重載方法,點進去,跳入RedissonLock.java,經過類聲明能夠看到該類實現了RLock接口,聲明及構造方法以下:
public class RedissonLock extends RedissonExpirable implements RLock {
...省略部分代碼...
protected static final LockPubSub PUBSUB = new LockPubSub();
final CommandAsyncExecutor commandExecutor;
public RedissonLock(CommandAsyncExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = id;
// 看門狗鎖續約檢查時間週期,默認30s
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
}
複製代碼
經過該構造方法構造了RedissonLock實例,其中internalLockLeaseTime即爲看門狗的檢查鎖的超時時間,默認爲30s。該參數可經過修改Config.lockWatchdogTimeout來指定新值。
當獲取獲取了鎖實例成功後,進行嘗試加鎖操做,代碼以下:
boolean getLock = rLock.tryLock(0, expireSeconds, TimeUnit.SECONDS);
複製代碼
進入RedissonLock.java查看實現。
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
final long threadId = Thread.currentThread().getId();
// 申請鎖,返回還剩餘的鎖過時時間
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
// 若是ttl爲空,表示鎖申請成功
if (ttl == null) {
return true;
}
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
// 訂閱監聽redis的消息,並建立RedissonLockEntry
// 其中,RedissonLockEntry中比較關鍵的是一個Semaphore
// 屬性對象用來控制本地的鎖的請求的信號量同步,返回Netty框架的Future
final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 阻塞等待subscribe的future的結果對象,若是subscribe方法調用超過了time,
// 說明已經超過了客戶端設置的最大的wait time,直接返回false,取消訂閱,而且不會再繼續申請鎖
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
@Override
public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
if (subscribeFuture.isSuccess()) {
unsubscribe(subscribeFuture, threadId);
}
}
});
}
acquireFailed(threadId);
return false;
}
try {
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
// 再次嘗試申請一次鎖
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
// 得到鎖並返回
if (ttl == null) {
return true;
}
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
// 不等待申請鎖並返回
acquireFailed(threadId);
return false;
}
// waiting for message
// 阻塞等待鎖
currentTime = System.currentTimeMillis();
// 經過信號量(共享鎖)進行阻塞,等待解鎖消息
// 若是剩餘時間 TTL 小於wait time,就在ttl時間內
// 從Entry的信號量獲取一個許可(除非發生中斷或者一直不存在可用的許可)
// 不然就在wait time時間範圍內等待能夠經過的信號量
if (ttl >= 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 更新等待時間,(最大等待時間-已經消耗的阻塞時間)
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
// 等待時間小於等於0,不等待申請鎖並返回
acquireFailed(threadId);
return false;
}
}
} finally {
// 不管最終獲取鎖是否成功,都須要取消訂閱解鎖消息,防止死鎖發生。
unsubscribe(subscribeFuture, threadId);
}
}
複製代碼
上面這段核心代碼邏輯中,咱們重點關注下tryAcquire(long leaseTime, TimeUnit unit),調用加鎖邏輯主要就在這段代碼邏輯中
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
複製代碼
點進去看一下 get(tryAcquireAsync(leaseTime, unit, threadId))
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
...省略部分邏輯...
}
複製代碼
ryAcquire(long leaseTime, TimeUnit unit)只針對leaseTime的不一樣參數進行對應的轉發處理邏輯。
trylock的無參方法就是直接調用了 get(tryLockInnerAsync(Thread.currentThread().getId()));
咱們接着看一下核心的tryLockInnerAsyn,它返回的是一個future對象,是爲了經過異步方式對IO進行處理從而提升系統吞吐量。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 檢查key是否已被佔用,若是沒有則設置超時時間及惟一標識,初始化value=1
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 鎖重入的狀況,判斷鎖的key field,一致的話,value加1
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 返回剩餘的過時時間
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
複製代碼
這裏解釋下這段加鎖的Lua腳本具體的參數:
執行這段Lua腳本當返回空,說明獲取到鎖;若是返回一個long數值(pttl 命令的返回值),則代表鎖已被佔用,經過返回剩餘時間,外部能夠作一些等待時間的判斷及調整的邏輯。
tryLock(long waitTime, long leaseTime, TimeUnit unit) 有leaseTime參數的申請鎖方法會按照leaseTime時間來自動釋放鎖。
對於沒有leaseTime參數的狀況,好比tryLock()或者tryLock(long waitTime, TimeUnit unit)以及lock()是會一直持有鎖的。
解鎖的核心邏輯也是經過Lua腳本實現的,能夠看出Redisson也是經過腳原本保證加鎖、解鎖的原子性,這與咱們在文章開頭時候的講解也是保持一致的。
咱們接着看一下unlock()方法的核心邏輯。
@Override
public void unlock() {
// 解鎖核心邏輯
Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
// 解鎖返回空,拋出異常
if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
// 解鎖成功以後取消更新鎖expire的時間的任務
cancelExpirationRenewal();
}
}
複製代碼
當解鎖成功以後,調用cancelExpirationRenewal(),移除更新鎖expire時間的任務,也就是鎖都不存在了,也就不必再進行鎖過時時間續約了。簡單看下它的代碼實現:
void cancelExpirationRenewal() {
Timeout task = expirationRenewalMap.remove(getEntryName());
if (task != null) {
task.cancel();
}
}
複製代碼
進入unlockInnerAsync方法。
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 若是鎖的key已經不存在,代表鎖已經被解鎖,直接發佈redis消息
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
// key和field不匹配,說明當前的客戶端線程並無持有鎖,不能進行主動解鎖操做。
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 將value減1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 若是counter>0代表鎖進行了重入,不能刪除key,也就是不進行解鎖操做
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
// 不然刪除key併發布解鎖消息進行解鎖
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
複製代碼
能夠看到這裏是經過Lua腳本執行的解鎖,那麼咱們來分析下這段腳本的具體含義。
從代碼的註釋應當可以較爲清楚的把握解鎖的核心脈絡。
額外提一下,咱們能夠看到在lua解鎖腳本中使用了publish命令,它的做用爲:
經過在鎖的惟一通道發佈解鎖消息,可以減小其餘分佈式節點的等待或者空轉,總體上能提升加鎖效率。
咱們在看下Redisson如何處理unlock消息,此處的消息的內容即:unlockMessage = 0L。它和unlock方法中publish的內容是對應的。
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
// 解鎖消息
public static final Long UNLOCK_MESSAGE = 0L;
public static final Long READ_UNLOCK_MESSAGE = 1L;
...省略部分邏輯...
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
// 若是訂閱的消息爲解鎖消息,UNLOCK_MESSAGE = 0L
if (message.equals(UNLOCK_MESSAGE)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// 釋放一個許可,並喚醒等待entry.
value.getLatch().release();
}
......
}
}
複製代碼
除了tryLock方式可以獲取鎖外,Redisson還提供了lock方法直接獲取鎖,咱們再看下它是如何進行鎖獲取操做的。
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
複製代碼
看下lockInterruptibly的具體邏輯。
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
複製代碼
點擊去看下lockInterruptibly(long leaseTime, TimeUnit unit)這個重載。
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 嘗試獲取鎖
Long ttl = tryAcquire(leaseTime, unit, threadId);
// 鎖獲取成功
if (ttl == null) {
return;
}
// 經過異步方式訂閱Redis的channel,阻塞方式獲取訂閱結果
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
// 經過循環判斷,直到鎖獲取成功,經典寫法。
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// 鎖獲取成功,跳出循環
if (ttl == null) {
break;
}
// 若是剩餘時間 TTL 大於0,從Entry的信號量獲取一個許可(除非發生中斷或者一直不存在可用的許可)
// 不然就在wait time時間範圍內等待能夠經過的信號量
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
// 不管最終獲取鎖是否成功,都須要取消訂閱解鎖消息,防止死鎖發生。
unsubscribe(future, threadId);
}
}
複製代碼
這段邏輯是否是有種很熟悉的感受,它和咱們上文中講到的tryLock邏輯很像。具體的邏輯在註釋中已經寫得比較清晰了就再也不贅述。
到此就是Redisson重入鎖加解鎖核心邏輯的源碼解析,相信會爲聰明的你一些幫助。
本文,咱們從分佈式鎖的概述入手,對Redis實現分佈式鎖的原理進行了較爲全面的剖析。而且重點對Redisson的分佈式鎖實現進行了詳細的講解,從筆者對Redisson的封裝類庫的調用實例入手,對Redisson的重入鎖進行了深刻的源碼解析。通過這一系列的學習,深刻淺出了Redis/Redisson分佈式鎖的實現機理,相信以後遇到的相似問題,咱們必定能夠成竹在胸。
更多分佈式鎖的實現及源碼解析,將會陸續發佈,請拭目以待。