[TOC]node
緊跟上文的:分佈式鎖實現(一):Redis ,這篇咱們用Zookeeper來設計和實現分佈式鎖,而且研究下開源客戶端工具Curator的分佈式鎖源碼算法
1.在某父節點下建立臨時有序節點
2.判斷建立的節點是不是當前父節點下全部子節點中序號最小的
3.是序號最小的成功獲取鎖,不然監聽比本身小的那個節點,進行watch,當該節點被刪除的時候通知當前節點,從新獲取鎖
4.解鎖的時候刪除當前節點
複製代碼
實現Zookeeper分佈式鎖關鍵就在於其[臨時有序節點]的特性,在Zookeeper中有四種節點
1.PERSISTENT 持久,若不手動刪除就永久存在
2.PERSISTENT_SEQUENTIAL 持久有序節點,zookeeper會爲節點編號(保證有序)
3.EPHEMERAL 臨時,一個客戶端會話斷開後會自動刪除
4.EPHEMERAL_SEQUENTIAL 臨時有序節點,zookeeper會爲節點編號(保證有序)
複製代碼
Zookeeper提供事件監聽機制,經過對節點、節點數據、子節點都提供了監聽,咱們經過這種監聽watcher機制實現鎖的等待
複製代碼
咱們基於ZkClient這個客戶端來實現,固然也能夠用原生Zookeeper API,大體是同樣的
座標以下:
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.2</version>
</dependency>
複製代碼
代碼以下:api
public class MyDistributedLock {
private ZkClient zkClient;
private String name;
private String currentLockPath;
private CountDownLatch countDownLatch;
private static final String PARENT_LOCK_PATH = "/distribute_lock";
public MyDistributedLock(ZkClient zkClient, String name) {
this.zkClient = zkClient;
this.name = name;
}
//加鎖
public void lock() {
//判斷父節點是否存在,不存在就建立
if (!zkClient.exists(PARENT_LOCK_PATH)) {
try {
//多個線程只會成功創建一次
zkClient.createPersistent(PARENT_LOCK_PATH);
} catch (Exception ignored) {
}
}
//建立當前目錄下的臨時有序節點
currentLockPath = zkClient.createEphemeralSequential(PARENT_LOCK_PATH + "/", System.currentTimeMillis());
//校驗是否最小節點
checkMinNode(currentLockPath);
}
//解鎖
public void unlock() {
System.out.println("delete : " + currentLockPath);
zkClient.delete(currentLockPath);
}
private boolean checkMinNode(String lockPath) {
//獲取當前目錄下全部子節點
List<String> children = zkClient.getChildren(PARENT_LOCK_PATH);
Collections.sort(children);
int index = children.indexOf(lockPath.substring(PARENT_LOCK_PATH.length() + 1));
if (index == 0) {
System.out.println(name + ":success");
if (countDownLatch != null) {
countDownLatch.countDown();
}
return true;
} else {
String waitPath = PARENT_LOCK_PATH + "/" + children.get(index - 1);
//等待前一個節點釋放的監聽
waitForLock(waitPath);
return false;
}
}
private void waitForLock(String prev) {
System.out.println(name + " current path :" + currentLockPath + ":fail add listener" + " wait path :" + prev);
countDownLatch = new CountDownLatch(1);
zkClient.subscribeDataChanges(prev, new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
System.out.println("prev node is done");
checkMinNode(currentLockPath);
}
});
if (!zkClient.exists(prev)) {
return;
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch = null;
}
}
複製代碼
zkClient.exists先判斷父節點是否存在,不存在就建立,zookeeper能夠保證只會建立成功一次bash
在當前目錄下zkClient.createEphemeralSequential建立臨時有序節點,再判斷當前目錄下此節點是否爲序號最小的,若是是,成功獲取鎖,不然的話拿比本身小的節點,並作監聽session
waitForLock等待比本身小的節點,subscribeDataChanges監聽一個節點的變化,handleDataDeleted裏面再次作checkMinNode的判斷多線程
監聽完畢後,再判斷一次此節點是否存在,由於在監聽的過程當中有可能以前小的那個節點從新釋放了鎖,若是以前節點不存在的話,無需在這裏等待,這裏的等待是經過countDownLatch實現的app
解鎖就是經過zkClient的delete刪除當前節點分佈式
經過啓動多個線程來測試lock、unlock的過程,查看是否有序ide
public class MyDistributedLockTest {
public static void main(String[] args) {
ZkClient zk = new ZkClient("127.0.0.1:2181", 5 * 10000);
for (int i = 0; i < 20; i++) {
String name = "thread" + i;
Thread thread = new Thread(() -> {
MyDistributedLock myDistributedLock = new MyDistributedLock(zk, name);
myDistributedLock.lock();
// try {
// Thread.sleep(1 * 1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
myDistributedLock.unlock();
});
thread.start();
}
}
}
複製代碼
執行結果以下,多線程狀況下lock/unlock和監聽一切正常:工具
thread1 current path :/distribute_lock2/0000000007:fail add listener wait path :/distribute_lock2/0000000006
thread6 current path :/distribute_lock2/0000000006:fail add listener wait path :/distribute_lock2/0000000005
thread3:success
delete : /distribute_lock2/0000000000
thread2 current path :/distribute_lock2/0000000005:fail add listener wait path :/distribute_lock2/0000000004
thread7 current path :/distribute_lock2/0000000004:fail add listener wait path :/distribute_lock2/0000000003
thread9 current path :/distribute_lock2/0000000009:fail add listener wait path :/distribute_lock2/0000000008
thread5 current path :/distribute_lock2/0000000008:fail add listener wait path :/distribute_lock2/0000000007
thread0 current path :/distribute_lock2/0000000001:fail add listener wait path :/distribute_lock2/0000000000
thread8 current path :/distribute_lock2/0000000002:fail add listener wait path :/distribute_lock2/0000000001
thread4 current path :/distribute_lock2/0000000003:fail add listener wait path :/distribute_lock2/0000000002
delete : /distribute_lock2/0000000001
prev node is done
thread8:success
delete : /distribute_lock2/0000000002
prev node is done
thread4:success
delete : /distribute_lock2/0000000003
prev node is done
thread7:success
delete : /distribute_lock2/0000000004
prev node is done
thread2:success
delete : /distribute_lock2/0000000005
prev node is done
thread6:success
delete : /distribute_lock2/0000000006
prev node is done
thread1:success
delete : /distribute_lock2/0000000007
prev node is done
thread5:success
delete : /distribute_lock2/0000000008
prev node is done
thread9:success
delete : /distribute_lock2/0000000009
複製代碼
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
client.start();
InterProcessMutex lock2 = new InterProcessMutex(client, "/test");
try {
lock.acquire();
//業務
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.release();
}
複製代碼
CuratorFrameworkFactory.newClient獲取zookeeper的客戶端,retryPolicy指定重試策略,開啓客戶端
Curator自己提供了多種鎖的實現,這裏咱們以InterProcessMutex可重入鎖爲例, lock.acquire()方法獲取鎖,lock.release()來釋放鎖,acquire方法也提供了重載的等待時間參數
acquire內部就直接internalLock方法,傳了-1的等待時間
public void acquire() throws Exception {
if(!this.internalLock(-1L, (TimeUnit)null)) {
throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
}
}
複製代碼
internalLock方法首先判斷是不是重入鎖,經過ConcurrentMap維護線程和一個原子計數器,非重入鎖的話,再經過attemptLock去獲取鎖
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary */ Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData != null ) { // re-entering lockData.lockCount.incrementAndGet(); return true; } String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); if ( lockPath != null ) { LockData newLockData = new LockData(currentThread, lockPath); threadData.put(currentThread, newLockData); return true; } return false; } 複製代碼
attemptLock在這裏進行循環等待,createsTheLock方法去建立節點,internalLockLoop去判斷當前節點是不是最小節點
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while ( !isDone )
{
isDone = true;
try
{
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// gets thrown by StandardLockInternalsDriver when it can't find the lock node // this can happen when the session expires, etc. So, if the retry allows, just try it all again if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) { isDone = false; } else { throw e; } } } if ( hasTheLock ) { return ourPath; } return null; } 複製代碼
createsTheLock就是調用curator封裝的api去建立臨時有序節點
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
複製代碼
internalLockLoop鎖判斷,內部就是driver.getsTheLock去判斷是不是當前目錄下最小節點,若是是的話,返回獲取鎖成功,不然的話對previousSequencePath進行監聽,監聽動做完成後再對等待時間進行從新判斷
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
複製代碼
release代碼相對來講比較簡單,就是先判斷map裏面是否存在當前線程的鎖計數,不存在拋出異常,存在的話,進行原子減一操做,releaseLock內部就是刪除節點操做,小於0的時候,從map裏面移除
public void release() throws Exception
{
/*
Note on concurrency: a given lockData instance
can be only acted on by a single thread so locking isn't necessary */ Thread currentThread = Thread.currentThread(); LockData lockData = threadData.get(currentThread); if ( lockData == null ) { throw new IllegalMonitorStateException("You do not own the lock: " + basePath); } int newLockCount = lockData.lockCount.decrementAndGet(); if ( newLockCount > 0 ) { return; } if ( newLockCount < 0 ) { throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath); } try { internals.releaseLock(lockData.lockPath); } finally { threadData.remove(currentThread); } } 複製代碼
分佈式鎖的實現目前主流比較經常使用的實現就是Redis和Zookeeper了,相比較本身的實現,Redission和Curator的設計實現更爲優秀,也更值得咱們借鑑和學習
千里之行,積於跬步;萬里之船,成於羅盤,共勉。
複製代碼