前言
前面已經講解了Redis的客戶端Redission是怎麼實現分佈式鎖的,大多都深刻到源碼級別。html
在分佈式系統中,常見的分佈式鎖實現方案還有Zookeeper,接下來會深刻研究Zookeeper是如何來實現分佈式鎖的。node
Zookeeper初識
文件系統
Zookeeper維護一個相似文件系統的數據結構apache
image.pngsession
每一個子目錄項如NameService都被稱爲znoed,和文件系統同樣,咱們可以自由的增長、刪除znode,在znode下增長、刪除子znode,惟一不一樣的在於znode是能夠存儲數據的。數據結構
有4種類型的znodeapp
-
PERSISTENT--持久化目錄節點客戶端與zookeeper斷開鏈接後,該節點依舊存在框架
-
PERSISTENT_SEQUENTIAL-持久化順序編號目錄節點客戶端與zookeeper斷開鏈接後,該節點依舊存在,只是Zookeeper給該節點名稱進行順序編號分佈式
-
EPHEMERAL-臨時目錄節點客戶端與zookeeper斷開鏈接後,該節點被刪除ide
-
EPHEMERAL_SEQUENTIAL-臨時順序編號目錄節點客戶端與zookeeper斷開鏈接後,該節點被刪除,只是Zookeeper給該節點名稱進行順序編號oop
通知機制
客戶端註冊監聽它關心的目錄節點,當目錄節點發生變化(數據改變、被刪除、子目錄節點增長刪除)等,zookeeper會通知客戶端。
分佈式鎖
有了zookeeper的一致性文件系統,鎖的問題變得容易。鎖服務能夠分爲兩類,一個是保持獨佔,另外一個是控制時序。
-
對於第一類,咱們將zookeeper上的一個znode看做是一把鎖,經過create znode的方式來實現。全部客戶端都去建立 /distribute_lock 節點,最終成功建立的那個客戶端也即擁有了這把鎖。廁全部言:來也沖沖,去也沖沖,用完刪除掉本身建立的distribute_lock 節點就釋放出鎖。
-
對於第二類, /distribute_lock 已經預先存在,全部客戶端在它下面建立臨時順序編號目錄節點,和選master同樣,編號最小的得到鎖,用完刪除本身建立的znode節點。
image.png
註明:以上內容參考 https://www.cnblogs.com/dream-to-pku/p/9513188.html
Curator框架初識
Curator是Netflix公司開源的一套Zookeeper客戶端框架。目前已經做爲Apache的頂級項目出現,是最流行的Zookeeper客戶端之一。
咱們看下Apache Curator官網的介紹:
image.png
接着看下quick start中關於分佈式鎖相關的內容
地址爲:http://curator.apache.org/getting-started.html
InterProcessMutex lock = new InterProcessMutex(client, lockPath); if ( lock.acquire(maxWait, waitUnit) ) { try { // do some work inside of the critical section here } finally { lock.release(); } }
使用很簡單,使用InterProcessMutex
類,使用其中的acquire()
方法,就能夠獲取一個分佈式鎖了。
Curator分佈式鎖使用示例
啓動兩個線程t1和t2去爭奪鎖,拿到鎖的線程會佔用5秒。運行屢次能夠觀察到,有時是t1先拿到鎖而t2等待,有時又會反過來。Curator會用咱們提供的lock路徑的結點做爲全局鎖,這個結點的數據相似這種格式:[_c_64e0811f-9475-44ca-aa36-c1db65ae5350-lock-00000000001],每次得到鎖時會生成這種串,釋放鎖時清空數據。
接下來看看加鎖的示例:
public class Application { private static final String ZK_ADDRESS = "192.20.38.58:2181"; private static final String ZK_LOCK_PATH = "/locks/lock_01"; public static void main(String[] args) throws InterruptedException { CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); Thread t1 = new Thread(() -> { doWithLock(client); }, "t1"); Thread t2 = new Thread(() -> { doWithLock(client); }, "t2"); t1.start(); t2.start(); } private static void doWithLock(CuratorFramework client) { InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH); try { if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) { System.out.println(Thread.currentThread().getName() + " hold lock"); Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + " release lock"); } } catch (Exception e) { e.printStackTrace(); } finally { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } }
運行結果:
image.png
Curator 加鎖實現原理
直接看Curator加鎖的代碼:
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> { private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap(); private static class LockData { final Thread owningThread; final String lockPath; final AtomicInteger lockCount = new AtomicInteger(1); private LockData(Thread owningThread, String lockPath) { this.owningThread = owningThread; this.lockPath = lockPath; } } @Override public boolean acquire(long time, TimeUnit unit) throws Exception { return internalLock(time, unit); } private boolean internalLock(long time, TimeUnit unit) throws Exception { /* Note on concurrency: a given lockData instance can be only acted on by a single thread so locking isn't necessary */ Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null ) { // re-entering lockData.lockCount.incrementAndGet(); return true; } String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; } }
直接看internalLock()
方法,首先是獲取當前線程,而後查看當前線程是否在一個concurrentHashMap中,這裏是重入鎖
的實現,若是當前已經已經獲取了鎖,那麼這個線程獲取鎖的次數再+1
若是沒有獲取鎖,那麼就是用attemptLock()
方法去嘗試獲取鎖,若是lockPath
不爲空,說明獲取鎖成功,並將當前線程放入到map中。
接下來看看核心的加鎖邏輯attemptLock()
方法:
入參:time
: 獲取鎖等待的時間unit
:時間單位lockNodeBytes
:默認爲null
public class LockInternals { String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception { final long startMillis = System.currentTimeMillis(); final Long millisToWait = (unit != null) ? unit.toMillis(time) : null; final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes; int retryCount = 0; String ourPath = null; boolean hasTheLock = false; boolean isDone = false; while ( !isDone ) { isDone = true; try { if ( localLockNodeBytes != null ) { ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, localLockNodeBytes); } else { ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path); } hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); } catch ( KeeperException.NoNodeException e ) { // gets thrown by StandardLockInternalsDriver when it can't find the lock node // this can happen when the session expires, etc. So, if the retry allows, just try it all again if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) { isDone = false; } else { throw e; } } } if ( hasTheLock ) { return ourPath; } return null; } }
ourPath = client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
使用的臨時順序節點,首先他是臨時節點,若是當前這臺機器若是本身宕機的話,他建立的這個臨時節點就會自動消失,若是有獲取鎖的客戶端宕機了,zk能夠保證鎖會自動釋放的
建立的數據結構相似於:
客戶端A獲取鎖的代碼,生成的ourPath=xxxx01
客戶端B獲取鎖的代碼,生成的ourPath=xxxx02
查看Zookeeper中/locks/lock_01下全部臨時節點數據:
PS:01/02的圖沒有截到,這裏又跑了一次截圖所示 03/04 的順序節點在ZK中的顯示
接着重點看interalLockLoop()
的邏輯:
public class LockInternals { private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; boolean doDelete = false; try { if ( revocable.get() != null ) { client.getData().usingWatcher(revocableWatcher).forPath(ourPath); } while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) { List<String> children = getSortedChildren(); String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); if ( predicateResults.getsTheLock() ) { haveTheLock = true; } else { String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath); if ( stat != null ) { if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if ( millisToWait <= 0 ) { doDelete = true; // timed out - delete our node break; } wait(millisToWait); } else { wait(); } } } // else it may have been deleted (i.e. lock released). Try to acquire again } } } // 省略部分代碼 return haveTheLock; } }
重點看while循環中的邏輯
首先是獲取鎖的邏輯:
- 獲取
/locks/lock_01
下排好序的znode節點,上面看圖已經知道,會有xxx01
和xxx02
兩個節點 - 調用
getsTheLock()
方法獲取鎖,其中maxLeases
爲1,默認只能一個線程獲取鎖 - 定位到
StandardLockInternalsDriver.getsTheLock()
方法,其中代碼核心以下:int ourIndex = children.indexOf(sequenceNodeName);
boolean getsTheLock = ourIndex < maxLeases;
- 上面
sequenceNodeName
參數爲xxx01
的全路徑名,而後查看在排好序的children列表中是否爲第一個元素,若是是第一個元素,返回的ourIndex=0,此時則認爲獲取鎖成功 - 若是爲有序列表中的第一個元素,那麼
predicateResults.getsTheLock()
爲true,獲取鎖的標誌位havaTheLock
爲true,直接返回獲取鎖成功
而後是獲取鎖失敗的邏輯:
獲取鎖失敗的核心代碼:
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { Stat stat = client.checkExists().usingWatcher(watcher).forPath(previousSequencePath); if ( stat != null ) { if ( millisToWait != null ) { millisToWait -= (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if ( millisToWait <= 0 ) { doDelete = true; // timed out - delete our node break; } wait(millisToWait); } else { wait(); } } }
- 針對上一個節點添加監聽器
- 若是加鎖有過時時間,到了過時時間後直接break退出循環
- 當前線程處於wait()狀態,等待上一個線程釋放鎖
Curator 釋放鎖實現原理
釋放鎖其實很簡單,直接刪除當前臨時節點,由於下一個節點監聽了上一個節點信息,因此上一個節點刪除後,當前節點就會被喚醒從新獲取鎖。
private void deleteOurPath(String ourPath) throws Exception { try { client.delete().guaranteed().forPath(ourPath); } catch ( KeeperException.NoNodeException e ) { // ignore - already deleted (possibly expired session, etc.) } }
總結
一張圖總結:
04_Zookeeper分佈式鎖實現原理.jpg
原圖可查看個人分享:https://www.processon.com/view/link/5e80508de4b06b85300175d2