聊聊分佈式鎖的實現(二)

上一篇給你們介紹了基於redis的分佈式鎖不知道有沒有給你解釋清楚,此次介紹一種基於zooKeeper的實現方式,本文只會介紹相關的zooKeeper知識,有興趣的同窗能夠自行學習。node

基於zooKeeper實現的分佈式鎖

1、相關概念

zookeeper的知識點在這裏就不詳細介紹了,下面列出一些跟實現分佈式鎖相關的概念redis

  • 臨時節點:臨時節點區別於持久節點的就是它只存在於會話期間,會話結束或者超時會被自動刪除;
  • 有序節點:顧名思義就是有順序的節點,zookeeper會根據現有節點作一個序號順延,如第一個建立的節點是/xiamu/lock-00001,下一個節點就是/xiamu/lock-00002;
  • 監聽器:監聽器的做用就是監聽一些事件的發生,好比節點數據變化、節點的子節點變化、節點的刪除;

2、臨時節點方案

基於zookeeper的臨時節點方案,主要利用了zookeeper的建立節點的原子性、臨時節點、監聽器等功能,大體上的思路以下:算法

  1. 客戶端加鎖時建立一個臨時節點,建立成功則加鎖成功。
  2. 加鎖失敗則建立一個監聽器用於監聽這個節點的變化,而後當前線程進入等待。
  3. 持有鎖的客戶端解鎖時會刪除這個節點,或者會話結束自動被刪除。
  4. 監聽器監聽到節點的刪除通知等待的客戶端去從新獲取鎖。

圖一:臨時節點實現分佈式鎖

部分代碼實現以下:緩存

/**
* 加鎖代碼實現
**/
 public void lock(String path) throws Exception {
    boolean hasLock = false;
    while (!hasLock) {
        try {
            this.createTemporaryNode(path, "data");
            hasLock = true;
            log.info("{}獲取鎖成功", Thread.currentThread().getName());
        } catch (Exception e) {
            synchronized (this) {
                try {
                    zooKeeperClient.getData(path, event -> {
                        if (SyncConnected.equals(event.getState()) && NodeDeleted.equals(event.getType())) {
                            notifyWait();
                        }
                    }, null);
                    wait();
                } catch (KeeperException.NoNodeException ex) {
                    log.info("節點已不存在");
                }
            }
        }
    }
}

/**
* 喚醒等待鎖的線程
**/
public synchronized void notifyWait() {
    notifyAll();
}
複製代碼

這裏我是使用的是ZooKeeper的Java原生API實現,這段實現代碼並不嚴謹,我只是爲了爲了描述相關邏輯;ZooKeeper的Java原生API存在一些問題如:客戶端斷開鏈接時須要手動去從新鏈接;監聽器只能使用一次,想要繼續使用須要重複註冊;上述代碼實現中若是監聽器被節點的數據改變事件觸發了,那麼就沒法再一次監聽節點刪除事件。推薦你們使用第三方開源框架Curatorbash

3、臨時順序節點方案

臨時順序節點方案和上述方案的不一樣點在於:框架

  1. 這裏全部的客戶端都能建立臨時順序節點,只有加鎖路徑下第一個節點才能獲取鎖;
  2. 獲取鎖失敗的客戶端並不監聽獲取鎖的客戶端的節點,而是監聽本身的前一個節點;
  3. 具備可重入性;

在這裏咱們使用Curator已有的輪子來實現這個方案,並跟着源碼來分析一下主要思路分佈式

InterProcessMutex lock = curatorLock.getCuratorLock(path);

/**
 * curator獲取鎖
 */
public InterProcessMutex getCuratorLock(String path) {
    return new InterProcessMutex(curatorClient, path);
}
    
/**
 * curator方式加鎖
 * @param lock 鎖
 */
public void curatorLock(InterProcessMutex lock) {
    try {
        lock.acquire();
        log.info("{}獲取鎖成功", Thread.currentThread().getName());
    } catch (Exception e) {
        e.printStackTrace();
    }
}

/**
 * curator方式釋放鎖
 * @param lock 鎖
 */
