聊聊storm nimbus的LeaderElector

本文主要研究一下storm nimbus的LeaderElectorhtml

Nimbus

org/apache/storm/daemon/nimbus/Nimbus.javajava

public static void main(String[] args) throws Exception {
        Utils.setupDefaultUncaughtExceptionHandler();
        launch(new StandaloneINimbus());
    }

    public static Nimbus launch(INimbus inimbus) throws Exception {
        Map<String, Object> conf = Utils.merge(ConfigUtils.readStormConfig(),
                                               ConfigUtils.readYamlConfig("storm-cluster-auth.yaml", false));
        boolean fixupAcl = (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_FIXUP);
        boolean checkAcl = fixupAcl || (boolean) conf.get(DaemonConfig.STORM_NIMBUS_ZOOKEEPER_ACLS_CHECK);
        if (checkAcl) {
            AclEnforcement.verifyAcls(conf, fixupAcl);
        }
        return launchServer(conf, inimbus);
    }

    private static Nimbus launchServer(Map<String, Object> conf, INimbus inimbus) throws Exception {
        StormCommon.validateDistributedMode(conf);
        validatePortAvailable(conf);
        StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
        final Nimbus nimbus = new Nimbus(conf, inimbus, metricsRegistry);
        nimbus.launchServer();
        final ThriftServer server = new ThriftServer(conf, new Processor<>(nimbus), ThriftConnectionType.NIMBUS);
        metricsRegistry.startMetricsReporters(conf);
        Utils.addShutdownHookWithDelayedForceKill(() -> {
            metricsRegistry.stopMetricsReporters();
            nimbus.shutdown();
            server.stop();
        }, 10);
        if (ClientAuthUtils.areWorkerTokensEnabledServer(server, conf)) {
            nimbus.initWorkerTokenManager();
        }
        LOG.info("Starting nimbus server for storm version '{}'", STORM_VERSION);
        server.serve();
        return nimbus;
    }

    public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stormClusterState, NimbusInfo hostPortInfo,
                  BlobStore blobStore, TopoCache topoCache, ILeaderElector leaderElector, IGroupMappingServiceProvider groupMapper,
                  StormMetricsRegistry metricsRegistry)
        throws Exception {
        //......

        if (blobStore == null) {
            blobStore = ServerUtils.getNimbusBlobStore(conf, this.nimbusHostPortInfo, null);
        }
        this.blobStore = blobStore;

        if (topoCache == null) {
            topoCache = new TopoCache(blobStore, conf);
        }
        if (leaderElector == null) {
            leaderElector = Zookeeper.zkLeaderElector(conf, zkClient, blobStore, topoCache, stormClusterState, getNimbusAcls(conf),
                metricsRegistry);
        }
        this.leaderElector = leaderElector;
        this.blobStore.setLeaderElector(this.leaderElector);

        //......
    }

    public void launchServer() throws Exception {
        try {
            BlobStore store = blobStore;
            IStormClusterState state = stormClusterState;
            NimbusInfo hpi = nimbusHostPortInfo;

            LOG.info("Starting Nimbus with conf {}", ConfigUtils.maskPasswords(conf));
            validator.prepare(conf);

            //add to nimbuses
            state.addNimbusHost(hpi.getHost(),
                                new NimbusSummary(hpi.getHost(), hpi.getPort(), Time.currentTimeSecs(), false, STORM_VERSION));
            leaderElector.addToLeaderLockQueue();
            this.blobStore.startSyncBlobs();
            
            for (ClusterMetricsConsumerExecutor exec: clusterConsumerExceutors) {
                exec.prepare();
            }

            if (isLeader()) {
                for (String topoId : state.activeStorms()) {
                    transition(topoId, TopologyActions.STARTUP, null);
                }
                clusterMetricSet.setActive(true);
            }

            //......
        } catch (Exception e) {
            if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
                throw e;
            }

            if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, e)) {
                throw e;
            }
            LOG.error("Error on initialization of nimbus", e);
            Utils.exitProcess(13, "Error on initialization of nimbus");
        }
    }
  • Nimbus在構造器裏頭調用Zookeeper.zkLeaderElector建立leaderElector
  • launchServer方法調用了leaderElector.addToLeaderLockQueue()參與leader選舉

Zookeeper.zkLeaderElector

storm-core-1.1.0-sources.jar!/org/apache/storm/zookeeper/Zookeeper.javaapache

