zk用處如此之多,以致於每一個地方都要你理解zk原理!html
請按以下操做姿式打開:java
1. 打開zk的git倉庫地址: https://github.com/apache/zookeeper , 確認過眼神,它就是你要找有人!
2. 下載源碼到本地,下載 ant 工具到本地,(若是還沒下載的話: http://ant.apache.org/)!
3. 運行 ant 腳本,使生成須要的環境: ant eclepse !(可能會花費幾分鐘的時間)
4. idea 打開源碼,導入必要包!
5. 運行源碼main() 方法,啓動 zk服務端,注意添加運行時配置文件!
6. 分析源碼,學習中!node
首先,從啓動腳本入口:(zkServer.sh)linux
#!/usr.bin.env bash # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org.licenses.LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # If this scripted is run out of /usr.bin or some other system bin directory # it should be linked to and not copied. Things like java jar files are found # relative to the canonical path of this script. # # use POSTIX interface, symlink is followed automatically ZOOBIN="${BASH_SOURCE-$0}" ZOOBIN="$(dirname "${ZOOBIN}")" ZOOBINDIR="$(cd "${ZOOBIN}"; pwd)" if [ -e "$ZOOBIN/../libexec.zkEnv.sh" ]; then . "$ZOOBINDIR/../libexec.zkEnv.sh" else . "$ZOOBINDIR.zkEnv.sh" fi # See the following page for extensive details on setting # up the JVM to accept JMX remote management: # http://java.sun.com.javase/6/docs.technotes.guides.management.agent.html # by default we allow local JMX connections if [ "x$JMXLOCALONLY" = "x" ] then JMXLOCALONLY=false fi if [ "x$JMXDISABLE" = "x" ] || [ "$JMXDISABLE" = 'false' ] then echo "ZooKeeper JMX enabled by default" >&2 if [ "x$JMXPORT" = "x" ] then # for some reason these two options are necessary on jdk6 on Ubuntu # accord to the docs they are not necessary, but otw jconsole cannot # do a local attach ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain" else if [ "x$JMXAUTH" = "x" ] then JMXAUTH=false fi if [ "x$JMXSSL" = "x" ] then JMXSSL=false fi if [ "x$JMXLOG4J" = "x" ] then JMXLOG4J=true fi echo "ZooKeeper remote JMX Port set to $JMXPORT" >&2 echo "ZooKeeper remote JMX authenticate set to $JMXAUTH" >&2 echo "ZooKeeper remote JMX ssl set to $JMXSSL" >&2 echo "ZooKeeper remote JMX log4j set to $JMXLOG4J" >&2 ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxremote.authenticate=$JMXAUTH -Dcom.sun.management.jmxremote.ssl=$JMXSSL -Dzookeeper.jmx.log4j.disable=$JMXLOG4J org.apache.zookeeper.server.quorum.QuorumPeerMain" fi else echo "JMX disabled by user request" >&2 ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain" fi if [ "x$SERVER_JVMFLAGS" != "x" ] then JVMFLAGS="$SERVER_JVMFLAGS $JVMFLAGS" fi if [ "x$2" != "x" ] then ZOOCFG="$ZOOCFGDIR/$2" fi # if we give a more complicated path to the config, don't screw around in $ZOOCFGDIR if [ "x$(dirname "$ZOOCFG")" != "x$ZOOCFGDIR" ] then ZOOCFG="$2" fi if $cygwin then ZOOCFG=`cygpath -wp "$ZOOCFG"` # cygwin has a "kill" in the shell itself, gets confused KILL=/bin.kill else KILL=kill fi echo "Using config: $ZOOCFG" >&2 case "$OSTYPE" in *solaris*) GREP=/usr.xpg4/bin.grep ;; *) GREP=grep ;; esac if [ -z "$ZOOPIDFILE" ]; then ZOO_DATADIR="$($GREP "^[[:space:]]*dataDir" "$ZOOCFG" | sed -e 's/.*=//')" if [ ! -d "$ZOO_DATADIR" ]; then mkdir -p "$ZOO_DATADIR" fi ZOOPIDFILE="$ZOO_DATADIR.zookeeper_server.pid" else # ensure it exists, otw stop will fail mkdir -p "$(dirname "$ZOOPIDFILE")" fi if [ ! -w "$ZOO_LOG_DIR" ] ; then mkdir -p "$ZOO_LOG_DIR" fi _ZOO_DAEMON_OUT="$ZOO_LOG_DIR.zookeeper.out" case $1 in start) echo -n "Starting zookeeper ... " if [ -f "$ZOOPIDFILE" ]; then if kill -0 `cat "$ZOOPIDFILE"` > /dev.null 2>&1; then echo $command already running as process `cat "$ZOOPIDFILE"`. exit 0 fi fi nohup "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \ -cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev.null & if [ $? -eq 0 ] then case "$OSTYPE" in *solaris*) /bin.echo "${!}\\c" > "$ZOOPIDFILE" ;; *) /bin.echo -n $! > "$ZOOPIDFILE" ;; esac if [ $? -eq 0 ]; then sleep 1 echo STARTED else echo FAILED TO WRITE PID exit 1 fi else echo SERVER DID NOT START exit 1 fi ;; start-foreground) ZOO_CMD=(exec "$JAVA") if [ "${ZOO_NOEXEC}" != "" ]; then ZOO_CMD=("$JAVA") fi "${ZOO_CMD[@]}" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \ -cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" ;; print-cmd) echo "\"$JAVA\" -Dzookeeper.log.dir=\"${ZOO_LOG_DIR}\" -Dzookeeper.root.logger=\"${ZOO_LOG4J_PROP}\" -cp \"$CLASSPATH\" $JVMFLAGS $ZOOMAIN \"$ZOOCFG\" > \"$_ZOO_DAEMON_OUT\" 2>&1 < /dev.null" ;; stop) echo -n "Stopping zookeeper ... " if [ ! -f "$ZOOPIDFILE" ] then echo "no zookeeper to stop (could not find file $ZOOPIDFILE)" else $KILL -9 $(cat "$ZOOPIDFILE") rm "$ZOOPIDFILE" echo STOPPED fi exit 0 ;; upgrade) shift echo "upgrading the servers to 3.*" "$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \ -cp "$CLASSPATH" $JVMFLAGS org.apache.zookeeper.server.upgrade.UpgradeMain ${@} echo "Upgrading ... " ;; restart) shift "$0" stop ${@} sleep 3 "$0" start ${@} ;; status) # -q is necessary on some versions of linux where nc returns too quickly, and no stat result is output clientPortAddress=`$GREP "^[[:space:]]*clientPortAddress[^[:alpha:]]" "$ZOOCFG" | sed -e 's/.*=//'` if ! [ $clientPortAddress ] then clientPortAddress="localhost" fi clientPort=`$GREP "^[[:space:]]*clientPort[^[:alpha:]]" "$ZOOCFG" | sed -e 's/.*=//'` STAT=`"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \ -cp "$CLASSPATH" $JVMFLAGS org.apache.zookeeper.client.FourLetterWordMain \ $clientPortAddress $clientPort srvr 2> /dev.null \ | $GREP Mode` if [ "x$STAT" = "x" ] then echo "Error contacting service. It is probably not running." exit 1 else echo $STAT exit 0 fi ;; *) echo "Usage: $0 {start|start-foreground|stop|restart|status|upgrade|print-cmd}" >&2 esac
主要看下啓動的腳本:git
nohup java -Dzookeeper.log.dir=. -Dzookeeper.root.logger=INFO,CONSOLE -cp /etc.zookeeper-3.4.13/bin/../build.classes:/etc.zookeeper-3.4.13/bin/../build.lib/*.jar:/etc.zookeeper-3.4.13/bin/../lib.slf4j-log4j12-1.7.25.jar:/etc.zookeeper-3.4.13/bin/../lib.slf4j-api-1.7.25.jar:/etc.zookeeper-3.4.13/bin/../lib.netty-3.10.6.Final.jar:/etc.zookeeper-3.4.13/bin/../lib.log4j-1.2.17.jar:/etc.zookeeper-3.4.13/bin/../lib.jline-0.9.94.jar:/etc.zookeeper-3.4.13/bin/../lib.audience-annotations-0.5.0.jar:/etc.zookeeper-3.4.13/bin/../zookeeper-3.4.13.jar:/etc.zookeeper-3.4.13/bin/../src.java.lib/*.jar:/etc.zookeeper-3.4.13/bin/../conf: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain /opt.zookeeper.zoo1.cfg > ./zookeeper.out 2>&1 < /dev.null &
能夠看到, org.apache.zookeeper.server.quorum.QuorumPeerMain 是啓動類, 所以找到這個方法:github
/** * To start the replicated server specify the configuration file name on * the command line. * @param args path to the configfile */ public static void main(String[] args) { QuorumPeerMain main = new QuorumPeerMain(); try { main.initializeAndRun(args); } catch (IllegalArgumentException e) { LOG.error("Invalid arguments, exiting abnormally", e); LOG.info(USAGE); System.err.println(USAGE); System.exit(ExitCode.INVALID_INVOCATION.getValue()); } catch (ConfigException e) { LOG.error("Invalid config, exiting abnormally", e); System.err.println("Invalid config, exiting abnormally"); System.exit(ExitCode.INVALID_INVOCATION.getValue()); } catch (DatadirException e) { LOG.error("Unable to access datadir, exiting abnormally", e); System.err.println("Unable to access datadir, exiting abnormally"); System.exit(ExitCode.UNABLE_TO_ACCESS_DATADIR.getValue()); } catch (AdminServerException e) { LOG.error("Unable to start AdminServer, exiting abnormally", e); System.err.println("Unable to start AdminServer, exiting abnormally"); System.exit(ExitCode.ERROR_STARTING_ADMIN_SERVER.getValue()); } catch (Exception e) { LOG.error("Unexpected exception, exiting abnormally", e); System.exit(ExitCode.UNEXPECTED_ERROR.getValue()); } LOG.info("Exiting normally"); System.exit(ExitCode.EXECUTION_FINISHED.getValue()); } protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { config.parse(args[0]); } // Start and schedule the the purge task DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); // 啓動後臺清理線程 // org.apache.zookeeper.server.DatadirCleanupManager purgeMgr.start(); if (args.length == 1 && config.isDistributed()) { // 若是是集羣模式,則走 runFromConfig(config); } else { // 單機模式運行,我們先看單機模式,後續再深刻到集羣模式吧 LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone ZooKeeperServerMain.main(args); } }
以上過程, 主要就是進行初始化,而後捕獲各類異常!包括對種應用參數,配置的異常檢測!shell
其中,對單機模式的處理,則是直接轉發給了 ZooKeeperServerMain.main() 處理。解析配置文件的過程大體以下:
對單機模式的運行,直接調用 ZooKeeperServerMain, 便可!express
// org.apache.zookeeper.server.ZooKeeperServerMain /* * Start up the ZooKeeper server. * * @param args the configfile or the port datadir [ticktime] */ public static void main(String[] args) { ZooKeeperServerMain main = new ZooKeeperServerMain(); try { main.initializeAndRun(args); } catch (IllegalArgumentException e) { LOG.error("Invalid arguments, exiting abnormally", e); LOG.info(USAGE); System.err.println(USAGE); System.exit(ExitCode.INVALID_INVOCATION.getValue()); } catch (ConfigException e) { LOG.error("Invalid config, exiting abnormally", e); System.err.println("Invalid config, exiting abnormally"); System.exit(ExitCode.INVALID_INVOCATION.getValue()); } catch (DatadirException e) { LOG.error("Unable to access datadir, exiting abnormally", e); System.err.println("Unable to access datadir, exiting abnormally"); System.exit(ExitCode.UNABLE_TO_ACCESS_DATADIR.getValue()); } catch (AdminServerException e) { LOG.error("Unable to start AdminServer, exiting abnormally", e); System.err.println("Unable to start AdminServer, exiting abnormally"); System.exit(ExitCode.ERROR_STARTING_ADMIN_SERVER.getValue()); } catch (Exception e) { LOG.error("Unexpected exception, exiting abnormally", e); System.exit(ExitCode.UNEXPECTED_ERROR.getValue()); } LOG.info("Exiting normally"); System.exit(ExitCode.EXECUTION_FINISHED.getValue()); } // 單機模式初始化方法 protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } ServerConfig config = new ServerConfig(); if (args.length == 1) { config.parse(args[0]); } else { config.parse(args); } // 主要是看這個運行過程 runFromConfig(config); }
從上面單機和集羣模式的啓動框架來看,大概流程都是同樣的,都是先把配置文件解析出來,而後再啓動本身的邏輯。另外,在集羣方法中解析出的參數,須要的單機模式從新再解析一次,以作啓動模式的兼容性! apache
集羣使用的是 QuorumPeerConfig 解析,而單機則是使用 ServerConfig 來解析!api
不過單機模式的配置解析仍然委託於集羣方式的解析,以下: ServerConfig.parse();
// org.apache.zookeeper.server.ServerConfig /** * Parse a ZooKeeper configuration file * @param path the patch of the configuration file * @return ServerConfig configured wrt arguments * @throws ConfigException error processing configuration */ public void parse(String path) throws ConfigException { // 直接交由 QuorumPeerConfig 解析 QuorumPeerConfig config = new QuorumPeerConfig(); config.parse(path); // let qpconfig parse the file and then pull the stuff we are // interested in // 而後讀取必要的參數便可, 這便是單機和集羣模式配置的差異 readFrom(config); } /** * Read attributes from a QuorumPeerConfig. * @param config */ public void readFrom(QuorumPeerConfig config) { clientPortAddress = config.getClientPortAddress(); secureClientPortAddress = config.getSecureClientPortAddress(); dataDir = config.getDataDir(); dataLogDir = config.getDataLogDir(); tickTime = config.getTickTime(); maxClientCnxns = config.getMaxClientCnxns(); minSessionTimeout = config.getMinSessionTimeout(); maxSessionTimeout = config.getMaxSessionTimeout(); metricsProviderClassName = config.getMetricsProviderClassName(); metricsProviderConfiguration = config.getMetricsProviderConfiguration(); }
因此,我們仍是簡單看下集羣下配置文件都是怎麼解析的吧!
// org.apache.zookeeper.server.quorum.QuorumPeerConfig /** * Parse a ZooKeeper configuration file * @param path the patch of the configuration file * @throws ConfigException error processing configuration */ public void parse(String path) throws ConfigException { LOG.info("Reading configuration from: " + path); try { // 使用建造者模式,生成配置文件對象 File configFile = (new VerifyingFileFactory.Builder(LOG) .warnForRelativePath() .failForNonExistingPath() .build()).create(path); Properties cfg = new Properties(); FileInputStream in = new FileInputStream(configFile); try { cfg.load(in); configFileStr = path; } finally { in.close(); } // 解析配置屬性到各字段域中,從這裏咱們也能夠看到 zk 支持的全部配置項,以下文所示 parseProperties(cfg); } catch (IOException e) { throw new ConfigException("Error processing " + path, e); } catch (IllegalArgumentException e) { throw new ConfigException("Error processing " + path, e); } if (dynamicConfigFileStr!=null) { try { // 對集羣模式,則初始化集羣配置 Properties dynamicCfg = new Properties(); FileInputStream inConfig = new FileInputStream(dynamicConfigFileStr); try { dynamicCfg.load(inConfig); if (dynamicCfg.getProperty("version") != null) { throw new ConfigException("dynamic file shouldn't have version inside"); } String version = getVersionFromFilename(dynamicConfigFileStr); // If there isn't any version associated with the filename, // the default version is 0. if (version != null) { dynamicCfg.setProperty("version", version); } } finally { inConfig.close(); } setupQuorumPeerConfig(dynamicCfg, false); } catch (IOException e) { throw new ConfigException("Error processing " + dynamicConfigFileStr, e); } catch (IllegalArgumentException e) { throw new ConfigException("Error processing " + dynamicConfigFileStr, e); } File nextDynamicConfigFile = new File(configFileStr + nextDynamicConfigFileSuffix); if (nextDynamicConfigFile.exists()) { try { Properties dynamicConfigNextCfg = new Properties(); FileInputStream inConfigNext = new FileInputStream(nextDynamicConfigFile); try { dynamicConfigNextCfg.load(inConfigNext); } finally { inConfigNext.close(); } boolean isHierarchical = false; for (Entry<Object, Object> entry : dynamicConfigNextCfg.entrySet()) { String key = entry.getKey().toString().trim(); if (key.startsWith("group") || key.startsWith("weight")) { isHierarchical = true; break; } } lastSeenQuorumVerifier = createQuorumVerifier(dynamicConfigNextCfg, isHierarchical); } catch (IOException e) { LOG.warn("NextQuorumVerifier is initiated to null"); } } } } // 對各屬性的解析,從這裏咱們能夠看到zk到底支持幾個屬性配置 /** * Parse config from a Properties. * @param zkProp Properties to parse from. * @throws IOException * @throws ConfigException */ public void parseProperties(Properties zkProp) throws IOException, ConfigException { int clientPort = 0; int secureClientPort = 0; String clientPortAddress = null; String secureClientPortAddress = null; VerifyingFileFactory vff = new VerifyingFileFactory.Builder(LOG).warnForRelativePath().build(); for (Entry<Object, Object> entry : zkProp.entrySet()) { String key = entry.getKey().toString().trim(); String value = entry.getValue().toString().trim(); if (key.equals("dataDir")) { dataDir = vff.create(value); } else if (key.equals("dataLogDir")) { dataLogDir = vff.create(value); } else if (key.equals("clientPort")) { clientPort = Integer.parseInt(value); } else if (key.equals("localSessionsEnabled")) { localSessionsEnabled = Boolean.parseBoolean(value); } else if (key.equals("localSessionsUpgradingEnabled")) { localSessionsUpgradingEnabled = Boolean.parseBoolean(value); } else if (key.equals("clientPortAddress")) { clientPortAddress = value.trim(); } else if (key.equals("secureClientPort")) { secureClientPort = Integer.parseInt(value); } else if (key.equals("secureClientPortAddress")){ secureClientPortAddress = value.trim(); } else if (key.equals("tickTime")) { tickTime = Integer.parseInt(value); } else if (key.equals("maxClientCnxns")) { maxClientCnxns = Integer.parseInt(value); } else if (key.equals("minSessionTimeout")) { minSessionTimeout = Integer.parseInt(value); } else if (key.equals("maxSessionTimeout")) { maxSessionTimeout = Integer.parseInt(value); } else if (key.equals("initLimit")) { initLimit = Integer.parseInt(value); } else if (key.equals("syncLimit")) { syncLimit = Integer.parseInt(value); } else if (key.equals("electionAlg")) { electionAlg = Integer.parseInt(value); if (electionAlg != 1 && electionAlg != 2 && electionAlg != 3) { throw new ConfigException("Invalid electionAlg value. Only 1, 2, 3 are supported."); } } else if (key.equals("quorumListenOnAllIPs")) { quorumListenOnAllIPs = Boolean.parseBoolean(value); } else if (key.equals("peerType")) { if (value.toLowerCase().equals("observer")) { peerType = LearnerType.OBSERVER; } else if (value.toLowerCase().equals("participant")) { peerType = LearnerType.PARTICIPANT; } else { throw new ConfigException("Unrecognised peertype: " + value); } } else if (key.equals( "syncEnabled" )) { syncEnabled = Boolean.parseBoolean(value); } else if (key.equals("dynamicConfigFile")){ dynamicConfigFileStr = value; } else if (key.equals("autopurge.snapRetainCount")) { snapRetainCount = Integer.parseInt(value); } else if (key.equals("autopurge.purgeInterval")) { purgeInterval = Integer.parseInt(value); } else if (key.equals("standaloneEnabled")) { if (value.toLowerCase().equals("true")) { setStandaloneEnabled(true); } else if (value.toLowerCase().equals("false")) { setStandaloneEnabled(false); } else { throw new ConfigException("Invalid option " + value + " for standalone mode. Choose 'true' or 'false.'"); } } else if (key.equals("reconfigEnabled")) { if (value.toLowerCase().equals("true")) { setReconfigEnabled(true); } else if (value.toLowerCase().equals("false")) { setReconfigEnabled(false); } else { throw new ConfigException("Invalid option " + value + " for reconfigEnabled flag. Choose 'true' or 'false.'"); } } else if (key.equals("sslQuorum")){ sslQuorum = Boolean.parseBoolean(value); // TODO: UnifiedServerSocket is currently buggy, will be fixed when @ivmaykov's PRs are merged. Disable port unification until then. // } else if (key.equals("portUnification")){ // shouldUsePortUnification = Boolean.parseBoolean(value); } else if ((key.startsWith("server.") || key.startsWith("group") || key.startsWith("weight")) && zkProp.containsKey("dynamicConfigFile")) { throw new ConfigException("parameter: " + key + " must be in a separate dynamic config file"); } else if (key.equals(QuorumAuth.QUORUM_SASL_AUTH_ENABLED)) { quorumEnableSasl = Boolean.parseBoolean(value); } else if (key.equals(QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED)) { quorumServerRequireSasl = Boolean.parseBoolean(value); } else if (key.equals(QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED)) { quorumLearnerRequireSasl = Boolean.parseBoolean(value); } else if (key.equals(QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT)) { quorumLearnerLoginContext = value; } else if (key.equals(QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT)) { quorumServerLoginContext = value; } else if (key.equals(QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL)) { quorumServicePrincipal = value; } else if (key.equals("quorum.cnxn.threads.size")) { quorumCnxnThreadsSize = Integer.parseInt(value); } else if (key.equals("metricsProvider.className")) { metricsProviderClassName = value; } else if (key.startsWith("metricsProvider.")) { String keyForMetricsProvider = key.substring(16); metricsProviderConfiguration.put(keyForMetricsProvider, value); } else { System.setProperty("zookeeper." + key, value); } } if (!quorumEnableSasl && quorumServerRequireSasl) { throw new IllegalArgumentException( QuorumAuth.QUORUM_SASL_AUTH_ENABLED + " is disabled, so cannot enable " + QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED); } if (!quorumEnableSasl && quorumLearnerRequireSasl) { throw new IllegalArgumentException( QuorumAuth.QUORUM_SASL_AUTH_ENABLED + " is disabled, so cannot enable " + QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED); } // If quorumpeer learner is not auth enabled then self won't be able to // join quorum. So this condition is ensuring that the quorumpeer learner // is also auth enabled while enabling quorum server require sasl. if (!quorumLearnerRequireSasl && quorumServerRequireSasl) { throw new IllegalArgumentException( QuorumAuth.QUORUM_LEARNER_SASL_AUTH_REQUIRED + " is disabled, so cannot enable " + QuorumAuth.QUORUM_SERVER_SASL_AUTH_REQUIRED); } // Reset to MIN_SNAP_RETAIN_COUNT if invalid (less than 3) // PurgeTxnLog.purge(File, File, int) will not allow to purge less // than 3. if (snapRetainCount < MIN_SNAP_RETAIN_COUNT) { LOG.warn("Invalid autopurge.snapRetainCount: " + snapRetainCount + ". Defaulting to " + MIN_SNAP_RETAIN_COUNT); snapRetainCount = MIN_SNAP_RETAIN_COUNT; } if (dataDir == null) { throw new IllegalArgumentException("dataDir is not set"); } if (dataLogDir == null) { dataLogDir = dataDir; } if (clientPort == 0) { LOG.info("clientPort is not set"); if (clientPortAddress != null) { throw new IllegalArgumentException("clientPortAddress is set but clientPort is not set"); } } else if (clientPortAddress != null) { this.clientPortAddress = new InetSocketAddress( InetAddress.getByName(clientPortAddress), clientPort); LOG.info("clientPortAddress is {}", formatInetAddr(this.clientPortAddress)); } else { this.clientPortAddress = new InetSocketAddress(clientPort); LOG.info("clientPortAddress is {}", formatInetAddr(this.clientPortAddress)); } if (secureClientPort == 0) { LOG.info("secureClientPort is not set"); if (secureClientPortAddress != null) { throw new IllegalArgumentException("secureClientPortAddress is set but secureClientPort is not set"); } } else if (secureClientPortAddress != null) { this.secureClientPortAddress = new InetSocketAddress( InetAddress.getByName(secureClientPortAddress), secureClientPort); LOG.info("secureClientPortAddress is {}", formatInetAddr(this.secureClientPortAddress)); } else { this.secureClientPortAddress = new InetSocketAddress(secureClientPort); LOG.info("secureClientPortAddress is {}", formatInetAddr(this.secureClientPortAddress)); } if (this.secureClientPortAddress != null) { configureSSLAuth(); } if (tickTime == 0) { throw new IllegalArgumentException("tickTime is not set"); } minSessionTimeout = minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout; maxSessionTimeout = maxSessionTimeout == -1 ? tickTime * 20 : maxSessionTimeout; if (minSessionTimeout > maxSessionTimeout) { throw new IllegalArgumentException( "minSessionTimeout must not be larger than maxSessionTimeout"); } LOG.info("metricsProvider.className is {}", metricsProviderClassName); try { Class.forName(metricsProviderClassName, false, Thread.currentThread().getContextClassLoader()); } catch (ClassNotFoundException error) { throw new IllegalArgumentException("metrics provider class was not found", error); } // backward compatibility - dynamic configuration in the same file as // static configuration params see writeDynamicConfig() if (dynamicConfigFileStr == null) { // 初始化集羣配置, 好比要求配置格式爲 server.1=172.19.2.2:2181:3181, 不然拋出異常, // 其中 2181 爲提供服務使用的端口, 3181 爲選舉使用的端口號 setupQuorumPeerConfig(zkProp, true); if (isDistributed() && isReconfigEnabled()) { // we don't backup static config for standalone mode. // we also don't backup if reconfig feature is disabled. backupOldConfig(); } } } /** * Parse dynamic configuration file and return * quorumVerifier for new configuration. * @param dynamicConfigProp Properties to parse from. * @throws IOException * @throws ConfigException */ public static QuorumVerifier parseDynamicConfig(Properties dynamicConfigProp, int eAlg, boolean warnings, boolean configBackwardCompatibilityMode) throws IOException, ConfigException { boolean isHierarchical = false; for (Entry<Object, Object> entry : dynamicConfigProp.entrySet()) { String key = entry.getKey().toString().trim(); if (key.startsWith("group") || key.startsWith("weight")) { isHierarchical = true; } else if (!configBackwardCompatibilityMode && !key.startsWith("server.") && !key.equals("version")){ LOG.info(dynamicConfigProp.toString()); throw new ConfigException("Unrecognised parameter: " + key); } } QuorumVerifier qv = createQuorumVerifier(dynamicConfigProp, isHierarchical); int numParticipators = qv.getVotingMembers().size(); int numObservers = qv.getObservingMembers().size(); if (numParticipators == 0) { if (!standaloneEnabled) { throw new IllegalArgumentException("standaloneEnabled = false then " + "number of participants should be >0"); } if (numObservers > 0) { throw new IllegalArgumentException("Observers w.o participants is an invalid configuration"); } } else if (numParticipators == 1 && standaloneEnabled) { // HBase currently adds a single server line to the config, for // b.w compatibility reasons we need to keep this here. If standaloneEnabled // is true, the QuorumPeerMain script will create a standalone server instead // of a quorum configuration // 若是隻有一個server, 可是又要成爲 集羣選舉模式,則是錯誤的配置 LOG.error("Invalid configuration, only one server specified (ignoring)"); if (numObservers > 0) { throw new IllegalArgumentException("Observers w.o quorum is an invalid configuration"); } } else { if (warnings) { if (numParticipators <= 2) { LOG.warn("No server failure will be tolerated. " + "You need at least 3 servers."); } else if (numParticipators % 2 == 0) { LOG.warn("Non-optimial configuration, consider an odd number of servers."); } } for (QuorumServer s : qv.getVotingMembers().values()) { if (s.electionAddr == null) throw new IllegalArgumentException( "Missing election port for server: " + s.id); } } return qv; } private static QuorumVerifier createQuorumVerifier(Properties dynamicConfigProp, boolean isHierarchical) throws ConfigException{ if(isHierarchical){ return new QuorumHierarchical(dynamicConfigProp); } else { /* * The default QuorumVerifier is QuorumMaj */ //LOG.info("Defaulting to majority quorums"); return new QuorumMaj(dynamicConfigProp); } } public QuorumMaj(Properties props) throws ConfigException { for (Entry<Object, Object> entry : props.entrySet()) { String key = entry.getKey().toString(); String value = entry.getValue().toString(); if (key.startsWith("server.")) { int dot = key.indexOf('.'); long sid = Long.parseLong(key.substring(dot + 1)); QuorumServer qs = new QuorumServer(sid, value); allMembers.put(Long.valueOf(sid), qs); if (qs.type == LearnerType.PARTICIPANT) // 把本身加入投票者名單中 votingMembers.put(Long.valueOf(sid), qs); else { observingMembers.put(Long.valueOf(sid), qs); } } else if (key.equals("version")) { version = Long.parseLong(value, 16); } } // 最後,計算出半數的投票人數,超過半數後,選舉將成立 half = votingMembers.size() / 2; } // org.apache.zookeeper.server.quorum.QuorumPeer$QuorumServer public QuorumServer(long sid, String addressStr) throws ConfigException { // LOG.warn("sid = " + sid + " addressStr = " + addressStr); this.id = sid; String serverClientParts[] = addressStr.split(";"); String serverParts[] = ConfigUtils.getHostAndPort(serverClientParts[0]); if ((serverClientParts.length > 2) || (serverParts.length < 3) || (serverParts.length > 4)) { throw new ConfigException(addressStr + wrongFormat); } if (serverClientParts.length == 2) { //LOG.warn("ClientParts: " + serverClientParts[1]); String clientParts[] = ConfigUtils.getHostAndPort(serverClientParts[1]); if (clientParts.length > 2) { throw new ConfigException(addressStr + wrongFormat); } // is client_config a host:port or just a port hostname = (clientParts.length == 2) ? clientParts[0] : "0.0.0.0"; try { clientAddr = new InetSocketAddress(hostname, Integer.parseInt(clientParts[clientParts.length - 1])); //LOG.warn("Set clientAddr to " + clientAddr); } catch (NumberFormatException e) { throw new ConfigException("Address unresolved: " + hostname + ":" + clientParts[clientParts.length - 1]); } } // server_config should be either host:port:port or host:port:port:type try { addr = new InetSocketAddress(serverParts[0], Integer.parseInt(serverParts[1])); } catch (NumberFormatException e) { throw new ConfigException("Address unresolved: " + serverParts[0] + ":" + serverParts[1]); } try { electionAddr = new InetSocketAddress(serverParts[0], Integer.parseInt(serverParts[2])); } catch (NumberFormatException e) { throw new ConfigException("Address unresolved: " + serverParts[0] + ":" + serverParts[2]); } if(addr.getPort() == electionAddr.getPort()) { throw new ConfigException( "Client and election port must be different! Please update the configuration file on server." + sid); } if (serverParts.length == 4) { setType(serverParts[3]); } this.hostname = serverParts[0]; setMyAddrs(); } private void setMyAddrs() { this.myAddrs = new ArrayList<InetSocketAddress>(); this.myAddrs.add(this.addr); this.myAddrs.add(this.clientAddr); this.myAddrs.add(this.electionAddr); // 把類型於 127.0.0.1 這樣的特殊地址排除 this.myAddrs = excludedSpecialAddresses(this.myAddrs); } // org.apache.zookeeper.server.quorum.QuorumPeerConfig void setupQuorumPeerConfig(Properties prop, boolean configBackwardCompatibilityMode) throws IOException, ConfigException { quorumVerifier = parseDynamicConfig(prop, electionAlg, true, configBackwardCompatibilityMode); // 檢測 myid 文件是否存在,不存在則報錯 setupMyId(); // 設置端口號 setupClientPort(); // 設置節點類型: PARTICIPANT, OBSERVER setupPeerType(); checkValidity(); }
配置解析好後,就運行zk的server邏輯了!此處以單機模式爲例進行分解!
// org.apache.zookeeper.server.ZooKeeperServerMain /** * Run from a ServerConfig. * @param config ServerConfig to use. * @throws IOException * @throws AdminServerException */ public void runFromConfig(ServerConfig config) throws IOException, AdminServerException { LOG.info("Starting server"); FileTxnSnapLog txnLog = null; try { try { // 先啓動度量程序 metricsProvider = MetricsProviderBootstrap .startMetricsProvider(config.getMetricsProviderClassName(), config.getMetricsProviderConfiguration()); } catch (MetricsProviderLifeCycleException error) { throw new IOException("Cannot boot MetricsProvider "+config.getMetricsProviderClassName(), error); } // Note that this thread isn't going to be doing anything else, // so rather than spawning another thread, we will just call // run() in this thread. // create a file logger url from the command line args // 建立各類日誌文件,並校驗有效性 txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir); // 建立 ZooKeeperServer, zk 正式啓動, zkDb 設置爲 null final ZooKeeperServer zkServer = new ZooKeeperServer(txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null); zkServer.setRootMetricsContext(metricsProvider.getRootContext()); txnLog.setServerStats(zkServer.serverStats()); // Registers shutdown handler which will be used to know the // server error or shutdown state changes. // 關閉的閉鎖,註冊關閉鉤子 final CountDownLatch shutdownLatch = new CountDownLatch(1); zkServer.registerServerShutdownHandler( new ZooKeeperServerShutdownHandler(shutdownLatch)); // Start Admin server // 建立一個 jettyServer 的實例, 後臺管理控制檯 adminServer = AdminServerFactory.createAdminServer(); adminServer.setZooKeeperServer(zkServer); adminServer.start(); boolean needStartZKServer = true; if (config.getClientPortAddress() != null) { // 建立 cnxn , 默認爲 NIOServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); // 配置 server, 進行權限驗證 cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false); cnxnFactory.startup(zkServer); // zkServer has been started. So we don't need to start it again in secureCnxnFactory. needStartZKServer = false; } if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true); secureCnxnFactory.startup(zkServer, needStartZKServer); } containerManager = new ContainerManager(zkServer.getZKDatabase(), zkServer.firstProcessor, Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)), Integer.getInteger("znode.container.maxPerMinute", 10000) ); containerManager.start(); // Watch status of ZooKeeper server. It will do a graceful shutdown // if the server is not running or hits an internal error. // 阻塞在此進行等等關閉信號, 至此,則 server 已徹底啓動 shutdownLatch.await(); // 優雅停機處理 shutdown(); if (cnxnFactory != null) { cnxnFactory.join(); } if (secureCnxnFactory != null) { secureCnxnFactory.join(); } if (zkServer.canShutdown()) { zkServer.shutdown(true); } } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Server interrupted", e); } finally { if (txnLog != null) { txnLog.close(); } if (metricsProvider != null) { try { metricsProvider.stop(); } catch (Throwable error) { LOG.warn("Error while stopping metrics", error); } } } }
如上,啓動主要分做幾步:
1. 開啓度量程序,監控指標;
2. 使用 FileTxnSnapLog, 啓動時從磁盤或其餘地方恢復數據;
3. 建立 ZooKeeperServer, 待用;
4. 註冊關閉鉤子 ZooKeeperServerShutdownHandler ;
5. 啓動後臺管理程序 AdminServerFactory ;
6. 啓動 zkServer;
7. 啓動 ContainerManager;
8. 服務啓動, 阻塞等待關閉信號;
如上,也算是基本的服務端程序的動做流程了!
附一個判斷是不是集羣的實現方式:
// 檢測是不是集羣模式 public boolean isDistributed() { return quorumVerifier!=null && (!standaloneEnabled || quorumVerifier.getVotingMembers().size() > 1); }
接下來,咱們先來看看ZkServer 的初始化!
// org.apache.zookeeper.server.ZooKeeperServer /** * Creates a ZooKeeperServer instance. It sets everything up, but doesn't * actually start listening for clients until run() is invoked. * * @param dataDir the directory to put the data */ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, ZKDatabase zkDb) { serverStats = new ServerStats(this); this.txnLogFactory = txnLogFactory; this.txnLogFactory.setServerStats(this.serverStats); // zkDb 爲 zk 保存數據最重要的實例,可是此爲 null this.zkDb = zkDb; this.tickTime = tickTime; // 默認最小超時: 4000, 最大超時: 40000; setMinSessionTimeout(minSessionTimeout); setMaxSessionTimeout(maxSessionTimeout); // 開啓一個監聽,主要做用是在 listener = new ZooKeeperServerListenerImpl(this); LOG.info("Created server with tickTime " + tickTime + " minSessionTimeout " + getMinSessionTimeout() + " maxSessionTimeout " + getMaxSessionTimeout() + " datadir " + txnLogFactory.getDataDir() + " snapdir " + txnLogFactory.getSnapDir()); } // 其中, server 的 state 爲4種, 第種狀態表明其所處的生命週期 protected enum State { INITIAL, RUNNING, SHUTDOWN, ERROR } // org.apache.zookeeper.server.NIOServerCnxnFactory, 配置 server @Override public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException { if (secure) { throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn"); } // 若是有必要的話,會進行登陸權限驗證, 有時候由於 zk 鏈接不通,則可能報 SASL 沒有權限的錯誤 configureSaslLogin(); // 最大鏈接數,默認爲 60 maxClientCnxns = maxcc; sessionlessCnxnTimeout = Integer.getInteger( ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000); // We also use the sessionlessCnxnTimeout as expiring interval for // cnxnExpiryQueue. These don't need to be the same, but the expiring // interval passed into the ExpiryQueue() constructor below should be // less than or equal to the timeout. cnxnExpiryQueue = new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout); // 過時隊列清理線程,後面細看 expirerThread = new ConnectionExpirerThread(); int numCores = Runtime.getRuntime().availableProcessors(); // 32 cores sweet spot seems to be 4 selector threads numSelectorThreads = Integer.getInteger( ZOOKEEPER_NIO_NUM_SELECTOR_THREADS, Math.max((int) Math.sqrt((float) numCores/2), 1)); if (numSelectorThreads < 1) { throw new IOException("numSelectorThreads must be at least 1"); } // worker 線程數,默認爲 cpu數的兩倍 numWorkerThreads = Integer.getInteger( ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores); workerShutdownTimeoutMS = Long.getLong( ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000); LOG.info("Configuring NIO connection handler with " + (sessionlessCnxnTimeout/1000) + "s sessionless connection" + " timeout, " + numSelectorThreads + " selector thread(s), " + (numWorkerThreads > 0 ? numWorkerThreads : "no") + " worker threads, and " + (directBufferBytes == 0 ? "gathered writes." : ("" + (directBufferBytes/1024) + " kB direct buffers."))); // 將指定數量的selector線程添加到集合中,以便在 acceptThread 中開啓 for(int i=0; i<numSelectorThreads; ++i) { selectorThreads.add(new SelectorThread(i)); } // 打開一個 nio 鏈接, 斷定socket 端口,至此,外部語法就能夠進來了,可是,尚未任何的處理程序 this.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); LOG.info("binding to port " + addr); ss.socket().bind(addr); ss.configureBlocking(false); // 將鏈接信息傳入內部類 AcceptThread 線程中,後綴的鏈接操做將直接由 AcceptThread 處理; acceptThread = new AcceptThread(ss, addr, selectorThreads); } // org.apache.zookeeper.server.NIOServerCnxnFactory$AcceptThread public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException { super("NIOServerCxnFactory.AcceptThread:" + addr); this.acceptSocket = ss; this.acceptKey = acceptSocket.register(selector, SelectionKey.OP_ACCEPT); this.selectorThreads = Collections.unmodifiableList( new ArrayList<SelectorThread>(selectorThreads)); selectorIterator = this.selectorThreads.iterator(); } // org.apache.zookeeper.server.ServerCnxnFactory /** * Initialize the server SASL if specified. * * If the user has specified a "ZooKeeperServer.LOGIN_CONTEXT_NAME_KEY" * or a jaas.conf using "java.security.auth.login.config" * the authentication is required and an exception is raised. * Otherwise no authentication is configured and no exception is raised. * * @throws IOException if jaas.conf is missing or there's an error in it. */ protected void configureSaslLogin() throws IOException { String serverSection = System.getProperty(ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY, ZooKeeperSaslServer.DEFAULT_LOGIN_CONTEXT_NAME); // Note that 'Configuration' here refers to javax.security.auth.login.Configuration. AppConfigurationEntry entries[] = null; SecurityException securityException = null; try { entries = Configuration.getConfiguration().getAppConfigurationEntry(serverSection); } catch (SecurityException e) { // handle below: might be harmless if the user doesn't intend to use JAAS authentication. securityException = e; } // No entries in jaas.conf // If there's a configuration exception fetching the jaas section and // the user has required sasl by specifying a LOGIN_CONTEXT_NAME_KEY or a jaas file // we throw an exception otherwise we continue without authentication. if (entries == null) { String jaasFile = System.getProperty(Environment.JAAS_CONF_KEY); String loginContextName = System.getProperty(ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY); if (securityException != null && (loginContextName != null || jaasFile != null)) { String errorMessage = "No JAAS configuration section named '" + serverSection + "' was found"; if (jaasFile != null) { errorMessage += "in '" + jaasFile + "'."; } if (loginContextName != null) { errorMessage += " But " + ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY + " was set."; } LOG.error(errorMessage); throw new IOException(errorMessage); } return; } // jaas.conf entry available try { saslServerCallbackHandler = new SaslServerCallbackHandler(Configuration.getConfiguration()); login = new Login(serverSection, saslServerCallbackHandler, new ZKConfig() ); login.startThreadIfNeeded(); } catch (LoginException e) { throw new IOException("Could not configure server because SASL configuration did not allow the " + " ZooKeeper server to authenticate itself properly: " + e); } }
接下來是對鏈接端的啓動: cnxnFactory.startup(zkServer);
// org.apache.zookeeper.server.ServerCnxnFactory public void startup(ZooKeeperServer zkServer) throws IOException, InterruptedException { startup(zkServer, true); } @Override public void startup(ZooKeeperServer zks, boolean startServer) throws IOException, InterruptedException { // 調用 NioServer 的start() start(); // 綁定server 工廠類 setZooKeeperServer(zks); if (startServer) { // 從磁盤加載初始化數據 zks.startdata(); // 啓動 zkServer zks.startup(); } } // org.apache.zookeeper.server.NIOServerCnxnFactory // 開啓各類線程, acceptor, selector, expirerThread... @Override public void start() { stopped = false; if (workerPool == null) { // 自定義實現的線程池, 其底層也是啓用 Executors 工廠類,去生成 ThreadPoolExecutor // 該 線程程是自啓動的 workerPool = new WorkerService( "NIOWorker", numWorkerThreads, false); } for(SelectorThread thread : selectorThreads) { // 沒有啓動的線程就讓它啓動, 而 SelectorThread 內部則是由兩個關鍵隊列組成 if (thread.getState() == Thread.State.NEW) { thread.start(); } } // ensure thread is started once and only once if (acceptThread.getState() == Thread.State.NEW) { acceptThread.start(); } if (expirerThread.getState() == Thread.State.NEW) { expirerThread.start(); } } // org.apache.zookeeper.server.WorkerService , worker 線程池的實現方式一覽: FixedThreadPool() public WorkerService(String name, int numThreads, boolean useAssignableThreads) { this.threadNamePrefix = (name == null ? "" : name) + "Thread"; this.numWorkerThreads = numThreads; this.threadsAreAssignable = useAssignableThreads; // 構造好後,直接啓動自身, 實際上是初始化好線程池 start(); } public void start() { if (numWorkerThreads > 0) { if (threadsAreAssignable) { for(int i = 1; i <= numWorkerThreads; ++i) { // worker 是基於 fixed 線程池的 workers.add(Executors.newFixedThreadPool( 1, new DaemonThreadFactory(threadNamePrefix, i))); } } else { workers.add(Executors.newFixedThreadPool( numWorkerThreads, new DaemonThreadFactory(threadNamePrefix))); } } stopped = false; } // SelectorThread 是一個 org.apache.zookeeper.server.NIOServerCnxnFactory, 的內部類, 主要維護 acceptedQueue和updateQueue 兩個隊列 public SelectorThread(int id) throws IOException { super("NIOServerCxnFactory.SelectorThread-" + id); this.id = id; acceptedQueue = new LinkedBlockingQueue<SocketChannel>(); updateQueue = new LinkedBlockingQueue<SelectionKey>(); }
如上 zkServer 的 start() 過程,其實就是多種線程/線程池的開啓過程!全部的 zkServer 的服務也是由這些線程來操做的!主要的操做流程爲:
1. selectorThreads 接收外部請求, 放入 acceptedQueue 中;
2. 由 selector 構造 IOWorkRequest 放入 workerPool 中,進行稍後調試處理;
3. 由 workerPool 調度 IOWorkRequest.dowork() 方法進行處理;
4. ....
zkServer 啓動起來以後,就會先的磁盤或者其餘地方同步初始化數據;
zks.startdata(); 加載磁盤數據; 即初始化 zkDb, 整個運行的數據都是保存在該數據結構中!
// 咱們先看下 FileTxnSnapLog 的構造器,其實作了不少事,如檢查目錄權限,建立目錄等, 這將爲後續的數據恢復打下基礎 // org.apache.zookeeper.server.persistence.FileTxnSnapLog /** * the constructor which takes the datadir and * snapdir. * @param dataDir the transaction directory * @param snapDir the snapshot directory */ public FileTxnSnapLog(File dataDir, File snapDir) throws IOException { LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir); this.dataDir = new File(dataDir, version + VERSION); this.snapDir = new File(snapDir, version + VERSION); // by default create snap.log dirs, but otherwise complain instead // See ZOOKEEPER-1161 for more details boolean enableAutocreate = Boolean.valueOf( System.getProperty(ZOOKEEPER_DATADIR_AUTOCREATE, ZOOKEEPER_DATADIR_AUTOCREATE_DEFAULT)); if (!this.dataDir.exists()) { if (!enableAutocreate) { throw new DatadirException("Missing data directory " + this.dataDir + ", automatic data directory creation is disabled (" + ZOOKEEPER_DATADIR_AUTOCREATE + " is false). Please create this directory manually."); } if (!this.dataDir.mkdirs()) { throw new DatadirException("Unable to create data directory " + this.dataDir); } } if (!this.dataDir.canWrite()) { throw new DatadirException("Cannot write to data directory " + this.dataDir); } if (!this.snapDir.exists()) { // by default create this directory, but otherwise complain instead // See ZOOKEEPER-1161 for more details if (!enableAutocreate) { throw new DatadirException("Missing snap directory " + this.snapDir + ", automatic data directory creation is disabled (" + ZOOKEEPER_DATADIR_AUTOCREATE + " is false). Please create this directory manually."); } if (!this.snapDir.mkdirs()) { throw new DatadirException("Unable to create snap directory " + this.snapDir); } } if (!this.snapDir.canWrite()) { throw new DatadirException("Cannot write to snap directory " + this.snapDir); } // check content of transaction log and snapshot dirs if they are two different directories // See ZOOKEEPER-2967 for more details if(!this.dataDir.getPath().equals(this.snapDir.getPath())){ checkLogDir(); checkSnapDir(); } txnLog = new FileTxnLog(this.dataDir); snapLog = new FileSnap(this.snapDir); autoCreateDB = Boolean.parseBoolean(System.getProperty(ZOOKEEPER_DB_AUTOCREATE, ZOOKEEPER_DB_AUTOCREATE_DEFAULT)); } // org.apache.zookeeper.server.ZooKeeperServer, 這裏是開始初始化 zkDb public void startdata() throws IOException, InterruptedException { //check to see if zkDb is not null // zkDb 主要使用 鏈表 和 map 來保存數據, DataTree() 保存數據, 而 DataTree 中又以 ConcurrentHashMap() 做爲存儲方式 // 我們先重點看一下 if (zkDb == null) { zkDb = new ZKDatabase(this.txnLogFactory); } // 未初始化過 zkDb 則從磁盤加載一次數據 if (!zkDb.isInitialized()) { loadData(); } } // org.apache.zookeeper.server.ZKDatabase, 最重要的一個數據結構之一 /** * the filetxnsnaplog that this zk database * maps to. There is a one to one relationship * between a filetxnsnaplog and zkdatabase. * @param snapLog the FileTxnSnapLog mapping this zkdatabase */ public ZKDatabase(FileTxnSnapLog snapLog) { // 使用 DataTree() dataTree = createDataTree(); // 有過時時間的 session 使用 ConcurrentHashMap 保存, zxid -> timeout sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>(); this.snapLog = snapLog; try { snapshotSizeFactor = Double.parseDouble( System.getProperty(SNAPSHOT_SIZE_FACTOR, Double.toString(DEFAULT_SNAPSHOT_SIZE_FACTOR))); if (snapshotSizeFactor > 1) { snapshotSizeFactor = DEFAULT_SNAPSHOT_SIZE_FACTOR; LOG.warn("The configured {} is invalid, going to use " + "the default {}", SNAPSHOT_SIZE_FACTOR, DEFAULT_SNAPSHOT_SIZE_FACTOR); } } catch (NumberFormatException e) { LOG.error("Error parsing {}, using default value {}", SNAPSHOT_SIZE_FACTOR, DEFAULT_SNAPSHOT_SIZE_FACTOR); snapshotSizeFactor = DEFAULT_SNAPSHOT_SIZE_FACTOR; } LOG.info("{} = {}", SNAPSHOT_SIZE_FACTOR, snapshotSizeFactor); } // org.apache.zookeeper.server.DataTree 構造函數以下 public DataTree() { /* Rather than fight it, let root have an alias */ nodes.put("", root); nodes.put(rootZookeeper, root); /** add the proc node and quota node */ root.addChild(procChildZookeeper); nodes.put(procZookeeper, procDataNode); procDataNode.addChild(quotaChildZookeeper); nodes.put(quotaZookeeper, quotaDataNode); addConfigNode(); nodeDataSize.set(approximateDataSize()); try { // 建立 watchManager, 方便 watch dataWatches = WatchManagerFactory.createWatchManager(); childWatches = WatchManagerFactory.createWatchManager(); } catch (Exception e) { LOG.error("Unexpected exception when creating WatchManager, " + "exiting abnormally", e); System.exit(ExitCode.UNEXPECTED_ERROR.getValue()); } } // org.apache.zookeeper.server.watch.WatchManagerFactory, 儘可能支持動態設置, 默認爲 WatchManager ; public static IWatchManager createWatchManager() throws IOException { String watchManagerName = System.getProperty(ZOOKEEPER_WATCH_MANAGER_NAME); if (watchManagerName == null) { watchManagerName = WatchManager.class.getName(); } try { IWatchManager watchManager = (IWatchManager) Class.forName(watchManagerName).newInstance(); LOG.info("Using {} as watch manager", watchManagerName); return watchManager; } catch (Exception e) { IOException ioe = new IOException("Couldn't instantiate " + watchManagerName); ioe.initCause(e); throw ioe; } } // org.apache.zookeeper.server.ZooKeeperServer // 從磁盤加載初始化數據, /** * Restore sessions and data */ public void loadData() throws IOException, InterruptedException { /* * When a new leader starts executing Leader#lead, it * invokes this method. The database, however, has been * initialized before running leader election so that * the server could pick its zxid for its initial vote. * It does it by invoking QuorumPeer#getLastLoggedZxid. * Consequently, we don't need to initialize it once more * and avoid the penalty of loading it a second time. Not * reloading it is particularly important for applications * that host a large database. * * The following if block checks whether the database has * been initialized or not. Note that this method is * invoked by at least one other method: * ZooKeeperServer#startdata. * * See ZOOKEEPER-1642 for more detail. */ if(zkDb.isInitialized()){ setZxid(zkDb.getDataTreeLastProcessedZxid()); } else { // 首次加載 DataBase, 會從新 loadDataBase(), 並獲取最大 zxid setZxid(zkDb.loadDataBase()); } // Clean up dead sessions List<Long> deadSessions = new LinkedList<Long>(); for (Long session : zkDb.getSessions()) { if (zkDb.getSessionWithTimeOuts().get(session) == null) { deadSessions.add(session); } } for (long session : deadSessions) { // XXX: Is lastProcessedZxid really the best thing to use? killSession(session, zkDb.getDataTreeLastProcessedZxid()); } // Make a clean snapshot takeSnapshot(); } // org.apache.zookeeper.server.ZKDatabase /** * load the database from the disk onto memory and also add * the transactions to the committedlog in memory. * @return the last valid zxid on disk * @throws IOException */ public long loadDataBase() throws IOException { long startTime = Time.currentElapsedTime(); // 使用 snapLog 進行數據的恢復 long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener); initialized = true; long loadTime = Time.currentElapsedTime() - startTime; ServerMetrics.DB_INIT_TIME.add(loadTime); LOG.info("Snapshot loaded in " + loadTime + " ms"); return zxid; } // org.apache.zookeeper.server.persistence.FileSnap /** * this function restores the server * database after reading from the * snapshots and transaction logs * @param dt the datatree to be restored * @param sessions the sessions to be restored * @param listener the playback listener to run on the * database restoration * @return the highest zxid restored * @throws IOException */ public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { long deserializeResult = snapLog.deserialize(dt, sessions); FileTxnLog txnLog = new FileTxnLog(dataDir); boolean trustEmptyDB; // 若是存在初始化文件標識存在,則刪除 File initFile = new File(dataDir.getParent(), "initialize"); if (Files.deleteIfExists(initFile.toPath())) { LOG.info("Initialize file found, an empty database will not block voting participation"); trustEmptyDB = true; } else { trustEmptyDB = autoCreateDB; } if (-1L == deserializeResult) { /* this means that we couldn't find any snapshot, so we need to * initialize an empty database (reported in ZOOKEEPER-2325) */ if (txnLog.getLastLoggedZxid() != -1) { throw new IOException( "No snapshot found, but there are log entries. " + "Something is broken!"); } if (trustEmptyDB) { /* TODO: (br33d) we should either put a ConcurrentHashMap on restore() * or use Map on save() */ save(dt, (ConcurrentHashMap<Long, Integer>)sessions, false); /* return a zxid of 0, since we know the database is empty */ return 0L; } else { /* return a zxid of -1, since we are possibly missing data */ LOG.warn("Unexpected empty data tree, setting zxid to -1"); dt.lastProcessedZxid = -1L; return -1L; } } // 從副本中恢復數據 return fastForwardFromEdits(dt, sessions, listener); } /** * deserialize a data tree from the most recent snapshot * @return the zxid of the snapshot */ public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException { // we run through 100 snapshots (not all of them) // if we cannot get it running within 100 snapshots // we should give up List<File> snapList = findNValidSnapshots(100); if (snapList.size() == 0) { return -1L; } File snap = null; boolean foundValid = false; for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) { snap = snapList.get(i); LOG.info("Reading snapshot " + snap); try (InputStream snapIS = new BufferedInputStream(new FileInputStream(snap)); CheckedInputStream crcIn = new CheckedInputStream(snapIS, new Adler32())) { InputArchive ia = BinaryInputArchive.getArchive(crcIn); deserialize(dt, sessions, ia); long checkSum = crcIn.getChecksum().getValue(); long val = ia.readLong("val"); // 檢查數據checksum,確認是否有被損壞 if (val != checkSum) { throw new IOException("CRC corruption in snapshot : " + snap); } foundValid = true; break; } catch (IOException e) { LOG.warn("problem reading snap file " + snap, e); } } if (!foundValid) { throw new IOException("Not able to find valid snapshots in " + snapDir); } // 快照便是帶了 zxid 的, 因此 以名字就能夠解析出最大的 zxid dt.lastProcessedZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX); return dt.lastProcessedZxid; } /** * This function will fast forward the server database to have the latest * transactions in it. This is the same as restore, but only reads from * the transaction logs and not restores from a snapshot. * @param dt the datatree to write transactions to. * @param sessions the sessions to be restored. * @param listener the playback listener to run on the * database transactions. * @return the highest zxid restored. * @throws IOException */ public long fastForwardFromEdits(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException { TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1); long highestZxid = dt.lastProcessedZxid; TxnHeader hdr; try { while (true) { // iterator points to // the first valid txn when initialized hdr = itr.getHeader(); if (hdr == null) { //empty logs return dt.lastProcessedZxid; } // 只處理最後幾個連續未被提交的事務數據 if (hdr.getZxid() < highestZxid && highestZxid != 0) { LOG.error("{}(highestZxid) > {}(next log) for type {}", highestZxid, hdr.getZxid(), hdr.getType()); } else { highestZxid = hdr.getZxid(); } try { processTransaction(hdr,dt,sessions, itr.getTxn()); } catch(KeeperException.NoNodeException e) { throw new IOException("Failed to process transaction type: " + hdr.getType() + " error: " + e.getMessage(), e); } listener.onTxnLoaded(hdr, itr.getTxn()); if (!itr.next()) break; } } finally { if (itr != null) { itr.close(); } } return highestZxid; } // org.apache.zookeeper.server.ZKDatabase // 反序列化時,會調用 DataTree.deserialize() 方法; public void deserialize(InputArchive ia, String tag) throws IOException { aclCache.deserialize(ia); nodes.clear(); pTrie.clear(); nodeDataSize.set(0); String path = ia.readString("path"); while (!"/".equals(path)) { // 數據一條條讀入 node 中 DataNode node = new DataNode(); ia.readRecord(node, "node"); nodes.put(path, node); synchronized (node) { aclCache.addUsage(node.acl); } int lastSlash = path.lastIndexOf('/'); if (lastSlash == -1) { root = node; } else { String parentPath = path.substring(0, lastSlash); DataNode parent = nodes.get(parentPath); if (parent == null) { throw new IOException("Invalid Datatree, unable to find " + "parent " + parentPath + " of path " + path); } parent.addChild(path.substring(lastSlash + 1)); long eowner = node.stat.getEphemeralOwner(); EphemeralType ephemeralType = EphemeralType.get(eowner); if (ephemeralType == EphemeralType.CONTAINER) { containers.add(path); } else if (ephemeralType == EphemeralType.TTL) { ttls.add(path); } else if (eowner != 0) { HashSet<String> list = ephemerals.get(eowner); if (list == null) { list = new HashSet<String>(); ephemerals.put(eowner, list); } list.add(path); } } path = ia.readString("path"); } // 最後,放入 / 根節點 nodes.put("/", root); nodeDataSize.set(approximateDataSize()); // we are done with deserializing the // the datatree // update the quotas - create path trie // and also update the stat nodes // 更新集羣節點信息,若有必要的話 setupQuota(); // 清除無用數據, 即 refers 中 小於0 的節點數據 aclCache.purgeUnused(); } /** * this method sets up the path trie and sets up stats for quota nodes */ private void setupQuota() { String quotaPath = Quotas.quotaZookeeper; DataNode node = getNode(quotaPath); if (node == null) { return; } traverseNode(quotaPath); } // 從磁盤加載完數據後,當即作一次新的快照 // org.apache.zookeeper.server.persistence.FileTxnSnapLog public void takeSnapshot() { takeSnapshot(false); } public void takeSnapshot(boolean syncSnap){ long start = Time.currentElapsedTime(); try { txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap); } catch (IOException e) { LOG.error("Severe unrecoverable error, exiting", e); // This is a severe error that we cannot recover from, // so we need to exit System.exit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue()); } long elapsed = Time.currentElapsedTime() - start; LOG.info("Snapshot taken in " + elapsed + " ms"); ServerMetrics.SNAPSHOT_TIME.add(elapsed); } /** * save the datatree and the sessions into a snapshot * @param dataTree the datatree to be serialized onto disk * @param sessionsWithTimeouts the session timeouts to be * serialized onto disk * @param syncSnap sync the snapshot immediately after write * @throws IOException */ public void save(DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts, boolean syncSnap) throws IOException { long lastZxid = dataTree.lastProcessedZxid; // 快照的命名方式就是前綴+zxid // FileSnap.SNAPSHOT_FILE_PREFIX + "." + Long.toHexString(zxid); File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid)); LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid), snapshotFile); try { // 按照必定規則序列化後存儲,在讀取時反向操做便可,此處爲同步操做 snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap); } catch (IOException e) { if (snapshotFile.length() == 0) { /* This may be caused by a full disk. In such a case, the server * will get stuck in a loop where it tries to write a snapshot * out to disk, and ends up creating an empty file instead. * Doing so will eventually result in valid snapshots being * removed during cleanup. */ if (snapshotFile.delete()) { LOG.info("Deleted empty snapshot file: " + snapshotFile.getAbsolutePath()); } else { LOG.warn("Could not delete empty snapshot file: " + snapshotFile.getAbsolutePath()); } } else { /* Something else went wrong when writing the snapshot out to * disk. If this snapshot file is invalid, when restarting, * ZooKeeper will skip it, and find the last known good snapshot * instead. */ } throw e; } } // org.apache.zookeeper.server.persistence.FileSnap /** * serialize the datatree and session into the file snapshot * @param dt the datatree to be serialized * @param sessions the sessions to be serialized * @param snapShot the file to store snapshot into * @param fsync sync the file immediately after write */ public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot, boolean fsync) throws IOException { if (!close) { try (CheckedOutputStream crcOut = new CheckedOutputStream(new BufferedOutputStream(fsync ? new AtomicFileOutputStream(snapShot) : new FileOutputStream(snapShot)), new Adler32())) { //CheckedOutputStream cout = new CheckedOutputStream() OutputArchive oa = BinaryOutputArchive.getArchive(crcOut); FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId); serialize(dt, sessions, oa, header); long val = crcOut.getChecksum().getValue(); oa.writeLong(val, "val"); oa.writeString("/", "path"); crcOut.flush(); } } }
如上,loaddata(); 就算完成了!接下就是真正的啓動了!
zks.startup();
// org.apache.zookeeper.server.ZooKeeperServer public synchronized void startup() { if (sessionTracker == null) { // 首先建立一個 sessionTracker, 它是一個異步線程,主要處理session的過時處理問題 createSessionTracker(); } // 開啓處理線程 startSessionTracker(); // 設置處理器鏈,相當重要 setupRequestProcessors(); // 註冊 JMX registerJMX(); // 最後標識啓動完成,運行中 setState(State.RUNNING); // 喚醒被阻塞的全部對象 notifyAll(); } // org.apache.zookeeper.server.SessionTrackerImpl public SessionTrackerImpl(SessionExpirer expirer, ConcurrentMap<Long, Integer> sessionsWithTimeout, int tickTime, long serverId, ZooKeeperServerListener listener) { super("SessionTracker", listener); this.expirer = expirer; this.sessionExpiryQueue = new ExpiryQueue<SessionImpl>(tickTime); this.sessionsWithTimeout = sessionsWithTimeout; this.nextSessionId.set(initializeNextSession(serverId)); for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) { trackSession(e.getKey(), e.getValue()); } EphemeralType.validateServerId(serverId); } @Override public void run() { try { while (running) { long waitTime = sessionExpiryQueue.getWaitTime(); if (waitTime > 0) { Thread.sleep(waitTime); continue; } // 主要任務就是將過時的session 關閉掉 for (SessionImpl s : sessionExpiryQueue.poll()) { setSessionClosing(s.sessionId); expirer.expire(s); } } } catch (InterruptedException e) { handleException(this.getName(), e); } LOG.info("SessionTrackerImpl exited loop!"); } // org.apache.zookeeper.server.ZooKeeperServer // 構建處理器鏈,由此組合請求進來後的處理方式 protected void setupRequestProcessors() { // 這裏使用一個責任鏈模式進行包裝 多個 processer // PrepRequestProcessor -> SyncRequestProcessor -> FinalRequestProcessor // 先準備 request, 而後落盤數據, 最後處理 RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); // SyncRequestProcessor 是一個異步線程, 主要處理請求數據的實時落盤操做 ((SyncRequestProcessor)syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor)firstProcessor).start(); } // org.apache.zookeeper.server.PrepRequestProcessor, 做爲第一個請求處理的線程 @Override public void run() { try { while (true) { // LinkedBlockingQueue<Request>, 阻塞隊列 Request request = submittedRequests.take(); long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK; if (request.type == OpCode.ping) { traceMask = ZooTrace.CLIENT_PING_TRACE_MASK; } if (LOG.isTraceEnabled()) { ZooTrace.logRequest(LOG, traceMask, 'P', request, ""); } if (Request.requestOfDeath == request) { break; } // 處理請求 pRequest(request); } } catch (RequestProcessorException e) { if (e.getCause() instanceof XidRolloverException) { LOG.info(e.getCause().getMessage()); } handleException(this.getName(), e); } catch (Exception e) { handleException(this.getName(), e); } LOG.info("PrepRequestProcessor exited loop!"); } // org.apache.zookeeper.server.SyncRequestProcessor // 當有請求須要進行數據落盤時,僅僅是將數據插入到 queuedRequests 中,便可,該後臺線程會及時把數據刷入磁盤的 @Override public void run() { try { int logCount = 0; // we do this in an attempt to ensure that not all of the servers // in the ensemble take a snapshot at the same time int randRoll = r.nextInt(snapCount/2); while (true) { Request si = null; // 阻塞獲取隊列 if (toFlush.isEmpty()) { si = queuedRequests.take(); } else { si = queuedRequests.poll(); if (si == null) { flush(toFlush); continue; } } // 若是是關閉請求, Request.requestOfDeath , 則直接退出 if (si == requestOfDeath) { break; } if (si != null) { // track the number of records written to the log if (zks.getZKDatabase().append(si)) { logCount++; if (logCount > (snapCount / 2 + randRoll)) { randRoll = r.nextInt(snapCount/2); // roll the log zks.getZKDatabase().rollLog(); // take a snapshot if (snapInProcess != null && snapInProcess.isAlive()) { LOG.warn("Too busy to snap, skipping"); } else { snapInProcess = new ZooKeeperThread("Snapshot Thread") { public void run() { try { zks.takeSnapshot(); } catch(Exception e) { LOG.warn("Unexpected exception", e); } } }; snapInProcess.start(); } logCount = 0; } } else if (toFlush.isEmpty()) { // optimization for read heavy workloads // iff this is a read, and there are no pending // flushes (writes), then just pass this to the next // processor if (nextProcessor != null) { // 因其自己是一個獨立線程,因此須要獨立調用下一個處理器 nextProcessor.processRequest(si); if (nextProcessor instanceof Flushable) { ((Flushable)nextProcessor).flush(); } } continue; } toFlush.add(si); if (toFlush.size() > 1000) { flush(toFlush); } } } } catch (Throwable t) { handleException(this.getName(), t); } finally{ running = false; } LOG.info("SyncRequestProcessor exited!"); } // 處理各種請求,進行不一樣類型的區分 /** * This method will be called inside the ProcessRequestThread, which is a * singleton, so there will be a single thread calling this code. * * @param request */ protected void pRequest(Request request) throws RequestProcessorException { // LOG.info("Prep>>> cxid = " + request.cxid + " type = " + // request.type + " id = 0x" + Long.toHexString(request.sessionId)); request.setHdr(null); request.setTxn(null); try { switch (request.type) { case OpCode.createContainer: case OpCode.create: case OpCode.create2: CreateRequest create2Request = new CreateRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true); break; case OpCode.createTTL: CreateTTLRequest createTtlRequest = new CreateTTLRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true); break; case OpCode.deleteContainer: case OpCode.delete: DeleteRequest deleteRequest = new DeleteRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true); break; case OpCode.setData: SetDataRequest setDataRequest = new SetDataRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true); break; case OpCode.reconfig: ReconfigRequest reconfigRequest = new ReconfigRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest); pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true); break; case OpCode.setACL: SetACLRequest setAclRequest = new SetACLRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true); break; case OpCode.check: CheckVersionRequest checkRequest = new CheckVersionRequest(); pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true); break; case OpCode.multi: MultiTransactionRecord multiRequest = new MultiTransactionRecord(); try { ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest); } catch(IOException e) { request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi)); throw e; } List<Txn> txns = new ArrayList<Txn>(); //Each op in a multi-op must have the same zxid! long zxid = zks.getNextZxid(); KeeperException ke = null; //Store off current pending change records in case we need to rollback Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest); for(Op op: multiRequest) { Record subrequest = op.toRequestRecord(); int type; Record txn; /* If we've already failed one of the ops, don't bother * trying the rest as we know it's going to fail and it * would be confusing in the logfiles. */ if (ke != null) { type = OpCode.error; txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue()); } /* Prep the request and convert to a Txn */ else { try { pRequest2Txn(op.getType(), zxid, request, subrequest, false); type = request.getHdr().getType(); txn = request.getTxn(); } catch (KeeperException e) { ke = e; type = OpCode.error; txn = new ErrorTxn(e.code().intValue()); if (e.code().intValue() > Code.APIERROR.intValue()) { LOG.info("Got user-level KeeperException when processing {} aborting" + " remaining multi ops. Error Path:{} Error:{}", request.toString(), e.getPath(), e.getMessage()); } request.setException(e); /* Rollback change records from failed multi-op */ rollbackPendingChanges(zxid, pendingChanges); } } //FIXME: I don't want to have to serialize it here and then // immediately deserialize in next processor. But I'm // not sure how else to get the txn stored into our list. ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); txn.serialize(boa, "request") ; ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); txns.add(new Txn(type, bb.array())); } request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), request.type)); request.setTxn(new MultiTxn(txns)); break; //create.close session don't require request record case OpCode.createSession: case OpCode.closeSession: if (!request.isLocalSession()) { pRequest2Txn(request.type, zks.getNextZxid(), request, null, true); } break; //All the rest don't need to create a Txn - just verify session case OpCode.sync: case OpCode.exists: case OpCode.getData: case OpCode.getACL: case OpCode.getChildren: case OpCode.getChildren2: case OpCode.ping: case OpCode.setWatches: case OpCode.checkWatches: case OpCode.removeWatches: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); break; default: LOG.warn("unknown type " + request.type); break; } } catch (KeeperException e) { if (request.getHdr() != null) { request.getHdr().setType(OpCode.error); request.setTxn(new ErrorTxn(e.code().intValue())); } if (e.code().intValue() > Code.APIERROR.intValue()) { LOG.info("Got user-level KeeperException when processing {} Error Path:{} Error:{}", request.toString(), e.getPath(), e.getMessage()); } request.setException(e); } catch (Exception e) { // log at error level as we are returning a marshalling // error to the user LOG.error("Failed to process " + request, e); StringBuilder sb = new StringBuilder(); ByteBuffer bb = request.request; if(bb != null){ bb.rewind(); while (bb.hasRemaining()) { sb.append(Integer.toHexString(bb.get() & 0xff)); } } else { sb.append("request buffer is null"); } LOG.error("Dumping request buffer: 0x" + sb.toString()); if (request.getHdr() != null) { request.getHdr().setType(OpCode.error); request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue())); } } request.zxid = zks.getZxid(); // 因該線程是第一個處理器,因此,須要把處理權讓給下一個處理器,即 SyncRequestProcessor, 而後是 FinalRequestProcessor // 固然了,SyncRequestProcessor 的處理方式,僅僅是放入一個隊列中而已, queuedRequests.add(request); nextProcessor.processRequest(request); }
處理器啓動完成後,接下來進行 JMX 的啓動;
// org.apache.zookeeper.server.ZooKeeperServer protected void registerJMX() { // register with JMX try { jmxServerBean = new ZooKeeperServerBean(this); MBeanRegistry.getInstance().register(jmxServerBean, null); try { jmxDataTreeBean = new DataTreeBean(zkDb.getDataTree()); MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean); } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxDataTreeBean = null; } } catch (Exception e) { LOG.warn("Failed to register with JMX", e); jmxServerBean = null; } }
最後,zk 將阻塞在 shutdownLatch.await(); 等待關閉信號,作優雅關閉!
zkServer 啓動完成!
整體來講下啓動邏輯:
1. 集羣和單機模式本來是一個啓動入口;
2. 在配置文件解析以後,才發現是一個單機模式,此時,則從新調用單機模式方法從新運行;
3. 啓動階段主要爲建立 selector, workerPool 等等的線程過程;
4. 啓動時將進行一次數據初始化或數據恢復;
5. ZKDatabase 做爲重要的存儲結構貫穿 zk 的數據存儲;
6. zkServer 最終將阻塞在關閉信號等待處;
7. 請求的處理使用責任鏈模式進行依次處理;
掃的是啓動過程,可是實際的處理業務邏輯並無說明。(這可能就是所謂:然而這並無什麼卵用!)
欲知後事如何,且聽下回分解!