聊聊curator recipes的LeaderLatch

本文主要研究一下curator recipes的LeaderLatchhtml

實例

@Test
    public void testCuratorLeaderLatch() throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(1000, 3));
        client.start();

        String leaderLockPath = "/leader-lock2";

        List<LeaderLatch> latchList = IntStream.rangeClosed(1,10)
                .parallel()
                .mapToObj(i -> new LeaderLatch(client,leaderLockPath,"client"+i))
                .collect(Collectors.toList());

        latchList.parallelStream()
                .forEach(latch -> {
                    try {
                        latch.start();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });

        TimeUnit.SECONDS.sleep(5);

        Iterator<LeaderLatch> iterator = latchList.iterator();
        while (iterator.hasNext()){
            LeaderLatch latch = iterator.next();
            if(latch.hasLeadership()){
                System.out.println(latch.getId() + " hasLeadership");
                try {
                    latch.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                iterator.remove();
            }
        }


        TimeUnit.SECONDS.sleep(5);

        latchList.stream()
                .filter(latch -> latch.hasLeadership())
                .forEach(latch -> System.out.println(latch.getId() + " hasLeadership"));

        Participant participant = latchList.get(0).getLeader();
        System.out.println(participant);


        TimeUnit.MINUTES.sleep(15);

        latchList.stream()
                .forEach(latch -> {
                    try {
                        latch.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
        client.close();
    }
  • zkCli查詢
[zk: localhost:2181(CONNECTED) 17] ls /
[leader-lock1, leader-lock2, zookeeper, leader-lock]
[zk: localhost:2181(CONNECTED) 18] ls /leader-lock2
[_c_4e86edb9-075f-4e18-a00c-cbf4fbf11b23-latch-0000000048, _c_b53efe1b-39ba-48df-8edb-905ddcccf5c9-latch-0000000042, _c_5ea234cc-8350-47ef-beda-8795694b62f6-latch-0000000045, _c_5f3330d9-384c-4abf-8f3e-21623213a374-latch-0000000044, _c_3fdec032-b8a4-44b9-9a9f-20285553a23e-latch-0000000049, _c_97a53125-0ab1-48ea-85cc-cdba631ce20f-latch-0000000047, _c_2bb56be2-ba17-485e-bbd3-10aa1d6af57c-latch-0000000043, _c_93fb732d-541b-48c6-aca7-dd2cd9b6f93e-latch-0000000041, _c_e09f0307-344c-4041-ab71-d68e10a48d02-latch-0000000046, _c_754a4f90-b03c-4803-915b-0654ad35ec9f-latch-0000000040]

LeaderLatch.start

curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.javajava

/**
     * Add this instance to the leadership election and attempt to acquire leadership.
     *
     * @throws Exception errors
     */
    public void start() throws Exception
    {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");

        startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
        {
            @Override
            public void run()
            {
                try
                {
                    internalStart();
                }
                finally
                {
                    startTask.set(null);
                }
            }
        }));
    }

    private synchronized void internalStart()
    {
        if ( state.get() == State.STARTED )
        {
            client.getConnectionStateListenable().addListener(listener);
            try
            {
                reset();
            }
            catch ( Exception e )
            {
                ThreadUtils.checkInterrupted(e);
                log.error("An error occurred checking resetting leadership.", e);
            }
        }
    }

    @VisibleForTesting
    void reset() throws Exception
    {
        setLeadership(false);
        setNode(null);

        BackgroundCallback callback = new BackgroundCallback()
        {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
            {
                if ( debugResetWaitLatch != null )
                {
                    debugResetWaitLatch.await();
                    debugResetWaitLatch = null;
                }

                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    setNode(event.getName());
                    if ( state.get() == State.CLOSED )
                    {
                        setNode(null);
                    }
                    else
                    {
                        getChildren();
                    }
                }
                else
                {
                    log.error("getChildren() failed. rc = " + event.getResultCode());
                }
            }
        };
        client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
    }
  • 這裏start方法表示參與選舉,reset方法經過forPath建立子節點
  • 這裏ZKPaths.makePath(latchPath, LOCK_NAME)返回的是/latchPath/latch-
  • 這裏有個callback主要作getChildren處理

CreateBuilderImpl.forPath

curator-framework-4.0.1-sources.jar!/org/apache/curator/framework/imps/CreateBuilderImpl.javanode

@VisibleForTesting
    static final String PROTECTED_PREFIX = "_c_";

    @Override
    public String forPath(final String givenPath, byte[] data) throws Exception
    {
        if ( compress )
        {
            data = client.getCompressionProvider().compress(givenPath, data);
        }

        final String adjustedPath = adjustPath(client.fixForNamespace(givenPath, createMode.isSequential()));
        List<ACL> aclList = acling.getAclList(adjustedPath);
        client.getSchemaSet().getSchema(givenPath).validateCreate(createMode, givenPath, data, aclList);

        String returnPath = null;
        if ( backgrounding.inBackground() )
        {
            pathInBackground(adjustedPath, data, givenPath);
        }
        else
        {
            String path = protectedPathInForeground(adjustedPath, data, aclList);
            returnPath = client.unfixForNamespace(path);
        }
        return returnPath;
    }

    @VisibleForTesting
    String adjustPath(String path) throws Exception
    {
        if ( doProtected )
        {
            ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
            String name = getProtectedPrefix(protectedId) + pathAndNode.getNode();
            path = ZKPaths.makePath(pathAndNode.getPath(), name);
        }
        return path;
    }

    private static String getProtectedPrefix(String protectedId)
    {
        return PROTECTED_PREFIX + protectedId + "-";
    }
  • 若是CuratorFramework建立的時候沒有指定的namespace的話,這裏client.fixForNamespace返回原值
  • adjustPath對於須要doProtected的進行處理,添加上PROTECTED_PREFIX以及protectedId(UUID)還有-,好比原來是latch-,處理以後變爲_c_a749fd26-b739-4510-9e1b-d2974f6dd1d1-latch-
  • 以後因爲建立的是EPHEMERAL_SEQUENTIAL,於是最後會添加上編號,好比/leader-lock2/_c_a749fd26-b739-4510-9e1b-d2974f6dd1d1-latch-0000000045,而節點的值爲LeaderLatch指定的id

LeaderLatch.getChildren

curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.javaapache

private void getChildren() throws Exception
    {
        BackgroundCallback callback = new BackgroundCallback()
        {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
            {
                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    checkLeadership(event.getChildren());
                }
            }
        };
        client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
    }

    private void checkLeadership(List<String> children) throws Exception
    {
        final String localOurPath = ourPath.get();
        List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
        int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
        if ( ourIndex < 0 )
        {
            log.error("Can't find our node. Resetting. Index: " + ourIndex);
            reset();
        }
        else if ( ourIndex == 0 )
        {
            setLeadership(true);
        }
        else
        {
            String watchPath = sortedChildren.get(ourIndex - 1);
            Watcher watcher = new Watcher()
            {
                @Override
                public void process(WatchedEvent event)
                {
                    if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
                    {
                        try
                        {
                            getChildren();
                        }
                        catch ( Exception ex )
                        {
                            ThreadUtils.checkInterrupted(ex);
                            log.error("An error occurred checking the leadership.", ex);
                        }
                    }
                }
            };

            BackgroundCallback callback = new BackgroundCallback()
            {
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
                {
                    if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
                    {
                        // previous node is gone - reset
                        reset();
                    }
                }
            };
            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
            client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
        }
    }
  • 這裏主要是調用了checkLeadership方法,該方法對於index爲0的標記爲leader,對於index大於0的則添加watch,watch的路徑爲前一個節點,若是前一個節點被刪除了,則從新觸發getChildren方法
  • 這裏還註冊一個callback,若是前一個節點被刪除,則從新觸發reset操做

