寫這篇文章的目的主要是爲了記錄下本身在zookeeper 鎖上踩過的坑,以及踩坑以後本身的一點認識;java
從zk分佈式鎖原理提及,原理很簡單,你們也應該都知道,簡單的說就是zookeeper實現分佈式鎖是經過在zk集羣上的路徑實現的,在獲取分佈式鎖的時候在zk服務器集羣節點上建立臨時順序節點,釋放鎖的時候刪除該臨時節點.
多麼簡單的一句話,可是當你實現起來,想去作點優化的時候每每會變得很難,難的咱們後續說;node
再從需求提及,需求就是加鎖,可是因爲原來吞吐量不是很大,只是配置了一個固定的鎖路徑,可是卻不是每次都會去根據這個鎖路徑建立鎖,而是將這個鎖路徑存放在一個本地的HashMap中,這樣的話,我就沒有必要每次都去重複的建立這個鎖對象,簡單高效的利用;程序員
變動後的需求是這樣的,爲了下降鎖的力度,每次我要動態的生成一個path去zk上進行建立,而後再根據這個path生成鎖對象,可是,一開始我依舊是沿用老的思惟,想避免重複建立這個path的鎖對象,因而,我想弄個三方緩存來存儲這個鎖對象,這時候坑就來了;redis
接下來,咱們開始分析個人踩坑之旅:緩存
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> { ... }
這是curator裏面重入鎖對象的結構,InterProcessLock
這個是curator通用的鎖接口,定義的跟jdk自己的也差很少,也是curator留給開發者本身去定製實現符合本身業務需求的鎖對象的;Revocable
接口是用來執行取消動做時觸發動做用到的,若是你自定義鎖對象的時候在釋放鎖對象時想觸發一些動做,你能夠實現它的方法,以上即是InterProcessLock結構的介紹;服務器
看到這個代碼結構咱們還看出什麼東西沒?它並無實現Serializable,致使其沒法被序列化,也就是上面我本身想改進我業務中鎖的場景就不支持了,由於相似於redis這種緩存,無法去存放一個對象,它頂多支持字符串以及byte[]
,因此個人想法就被loss掉了;session
即便與業務無關了,可是咱們做爲可愛的程序員仍是有必要去研究一下這個玩意的內部實現,由於咱們不知道下次咱們還會遇到什麼場景,因此有必要讓本身刻骨銘心一次;app
接下來,咱們看其內部實現,也就是咱們高大上的源碼之旅:分佈式
private final LockInternals internals; private final String basePath; private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
internals:這個是全部申請鎖與釋放鎖的核心實現,待會咱們再來說內部實現;
basePath:鎖定的路徑;
threadData:內部緩存鎖的容器;oop
實現流程主要是這樣的:每次初始化InterProcessMutex
對象的時候都會初始化一個StandardLockInternalsDriver
對象,這個對象咱們後面再講它的使用,同時也會初始化一個LockInternals對象,
接下來,咱們來看獲取鎖的代碼:
public void acquire() throws Exception{ if ( !internalLock(-1, null) ) { throw new IOException("Lost connection while trying to acquire lock: " + basePath); } }
private boolean internalLock(long time, TimeUnit unit) throws Exception { Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null ) { // re-entering lockData.lockCount.incrementAndGet(); return true; } String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; }
邏輯以下:
每次獲取鎖時會直接從本地緩存中先獲取鎖的元數據,若是存在,則在原有的計數器基礎上+1,直接返回;
不然,嘗試去獲取鎖,邏輯以下,
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception { final long startMillis = System.currentTimeMillis(); //等待時間 final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; int retryCount = 0; String ourPath = null; boolean hasTheLock = false; boolean isDone = false; while ( !isDone ) { isDone = true; try { ourPath = driver.createsTheLock(client, path, localLockNodeBytes); hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } catch ( KeeperException.NoNodeException e ) { // gets thrown by StandardLockInternalsDriver when it can't find the lock node // this can happen when the session expires, etc. So, if the retry allows, just try it all again if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) { isDone = false; } else { throw e; } } } if ( hasTheLock ) { return ourPath; } return null; }
首先設置一個是否有鎖的標誌hasTheLock = false
,而後
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
這個地方主要是經過StandardLockInternalsDriver
在鎖目錄下建立EPHEMERAL_SEQUENTIAL節點,
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
這裏主要是循環獲取鎖的過程,代碼看下面,首先是判斷是否實現了revocable接口,若是實現了那麼就對這個path設置監聽,不然的話經過StandardLockInternalsDriver嘗試獲得PredicateResults(主要是否獲得鎖及須要監視的目錄的兩個屬性);
private boolean internalLockLoop(long startMillis,Long millisToWait, String ourPath) throws Exception{ boolean haveTheLock = false; boolean doDelete = false; try{ if ( revocable.get() != null ){ client.getData().usingWatcher(revocableWatcher).forPath(ourPath); } while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ){ List<String> children = getSortedChildren(); String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); if ( predicateResults.getsTheLock() ) { haveTheLock = true; } else { String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { try { // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak client.getData().usingWatcher(watcher).forPath(previousSequencePath); if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if ( millisToWait <= 0 ) { doDelete = true; // timed out - delete our node break; } wait(millisToWait); } else { wait(); } } catch ( KeeperException.NoNodeException e ) { // it has been deleted (i.e. lock released). Try to acquire again } } } } } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); doDelete = true; throw e; } finally { if ( doDelete ) { deleteOurPath(ourPath); } } return haveTheLock; }
而後判斷PredicateResults中的pathToWatch(主要保存sequenceNode)是不是最小的節點,若是是,則獲得鎖,getsTheLock爲true,不然獲得該序列的前一個節點,設爲pathToWatch,並監控起來;再判斷獲取鎖的時間是否超時,超時則刪除節點,不競爭下次鎖,不然,睡眠等待獲取鎖;最後把獲取的鎖對象的鎖路徑等信息封裝成LockData存儲在本地緩存中.
獲取鎖的邏輯主要就是這些,有興趣的同窗能夠打斷點跟蹤學習下,
下面是釋放鎖的過程;
public void release() throws Exception { /* Note on concurrency: a given lockData instance can be only acted on by a single thread so locking isn't necessary */ Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData == null ) { throw new IllegalMonitorStateException("You do not own the lock: " + basePath); } int newLockCount = lockData.lockCount.decrementAndGet(); if ( newLockCount > 0 ) { return; } if ( newLockCount < 0 ) { throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath); } try { internals.releaseLock(lockData.lockPath); } finally { threadData.remove(currentThread); } }
代碼很簡單,從本地緩存中拿到鎖對象,計數器-1,只有到那個計數器=0的時候纔會去執internals.releaseLock(lockData.lockPath);
final void releaseLock(String lockPath) throws Exception { client.removeWatchers(); revocable.set(null); deleteOurPath(lockPath); }
只要邏輯見名知意,首先移除watcher監聽,這個監聽多是在循環獲取鎖的時候建立的,而後取消動做時觸發動做時間置空,最後就是刪除path;
最後作個小總結吧