在很早很早以前,寫過一篇文章介紹過Redis
中的red lock
的實現,可是在生產環境中,筆者所負責的項目使用的分佈式鎖組件一直是Redisson
。Redisson
是具有多種內存數據網格特性的基於Java
編寫的Redis
客戶端框架(Redis Java Client with features of In-Memory Data Grid
),基於Redis
的基本數據類型擴展出不少種實現的高級數據結構,具體見其官方的簡介圖:java
本文要分析的R(ed)Lock
實現,只是其中一個很小的模塊,其餘高級特性能夠按需選用。下面會從基本原理、源碼分析和基於Jedis
仿實現等內容進行展開。本文分析的Redisson
源碼是2020-01
左右Redisson
項目的main
分支源碼,對應版本是3.14.1
。node
red lock
的基本原理其實就"光明正大地"展現在Redis
官網的首頁文檔中(具體連接是https://redis.io/topics/distlock
):redis
摘錄一下簡介進行翻譯:在許多環境中不一樣進程必須以互斥方式使用共享資源進行操做時,分佈式鎖是一個很是有用的原語。此試圖提供一種更規範的算法來實現Redis的分佈式鎖。咱們提出了一種稱爲Redlock
的算法,它實現了DLM
(猜想是Distributed Lock Manager
的縮寫,分佈式鎖管理器),咱們認爲它比普通的單實例方法更安全。算法
算法的三個核心特徵(三大最低保證):spring
Safety property
(安全性):互斥。確保在任何給定時刻下,只有一個客戶端能夠持有鎖Liveness property A
(活性A
):無死鎖。即便存在曾經鎖定資源的客戶端崩潰或者出現網絡分區異常,確保鎖老是可以成功獲取Liveness property B
(活性B
):容錯性。只要大多數Redis
節點處於正常運行狀態,客戶端就能夠獲取和釋放鎖文檔中還指出了目前算法對於故障轉移的實現還存在明顯的競態條件問題(描述的應該是Redis
主從架構下的問題):shell
A
獲取Redis
主節點中的鎖(假設鎖定的資源爲X
)Redis
主節點把KEY
同步到Redis
從節點以前,Redis
主節點崩潰Redis
從節點由於故障晉升爲主節點B
獲取資源X
的鎖成功,問題是資源X
的鎖在前面已經被客戶端A
獲取過,這樣就出現了併發問題算法的實現很簡單,單個Redis
實例下加鎖命令以下:編程
SET $resource_name $random_value NX PX $ttl
這裏的Nx
和PX
是SET
命令的加強參數,自從Redis
的2.6.12
版本起,SET
命令已經提供了可選的複合操做符:安全
EX
:設置超時時間,單位是秒PX
:設置超時時間,單位是毫秒NX
:IF NOT EXIST
的縮寫,只有KEY
不存在的前提下才會設置K-V
,設置成功返回1
,不然返回0
XX
:IF EXIST
的縮寫,只有在KEY
存在的前提下才會設置K-V
,設置成功返回1
,不然返回0
單個Redis
實例下解鎖命令以下:網絡
# KEYS[1] = $resource_name # ARGV[1] = $random_value if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
使用RLock
要先實例化Redisson
,Redisson
已經適配了Redis
的哨兵、集羣、普通主從和單機模式,由於筆者本地只安裝了單機Redis
,因此這裏使用單機模式配置進行演示。實例化RedissonClient
:數據結構
static RedissonClient REDISSON; @BeforeClass public static void beforeClass() throws Exception { Config config = new Config(); // 單機 config.useSingleServer() .setTimeout(10000) .setAddress("redis://127.0.0.1:6379"); REDISSON = Redisson.create(config); // // 主從 // config.useMasterSlaveServers() // .setMasterAddress("主節點鏈接地址") // .setSlaveAddresses(Sets.newHashSet("從節點鏈接地址")); // REDISSON = Redisson.create(config); // // 哨兵 // config.useSentinelServers() // .setMasterName("Master名稱") // .addSentinelAddress(new String[]{"哨兵鏈接地址"}); // REDISSON = Redisson.create(config); // // 集羣 // config.useClusterServers() // .addNodeAddress(new String[]{"集羣節點鏈接地址"}); // REDISSON = Redisson.create(config); }
加鎖和解鎖:
@Test public void testLockAndUnLock() throws Exception { String resourceName = "resource:x"; RLock lock = REDISSON.getLock(resourceName); Thread threadA = new Thread(() -> { try { lock.lock(); process(resourceName); } finally { lock.unlock(); System.out.println(String.format("線程%s釋放資源%s的鎖", Thread.currentThread().getName(), resourceName)); } }, "threadA"); Thread threadB = new Thread(() -> { try { lock.lock(); process(resourceName); } finally { lock.unlock(); System.out.println(String.format("線程%s釋放資源%s的鎖", Thread.currentThread().getName(), resourceName)); } }, "threadB"); threadA.start(); threadB.start(); Thread.sleep(Long.MAX_VALUE); } private void process(String resourceName) { String threadName = Thread.currentThread().getName(); System.out.println(String.format("線程%s獲取到資源%s的鎖", threadName, resourceName)); try { Thread.sleep(1000); } catch (InterruptedException ignore) { } } // 某次執行的輸出結果 線程threadB獲取到資源resource:x的鎖 線程threadB釋放資源resource:x的鎖 線程threadA獲取到資源resource:x的鎖 線程threadA釋放資源resource:x的鎖
更多的時候,咱們會選用帶等待時間週期和鎖最大持有時間的API
:
@Test public void testTryLockAndUnLock() throws Exception { String resourceName = "resource:x"; int waitTime = 500; int leaseTime = 1000; Thread threadA = new Thread(() -> { process(resourceName, waitTime, leaseTime); }, "threadA"); Thread threadB = new Thread(() -> { process(resourceName, waitTime, leaseTime); }, "threadB"); threadA.start(); threadB.start(); Thread.sleep(Long.MAX_VALUE); } private void process(String resourceName, int waitTime, int leaseTime) { RLock lock = REDISSON.getLock(resourceName); try { String threadName = Thread.currentThread().getName(); boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS); if (tryLock) { try { System.out.println(String.format("線程%s獲取到資源%s的鎖", threadName, resourceName)); Thread.sleep(800); } finally { lock.unlock(); System.out.println(String.format("線程%s釋放資源%s的鎖", Thread.currentThread().getName(), resourceName)); } } else { System.out.println(String.format("線程%s獲取資源%s的鎖失敗,等待時間:%d ms", threadName, resourceName, waitTime)); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } // 某次執行的輸出結果 線程threadA獲取到資源resource:x的鎖 線程threadB獲取資源resource:x的鎖失敗,等待時間:500 ms 線程threadA釋放資源resource:x的鎖
爲了使用的時候更加簡單,能夠參考spring-tx
中的編程式事務那樣進行輕度封裝:
@RequiredArgsConstructor private static class RedissonLockProvider { private final RedissonClient redissonClient; public <T> T executeInLock(String resourceName, LockAction lockAction) { RLock lock = redissonClient.getLock(resourceName); try { lock.lock(); lockAction.onAcquire(resourceName); return lockAction.doInLock(resourceName); } finally { lock.unlock(); lockAction.onExit(resourceName); } } public <T> T executeInLock(String resourceName, int waitTime, int leaseTime, LockAction lockAction) throws InterruptedException { RLock lock = redissonClient.getLock(resourceName); boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS); if (tryLock) { try { lockAction.onAcquire(resourceName); return lockAction.doInLock(resourceName); } finally { lock.unlock(); lockAction.onExit(resourceName); } } return null; } public void executeInLockWithoutResult(String resourceName, int waitTime, int leaseTime, LockActionWithoutResult lockAction) throws InterruptedException { RLock lock = redissonClient.getLock(resourceName); boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS); if (tryLock) { try { lockAction.onAcquire(resourceName); lockAction.doInLock(resourceName); } finally { lock.unlock(); lockAction.onExit(resourceName); } } } public void executeInLockWithoutResult(String resourceName, LockActionWithoutResult lockAction) { RLock lock = redissonClient.getLock(resourceName); try { lock.lock(); lockAction.onAcquire(resourceName); lockAction.doInLock(resourceName); } finally { lock.unlock(); lockAction.onExit(resourceName); } } } @FunctionalInterface interface LockAction { default void onAcquire(String resourceName) { } <T> T doInLock(String resourceName); default void onExit(String resourceName) { } } @FunctionalInterface interface LockActionWithoutResult { default void onAcquire(String resourceName) { } void doInLock(String resourceName); default void onExit(String resourceName) { } }
使用RedissonLockProvider
(僅供參考):
@Test public void testRedissonLockProvider() throws Exception { RedissonLockProvider provider = new RedissonLockProvider(REDISSON); String resourceName = "resource:x"; Thread threadA = new Thread(() -> { provider.executeInLockWithoutResult(resourceName, new LockActionWithoutResult() { @Override public void onAcquire(String resourceName) { System.out.println(String.format("線程%s獲取到資源%s的鎖", Thread.currentThread().getName(), resourceName)); } @Override public void doInLock(String resourceName) { try { Thread.sleep(800); } catch (InterruptedException ignore) { } } @Override public void onExit(String resourceName) { System.out.println(String.format("線程%s釋放資源%s的鎖", Thread.currentThread().getName(), resourceName)); } }); }, "threadA"); Thread threadB = new Thread(() -> { provider.executeInLockWithoutResult(resourceName, new LockActionWithoutResult() { @Override public void onAcquire(String resourceName) { System.out.println(String.format("線程%s獲取到資源%s的鎖", Thread.currentThread().getName(), resourceName)); } @Override public void doInLock(String resourceName) { try { Thread.sleep(800); } catch (InterruptedException ignore) { } } @Override public void onExit(String resourceName) { System.out.println(String.format("線程%s釋放資源%s的鎖", Thread.currentThread().getName(), resourceName)); } }); }, "threadB"); threadA.start(); threadB.start(); Thread.sleep(Long.MAX_VALUE); } // 某次執行結果 線程threadA獲取到資源resource:x的鎖 線程threadA釋放資源resource:x的鎖 線程threadB獲取到資源resource:x的鎖 線程threadB釋放資源resource:x的鎖
Redisson
中RLock
的實現是基本參照了Redis
的red lock
算法進行實現,不過在原始的red lock
算法下進行了改良,主要包括下面的特性:
ReentrantLock
,同一個線程能夠重複獲取同一個資源的鎖(通常使用計數器實現),鎖的重入特性通常狀況下有利於提升資源的利用率X
永久鎖定,那麼並非直接對KEY
生存週期設置爲-1
,而是經過一個守護線程每隔固定週期延長KEY
的過時時間,這樣就能實現在守護線程不被殺掉的前提下,避免客戶端崩潰致使鎖沒法釋放長期佔用資源的問題org.redisson.pubsub.LockPubSub
,用於訂閱和通知鎖釋放事件red lock
算法的實現,數據類型選用了HASH
,配合Lua
腳本完成多個命令的原子性續期或者說延長KEY
的過時時間在Redisson
使用watch dog
實現,理解爲用於續期的守護線程,底層依賴於Netty
的時間輪HashedWheelTimer
和任務io.netty.util.Timeout
實現,俗稱看門狗,下面會詳細分析。
先看RLock
的類圖:
這裏有一個疑惑點,RedissonRedLock(RedissonMultiLock的子類)的註釋中提到RedLock locking algorithm implementation for multiple locks. It manages all locks as one. 但從直觀上看,RedissonLock纔是整個鎖體系的核心,裏面的實現思路也是聽從red lock算法的。
RedissonLock
就是RLock
的直接實現,也是分佈式鎖實現的核心類,從源碼中看到Redisson#getLock()
就是直接實例化RedissonLock
public class Redisson implements RedissonClient { // ...... 省略其餘代碼 @Override public RLock getLock(String name) { return new RedissonLock(connectionManager.getCommandExecutor(), name); } // ...... 省略其餘代碼 }
所以只須要圍繞RedissonLock
的源碼進行分析便可。RedissonLock
的類繼承圖以下:
這裏須要有幾點認知:
RedissonLock
實現了java.util.concurrent.locks.Lock
接口中除了newCondition()
方法外的全部方法,也就是能夠基本無縫適配Lock
接口,對於習慣Lock
接口的API
的使用者來講是一個福音RedissonLock
基本全部同步API
都依賴於異步API
的實現,也就是RLock
的實現依賴於RLockAsync
的實現,底層依賴的是Netty
的io.netty.util.concurrent.Promise
,具體見RedissonPromise
,若是用過JUC
中的Future
的開發者應該比較熟悉Future#get()
,這裏的作法相似RObjectAsync
:全部Redisson
對象的基礎接口,提供一些內存測量、對象拷貝、移動等的異步方法RObject
:RObjectAsync
的同步版本RExpirableAsync
:提供對象TTL
相關的異步方法RExpirable
:RExpirableAsync
的同步版本RedissonObject
:直接實現類RObject
接口中的方法RedissonExpirable
:主要是實現了RExpirable
接口中的方法接着先看RedissonLock
的構造函數和核心屬性:
// 存放entryName -> ExpirationEntry,用於獲取當前entryName的線程重入計數器和續期任務 private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>(); // 內部的鎖持有的最大時間,來源於參數Config#lockWatchdogTimeout,用於控制續期的週期 protected long internalLockLeaseTime; // ID,惟一標識,是一個UUID final String id; // final String entryName; // 鎖釋放事件訂閱發佈相關 protected final LockPubSub pubSub; // 命令異步執行器實例 final CommandAsyncExecutor commandExecutor; /** * CommandAsyncExecutor是命令的異步執行器,裏面的方法是相對底層的面向通信框架的方法,包括異步寫、異步讀和同步結果獲取等 * name參數就是getLock()時候傳入的參數,其實就是最終同步到Redis中的KEY */ public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; // 這裏的ID爲外部初始化的UUID實例,調用toString() this.id = commandExecutor.getConnectionManager().getId(); this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); // 這裏的entryName = uuid值 + : + 外部傳進來的name(KEY),如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:resource:x this.entryName = id + ":" + name; // 初始化LockPubSub實例,用於訂閱和發佈鎖釋放的事件 this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); } // RedissonLock內部類ExpirationEntry,存放着線程重入的計數器和續期的Timeout任務 public static class ExpirationEntry { // 線程ID -> 線程重入的次數 private final Map<Long, Integer> threadIds = new LinkedHashMap<>(); private volatile Timeout timeout; public ExpirationEntry() { super(); } // 這個方法主要記錄線程重入的計數 public void addThreadId(long threadId) { Integer counter = threadIds.get(threadId); if (counter == null) { counter = 1; } else { counter++; } threadIds.put(threadId, counter); } public boolean hasNoThreads() { return threadIds.isEmpty(); } public Long getFirstThreadId() { if (threadIds.isEmpty()) { return null; } return threadIds.keySet().iterator().next(); } public void removeThreadId(long threadId) { Integer counter = threadIds.get(threadId); if (counter == null) { return; } counter--; if (counter == 0) { threadIds.remove(threadId); } else { threadIds.put(threadId, counter); } } public void setTimeout(Timeout timeout) { this.timeout = timeout; } public Timeout getTimeout() { return timeout; } }
這裏須要關注一下Config
中的lockWatchdogTimeout
參數:
翻譯一下大意:lockWatchdogTimeout
參數只有在沒有使用leaseTimeout
參數定義的成功獲取到鎖的場景(簡單來講就是不設置時限的加鎖)下生效,若是看門狗在下一個lockWatchdogTimeout
週期內不進行續期,那麼鎖就會過時釋放(從源碼上看,每三分之一lockWatchdogTimeout
就會執行一次續期任務,每次經過pexpire
把KEY
的存活週期延長lockWatchdogTimeout
),lockWatchdogTimeout
的默認值爲30000
,也就是30
秒。
這裏先列舉一下RedissonLock
中獲取名稱的方法,以便後面分析這些名稱做爲K-V
結構的KEY
時候使用:
id
:由配置實例化時候實例化的UUID
實例生成,從源碼上分析每一個鏈接方式的Redisson
實例有惟一的UUID
,ConnectionManager
初始化的時候會調用UUID id = UUID.randomUUID()
,筆者認爲能夠理解爲Redisson
實例在某個應用程序進程中的惟一標識,畢竟通常狀況下,一個應用程序應該只會應用一種Redisson
的鏈接方式getEntryName()
:返回的是UUID + : + $KEY
,例如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:resource:x
getName()
:返回的是$KEY
,例如resource:x
getChannelName()
:返回的是redisson_lock__channel:{$KEY}
,例如redisson_lock__channel:{resource:x}
getLockName(long threadId)
:返回的是UUID + : + $threadId
,例如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
接着看加鎖的方法,核心實現主要是:
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException
:lock
方法體系public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException
:tryLock
方法體系先看只包含鎖最大持有時間的lock()
方法體系:
/** * 獲取鎖,不指定等待時間,只指定鎖的最大持有時間 * 經過interruptibly參數配置支持中斷 */ private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); // 嘗試獲取鎖,返回的ttl爲空表明獲取鎖成功,返回的ttl表明已經存在的KEY的剩餘存活時間 Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } // 訂閱redisson_lock__channel:{$KEY},其實本質的目的是爲了客戶端經過Redis的訂閱發佈,感知到解鎖的事件 // 這個方法會在LockPubSub中註冊一個entryName -> RedissonLockEntry的哈希映射,RedissonLockEntry實例中存放着RPromise<RedissonLockEntry>結果,一個信號量形式的鎖和訂閱方法重入計數器 // 下面的死循環中的getEntry()或者RPromise<RedissonLockEntry>#getNow()就是從這個映射中獲取的 RFuture<RedissonLockEntry> future = subscribe(threadId); // 同步訂閱執行,獲取註冊訂閱Channel的響應,區分是否支持中斷 if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } // 走到下面的for循環說明返回的ttl不爲空,也就是Redis已經存在對應的KEY,有其餘客戶端已經獲取到鎖,此客戶端線程的調用須要阻塞等待獲取鎖 try { while (true) { // 死循環中嘗試獲取鎖,這個是後面會分析的方法 ttl = tryAcquire(leaseTime, unit, threadId); // 返回的ttl爲空,說明獲取到鎖,跳出死循環,這個死循環或者拋出中斷異常,或者獲取到鎖成功break跳出,沒有其餘方式 if (ttl == null) { break; } // 這個ttl來源於等待存在的鎖的KEY的存活時間,直接使用許可爲0的信號量進行阻塞等待,下面的幾個分支判斷都是大同小異,只是有的支持超時時間,有的支持中斷 // 有的是永久阻塞直到鎖釋放事件訂閱LockPubSub的onMessage()方法回調激活getLatch().release()進行解鎖纔會往下走 // 這裏能夠學到一個特殊的技巧,Semaphore(0),信號量的許可設置爲0,首個調用acquire()的線程會被阻塞,直到其餘線程調用此信號量的release()方法纔會解除阻塞,相似於一個CountDownLatch(1)的效果 if (ttl >= 0) { try { future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { future.getNow().getLatch().acquire(); } else { future.getNow().getLatch().acquireUninterruptibly(); } } } } finally { // 獲取到鎖或者拋出中斷異常,退訂redisson_lock__channel:{$KEY},再也不關注解鎖事件 unsubscribe(future, threadId); } } // 這是一個異步轉同步的方法,相似於FutureTask#get(),關鍵看調用的tryAcquireAsync()方法 private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); } /** * 經過傳入鎖持有的最大時間和線程ID異步獲取鎖 */ private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { // 鎖持有最大時間不爲-1,也就是明確鎖的持有時間,不是永久持有的場景 if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } // 走到這裏說明是leaseTime == -1,KEY不設置過時時間的分支,須要啓動看門狗機制。嘗試內部異步獲取鎖,注意這裏的lockWatchdogTimeout是從配置中獲取傳進去,不是內部的internalLockLeaseTime屬性,這裏的默認值仍是30000毫秒 RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { // 執行異常場景直接返回 if (e != null) { return; } // 成功獲取到鎖的場景,須要基於線程ID啓用看門狗,經過時間輪指定定時任務進行續期 if (ttlRemaining == null) { // 定時調度進行續期操做 scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; } /** * 轉換鎖持有最大時間,經過參數進行加鎖的LUA腳本調用 * getName()就是傳入的KEY,如resource:x getLockName()就是鎖的名稱,形式是:UUID + : + threadId,如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1 * internalLockLeaseTime在leaseTime != -1的前提下使用的是原值,在leaseTime == -1的前提下,使用的是lockWatchdogTimeout */ <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { // 時間轉換爲毫秒,注意一點這裏的internalLockLeaseTime是類內的屬性,被從新賦值了 internalLockLeaseTime = unit.toMillis(leaseTime); // 底層向Redis服務執行LUA腳本 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
先留意一下屬性internalLockLeaseTime
,它在tryLockInnerAsync()
方法內被從新賦值,在leaseTime == -1L
的前提下,它被賦值爲lockWatchdogTimeout
,這個細節很重要,決定了後面續期方法(看門狗)的調度頻率。另外,leaseTime != -1L
不會進行續期,也就是不會啓動看門狗機制。
接着須要仔細分析一下tryLockInnerAsync()
中執行的LUA
腳本,筆者把它提取出來經過註釋進行描述:
-- KEYS[1] == getName() --> $KEY --> resource:x -- ARGV[1] == internalLockLeaseTime --> 30000 -- ARGV[2] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1 -- 第一段代碼是判斷鎖定的資源KEY不存在的時候進行相應值的設置,表明資源沒有被鎖定,首次獲取鎖成功 if (redis.call('exists', KEYS[1]) == 0) then -- 這裏是設置調用次數,能夠理解爲延長KEY過時時間的調用次數 redis.call('hset', KEYS[1], ARGV[2], 1); -- 設置KEY的過時時間 redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; -- 第二段代碼是判斷HASH的field是否存在,若是存在說明是同一個線程重入的狀況,這個時候須要延長KEY的TTL,而且HASH的field對應的value加1,記錄延長ttl的次數 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then -- 這裏是增長調用次數,能夠理解爲增長延長KEY過時時間的調用次數 redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 延長KEY的過時時間 redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; -- 第三段代碼是兜底的,走到這裏說明當前線程獲取鎖失敗,鎖已經被其餘(進程中的)線程佔有,返回當前KEY被佔用資源的ttl,用來肯定須要休眠的最大時間 return redis.call('pttl', KEYS[1]);
這裏畫一個圖演示一下這個Lua
腳本中三段代碼出現的邏輯:
剩下一個scheduleExpirationRenewal(threadId)
方法尚未分析,裏面的邏輯就是看門狗的按期續期邏輯:
// 基於線程ID定時調度和續期 private void scheduleExpirationRenewal(long threadId) { // 若是須要的話新建一個ExpirationEntry記錄線程重入計數,同時把續期的任務Timeout對象保存在屬性中 ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { // 當前進行的當前線程重入加鎖 oldEntry.addThreadId(threadId); } else { // 當前進行的當前線程首次加鎖 entry.addThreadId(threadId); // 首次新建ExpirationEntry須要觸發續期方法,記錄續期的任務句柄 renewExpiration(); } } // 處理續期 private void renewExpiration() { // 根據entryName獲取ExpirationEntry實例,若是爲空,說明在cancelExpirationRenewal()方法已經被移除,通常是解鎖的時候觸發 ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } // 新建一個定時任務,這個就是看門狗的實現,io.netty.util.Timeout是Netty結合時間輪使用的定時任務實例 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { // 這裏是重複外面的那個邏輯, ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } // 獲取ExpirationEntry中首個線程ID,若是爲空說明調用過cancelExpirationRenewal()方法清空持有的線程重入計數,通常是鎖已經釋放的場景 Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } // 向Redis異步發送續期的命令 RFuture<Boolean> future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { // 拋出異常,續期失敗,只打印日誌和直接終止任務 if (e != null) { log.error("Can't update lock " + getName() + " expiration", e); return; } // 返回true證實續期成功,則遞歸調用續期方法(從新調度本身),續期失敗說明對應的鎖已經不存在,直接返回,再也不遞歸 if (res) { // reschedule itself renewExpiration(); } }); } }, // 這裏的執行頻率爲leaseTime轉換爲ms單位下的三分之一,因爲leaseTime初始值爲-1的狀況下才會進入續期邏輯,那麼這裏的執行頻率爲lockWatchdogTimeout的三分之一 internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // ExpirationEntry實例持有調度任務實例 ee.setTimeout(task); } // 調用Redis,執行Lua腳本,進行異步續期 protected RFuture<Boolean> renewExpirationAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.<Object>singletonList(getName()), // 這裏根據前面的分析,internalLockLeaseTime在leaseTime的值爲-1的前提下,對應值爲lockWatchdogTimeout internalLockLeaseTime, getLockName(threadId)); }
基於源碼推斷出續期的機制由入參leaseTime
決定:
leaseTime == -1
的前提下(通常是lock()
和lockInterruptibly()
這類方法調用),續期任務的調度週期爲lockWatchdogTimeout / 3
,鎖的最大持有時間(KEY
的過時時間)被刷新爲lockWatchdogTimeout
leaseTime != -1
的前提下(通常是lock(long leaseTime, TimeUnit unit)
和lockInterruptibly(long leaseTime, TimeUnit unit)
這類方法調用指定leaseTime
不爲-1
),這種狀況下會直接設置鎖的過時時間爲輸入值轉換爲ms
單位的時間量,不會啓動續期機制提取續期的Lua
腳本以下:
-- KEYS[1] == getName() --> $KEY --> resource:x -- ARGV[1] == internalLockLeaseTime --> 30000 -- ARGV[2] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;
到此爲止,不帶waitTime
參數的加鎖和續期邏輯基本分析完畢,而帶waitTime
參數的tryLock(long waitTime, long leaseTime, TimeUnit unit)
實現其實和只存在leaseTime
參數的lock(long leaseTime, TimeUnit unit, boolean interruptibly)
實現底層調用的方法是一致的,最大的區別是會在嘗試獲取鎖操做以後基於先後的System.currentTimeMillis()
計算出時間差和waitTime
作對比,決定須要阻塞等待仍是直接超時獲取鎖失敗返回,處理阻塞等待的邏輯是客戶端自己的邏輯,這裏就不作詳細展開,由於源碼實現也不是十分優雅(太多long currentTime = System.currentTimeMillis()
的代碼段了)。接着花點功夫分析一下解鎖的實現,包括通常狀況下的解鎖unlock()
和強制解鎖forceUnlockAsync()
:
// 通常狀況下的解鎖 @Override public void unlock() { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { // IllegalMonitorStateException通常是A線程加鎖,B線程解鎖,內部判斷線程狀態不一致拋出的 if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } } } @Override public RFuture<Void> unlockAsync() { // 獲取當前調用解鎖操做的線程ID long threadId = Thread.currentThread().getId(); return unlockAsync(threadId); } @Override public RFuture<Void> unlockAsync(long threadId) { // 構建一個結果RedissonPromise RPromise<Void> result = new RedissonPromise<Void>(); // 返回的RFuture若是持有的結果爲true,說明解鎖成功,返回NULL說明線程ID異常,加鎖和解鎖的客戶端線程不是同一個線程 RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { // 這是內部的異常,說明解鎖異常,須要取消看門狗的續期任務 if (e != null) { cancelExpirationRenewal(threadId); result.tryFailure(e); return; } // 這種狀況說明線程ID異常,加鎖和解鎖的客戶端線程不是同一個線程,拋出IllegalMonitorStateException異常 if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } // 走到這裏說明正常解鎖,取消看門狗的續期任務 cancelExpirationRenewal(threadId); result.trySuccess(null); }); return result; } // 真正的內部解鎖的方法,執行解鎖的Lua腳本 protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); } // 取消續期任務 void cancelExpirationRenewal(Long threadId) { // 這裏說明ExpirationEntry已經被移除,通常是基於同一個線程ID屢次調用解鎖方法致使的(併發解鎖) ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (task == null) { return; } // 傳入的線程ID不爲NULL,從ExpirationEntry中移除線程ID,若是持有的線程ID對應的線程重入計數不爲0,會先遞減到0,等於0的前提下才會進行刪除 if (threadId != null) { task.removeThreadId(threadId); } // 這裏threadId == null的狀況是爲了知足強制解鎖的場景,強制解鎖須要直接刪除鎖所在的KEY,不須要理會傳入的線程ID(傳入的線程ID直接爲NULL) // 後者task.hasNoThreads()是爲了說明當前的鎖沒有被任何線程持有,對於單線程也肯定在移除線程ID以後重入計數器已經爲0,從ExpirationEntry中移除,這個時候獲取ExpirationEntry的任務實例進行取消便可 if (threadId == null || task.hasNoThreads()) { Timeout timeout = task.getTimeout(); if (timeout != null) { timeout.cancel(); } // EntryName -> ExpirationEntry映射中移除當前鎖的相關實例ExpirationEntry EXPIRATION_RENEWAL_MAP.remove(getEntryName()); } } // 強制解鎖 @Override public boolean forceUnlock() { return get(forceUnlockAsync()); } @Override public RFuture<Boolean> forceUnlockAsync() { // 線程ID傳入爲NULL,取消當前的EntryName對應的續期任務 cancelExpirationRenewal(null); // 執行Lua腳本強制刪除鎖所在的KEY而且發佈解鎖消息 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('del', KEYS[1]) == 1) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1 " + "else " + "return 0 " + "end", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE); }
這裏列出通常狀況下解鎖和強制解鎖的Lua
腳本,分析以下:
-- unlockInnerAsync方法的lua腳本 -- KEYS[1] == getName() --> $KEY --> resource:x -- KEYS[2] == getChannelName() --> 訂閱鎖的Channel --> redisson_lock__channel:{resource:x} -- ARGV[1] == LockPubSub.UNLOCK_MESSAGE --> 常量數值0 -- ARGV[2] == internalLockLeaseTime --> 30000或者具體的鎖最大持有時間 -- ARGV[3] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1 -- 第一個IF分支判斷若是鎖所在的哈希的field不存在,說明當前線程ID不曾獲取過對應的鎖,返回NULL表示解鎖失敗 if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil; end; -- 走到這裏經過hincrby進行線程重入計數-1,返回計數值 local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); -- 計數值大於0,說明線程重入加鎖,這個時候基於internalLockLeaseTime對鎖所在KEY進行續期 if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else -- 計數值小於或等於0,說明能夠解鎖,刪除鎖所在的KEY,而且向redisson_lock__channel:{$KEY}發佈消息,內容是0(常量數值) redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; -- 最後的return nil;在IDEA中提示是不會到達的語句,估計這裏是開發者筆誤寫上去的,前面的if-else都有返回語句,這裏應該是不可達的 return nil; -------------------------------------------------- 不怎麼華麗的分割線 ------------------------------------------------- -- forceUnlockAsync方法的lua腳本 -- KEYS[1] == getName() --> $KEY --> resource:x -- KEYS[2] == getChannelName() --> 訂閱鎖的Channel --> redisson_lock__channel:{resource:x} -- ARGV[1] == LockPubSub.UNLOCK_MESSAGE --> 常量數值0 -- 強制刪除鎖所在的KEY,若是刪除成功向redisson_lock__channel:{$KEY}發佈消息,內容是0(常量數值) if (redis.call('del', KEYS[1]) == 1) then redis.call('publish', KEYS[2], ARGV[1]); return 1 else return 0 end
其餘輔助方法都相對簡單,這裏弄個簡單的"流水帳"記錄一番:
isLocked()
:基於getName()
調用Redis
的EXISTS $KEY
命令判斷是否加鎖isHeldByThread(long threadId)
和isHeldByCurrentThread()
:基於getName()
和getLockName(threadId)
調用Redis
的HEXISTS $KEY $LOCK_NAME
命令判斷HASH
中對應的field-value
是否存在,存在則說明鎖被對應線程ID
的線程持有getHoldCount()
:基於getName()
和getLockName(threadId)
調用Redis
的HGET $KEY $LOCK_NAME
命令,用於獲取線程對於某一個鎖的持有量(註釋叫holds
,其實就是同一個線程對某一個鎖的KEY
的續期次數)訂閱和發佈部分設計到大量Netty
組件使用相關的源碼,這裏不詳細展開,這部分的邏輯簡單附加到後面這個流程圖中。最後,經過一個比較詳細的圖分析一下Redisson
的加鎖和解鎖流程。
waitTime
參數的加鎖流程:waitTime
參數的加鎖流程(圖右邊的流程基本不變,主要是左邊的流程每一步都要計算時間間隔):假設不一樣進程的兩個不一樣的線程X
和Y
去競爭資源RESOURCE
的鎖,那麼可能的流程以下:
最後再歸納一下Redisson
中實現red lock
算法使用的HASH
數據類型:
KEY
表明的就是資源或者鎖,建立、存在性判斷,延長生存週期和刪除操做老是針對KEY
進行的FIELD
表明的是鎖名稱lockName()
,可是其實它由Redisson
鏈接管理器實例的初始化UUID
拼接客戶端線程ID
組成,嚴格來講應該是獲取鎖的客戶端線程惟一標識VALUE
表明的是客戶端線程對於鎖的持有量,從源碼上看應該是KEY
被續期的次數前面的章節已經比較詳細分析了Redisson
中分佈式鎖的實現原理,這裏使用Jedis
和多線程技巧作一個相似的實現。爲了簡單起見,這裏只實現一個無入參的lock()
方法(相似於Redisson
中leaseTime == -1
的場景)和unlock()
方法。定義接口RedLock
:
public interface RedLock { void lock(String resource) throws InterruptedException; void unlock(String resource); }
爲了簡單起見,筆者把全部實現邏輯都寫在實現類RedisRedLock
中:
@RequiredArgsConstructor public class RedisRedLock implements RedLock { private final JedisPool jedisPool; private final String uuid; private static final String WATCH_DOG_TIMEOUT_STRING = "30000"; private static final long WATCH_DOG_TASK_DURATION = 10000L; private static final String CHANNEL_PREFIX = "__red__lock:"; private static final String UNLOCK_STATUS_STRING = "0"; private static final String LOCK_LUA = "if (redis.call('exists', KEYS[1]) == 0) then\n" + " redis.call('hset', KEYS[1], ARGV[2], 1);\n" + " redis.call('pexpire', KEYS[1], ARGV[1]);\n" + " return nil;\n" + "end;\n" + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then\n" + " redis.call('hincrby', KEYS[1], ARGV[2], 1);\n" + " redis.call('pexpire', KEYS[1], ARGV[1]);\n" + " return nil;\n" + "end;\n" + "return redis.call('pttl', KEYS[1]);"; private static final String UNLOCK_LUA = "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then\n" + " return nil;\n" + "end;\n" + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);\n" + "if (counter > 0) then\n" + " redis.call('pexpire', KEYS[1], ARGV[2]);\n" + " return 0;\n" + "else\n" + " redis.call('del', KEYS[1]);\n" + " redis.call('publish', KEYS[2], ARGV[1]);\n" + " return 1;\n" + "end;"; private static final String RENEW_LUA = "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;"; private static final ExecutorService SUB_PUB_POOL = Executors.newCachedThreadPool(); private static final ScheduledExecutorService WATCH_DOG_POOL = new ScheduledThreadPoolExecutor( Runtime.getRuntime().availableProcessors() * 2 ); private static class ThreadEntry { private final ConcurrentMap<Long, Integer> threadCounter = Maps.newConcurrentMap(); private volatile WatchDogTask watchDogTask; public synchronized void addThreadId(long threadId) { Integer counter = threadCounter.get(threadId); if (counter == null) { counter = 1; } else { counter++; } threadCounter.put(threadId, counter); } public synchronized boolean hasNoThreads() { return threadCounter.isEmpty(); } public synchronized Long getFirstThreadId() { if (threadCounter.isEmpty()) { return null; } return threadCounter.keySet().iterator().next(); } public synchronized void removeThreadId(long threadId) { Integer counter = threadCounter.get(threadId); if (counter == null) { return; } counter--; if (counter == 0) { threadCounter.remove(threadId); } else { threadCounter.put(threadId, counter); } } public void setWatchDogTask(WatchDogTask watchDogTask) { this.watchDogTask = watchDogTask; } public WatchDogTask getWatchDogTask() { return watchDogTask; } } @Getter private static class SubPubEntry { private final String key; private final Semaphore latch; private final SubscribeListener subscribeListener; public SubPubEntry(String key) { this.key = key; this.latch = new Semaphore(0); this.subscribeListener = new SubscribeListener(key, latch); } } private static final ConcurrentMap<String, ThreadEntry> THREAD_ENTRY_MAP = Maps.newConcurrentMap(); @Override public void lock(String resource) throws InterruptedException { long threadId = Thread.currentThread().getId(); String lockName = uuid + ":" + threadId; String entryName = uuid + ":" + resource; // 獲取鎖 Long ttl = acquire(resource, lockName, threadId, entryName); // 加鎖成功直接返回 if (Objects.isNull(ttl)) { return; } // 訂閱 SubPubEntry subPubEntry = subscribeAsync(resource); try { for (; ; ) { ttl = acquire(resource, lockName, threadId, entryName); // 加鎖成功直接返回 if (Objects.isNull(ttl)) { return; } if (ttl > 0L) { subPubEntry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } } finally { unsubscribeSync(subPubEntry); } } private Long acquire(String key, String lockName, long threadId, String entryName) { Object result = execute0(jedis -> jedis.eval(LOCK_LUA, Lists.newArrayList(key), Lists.newArrayList(WATCH_DOG_TIMEOUT_STRING, lockName))); if (Objects.nonNull(result)) { return Long.parseLong(String.valueOf(result)); } // 啓動看門狗 ThreadEntry entry = new ThreadEntry(); ThreadEntry oldEntry = THREAD_ENTRY_MAP.putIfAbsent(entryName, entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); Runnable renewAction = () -> executeWithoutResult(jedis -> jedis.eval(RENEW_LUA, Lists.newArrayList(key), Lists.newArrayList(WATCH_DOG_TIMEOUT_STRING, lockName))); WatchDogTask watchDogTask = new WatchDogTask(new AtomicReference<>(renewAction)); entry.setWatchDogTask(watchDogTask); WATCH_DOG_POOL.scheduleWithFixedDelay(watchDogTask, 0, WATCH_DOG_TASK_DURATION, TimeUnit.MILLISECONDS); } return null; } private SubPubEntry subscribeAsync(String key) { SubPubEntry subPubEntry = new SubPubEntry(key); SUB_PUB_POOL.submit(() -> { SubscribeListener subscribeListener = subPubEntry.getSubscribeListener(); executeWithoutResult(jedis -> jedis.subscribe(subscribeListener, subscribeListener.getChannelName())); return null; }); return subPubEntry; } private void unsubscribeSync(SubPubEntry subPubEntry) { SubscribeListener subscribeListener = subPubEntry.getSubscribeListener(); subscribeListener.unsubscribe(subscribeListener.getChannelName()); } @Override public void unlock(String resource) { long threadId = Thread.currentThread().getId(); String entryName = uuid + ":" + resource; String lockName = uuid + ":" + threadId; String channelName = CHANNEL_PREFIX + resource; Object result = execute0(jedis -> jedis.eval(UNLOCK_LUA, Lists.newArrayList(resource, channelName), Lists.newArrayList(UNLOCK_STATUS_STRING, WATCH_DOG_TIMEOUT_STRING, lockName))); ThreadEntry threadEntry = THREAD_ENTRY_MAP.get(entryName); if (Objects.nonNull(threadEntry)) { threadEntry.removeThreadId(threadId); if (threadEntry.hasNoThreads() && Objects.nonNull(threadEntry.getWatchDogTask())) { threadEntry.getWatchDogTask().cancel(); } } if (Objects.isNull(result)) { throw new IllegalMonitorStateException(); } } private static class SubscribeListener extends JedisPubSub { @Getter private final String key; @Getter private final String channelName; @Getter private final Semaphore latch; public SubscribeListener(String key, Semaphore latch) { this.key = key; this.channelName = CHANNEL_PREFIX + key; this.latch = latch; } @Override public void onMessage(String channel, String message) { if (Objects.equals(channelName, channel) && Objects.equals(UNLOCK_STATUS_STRING, message)) { latch.release(); } } } @RequiredArgsConstructor private static class WatchDogTask implements Runnable { private final AtomicBoolean running = new AtomicBoolean(true); private final AtomicReference<Runnable> actionReference; @Override public void run() { if (running.get() && Objects.nonNull(actionReference.get())) { actionReference.get().run(); } else { throw new WatchDogTaskStopException("watch dog cancel"); } } public void cancel() { actionReference.set(null); running.set(false); } } private <T> T execute0(Function<Jedis, T> function) { try (Jedis jedis = jedisPool.getResource()) { return function.apply(jedis); } } interface Action { void apply(Jedis jedis); } private void executeWithoutResult(Action action) { try (Jedis jedis = jedisPool.getResource()) { action.apply(jedis); } } private static class WatchDogTaskStopException extends RuntimeException { @Override public synchronized Throwable fillInStackTrace() { return this; } } public static void main(String[] args) throws Exception { String resourceName = "resource:x"; RedLock redLock = new RedisRedLock(new JedisPool(new GenericObjectPoolConfig()), UUID.randomUUID().toString()); Thread threadA = new Thread(() -> { try { redLock.lock(resourceName); process(resourceName); } catch (InterruptedException e) { e.printStackTrace(); } finally { redLock.unlock(resourceName); System.out.println(String.format("線程%s釋放資源%s的鎖", Thread.currentThread().getName(), resourceName)); } }, "threadA"); Thread threadB = new Thread(() -> { try { redLock.lock(resourceName); process(resourceName); } catch (InterruptedException e) { e.printStackTrace(); } finally { redLock.unlock(resourceName); System.out.println(String.format("線程%s釋放資源%s的鎖", Thread.currentThread().getName(), resourceName)); } }, "threadB"); threadA.start(); threadB.start(); Thread.sleep(Long.MAX_VALUE); } private static void process(String resourceName) { String threadName = Thread.currentThread().getName(); System.out.println(String.format("線程%s獲取到資源%s的鎖", threadName, resourceName)); try { Thread.sleep(1000); } catch (InterruptedException ignore) { } } }
上面的實現短期內編寫完,沒有作詳細的DEBUG
,可能會有紕漏。某次執行結果以下:
線程threadB獲取到資源resource:x的鎖 線程threadB釋放資源resource:x的鎖 線程threadA獲取到資源resource:x的鎖 線程threadA釋放資源resource:x的鎖
Redisson
中的red lock
實現,應用到下面的核心技術:
Redis
的基本數據類型HASH
Redis
的訂閱發佈Lua
腳本的原子性Netty
中的Promise
實現Netty
中的時間輪HashedWheelTimer
和對應的定時任務(HashedWheel)Timeout
Semaphore
進行帶期限、永久或者可中斷的阻塞以及喚醒,替代CountDownLatch
中的無等待期限阻塞上面的核心技術相對合理地應用,才能實現一個高效並且容錯能力相對比較高的分佈式鎖方案,可是從目前來看,Redisson
仍未解決red lock
算法中的故障轉移缺陷,筆者認爲這個有多是Redis
實現分佈式鎖方案的一個底層缺陷,此方案在Redis
單實例中是相對完善,一旦應用在Redis
集羣(普通主從、哨兵或者Cluster
),有概率會出現前文提到的節點角色切換致使多個不一樣客戶端獲取到同一個資源對應的鎖的問題。暫時無解。
參考資料:
Redisson
開源版本源碼Redis
官方文檔畫圖用的是ProcessOn
:https://www.processon.com/view/link/5ffc540de0b34d2060d2d715
(c-2-w e-a-20210110 2021年的第一篇文章,但願這一年不要這麼鴿,這個系列的下一篇是《冷飯新炒:理解JDK中UUID的底層實現》)