LeaderLatch.close

curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.javaapp

/**
     * Remove this instance from the leadership election. If this instance is the leader, leadership
     * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
     * instances must eventually be closed.
     *
     * @throws IOException errors
     */
    @Override
    public void close() throws IOException
    {
        close(closeMode);
    }

    /**
     * Remove this instance from the leadership election. If this instance is the leader, leadership
     * is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
     * instances must eventually be closed.
     *
     * @param closeMode allows the default close mode to be overridden at the time the latch is closed.
     * @throws IOException errors
     */
    public synchronized void close(CloseMode closeMode) throws IOException
    {
        Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
        Preconditions.checkNotNull(closeMode, "closeMode cannot be null");

        cancelStartTask();

        try
        {
            setNode(null);
            client.removeWatchers();
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            throw new IOException(e);
        }
        finally
        {
            client.getConnectionStateListenable().removeListener(listener);

            switch ( closeMode )
            {
            case NOTIFY_LEADER:
            {
                setLeadership(false);
                listeners.clear();
                break;
            }

            default:
            {
                listeners.clear();
                setLeadership(false);
                break;
            }
            }
        }
    }

    private synchronized void setLeadership(boolean newValue)
    {
        boolean oldValue = hasLeadership.getAndSet(newValue);

        if ( oldValue && !newValue )
        { // Lost leadership, was true, now false
            listeners.forEach(new Function<LeaderLatchListener, Void>()
                {
                    @Override
                    public Void apply(LeaderLatchListener listener)
                    {
                        listener.notLeader();
                        return null;
                    }
                });
        }
        else if ( !oldValue && newValue )
        { // Gained leadership, was false, now true
            listeners.forEach(new Function<LeaderLatchListener, Void>()
                {
                    @Override
                    public Void apply(LeaderLatchListener input)
                    {
                        input.isLeader();
                        return null;
                    }
                });
        }

        notifyAll();
    }
  • close方法用於將該LeaderLatch退出選舉,若是該latch是leader,則須要釋放leadership
  • close方法首先cancel掉StartTask,設置節點值爲null,而後移除了watcher以及ConnectionStateListener,最後設置leadership爲false,而後觸發相關listener
  • 注意若是closeMode是NOTIFY_LEADER,則先設置leadership爲false,觸發相關listener以後再移除listener;不然是先移除listener,再設置爲false
  • setLeadership根據新舊值調用listener.notLeader()或者input.isLeader()

