zookeeper之master選舉

master選舉

master選舉,在分佈式系統中是很是場景的一種應用場景,在集羣中,有且只有一個master負責主要工做,其餘slave做爲備份,當master掛的時候,其餘slave再競爭master。這樣的一個場景,有兩個特色:node

  • 只能一個服務器成爲master
  • master掛了,其餘slave可以知道並競爭master

zookeeper的幾個特色,能夠知足master選舉:緩存

  • 強一致性:知足只能一個服務器成爲master
  • 臨時節點:master掛了,臨時節點隨着會話的關閉而消失
  • Watcher:監聽節點的變化,master節點被刪除了,就會被監聽到

Curator流程圖

image.png

  1. 建立節點
  2. 建立成功,判斷是不是第一個節點,第一個節點是leader
  3. 不是master,重試策略繼續監聽上一個節點的狀況

Curator源碼

MasterSelect服務器

public class MasterSelect {
    CuratorFramework client = CuratorConnect.getCuratorClient2();
    private static final String path = "/master";

    public void masterSelect() {
        LeaderSelector leaderSelector = new LeaderSelector(client, path, new LeaderSelectorListenerAdapter() {
            @Override
            public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                System.out.println("Get master");
                TimeUnit.SECONDS.sleep(3);
                System.out.println("Release master");
            }
        });
        // 自動加入隊列
        leaderSelector.autoRequeue();
        leaderSelector.start();
        try {
            TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new MasterSelect().masterSelect();
    }
}

運行結果:
image.pngsession

LeaderSelector源碼分析

start

public void start()
{
    //cas操做,狀態不是從LATENT到STARTED的,拋異常
    Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
    // 啓動後,不能中止線程
    Preconditions.checkState(!executorService.isShutdown(), "Already started");
    // 判斷當前是否Leader節點
    Preconditions.checkState(!hasLeadership, "Already has leadership");

   // 增長監聽 
   client.getConnectionStateListenable().addListener(listener);
   // 入隊列
    requeue();
}

 public boolean requeue()
{
    // 當前狀態是不是STARTED
    Preconditions.checkState(state.get() == State.STARTED, "close() has already been called");
    // 入隊列
    return internalRequeue();
}

private synchronized boolean internalRequeue()
{
    // 若是沒有在隊列,而且是STARTED狀態
    if ( !isQueued && (state.get() == State.STARTED) )
    {
        // 設置已經在隊列
        isQueued = true;
        Future<Void> task = executorService.submit(new Callable<Void>()
        {
            @Override
            public Void call() throws Exception
            {
                try
                {
                    doWorkLoop();
                }
                finally
                {
                    // 退出隊列
                    clearIsQueued();
                    // 自動入隊列
                    if ( autoRequeue.get() )
                    {
                        internalRequeue();
                    }
                }
                return null;
            }
        });
        ourTask.set(task);

        return true;
    }
    return false;
}

doWorkLoop

private void doWorkLoop() throws Exception
{
    KeeperException exception = null;
    try
    {
        doWork();
    }
    catch ( KeeperException.ConnectionLossException e )
    {
        exception = e;
    }
    catch ( KeeperException.SessionExpiredException e )
    {
        exception = e;
    }
    catch ( InterruptedException ignore )
    {
        // 線程中斷,則忽略
        Thread.currentThread().interrupt();
    }
    if ( (exception != null) && !autoRequeue.get() )   // autoRequeue should ignore connection loss or session expired and just keep trying
    {
        throw exception;
    }
}

