實現分佈式鎖目前有三種流行方案,分別爲基於數據庫、Redis、Zookeeper的方案,其中前兩種方案網絡上有不少資料能夠參考,本文不作展開。咱們來看下使用Zookeeper如何實現分佈式鎖。node
什麼是Zookeeper?算法
Zookeeper(業界簡稱zk)是一種提供配置管理、分佈式協同以及命名的中心化服務,這些提供的功能都是分佈式系統中很是底層且必不可少的基本功能,可是若是本身實現這些功能並且要達到高吞吐、低延遲同時還要保持一致性和可用性,實際上很是困難。所以zookeeper提供了這些功能,開發者在zookeeper之上構建本身的各類分佈式系統。數據庫
雖然zookeeper的實現比較複雜,可是它提供的模型抽象倒是很是簡單的。Zookeeper提供一個多層級的節點命名空間(節點稱爲znode),每一個節點都用一個以斜槓(/)分隔的路徑表示,並且每一個節點都有父節點(根節點除外),很是相似於文件系統。例如,/foo/doo這個表示一個znode,它的父節點爲/foo,父父節點爲/,而/爲根節點沒有父節點。與文件系統不一樣的是,這些節點均可以設置關聯的數據,而文件系統中只有文件節點能夠存放數據而目錄節點不行。Zookeeper爲了保證高吞吐和低延遲,在內存中維護了這個樹狀的目錄結構,這種特性使得Zookeeper不能用於存放大量的數據,每一個節點的存放數據上限爲1M。apache
而爲了保證高可用,zookeeper須要以集羣形態來部署,這樣只要集羣中大部分機器是可用的(可以容忍必定的機器故障),那麼zookeeper自己仍然是可用的。客戶端在使用zookeeper時,須要知道集羣機器列表,經過與集羣中的某一臺機器創建TCP鏈接來使用服務,客戶端使用這個TCP連接來發送請求、獲取結果、獲取監聽事件以及發送心跳包。若是這個鏈接異常斷開了,客戶端能夠鏈接到另外的機器上。網絡
架構簡圖以下所示:session
zookeeper數據結構
客戶端的讀請求能夠被集羣中的任意一臺機器處理,若是讀請求在節點上註冊了監聽器,這個監聽器也是由所鏈接的zookeeper機器來處理。對於寫請求,這些請求會同時發給其餘zookeeper機器而且達成一致後,請求才會返回成功。所以,隨着zookeeper的集羣機器增多,讀請求的吞吐會提升可是寫請求的吞吐會降低。架構
有序性是zookeeper中很是重要的一個特性,全部的更新都是全局有序的,每一個更新都有一個惟一的時間戳,這個時間戳稱爲zxid(Zookeeper Transaction Id)。而讀請求只會相對於更新有序,也就是讀請求的返回結果中會帶有這個zookeeper最新的zxid。maven
如何使用zookeeper實現分佈式鎖?分佈式
在描述算法流程以前,先看下zookeeper中幾個關於節點的有趣的性質:
有序節點:假如當前有一個父節點爲/lock,咱們能夠在這個父節點下面建立子節點;zookeeper提供了一個可選的有序特性,例如咱們能夠建立子節點「/lock/node-」而且指明有序,那麼zookeeper在生成子節點時會根據當前的子節點數量自動添加整數序號,也就是說若是是第一個建立的子節點,那麼生成的子節點爲/lock/node-0000000000,下一個節點則爲/lock/node-0000000001,依次類推。
臨時節點:客戶端能夠創建一個臨時節點,在會話結束或者會話超時後,zookeeper會自動刪除該節點。
事件監聽:在讀取數據時,咱們能夠同時對節點設置事件監聽,當節點數據或結構變化時,zookeeper會通知客戶端。當前zookeeper有以下四種事件:1)節點建立;2)節點刪除;3)節點數據修改;4)子節點變動。
下面描述使用zookeeper實現分佈式鎖的算法流程,假設鎖空間的根節點爲/lock:
客戶端鏈接zookeeper,並在/lock下建立臨時的且有序的子節點,第一個客戶端對應的子節點爲/lock/lock-0000000000,第二個爲/lock/lock-0000000001,以此類推。
客戶端獲取/lock下的子節點列表,判斷本身建立的子節點是否爲當前子節點列表中序號最小的子節點,若是是則認爲得到鎖,不然監聽/lock的子節點變動消息,得到子節點變動通知後重復此步驟直至得到鎖;
執行業務代碼;
完成業務流程後,刪除對應的子節點釋放鎖。
步驟1中建立的臨時節點可以保證在故障的狀況下鎖也能被釋放,考慮這麼個場景:假如客戶端a當前建立的子節點爲序號最小的節點,得到鎖以後客戶端所在機器宕機了,客戶端沒有主動刪除子節點;若是建立的是永久的節點,那麼這個鎖永遠不會釋放,致使死鎖;因爲建立的是臨時節點,客戶端宕機後,過了必定時間zookeeper沒有收到客戶端的心跳包判斷會話失效,將臨時節點刪除從而釋放鎖。
另外細心的朋友可能會想到,在步驟2中獲取子節點列表與設置監聽這兩步操做的原子性問題,考慮這麼個場景:客戶端a對應子節點爲/lock/lock-0000000000,客戶端b對應子節點爲/lock/lock-0000000001,客戶端b獲取子節點列表時發現本身不是序號最小的,可是在設置監聽器前客戶端a完成業務流程刪除了子節點/lock/lock-0000000000,客戶端b設置的監聽器豈不是丟失了這個事件從而致使永遠等待了?這個問題不存在的。由於zookeeper提供的API中設置監聽器的操做與讀操做是原子執行的,也就是說在讀子節點列表時同時設置監聽器,保證不會丟失事件。
最後,對於這個算法有個極大的優化點:假如當前有1000個節點在等待鎖,若是得到鎖的客戶端釋放鎖時,這1000個客戶端都會被喚醒,這種狀況稱爲「羊羣效應」;在這種羊羣效應中,zookeeper須要通知1000個客戶端,這會阻塞其餘的操做,最好的狀況應該只喚醒新的最小節點對應的客戶端。應該怎麼作呢?在設置事件監聽時,每一個客戶端應該對恰好在它以前的子節點設置事件監聽,例如子節點列表爲/lock/lock-0000000000、/lock/lock-000000000一、/lock/lock-0000000002,序號爲1的客戶端監聽序號爲0的子節點刪除消息,序號爲2的監聽序號爲1的子節點刪除消息。
zookeeper學習中
因此調整後的分佈式鎖算法流程以下:
客戶端鏈接zookeeper,並在/lock下建立臨時的且有序的子節點,第一個客戶端對應的子節點爲/lock/lock-0000000000,第二個爲/lock/lock-0000000001,以此類推;
客戶端獲取/lock下的子節點列表,判斷本身建立的子節點是否爲當前子節點列表中序號最小的子節點,若是是則認爲得到鎖,不然監聽恰好在本身以前一位的子節點刪除消息,得到子節點變動通知後重復此步驟直至得到鎖;
執行業務代碼;
完成業務流程後,刪除對應的子節點釋放鎖。
Curator的源碼分析
雖然zookeeper原生客戶端暴露的API已經很是簡潔了,可是實現一個分佈式鎖仍是比較麻煩的…咱們能夠直接使用curator這個開源項目提供的zookeeper分佈式鎖實現。
咱們只須要引入下面這個包(基於maven):
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
而後就能夠用啦!代碼以下:
public static void main(String[] args) throws Exception {
//建立zookeeper的客戶端
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("10.21.41.181:2181,10.21.42.47:2181,10.21.49.252:2181", retryPolicy);
client.start();
//建立分佈式鎖, 鎖空間的根節點路徑爲/curator/lock
InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
mutex.acquire();
//得到了鎖, 進行業務流程
System.out.println("Enter mutex");
//完成業務流程, 釋放鎖
mutex.release();
//關閉客戶端
client.close();
}
能夠看到關鍵的核心操做就只有mutex.acquire()和mutex.release(),簡直太方便了!
下面來分析下獲取鎖的源碼實現。acquire的方法以下:
/*
* 獲取鎖,當鎖被佔用時會阻塞等待,這個操做支持同線程的可重入(也就是重複獲取鎖),acquire的次數須要與release的次數相同。
* @throws Exception ZK errors, connection interruptions
*/
@Override
public void acquire() throws Exception
{
if ( !internalLock(-1, null) )
{
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
這裏有個地方須要注意,當與zookeeper通訊存在異常時,acquire會直接拋出異常,須要使用者自身作重試策略。代碼中調用了internalLock(-1, null),參數代表在鎖被佔用時永久阻塞等待。internalLock的代碼以下:
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
//這裏處理同線程的可重入性,若是已經得到鎖,那麼只是在對應的數據結構中增長acquire的次數統計,直接返回成功
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if ( lockData != null )
{
// re-entering
lockData.lockCount.incrementAndGet();
return true;
}
//這裏才真正去zookeeper中獲取鎖
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
//得到鎖以後,記錄當前的線程得到鎖的信息,在重入時只需在LockData中增長次數統計便可
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
//在阻塞返回時仍然獲取不到鎖,這裏上下文的處理隱含的意思爲zookeeper通訊異常
return false;
}
代碼中增長了具體註釋,不作展開。看下zookeeper獲取鎖的具體實現:
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
//參數初始化,此處省略
//...
//自旋獲取鎖
while ( !isDone )
{
isDone = true;
try
{
//在鎖空間下建立臨時且有序的子節點
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
//判斷是否得到鎖(子節點序號最小),得到鎖則直接返回,不然阻塞等待前一個子節點刪除通知
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
//對於NoNodeException,代碼中確保了只有發生session過時纔會在這裏拋出NoNodeException,所以這裏根據重試策略進行重試
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
//若是得到鎖則返回該子節點的路徑
if ( hasTheLock )
{
return ourPath;
}
return null;
}
上面代碼中主要有兩步操做:
driver.createsTheLock:建立臨時且有序的子節點,裏面實現比較簡單不作展開,主要關注幾種節點的模式:1)PERSISTENT(永久);2)PERSISTENT_SEQUENTIAL(永久且有序);3)EPHEMERAL(臨時);4)EPHEMERAL_SEQUENTIAL(臨時且有序)。
internalLockLoop:阻塞等待直到得到鎖。
看下internalLockLoop是怎麼判斷鎖以及阻塞等待的,這裏刪除了一些無關代碼,只保留主流程:
//自旋直至得到鎖
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
//獲取全部的子節點列表,而且按序號從小到大排序
List<String> children = getSortedChildren();
//根據序號判斷當前子節點是否爲最小子節點
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
//若是爲最小子節點則認爲得到鎖
haveTheLock = true;
}
else
{
//不然獲取前一個子節點
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
//這裏使用對象監視器作線程同步,當獲取不到鎖時監聽前一個子節點刪除消息而且進行wait(),當前一個子節點刪除(也就是鎖釋放)時,回調會經過notifyAll喚醒此線程,此線程繼續自旋判斷是否得到鎖
synchronized(this)
{
try
{
//這裏使用getData()接口而不是checkExists()是由於,若是前一個子節點已經被刪除了那麼會拋出異常並且不會設置事件監聽器,而checkExists雖然也能夠獲取到節點是否存在的信息可是同時設置了監聽器,這個監聽器其實永遠不會觸發,對於zookeeper來講屬於資源泄露
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
//若是設置了阻塞等待的時間
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // 等待時間到達,刪除對應的子節點
break;
}
//等待相應的時間
wait(millisToWait);
}
else
{
//永遠等待
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
//上面使用getData來設置監聽器時,若是前一個子節點已經被刪除那麼會拋出NoNodeException,只須要自旋一次便可,無需額外處理
}
}
}
}
具體邏輯見註釋,再也不贅述。代碼中設置的事件監聽器,在事件發生回調時只是簡單的notifyAll喚醒當前線程以從新自旋判斷,比較簡單再也不展開。