ConnectionStateListener

curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.java框架

private final ConnectionStateListener listener = new ConnectionStateListener()
    {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState)
        {
            handleStateChange(newState);
        }
    };

    private void handleStateChange(ConnectionState newState)
    {
        switch ( newState )
        {
            default:
            {
                // NOP
                break;
            }

            case RECONNECTED:
            {
                try
                {
                    if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) || !hasLeadership.get() )
                    {
                        reset();
                    }
                }
                catch ( Exception e )
                {
                    ThreadUtils.checkInterrupted(e);
                    log.error("Could not reset leader latch", e);
                    setLeadership(false);
                }
                break;
            }

            case SUSPENDED:
            {
                if ( client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) )
                {
                    setLeadership(false);
                }
                break;
            }

            case LOST:
            {
                setLeadership(false);
                break;
            }
        }
    }
  • LeaderLatch註冊了一個自定義的ConnectionStateListener,分別在RECONNECTED、SUSPENDED、LOST的時候進行相應處理
  • setLeadership(false)的時候,會根據新舊值通知相應的listener作處理,若是原來是leader,則回調listener.notLeader()
  • 對於RECONNECTED狀態,若是當前latch不是leader,則調用reset,從新走start過程註冊節點

小結

  • curator recipes的LeaderLatch給咱們提供了leader選舉的便利方法,並提供了LeaderLatchListener供自定義處理
  • LeaderLatch使用了zk的EPHEMERAL_SEQUENTIAL,節點名會自動帶上編號,默認LOCK_NAME爲latch-,另外對於protected的,會自動添加上PROTECTED_PREFIX(_c_)以及protectedId(UUID),於是最後的節點名的格式爲PROTECTED_PREFIX+UUID+LOCK_NAME+編號,相似_c_a749fd26-b739-4510-9e1b-d2974f6dd1d1-latch-0000000045
  • LeaderLatch使用了ConnectionStateListener對自身節點變化進行相應處理,取index爲0的節點位leader,對於非leader的還對前一個節點添加watcher針對前一節點刪除進行處理,觸發checkLeadership操做,從新檢查自身的index是不是在children排在第一位,若是是則更新爲leader,觸發相應操做,若是不是則從新watch前面一個節點。如此一環扣一環的實現顯得十分精妙。

doc

相關文章
相關標籤/搜索