public void curatorReleaseLock(InterProcessMutex lock) {
    if (null != lock && lock.isAcquiredInThisProcess()) {
        try {
            lock.release();
            log.info("{}釋放鎖成功", Thread.currentThread().getName());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
複製代碼

這種實現是否是很是方便呢,實際上主要邏輯就是以前講過的那些,都封裝在內部了,這裏簡單調用一下API便可實現。下面咱們來看看acquire()和release()方法的源碼分析實現方式。ide

源碼分析

// 首先咱們看acquire()方法的對象InterProcessMutex
// 從它的構造方法咱們看下來能夠得知這個鎖的基礎路徑就是咱們傳入的path,鎖的名字暫時是lock-開頭
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {
    this(client, path, LOCK_NAME(lock-), 1, driver);
}

InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
    basePath = PathUtils.validatePath(path);
    internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
複製代碼
@Override
public void acquire() throws Exception {
    if ( !internalLock(-1, null) ) {
        throw new IOException("Lost connection while trying to acquire lock: " + basePath);
    }
}

private boolean internalLock(long time, TimeUnit unit) throws Exception {
    // 獲取當前加鎖線程
    Thread currentThread = Thread.currentThread();
    // 從一個ConcurrentMap緩存中嘗試獲取當前線程信息
    LockData lockData = threadData.get(currentThread);
    // 若是map中存在這個線程則說明當前線程已加鎖成功,加鎖次數加一,返回加鎖成功
    if ( lockData != null ) {
        lockData.lockCount.incrementAndGet();
        return true;
    }
    // 嘗試加鎖並返回加鎖路徑
    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    // 加鎖成功
    if ( lockPath != null ) {
        LockData newLockData = new LockData(currentThread, lockPath);
        // 構造一個加鎖數據並加入緩存map
        threadData.put(currentThread, newLockData);
        return true;
    }
    return false;
}

private static class LockData {
    // 當前加鎖線程
    final Thread owningThread;
    // 加鎖path
    final String lockPath;
    // 加鎖次數
    final AtomicInteger lockCount = new AtomicInteger(1);
}

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
    final long      startMillis = System.currentTimeMillis();
    final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
    final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
    int             retryCount = 0;
    String          ourPath = null;
    boolean         hasTheLock = false;
    boolean         isDone = false;
    while ( !isDone )
    {
        isDone = true;
        try {
            // 在加鎖路徑下建立臨時順序節點並返回路徑
            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            // 獲取鎖
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        }
        catch ( KeeperException.NoNodeException e ) {
            // 會話超時會致使找不到鎖定節點,從新嘗試鏈接(容許重試)
            if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) {
                // 鏈接成功從新嘗試加鎖
                isDone = false;
            } else {
                // 鏈接失敗拋出異常
                throw e;
            }
        }
    }
    // 獲取鎖成功返回加鎖路徑
    if ( hasTheLock ) {
        return ourPath;
    }
    return null;
}

// 在加鎖路徑下建立一個臨時順序節點並返回路徑
@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
    String ourPath;
    if ( lockNodeBytes != null ) {
        ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
    } else {
        ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
    }
    return ourPath;
}

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
    boolean     haveTheLock = false;
    boolean     doDelete = false;
    try {
        if ( revocable.get() != null ) {
            client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
        }
        // 客戶端已啓動而且沒有獲取鎖則循環重試
        while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) {
            // 獲取從小到大排序的加鎖路徑下的子節點列表
            List<String> children = getSortedChildren();
            // 獲取當前節點的序列號
            String sequenceNodeName = ourPath.substring(basePath.length() + 1); 
            // 判斷是否能獲取鎖,返回是否成功和須要監聽的路徑
            PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
            if ( predicateResults.getsTheLock() ) {
                haveTheLock = true;
            } else {
                // 須要監聽節點的完整路徑
                String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
                synchronized(this)
                {
                    try {
                        // 監聽節點                       
                        client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                        // 設置了超時等待時間
                        if ( millisToWait != null ) {
                            millisToWait -= (System.currentTimeMillis() - startMillis);
                            startMillis = System.currentTimeMillis();
                            // 等待時間到了退出獲取
                            if ( millisToWait <= 0 ) {
                                doDelete = true;    // timed out - delete our node
                                break;
                            }
                            // 超時等待
                            wait(millisToWait);
                        } else {
                            // 進入等待
                            wait();
                        }
                    } catch ( KeeperException.NoNodeException e ) {
                        // it has been deleted (i.e. lock released). Try to acquire again
                    }
                }
            }
        }
    }
    catch ( Exception e ) {
        ThreadUtils.checkInterrupted(e);
        doDelete = true;
        throw e;
    } finally {
        if ( doDelete )  {
            // 等待時間到了沒有獲取鎖或則拋出異常則刪除本身的節點
            deleteOurPath(ourPath);
        }
    }
    return haveTheLock;
}

@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
    // 獲取當前的index
    int ourIndex = children.indexOf(sequenceNodeName);
    validateOurIndex(sequenceNodeName, ourIndex);
    // 若是當前的index比上一個小則得到鎖
    boolean getsTheLock = ourIndex < maxLeases;
    // 若是沒有得到鎖則獲取前一個節點的路徑
    String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
    return new PredicateResults(pathToWatch, getsTheLock);
}
// 監聽器,事件觸發時喚醒等待的線程
private final Watcher watcher = new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        notifyFromWatcher();
    }
};

@Override
public void release() throws Exception {
    Thread currentThread = Thread.currentThread();
    // 根據當前線程從緩存map中獲取加鎖信息
    LockData lockData = threadData.get(currentThread);
    // 若是沒有信息則說明沒有得到到鎖
    if ( lockData == null ) {
        throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
    }
    // 減小加鎖次數一
    int newLockCount = lockData.lockCount.decrementAndGet();
    // 若是還有加鎖次數則返回
    if ( newLockCount > 0 ) {
        return;
    }
    if ( newLockCount < 0 ) {
        throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
    }
    try {
        // 釋放鎖
        internals.releaseLock(lockData.lockPath);
    } finally {
        // 從緩存map中移除加鎖信息
        threadData.remove(currentThread);
    }
}

final void releaseLock(String lockPath) throws Exception {
    // 移除監聽
    client.removeWatchers();
    revocable.set(null);
    // 刪除節點
    deleteOurPath(lockPath);
}
複製代碼

總體流程:加鎖時對某個路徑建立臨時順序節點,若是當前已經獲取了鎖,那麼加鎖次數加一;不然若是建立的臨時節點是當前路徑下第一個節點那麼加鎖成功;不然找到當前加鎖路徑下的子節點列表,找到本身的前一個節點並監聽而後進入等待,若是前一個節點釋放了鎖或者當前會話失效那麼節點刪除觸發監聽事件,註冊監聽的線程喚醒從新獲取鎖。oop

總結

其實分佈式鎖還存在着其餘的問題,我這兩篇文章中沒有講到;其中一個是獲取鎖的線程進入等待釋放了鎖喚醒以後如何保證不會重複執行由期間獲取鎖的線程執行的操做。若是你若是你的應用只須要高性能的分佈式鎖不要求多高的正確性,那麼單節點 Redis 夠了;若是你的應用想要保住正確性,那麼不建議使用集羣模式下的 Redlock算法,建議使用ZooKeeper且保證存在fencing token(即上述問題的解決方案遞增版本號)。
相關文章
相關標籤/搜索