void doWork() throws Exception
{
    // 不是Leader節點
    hasLeadership = false;
    try
    {
        // 獲取到鎖
        mutex.acquire();
        // 是leader節點
        hasLeadership = true;
        try
        {
            if ( debugLeadershipLatch != null )
            {
                debugLeadershipLatch.countDown();
            }
            if ( debugLeadershipWaitLatch != null )
            {
                debugLeadershipWaitLatch.await();
            }
            // 調用takeLeadership方法
            listener.takeLeadership(client);
        }
        catch ( InterruptedException e )
        {
            Thread.currentThread().interrupt();
            throw e;
        }
        catch ( Throwable e )
        {
            ThreadUtils.checkInterrupted(e);
        }
        finally
        {
            // 退出隊列
            clearIsQueued();
        }
    }
    catch ( InterruptedException e )
    {
        Thread.currentThread().interrupt();
        throw e;
    }
    finally
    {
        // 若是搶到leader
        if ( hasLeadership )
        {
            // 設置爲非leader
            hasLeadership = false;
            // 是否中斷
            boolean wasInterrupted = Thread.interrupted();  // clear any interrupted tatus so that mutex.release() works immediately
            try
            {
                // 釋放資源
                mutex.release();
            }
            catch ( Exception e )
            {
                if ( failedMutexReleaseCount != null )
                {
                    failedMutexReleaseCount.incrementAndGet();
                }

                ThreadUtils.checkInterrupted(e);
                log.error("The leader threw an exception", e);
                // ignore errors - this is just a safety
            }
            finally
            {
                if ( wasInterrupted )
                {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

mutex.acquire

public void acquire() throws Exception
{
    if ( !internalLock(-1, null) )
    {
        throw new IOException("Lost connection while trying to acquire lock: " + basePath);
    }
}

private boolean internalLock(long time, TimeUnit unit) throws Exception
{
    /*
       Note on concurrency: a given lockData instance
       can be only acted on by a single thread so locking isn't necessary
    */

    Thread currentThread = Thread.currentThread();

    LockData lockData = threadData.get(currentThread);
    if ( lockData != null )
    {
        // re-entering
        lockData.lockCount.incrementAndGet();
        return true;
    }
    // 獲取節點
    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    if ( lockPath != null )
    {
        // 放入緩存
        LockData newLockData = new LockData(currentThread, lockPath);
        threadData.put(currentThread, newLockData);
        return true;
    }

    return false;
}

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
    final long      startMillis = System.currentTimeMillis();
    final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
    final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
    int             retryCount = 0;

    String          ourPath = null;
    boolean         hasTheLock = false;
    boolean         isDone = false;
    while ( !isDone )
    {
        isDone = true;

        try
        {
            // 獲取臨時節點,建立失敗,在catch中會經過重試機制建立
            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        }
        catch ( KeeperException.NoNodeException e )
        {
            // gets thrown by StandardLockInternalsDriver when it can't find the lock node
            // this can happen when the session expires, etc. So, if the retry allows, just try it all again
            if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
            {
                isDone = false;
            }
            else
            {
                throw e;
            }
        }
    }

    if ( hasTheLock )
    {
        return ourPath;
    }

    return null;
}

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
    boolean     haveTheLock = false;
    boolean     doDelete = false;
    try
    {
        if ( revocable.get() != null )
        {
            client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
        }

        while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
        {
            // 獲取排序後的子節點
            List<String>        children = getSortedChildren();
            // 獲取當前的節點
            String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
            // 獲取到predicateResults,predicateResults有兩個值,是否獲取鎖,監聽節點
            PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
            // 獲取到鎖
            if ( predicateResults.getsTheLock() )
            {
                haveTheLock = true;
            }
            else
            {
                // 沒有獲取到鎖,則監聽
                String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                synchronized(this)
                {
                    try
                    {
                        // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                        // 監聽,等到喚醒,watcher中會調用notifyAll
                        client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                        if ( millisToWait != null )
                        {
                            millisToWait -= (System.currentTimeMillis() - startMillis);
                            startMillis = System.currentTimeMillis();
                            if ( millisToWait <= 0 )
                            {
                                doDelete = true;    // timed out - delete our node
                                break;
                            }

                            wait(millisToWait);
                        }
                        else
                        {
                            wait();
                        }
                    }
                    catch ( KeeperException.NoNodeException e )
                    {
                        // it has been deleted (i.e. lock released). Try to acquire again
                    }
                }
            }
        }
    }
    catch ( Exception e )
    {
        ThreadUtils.checkInterrupted(e);
        doDelete = true;
        throw e;
    }
    finally
    {
        if ( doDelete )
        {
            deleteOurPath(ourPath);
        }
    }
    return haveTheLock;
}

public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{   
    // 獲取節點的位置
    int             ourIndex = children.indexOf(sequenceNodeName);
    validateOurIndex(sequenceNodeName, ourIndex);
    // maxLeases爲1,說明若是不是第一個位置,則沒獲取到鎖
    boolean         getsTheLock = ourIndex < maxLeases;
    String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);

    return new PredicateResults(pathToWatch, getsTheLock);
}

mutex.release

public void release() throws Exception
{
    /*
        Note on concurrency: a given lockData instance
        can be only acted on by a single thread so locking isn't necessary
     */

    Thread currentThread = Thread.currentThread();
    LockData lockData = threadData.get(currentThread);
    if ( lockData == null )
    {
        throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
    }

    int newLockCount = lockData.lockCount.decrementAndGet();
    if ( newLockCount > 0 )
    {
        return;
    }
    if ( newLockCount < 0 )
    {
        throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
    }
    try
    {
        internals.releaseLock(lockData.lockPath);
    }
    finally
    {
        threadData.remove(currentThread);
    }
}

final void releaseLock(String lockPath) throws Exception
{
    // 移除監聽
    client.removeWatchers();
    revocable.set(null);
    // 刪除節點
    deleteOurPath(lockPath);
}
相關文章
相關標籤/搜索