【原創】大數據基礎之Hadoop(1)HA實現原理

有些工做只能在一臺server上進行,好比master,這時HA(High Availability)首先要求部署多個server,其次要求多個server自動選舉出一個active狀態server,其餘server處於standby狀態,只有active狀態的server容許進行特定的操做;當active狀態的server因爲各類緣由沒法服務以後(好比掛了或者斷網),其餘standby狀態的server中會立刻自動選舉出一個active的server來提供服務,實現服務的無縫切換;node

hadoop master ha是經過zookeeper實現的,其中又分爲hdfs ha(namenode的ha)和yarn ha(resourcemanager的ha),二者既有共同點,又有差異;apache

1 現象

1.1 hdfs ha

zookeeper path:app

/hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb
/hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock異步

配置:ide

<property>函數

  <name>ha.zookeeper.parent-znode</name>oop

  <value>/hadoop-ha</value>ui

  <description>this

    The ZooKeeper znode under which the ZK failover controller storesspa

    its information. Note that the nameservice ID is automatically

    appended to this znode, so it is not normally necessary to

    configure this, even in a federated environment.

  </description>

</property>

<property>

  <name>dfs.nameservices</name>

  <value></value>

  <description>

    Comma-separated list of nameservices.

  </description>

</property>

1.2 yarn ha

zookeeper path:

/yarn-leader-election/${yarn.resourcemanager.cluster-id}/ActiveBreadCrumb
/yarn-leader-election/${yarn.resourcemanager.cluster-id}/ActiveStandbyElectorLock

配置:

  <property>

    <description>The base znode path to use for storing leader information,

      when using ZooKeeper based leader election.</description>

    <name>yarn.resourcemanager.ha.automatic-failover.zk-base-path</name>

    <value>/yarn-leader-election</value>

  </property>

  <property>

    <description>Name of the cluster. In a HA setting,

      this is used to ensure the RM participates in leader

      election for this cluster and ensures it does not affect

      other clusters</description>

    <name>yarn.resourcemanager.cluster-id</name>

    <!--value>yarn-cluster</value-->

  </property>

爲何zookeeper上有兩個節點ActiveBreadCrumb和ActiveStandbyElectorLock,ActiveStandbyElectorLock是用來實際加鎖的,ActiveBreadCrumb是用來作fence的;

2 代碼實現

hdfs和yarn的ha最終都用到了ActiveStandbyElector,逐一來看

2.1 hdfs ha

zkfc啓動命令

$HADOOP_HOME/bin/hdfs

elif [ "$COMMAND" = "zkfc" ] ; then
CLASS='org.apache.hadoop.hdfs.tools.DFSZKFailoverController'
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_ZKFC_OPTS"

代碼

org.apache.hadoop.hdfs.tools.DFSZKFailoverController

  public static void main(String args[])
      throws Exception {
    if (DFSUtil.parseHelpArgument(args, 
        ZKFailoverController.USAGE, System.out, true)) {
      System.exit(0);
    }
    
    GenericOptionsParser parser = new GenericOptionsParser(
        new HdfsConfiguration(), args);
    DFSZKFailoverController zkfc = DFSZKFailoverController.create(
        parser.getConfiguration());
    int retCode = 0;
    try {
      retCode = zkfc.run(parser.getRemainingArgs());
    } catch (Throwable t) {
      LOG.fatal("Got a fatal error, exiting now", t);
    }
    System.exit(retCode);
  }

DFSZKFailoverController.main會調用run,這裏的run是父類ZKFailoverController的方法,其中會調用doRun,下面看父類:

org.apache.hadoop.ha.ZKFailoverController

  private static final String ZK_PARENT_ZNODE_KEY = "ha.zookeeper.parent-znode";
  
  static final String ZK_PARENT_ZNODE_DEFAULT = "/hadoop-ha";
  
  private int doRun(String[] args)
      throws HadoopIllegalArgumentException, IOException, InterruptedException {
    try {
      initZK();
    } catch (KeeperException ke) {
      LOG.fatal("Unable to start failover controller. Unable to connect "
          + "to ZooKeeper quorum at " + zkQuorum + ". Please check the "
          + "configured value for " + ZK_QUORUM_KEY + " and ensure that "
          + "ZooKeeper is running.");
      return ERR_CODE_NO_ZK;
    }
    if (args.length > 0) {
      if ("-formatZK".equals(args[0])) {
        boolean force = false;
        boolean interactive = true;
        for (int i = 1; i < args.length; i++) {
          if ("-force".equals(args[i])) {
            force = true;
          } else if ("-nonInteractive".equals(args[i])) {
            interactive = false;
          } else {
            badArg(args[i]);
          }
        }
        return formatZK(force, interactive);
      } else {
        badArg(args[0]);
      }
    }

    if (!elector.parentZNodeExists()) {
      LOG.fatal("Unable to start failover controller. "
          + "Parent znode does not exist.\n"
          + "Run with -formatZK flag to initialize ZooKeeper.");
      return ERR_CODE_NO_PARENT_ZNODE;
    }

    try {
      localTarget.checkFencingConfigured();
    } catch (BadFencingConfigurationException e) {
      LOG.fatal("Fencing is not configured for " + localTarget + ".\n" +
          "You must configure a fencing method before using automatic " +
          "failover.", e);
      return ERR_CODE_NO_FENCER;
    }

    initRPC();
    initHM();
    startRPC();
    try {
      mainLoop();
    } finally {
      rpcServer.stopAndJoin();
      
      elector.quitElection(true);
      healthMonitor.shutdown();
      healthMonitor.join();
    }
    return 0;
  }
  
  private void initZK() throws HadoopIllegalArgumentException, IOException,
      KeeperException {
...
    elector = new ActiveStandbyElector(zkQuorum,
        zkTimeout, getParentZnode(), zkAcls, zkAuths,
        new ElectorCallbacks(), maxRetryNum);
  }
  
  private String getParentZnode() {
    String znode = conf.get(ZK_PARENT_ZNODE_KEY,
        ZK_PARENT_ZNODE_DEFAULT);
    if (!znode.endsWith("/")) {
      znode += "/";
    }
    return znode + getScopeInsideParentNode();
  }

  class ElectorCallbacks implements ActiveStandbyElectorCallback {
    @Override
    public void becomeActive() throws ServiceFailedException {
      ZKFailoverController.this.becomeActive();
    }

    @Override
    public void becomeStandby() {
      ZKFailoverController.this.becomeStandby();
    }

    @Override
    public void enterNeutralMode() {
    }

    @Override
    public void notifyFatalError(String errorMessage) {
      fatalError(errorMessage);
    }

    @Override
    public void fenceOldActive(byte[] data) {
      ZKFailoverController.this.fenceOldActive(data);
    }
    
    @Override
    public String toString() {
      synchronized (ZKFailoverController.this) {
        return "Elector callbacks for " + localTarget;
      }
    }
  }

doRun中會調用幾個方法,最重要的兩個是initZK和initHM:

在initZK中會經過getParentZnode建立zk路徑,同時建立ActiveStandbyElector,這裏最重要的是把內部類ElectorCallbacks的對象傳到ActiveStandbyElector,後續各類zk狀態都是經過ActiveStandbyElector->ElectorCallbacks->ZKFailoverController這個調用鏈傳遞的,最終實現狀態變動,好比becomeActive,becomeStandby等;

initZK中只是肯定了zk路徑以及各類回調函數,尚未實際的建立操做,具體的操做在initHM中,下面看initHM:

org.apache.hadoop.ha.ZKFailoverController

  private void initHM() {
    healthMonitor = new HealthMonitor(conf, localTarget);
    healthMonitor.addCallback(new HealthCallbacks());
    healthMonitor.addServiceStateCallback(new ServiceStateCallBacks());
    healthMonitor.start();
  }

initHM中會建立HealthMonitor,傳入HealthCallbacks,而後啓動HealthMonitor,下面看HealthMonitor:

org.apache.hadoop.ha.HealthMonitor

  void start() {
    daemon.start();
  }
  
  private void doHealthChecks() throws InterruptedException {
    while (shouldRun) {
      HAServiceStatus status = null;
      boolean healthy = false;
      try {
        status = proxy.getServiceStatus();
        proxy.monitorHealth();
        healthy = true;
      } catch (HealthCheckFailedException e) {
        LOG.warn("Service health check failed for " + targetToMonitor
            + ": " + e.getMessage());
        enterState(State.SERVICE_UNHEALTHY);
      } catch (Throwable t) {
        LOG.warn("Transport-level exception trying to monitor health of " +
            targetToMonitor + ": " + t.getLocalizedMessage());
        RPC.stopProxy(proxy);
        proxy = null;
        enterState(State.SERVICE_NOT_RESPONDING);
        Thread.sleep(sleepAfterDisconnectMillis);
        return;
      }
      
      if (status != null) {
        setLastServiceStatus(status);
      }
      if (healthy) {
        enterState(State.SERVICE_HEALTHY);
      }

      Thread.sleep(checkIntervalMillis);
    }
  }
  
  private synchronized void enterState(State newState) {
    if (newState != state) {
      LOG.info("Entering state " + newState);
      state = newState;
      synchronized (callbacks) {
        for (Callback cb : callbacks) {
          cb.enteredState(newState);
        }
      }
    }
  }

HealthMonitor.start會啓動內部的MonitorDaemon線程,而MonitorDaemon線程中中會循環調用HealthMonitor.doHealthChecks,doHealthChecks會根據各類狀態變化調用enterState,而enterState會迭代回調全部的callbacks,這是一個Observer模式,重點在callback上,即HealthCallbacks;

先看MonitorDaemon線程:

org.apache.hadoop.ha.HealthMonitor.MonitorDaemon

    public void run() {
      while (shouldRun) {
        try { 
          loopUntilConnected();
          doHealthChecks();
        } catch (InterruptedException ie) {
          Preconditions.checkState(!shouldRun,
              "Interrupted but still supposed to run");
        }
      }
    }

再看HealthCallbacks:

