有些工做只能在一臺server上進行,好比master,這時HA(High Availability)首先要求部署多個server,其次要求多個server自動選舉出一個active狀態server,其餘server處於standby狀態,只有active狀態的server容許進行特定的操做;當active狀態的server因爲各類緣由沒法服務以後(好比掛了或者斷網),其餘standby狀態的server中會立刻自動選舉出一個active的server來提供服務,實現服務的無縫切換;node
hadoop master ha是經過zookeeper實現的,其中又分爲hdfs ha(namenode的ha)和yarn ha(resourcemanager的ha),二者既有共同點,又有差異;apache
zookeeper path:app
/hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb
/hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock異步
配置:ide
<property>函數
<name>ha.zookeeper.parent-znode</name>oop
<value>/hadoop-ha</value>ui
<description>this
The ZooKeeper znode under which the ZK failover controller storesspa
its information. Note that the nameservice ID is automatically
appended to this znode, so it is not normally necessary to
configure this, even in a federated environment.
</description>
</property>
<property>
<name>dfs.nameservices</name>
<value></value>
<description>
Comma-separated list of nameservices.
</description>
</property>
zookeeper path:
/yarn-leader-election/${yarn.resourcemanager.cluster-id}/ActiveBreadCrumb
/yarn-leader-election/${yarn.resourcemanager.cluster-id}/ActiveStandbyElectorLock
配置:
<property>
<description>The base znode path to use for storing leader information,
when using ZooKeeper based leader election.</description>
<name>yarn.resourcemanager.ha.automatic-failover.zk-base-path</name>
<value>/yarn-leader-election</value>
</property>
<property>
<description>Name of the cluster. In a HA setting,
this is used to ensure the RM participates in leader
election for this cluster and ensures it does not affect
other clusters</description>
<name>yarn.resourcemanager.cluster-id</name>
<!--value>yarn-cluster</value-->
</property>
爲何zookeeper上有兩個節點ActiveBreadCrumb和ActiveStandbyElectorLock,ActiveStandbyElectorLock是用來實際加鎖的,ActiveBreadCrumb是用來作fence的;
hdfs和yarn的ha最終都用到了ActiveStandbyElector,逐一來看
zkfc啓動命令
$HADOOP_HOME/bin/hdfs
elif [ "$COMMAND" = "zkfc" ] ; then
CLASS='org.apache.hadoop.hdfs.tools.DFSZKFailoverController'
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_ZKFC_OPTS"
代碼
org.apache.hadoop.hdfs.tools.DFSZKFailoverController
public static void main(String args[]) throws Exception { if (DFSUtil.parseHelpArgument(args, ZKFailoverController.USAGE, System.out, true)) { System.exit(0); } GenericOptionsParser parser = new GenericOptionsParser( new HdfsConfiguration(), args); DFSZKFailoverController zkfc = DFSZKFailoverController.create( parser.getConfiguration()); int retCode = 0; try { retCode = zkfc.run(parser.getRemainingArgs()); } catch (Throwable t) { LOG.fatal("Got a fatal error, exiting now", t); } System.exit(retCode); }
DFSZKFailoverController.main會調用run,這裏的run是父類ZKFailoverController的方法,其中會調用doRun,下面看父類:
org.apache.hadoop.ha.ZKFailoverController
private static final String ZK_PARENT_ZNODE_KEY = "ha.zookeeper.parent-znode"; static final String ZK_PARENT_ZNODE_DEFAULT = "/hadoop-ha"; private int doRun(String[] args) throws HadoopIllegalArgumentException, IOException, InterruptedException { try { initZK(); } catch (KeeperException ke) { LOG.fatal("Unable to start failover controller. Unable to connect " + "to ZooKeeper quorum at " + zkQuorum + ". Please check the " + "configured value for " + ZK_QUORUM_KEY + " and ensure that " + "ZooKeeper is running."); return ERR_CODE_NO_ZK; } if (args.length > 0) { if ("-formatZK".equals(args[0])) { boolean force = false; boolean interactive = true; for (int i = 1; i < args.length; i++) { if ("-force".equals(args[i])) { force = true; } else if ("-nonInteractive".equals(args[i])) { interactive = false; } else { badArg(args[i]); } } return formatZK(force, interactive); } else { badArg(args[0]); } } if (!elector.parentZNodeExists()) { LOG.fatal("Unable to start failover controller. " + "Parent znode does not exist.\n" + "Run with -formatZK flag to initialize ZooKeeper."); return ERR_CODE_NO_PARENT_ZNODE; } try { localTarget.checkFencingConfigured(); } catch (BadFencingConfigurationException e) { LOG.fatal("Fencing is not configured for " + localTarget + ".\n" + "You must configure a fencing method before using automatic " + "failover.", e); return ERR_CODE_NO_FENCER; } initRPC(); initHM(); startRPC(); try { mainLoop(); } finally { rpcServer.stopAndJoin(); elector.quitElection(true); healthMonitor.shutdown(); healthMonitor.join(); } return 0; } private void initZK() throws HadoopIllegalArgumentException, IOException, KeeperException { ... elector = new ActiveStandbyElector(zkQuorum, zkTimeout, getParentZnode(), zkAcls, zkAuths, new ElectorCallbacks(), maxRetryNum); } private String getParentZnode() { String znode = conf.get(ZK_PARENT_ZNODE_KEY, ZK_PARENT_ZNODE_DEFAULT); if (!znode.endsWith("/")) { znode += "/"; } return znode + getScopeInsideParentNode(); } class ElectorCallbacks implements ActiveStandbyElectorCallback { @Override public void becomeActive() throws ServiceFailedException { ZKFailoverController.this.becomeActive(); } @Override public void becomeStandby() { ZKFailoverController.this.becomeStandby(); } @Override public void enterNeutralMode() { } @Override public void notifyFatalError(String errorMessage) { fatalError(errorMessage); } @Override public void fenceOldActive(byte[] data) { ZKFailoverController.this.fenceOldActive(data); } @Override public String toString() { synchronized (ZKFailoverController.this) { return "Elector callbacks for " + localTarget; } } }
doRun中會調用幾個方法,最重要的兩個是initZK和initHM:
在initZK中會經過getParentZnode建立zk路徑,同時建立ActiveStandbyElector,這裏最重要的是把內部類ElectorCallbacks的對象傳到ActiveStandbyElector,後續各類zk狀態都是經過ActiveStandbyElector->ElectorCallbacks->ZKFailoverController這個調用鏈傳遞的,最終實現狀態變動,好比becomeActive,becomeStandby等;
initZK中只是肯定了zk路徑以及各類回調函數,尚未實際的建立操做,具體的操做在initHM中,下面看initHM:
org.apache.hadoop.ha.ZKFailoverController
private void initHM() { healthMonitor = new HealthMonitor(conf, localTarget); healthMonitor.addCallback(new HealthCallbacks()); healthMonitor.addServiceStateCallback(new ServiceStateCallBacks()); healthMonitor.start(); }
initHM中會建立HealthMonitor,傳入HealthCallbacks,而後啓動HealthMonitor,下面看HealthMonitor:
org.apache.hadoop.ha.HealthMonitor
void start() { daemon.start(); } private void doHealthChecks() throws InterruptedException { while (shouldRun) { HAServiceStatus status = null; boolean healthy = false; try { status = proxy.getServiceStatus(); proxy.monitorHealth(); healthy = true; } catch (HealthCheckFailedException e) { LOG.warn("Service health check failed for " + targetToMonitor + ": " + e.getMessage()); enterState(State.SERVICE_UNHEALTHY); } catch (Throwable t) { LOG.warn("Transport-level exception trying to monitor health of " + targetToMonitor + ": " + t.getLocalizedMessage()); RPC.stopProxy(proxy); proxy = null; enterState(State.SERVICE_NOT_RESPONDING); Thread.sleep(sleepAfterDisconnectMillis); return; } if (status != null) { setLastServiceStatus(status); } if (healthy) { enterState(State.SERVICE_HEALTHY); } Thread.sleep(checkIntervalMillis); } } private synchronized void enterState(State newState) { if (newState != state) { LOG.info("Entering state " + newState); state = newState; synchronized (callbacks) { for (Callback cb : callbacks) { cb.enteredState(newState); } } } }
HealthMonitor.start會啓動內部的MonitorDaemon線程,而MonitorDaemon線程中中會循環調用HealthMonitor.doHealthChecks,doHealthChecks會根據各類狀態變化調用enterState,而enterState會迭代回調全部的callbacks,這是一個Observer模式,重點在callback上,即HealthCallbacks;
先看MonitorDaemon線程:
org.apache.hadoop.ha.HealthMonitor.MonitorDaemon
public void run() { while (shouldRun) { try { loopUntilConnected(); doHealthChecks(); } catch (InterruptedException ie) { Preconditions.checkState(!shouldRun, "Interrupted but still supposed to run"); } } }
再看HealthCallbacks:
org.apache.hadoop.ha.ZKFailoverController.HealthCallbacks
class HealthCallbacks implements HealthMonitor.Callback { @Override public void enteredState(HealthMonitor.State newState) { setLastHealthState(newState); recheckElectability(); } }
這裏會調用到ZKFailoverController.recheckElectability
org.apache.hadoop.ha.ZKFailoverController
private void recheckElectability() { // Maintain lock ordering of elector -> ZKFC synchronized (elector) { synchronized (this) { boolean healthy = lastHealthState == State.SERVICE_HEALTHY; long remainingDelay = delayJoiningUntilNanotime - System.nanoTime(); if (remainingDelay > 0) { if (healthy) { LOG.info("Would have joined master election, but this node is " + "prohibited from doing so for " + TimeUnit.NANOSECONDS.toMillis(remainingDelay) + " more ms"); } scheduleRecheck(remainingDelay); return; } switch (lastHealthState) { case SERVICE_HEALTHY: elector.joinElection(targetToData(localTarget)); if (quitElectionOnBadState) { quitElectionOnBadState = false; } break; case INITIALIZING: LOG.info("Ensuring that " + localTarget + " does not " + "participate in active master election"); elector.quitElection(false); serviceState = HAServiceState.INITIALIZING; break; case SERVICE_UNHEALTHY: case SERVICE_NOT_RESPONDING: LOG.info("Quitting master election for " + localTarget + " and marking that fencing is necessary"); elector.quitElection(true); serviceState = HAServiceState.INITIALIZING; break; case HEALTH_MONITOR_FAILED: fatalError("Health monitor failed!"); break; default: throw new IllegalArgumentException("Unhandled state:" + lastHealthState); } } } }
在health的狀況下會調用ActiveStandbyElector.joinElection,下面看ActiveStandbyElector:
org.apache.hadoop.ha.ActiveStandbyElector
public class ActiveStandbyElector implements StatCallback, StringCallback { public ActiveStandbyElector(String zookeeperHostPorts, int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl, List<ZKAuthInfo> authInfo, ActiveStandbyElectorCallback app, int maxRetryNum) throws IOException, HadoopIllegalArgumentException, KeeperException { ... znodeWorkingDir = parentZnodeName; zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME; zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME; ... public synchronized void joinElection(byte[] data) throws HadoopIllegalArgumentException { if (data == null) { throw new HadoopIllegalArgumentException("data cannot be null"); } if (wantToBeInElection) { LOG.info("Already in election. Not re-connecting."); return; } appData = new byte[data.length]; System.arraycopy(data, 0, appData, 0, data.length); LOG.debug("Attempting active election for " + this); joinElectionInternal(); } private void joinElectionInternal() { Preconditions.checkState(appData != null, "trying to join election without any app data"); if (zkClient == null) { if (!reEstablishSession()) { fatalError("Failed to reEstablish connection with ZooKeeper"); return; } } createRetryCount = 0; wantToBeInElection = true; createLockNodeAsync(); } private void createLockNodeAsync() { zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this, zkClient); }
ActiveStandbyElector實現了兩個zookeeper的callback接口StatCallback和StringCallback,調用過程爲joinElection->joinElectionInternal->createLockNodeAsync,最終會調用ZooKeeper.create異步方法,同時把本身做爲callback傳進去,這樣zookeeper後續的變動都會回調ActiveStandbyElector.processResult,而processResult中會回調ElectorCallbacks,至此整個流程打通。
zookeeper的StringCallback接口以下:
org.apache.zookeeper.AsyncCallback.StringCallback
interface StringCallback extends AsyncCallback { public void processResult(int rc, String path, Object ctx, String name); }
org.apache.hadoop.yarn.conf.YarnConfiguration
public static final String AUTO_FAILOVER_ZK_BASE_PATH = AUTO_FAILOVER_PREFIX + "zk-base-path"; public static final String DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH = "/yarn-leader-election";
這裏能夠看到配置
org.apache.hadoop.yarn.server.resourcemanager.EmbeddedElectorService
protected void serviceInit(Configuration conf) throws Exception { ... String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH, YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH); String electionZNode = zkBasePath + "/" + clusterId; ... elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout, electionZNode, zkAcls, zkAuths, this, maxRetryNum); ... @Override protected void serviceStart() throws Exception { elector.joinElection(localActiveNodeInfo); super.serviceStart(); }
過程和上述ZKFailoverController差很少,EmbeddedElectorService.serviceInit中會建立zk路徑同時建立ActiveStandbyElector,而後在serviceStart中會調用ActiveStandbyElector.joinElection