上一篇給你們介紹了基於redis的分佈式鎖不知道有沒有給你解釋清楚,此次介紹一種基於zooKeeper的實現方式,本文只會介紹相關的zooKeeper知識,有興趣的同窗能夠自行學習。node
zookeeper的知識點在這裏就不詳細介紹了,下面列出一些跟實現分佈式鎖相關的概念redis
基於zookeeper的臨時節點方案,主要利用了zookeeper的建立節點的原子性、臨時節點、監聽器等功能,大體上的思路以下:算法
部分代碼實現以下:緩存
/**
* 加鎖代碼實現
**/
public void lock(String path) throws Exception {
boolean hasLock = false;
while (!hasLock) {
try {
this.createTemporaryNode(path, "data");
hasLock = true;
log.info("{}獲取鎖成功", Thread.currentThread().getName());
} catch (Exception e) {
synchronized (this) {
try {
zooKeeperClient.getData(path, event -> {
if (SyncConnected.equals(event.getState()) && NodeDeleted.equals(event.getType())) {
notifyWait();
}
}, null);
wait();
} catch (KeeperException.NoNodeException ex) {
log.info("節點已不存在");
}
}
}
}
}
/**
* 喚醒等待鎖的線程
**/
public synchronized void notifyWait() {
notifyAll();
}
複製代碼
這裏我是使用的是ZooKeeper的Java原生API實現,這段實現代碼並不嚴謹,我只是爲了爲了描述相關邏輯;ZooKeeper的Java原生API存在一些問題如:客戶端斷開鏈接時須要手動去從新鏈接;監聽器只能使用一次,想要繼續使用須要重複註冊;上述代碼實現中若是監聽器被節點的數據改變事件觸發了,那麼就沒法再一次監聽節點刪除事件。推薦你們使用第三方開源框架Curatorbash
臨時順序節點方案和上述方案的不一樣點在於:框架
在這裏咱們使用Curator已有的輪子來實現這個方案,並跟着源碼來分析一下主要思路分佈式
InterProcessMutex lock = curatorLock.getCuratorLock(path);
/**
* curator獲取鎖
*/
public InterProcessMutex getCuratorLock(String path) {
return new InterProcessMutex(curatorClient, path);
}
/**
* curator方式加鎖
* @param lock 鎖
*/
public void curatorLock(InterProcessMutex lock) {
try {
lock.acquire();
log.info("{}獲取鎖成功", Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* curator方式釋放鎖
* @param lock 鎖
*/
public void curatorReleaseLock(InterProcessMutex lock) {
if (null != lock && lock.isAcquiredInThisProcess()) {
try {
lock.release();
log.info("{}釋放鎖成功", Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
}
}
複製代碼
這種實現是否是很是方便呢,實際上主要邏輯就是以前講過的那些,都封裝在內部了,這裏簡單調用一下API便可實現。下面咱們來看看acquire()和release()方法的源碼分析實現方式。ide
// 首先咱們看acquire()方法的對象InterProcessMutex
// 從它的構造方法咱們看下來能夠得知這個鎖的基礎路徑就是咱們傳入的path,鎖的名字暫時是lock-開頭
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {
this(client, path, LOCK_NAME(lock-), 1, driver);
}
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
basePath = PathUtils.validatePath(path);
internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
複製代碼
@Override
public void acquire() throws Exception {
if ( !internalLock(-1, null) ) {
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
private boolean internalLock(long time, TimeUnit unit) throws Exception {
// 獲取當前加鎖線程
Thread currentThread = Thread.currentThread();
// 從一個ConcurrentMap緩存中嘗試獲取當前線程信息
LockData lockData = threadData.get(currentThread);
// 若是map中存在這個線程則說明當前線程已加鎖成功,加鎖次數加一,返回加鎖成功
if ( lockData != null ) {
lockData.lockCount.incrementAndGet();
return true;
}
// 嘗試加鎖並返回加鎖路徑
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
// 加鎖成功
if ( lockPath != null ) {
LockData newLockData = new LockData(currentThread, lockPath);
// 構造一個加鎖數據並加入緩存map
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
private static class LockData {
// 當前加鎖線程
final Thread owningThread;
// 加鎖path
final String lockPath;
// 加鎖次數
final AtomicInteger lockCount = new AtomicInteger(1);
}
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while ( !isDone )
{
isDone = true;
try {
// 在加鎖路徑下建立臨時順序節點並返回路徑
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
// 獲取鎖
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e ) {
// 會話超時會致使找不到鎖定節點,從新嘗試鏈接(容許重試)
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) {
// 鏈接成功從新嘗試加鎖
isDone = false;
} else {
// 鏈接失敗拋出異常
throw e;
}
}
}
// 獲取鎖成功返回加鎖路徑
if ( hasTheLock ) {
return ourPath;
}
return null;
}
// 在加鎖路徑下建立一個臨時順序節點並返回路徑
@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
String ourPath;
if ( lockNodeBytes != null ) {
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
} else {
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
try {
if ( revocable.get() != null ) {
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
// 客戶端已啓動而且沒有獲取鎖則循環重試
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) {
// 獲取從小到大排序的加鎖路徑下的子節點列表
List<String> children = getSortedChildren();
// 獲取當前節點的序列號
String sequenceNodeName = ourPath.substring(basePath.length() + 1);
// 判斷是否能獲取鎖,返回是否成功和須要監聽的路徑
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() ) {
haveTheLock = true;
} else {
// 須要監聽節點的完整路徑
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try {
// 監聽節點
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
// 設置了超時等待時間
if ( millisToWait != null ) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
// 等待時間到了退出獲取
if ( millisToWait <= 0 ) {
doDelete = true; // timed out - delete our node
break;
}
// 超時等待
wait(millisToWait);
} else {
// 進入等待
wait();
}
} catch ( KeeperException.NoNodeException e ) {
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e ) {
ThreadUtils.checkInterrupted(e);
doDelete = true;
throw e;
} finally {
if ( doDelete ) {
// 等待時間到了沒有獲取鎖或則拋出異常則刪除本身的節點
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
// 獲取當前的index
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName, ourIndex);
// 若是當前的index比上一個小則得到鎖
boolean getsTheLock = ourIndex < maxLeases;
// 若是沒有得到鎖則獲取前一個節點的路徑
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
// 監聽器,事件觸發時喚醒等待的線程
private final Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
notifyFromWatcher();
}
};
@Override
public void release() throws Exception {
Thread currentThread = Thread.currentThread();
// 根據當前線程從緩存map中獲取加鎖信息
LockData lockData = threadData.get(currentThread);
// 若是沒有信息則說明沒有得到到鎖
if ( lockData == null ) {
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
// 減小加鎖次數一
int newLockCount = lockData.lockCount.decrementAndGet();
// 若是還有加鎖次數則返回
if ( newLockCount > 0 ) {
return;
}
if ( newLockCount < 0 ) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try {
// 釋放鎖
internals.releaseLock(lockData.lockPath);
} finally {
// 從緩存map中移除加鎖信息
threadData.remove(currentThread);
}
}
final void releaseLock(String lockPath) throws Exception {
// 移除監聽
client.removeWatchers();
revocable.set(null);
// 刪除節點
deleteOurPath(lockPath);
}
複製代碼
總體流程:加鎖時對某個路徑建立臨時順序節點,若是當前已經獲取了鎖,那麼加鎖次數加一;不然若是建立的臨時節點是當前路徑下第一個節點那麼加鎖成功;不然找到當前加鎖路徑下的子節點列表,找到本身的前一個節點並監聽而後進入等待,若是前一個節點釋放了鎖或者當前會話失效那麼節點刪除觸發監聽事件,註冊監聽的線程喚醒從新獲取鎖。oop