org.apache.hadoop.ha.ZKFailoverController.HealthCallbacks

  class HealthCallbacks implements HealthMonitor.Callback {
    @Override
    public void enteredState(HealthMonitor.State newState) {
      setLastHealthState(newState);
      recheckElectability();
    }
  }

這裏會調用到ZKFailoverController.recheckElectability

org.apache.hadoop.ha.ZKFailoverController

  private void recheckElectability() {
    // Maintain lock ordering of elector -> ZKFC
    synchronized (elector) {
      synchronized (this) {
        boolean healthy = lastHealthState == State.SERVICE_HEALTHY;
    
        long remainingDelay = delayJoiningUntilNanotime - System.nanoTime(); 
        if (remainingDelay > 0) {
          if (healthy) {
            LOG.info("Would have joined master election, but this node is " +
                "prohibited from doing so for " +
                TimeUnit.NANOSECONDS.toMillis(remainingDelay) + " more ms");
          }
          scheduleRecheck(remainingDelay);
          return;
        }
    
        switch (lastHealthState) {
        case SERVICE_HEALTHY:
          elector.joinElection(targetToData(localTarget));
          if (quitElectionOnBadState) {
            quitElectionOnBadState = false;
          }
          break;
          
        case INITIALIZING:
          LOG.info("Ensuring that " + localTarget + " does not " +
              "participate in active master election");
          elector.quitElection(false);
          serviceState = HAServiceState.INITIALIZING;
          break;
    
        case SERVICE_UNHEALTHY:
        case SERVICE_NOT_RESPONDING:
          LOG.info("Quitting master election for " + localTarget +
              " and marking that fencing is necessary");
          elector.quitElection(true);
          serviceState = HAServiceState.INITIALIZING;
          break;
          
        case HEALTH_MONITOR_FAILED:
          fatalError("Health monitor failed!");
          break;
          
        default:
          throw new IllegalArgumentException("Unhandled state:" + lastHealthState);
        }
      }
    }
  }

在health的狀況下會調用ActiveStandbyElector.joinElection,下面看ActiveStandbyElector:

org.apache.hadoop.ha.ActiveStandbyElector

public class ActiveStandbyElector implements StatCallback, StringCallback {

  public ActiveStandbyElector(String zookeeperHostPorts,
      int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
      List<ZKAuthInfo> authInfo,
      ActiveStandbyElectorCallback app, int maxRetryNum) throws IOException,
      HadoopIllegalArgumentException, KeeperException {
...
    znodeWorkingDir = parentZnodeName;
    zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
    zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME;
...

  public synchronized void joinElection(byte[] data)
      throws HadoopIllegalArgumentException {
    
    if (data == null) {
      throw new HadoopIllegalArgumentException("data cannot be null");
    }
    
    if (wantToBeInElection) {
      LOG.info("Already in election. Not re-connecting.");
      return;
    }

    appData = new byte[data.length];
    System.arraycopy(data, 0, appData, 0, data.length);

    LOG.debug("Attempting active election for " + this);
    joinElectionInternal();
  }
  
  private void joinElectionInternal() {
    Preconditions.checkState(appData != null,
        "trying to join election without any app data");
    if (zkClient == null) {
      if (!reEstablishSession()) {
        fatalError("Failed to reEstablish connection with ZooKeeper");
        return;
      }
    }

    createRetryCount = 0;
    wantToBeInElection = true;
    createLockNodeAsync();
  }
  
  private void createLockNodeAsync() {
    zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL,
        this, zkClient);
  }

ActiveStandbyElector實現了兩個zookeeper的callback接口StatCallback和StringCallback,調用過程爲joinElection->joinElectionInternal->createLockNodeAsync,最終會調用ZooKeeper.create異步方法,同時把本身做爲callback傳進去,這樣zookeeper後續的變動都會回調ActiveStandbyElector.processResult,而processResult中會回調ElectorCallbacks,至此整個流程打通。

zookeeper的StringCallback接口以下:

org.apache.zookeeper.AsyncCallback.StringCallback

    interface StringCallback extends AsyncCallback {
        public void processResult(int rc, String path, Object ctx, String name);
    }

2.2 yarn ha

org.apache.hadoop.yarn.conf.YarnConfiguration

  public static final String AUTO_FAILOVER_ZK_BASE_PATH =
      AUTO_FAILOVER_PREFIX + "zk-base-path";
  public static final String DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH =
      "/yarn-leader-election";

這裏能夠看到配置

org.apache.hadoop.yarn.server.resourcemanager.EmbeddedElectorService

  protected void serviceInit(Configuration conf)
      throws Exception {
...
    String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
        YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
    String electionZNode = zkBasePath + "/" + clusterId;
...
    elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
        electionZNode, zkAcls, zkAuths, this, maxRetryNum);
...

  @Override
  protected void serviceStart() throws Exception {
    elector.joinElection(localActiveNodeInfo);
    super.serviceStart();
  }

過程和上述ZKFailoverController差很少,EmbeddedElectorService.serviceInit中會建立zk路徑同時建立ActiveStandbyElector,而後在serviceStart中會調用ActiveStandbyElector.joinElection

相關文章
相關標籤/搜索