public static ILeaderElector zkLeaderElector(Map conf, BlobStore blobStore) throws UnknownHostException {
        return _instance.zkLeaderElectorImpl(conf, blobStore);
    }

    protected ILeaderElector zkLeaderElectorImpl(Map conf, BlobStore blobStore) throws UnknownHostException {
        List<String> servers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
        Object port = conf.get(Config.STORM_ZOOKEEPER_PORT);
        CuratorFramework zk = mkClientImpl(conf, servers, port, "", conf);
        String leaderLockPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + "/leader-lock";
        String id = NimbusInfo.fromConf(conf).toHostPortString();
        AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id));
        AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference =
                new AtomicReference<>(leaderLatchListenerImpl(conf, zk, blobStore, leaderLatchAtomicReference.get()));
        return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference,
            leaderLatchListenerAtomicReference, blobStore);
    }
  • 這裏使用/leader-lock路徑建立了LeaderLatch,而後使用leaderLatchListenerImpl建立了LeaderLatchListener
  • 最後使用LeaderElectorImp建立ILeaderElector

leaderLatchListenerImpl

storm-core-1.1.0-sources.jar!/org/apache/storm/zookeeper/Zookeeper.javaapp

// Leader latch listener that will be invoked when we either gain or lose leadership
    public static LeaderLatchListener leaderLatchListenerImpl(final Map conf, final CuratorFramework zk, final BlobStore blobStore, final LeaderLatch leaderLatch) throws UnknownHostException {
        final String hostName = InetAddress.getLocalHost().getCanonicalHostName();
        return new LeaderLatchListener() {
            final String STORM_JAR_SUFFIX = "-stormjar.jar";
            final String STORM_CODE_SUFFIX = "-stormcode.ser";
            final String STORM_CONF_SUFFIX = "-stormconf.ser";

            @Override
            public void isLeader() {
                Set<String> activeTopologyIds = new TreeSet<>(Zookeeper.getChildren(zk, conf.get(Config.STORM_ZOOKEEPER_ROOT) + ClusterUtils.STORMS_SUBTREE, false));

                Set<String> activeTopologyBlobKeys = populateTopologyBlobKeys(activeTopologyIds);
                Set<String> activeTopologyCodeKeys = filterTopologyCodeKeys(activeTopologyBlobKeys);
                Set<String> allLocalBlobKeys = Sets.newHashSet(blobStore.listKeys());
                Set<String> allLocalTopologyBlobKeys = filterTopologyBlobKeys(allLocalBlobKeys);

                // this finds all active topologies blob keys from all local topology blob keys
                Sets.SetView<String> diffTopology = Sets.difference(activeTopologyBlobKeys, allLocalTopologyBlobKeys);
                LOG.info("active-topology-blobs [{}] local-topology-blobs [{}] diff-topology-blobs [{}]",
                        generateJoinedString(activeTopologyIds), generateJoinedString(allLocalTopologyBlobKeys),
                        generateJoinedString(diffTopology));

                if (diffTopology.isEmpty()) {
                    Set<String> activeTopologyDependencies = getTopologyDependencyKeys(activeTopologyCodeKeys);

                    // this finds all dependency blob keys from active topologies from all local blob keys
                    Sets.SetView<String> diffDependencies = Sets.difference(activeTopologyDependencies, allLocalBlobKeys);
                    LOG.info("active-topology-dependencies [{}] local-blobs [{}] diff-topology-dependencies [{}]",
                            generateJoinedString(activeTopologyDependencies), generateJoinedString(allLocalBlobKeys),
                            generateJoinedString(diffDependencies));

                    if (diffDependencies.isEmpty()) {
                        LOG.info("Accepting leadership, all active topologies and corresponding dependencies found locally.");
                    } else {
                        LOG.info("Code for all active topologies is available locally, but some dependencies are not found locally, giving up leadership.");
                        closeLatch();
                    }
                } else {
                    LOG.info("code for all active topologies not available locally, giving up leadership.");
                    closeLatch();
                }
            }

            @Override
            public void notLeader() {
                LOG.info("{} lost leadership.", hostName);
            }

            //......

            private void closeLatch() {
                try {
                    leaderLatch.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }
  • leaderLatchListenerImpl返回一個LeaderLatchListener接口的實現類
  • isLeader接口裏頭作了一些校驗,即當被zookeeper選中爲leader的時候,若是本地沒有全部的active topologies或者本地沒有全部dependencies,那麼就須要調用leaderLatch.close()放棄leadership
  • notLeader接口主要打印一下log

LeaderElectorImp

org/apache/storm/zookeeper/LeaderElectorImp.javaide

public class LeaderElectorImp implements ILeaderElector {
    private static Logger LOG = LoggerFactory.getLogger(LeaderElectorImp.class);
    private final Map<String, Object> conf;
    private final List<String> servers;
    private final CuratorFramework zk;
    private final String leaderlockPath;
    private final String id;
    private final AtomicReference<LeaderLatch> leaderLatch;
    private final AtomicReference<LeaderLatchListener> leaderLatchListener;
    private final BlobStore blobStore;
    private final TopoCache tc;
    private final IStormClusterState clusterState;
    private final List<ACL> acls;
    private final StormMetricsRegistry metricsRegistry;

    public LeaderElectorImp(Map<String, Object> conf, List<String> servers, CuratorFramework zk, String leaderlockPath, String id,
                            AtomicReference<LeaderLatch> leaderLatch, AtomicReference<LeaderLatchListener> leaderLatchListener,
                            BlobStore blobStore, final TopoCache tc, IStormClusterState clusterState, List<ACL> acls,
                            StormMetricsRegistry metricsRegistry) {
        this.conf = conf;
        this.servers = servers;
        this.zk = zk;
        this.leaderlockPath = leaderlockPath;
        this.id = id;
        this.leaderLatch = leaderLatch;
        this.leaderLatchListener = leaderLatchListener;
        this.blobStore = blobStore;
        this.tc = tc;
        this.clusterState = clusterState;
        this.acls = acls;
        this.metricsRegistry = metricsRegistry;
    }

    @Override
    public void prepare(Map<String, Object> conf) {
        // no-op for zookeeper implementation
    }

    @Override
    public void addToLeaderLockQueue() throws Exception {
        // if this latch is already closed, we need to create new instance.
        if (LeaderLatch.State.CLOSED.equals(leaderLatch.get().getState())) {
            leaderLatch.set(new LeaderLatch(zk, leaderlockPath));
            LeaderListenerCallback callback = new LeaderListenerCallback(conf, zk, leaderLatch.get(), blobStore, tc, clusterState, acls,
                metricsRegistry);
            leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(callback));
            LOG.info("LeaderLatch was in closed state. Resetted the leaderLatch and listeners.");
        }
        // Only if the latch is not already started we invoke start
        if (LeaderLatch.State.LATENT.equals(leaderLatch.get().getState())) {
            leaderLatch.get().addListener(leaderLatchListener.get());
            leaderLatch.get().start();
            LOG.info("Queued up for leader lock.");
        } else {
            LOG.info("Node already in queue for leader lock.");
        }
    }

    @Override
    // Only started latches can be closed.
    public void removeFromLeaderLockQueue() throws Exception {
        if (LeaderLatch.State.STARTED.equals(leaderLatch.get().getState())) {
            leaderLatch.get().close();
            LOG.info("Removed from leader lock queue.");
        } else {
            LOG.info("leader latch is not started so no removeFromLeaderLockQueue needed.");
        }
    }

    @Override
    public boolean isLeader() throws Exception {
        return leaderLatch.get().hasLeadership();
    }

    @Override
    public NimbusInfo getLeader() {
        try {
            return Zookeeper.toNimbusInfo(leaderLatch.get().getLeader());
        } catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    @Override
    public List<NimbusInfo> getAllNimbuses() throws Exception {
        List<NimbusInfo> nimbusInfos = new ArrayList<>();
        Collection<Participant> participants = leaderLatch.get().getParticipants();
        for (Participant participant : participants) {
            nimbusInfos.add(Zookeeper.toNimbusInfo(participant));
        }
        return nimbusInfos;
    }

    @Override
    public void close() {
        //Do nothing now.
    }
}
  • LeaderElectorImp實現了ILeaderElector接口
  • addToLeaderLockQueue方法檢測若是latch已經closed,則從新建立一個新的,而後檢測latch的狀態,若是尚未start的話,則調用start參與選舉
  • 之因此對closed狀態的latch建立一個,主要有兩個緣由:一是對已經closed的latch進行方法調用會拋異常,二是被zk選舉爲leader,可是不滿意storm的一些leader條件會放棄leadership即close掉

小結

  • storm nimbus的LeaderElector主要是基於zookeeper recipies的LeaderLatch來實現
  • storm nimbus自定義了LeaderLatchListener,對成爲leader以後的nimbus進行校驗,須要本地擁有全部的active topologies以及全部dependencies,不然放棄leadership

doc

相關文章
相關標籤/搜索