Yarn ResourceManager High Availability

在 Hadoop 生態中(Hadoop2.x及之後版本), JobTracker 和 TaskTracker 演變爲 Yarn 做爲 Hadoop 的資源管理器。 同時, MapReduce、Spark、Flink、等計算框架也支持 Yarn 來調度, 所以, Yarn 高可用極爲重要。 關於 Yarn 相關內容, 詳情查看Apache Hadoop YARN Architecture, 這裏對 Yarn ResourceManager 的 HA 作一個簡單介紹。 文章部分來自官方文檔。html

ResourceManager HA 的目的是當 Active RM 沒法工做時, Standby RM 可以階梯正在服務的 Active RM, 防止集羣出現不可用狀態。java

體系結構

Yarn HA Overview

ResourceManager HA 經過 Active/Standby 體系結構實現, 即在任意時刻, 都有一個 RM(ResourceManager) 處於 Active 狀態, 一個或多個 RM 處於 Standby 狀態, 若是 Active RM 發生故障, 由 Standby 的 RM 接管 Active RM 的工做。 啓動自動故障轉移時, 經過 Admin 接口或集成故障轉移器將 Standby RM 轉換爲 Active RM。node

手動轉換和故障轉移

不啓動自動故障轉移時, 管理員必須手動將其中一個 RM 轉換成 Active 狀態。 要從另外一個 RM 進行故障轉移到另外一個 RM, 須要先將 Active-RM 轉換成 Standby, 並將備用的 RM 轉換成 Active。 這些操做能夠經過 Yarn admin client 完成。web

自動故障轉移

RM 能夠選擇基於 zookeeper 的 ActiveStandbyElector 來決定那個 RM 是 Active 的, 當 Active 的 RM 中止或無響應時, 自動選擇一個 Standby RM 做爲 Active RM 來接管。 RM 的 HA 不須要單獨的守護進程(如 HDFS 的 ZKFC), 而是由嵌入到 RM 中的 ActiveStandbyElector 充當故障檢測器和 leader elector。apache

RM 故障轉移中的 Client、ApplicationMaster、NodeManager

當存在多個 RM 時, Client 和 全部節點的配置(yarn-site.xml)中須要列出全部的 RM, Client、AM(ApplicationMaster)、NodeManager 將循環嘗試鏈接 RM, 知道鏈接到 Active 的 RM。 若是 Active 的 RM 中止了, 將繼續輪詢, 直到能鏈接到新的 Active 的 RM 爲止。 能夠經過實現 org.apache.hadoop.yarn.client.RMFailoverProxyProvider 或者 配置 yarn.client.failover-proxy-provider 來實現。安全

Active-RM 狀態恢復

啓用 ResourceManager 後, 將 RM 狀態轉換成 Active 狀態須要加載 RM 內部狀態, 並根據 RM Restart 特性儘量從以前中止的位置繼續執行。 對於以前提交給 RM 託管的 Application, 都會生成一個新的 Application。 Application 能夠按期 CheckPoint, 以避免丟失任何 work。 狀態存儲必須在 Active & Standby 的 RM 中可見, 目前有兩種用於持久化的 RMStateStore 實現: FileSystemRMStateStoreZKRMStateStoreZKRMStateStore 隱式容許任什麼時候間任何節點對單個 RM 進行寫訪問, 所以官方推薦使用 ZKRMStateStore。 在使用 ZKRMStateStore 時, 不須要單獨的隔離機制來處理潛在的腦裂狀況。 在這種狀況下, 多個 RM 能夠潛在地承擔活動角色。 在使用 ZKRMStateStore 時, 官方建議不要在 zookeeper 集羣中設置 zookeeper.DigestAuthenticationProvider, 同時 zookeeper 管理員用戶不能有 Yarn 的 application/user 的憑證信息。bash

配置清單

配置項 描述
yarn.resourcemanager.zk-address zk-quorum的地址。同時用於狀態存儲和leader選舉。
yarn.resourcemanager.ha.enabled Enable RM HA.
yarn.resourcemanager.ha.rm-ids RM 的邏輯 ID, 好比 rm1, rm2
yarn.resourcemanager.hostname.rm-id 爲每一個 rm-id 聲明一個對應的主機名, 也能夠聲明 RM 的服務地址來替換。
yarn.resourcemanager.address.rm-id 爲每一個 rm-id 聲明一個對應的 RM 服務地址, 也能夠聲明 rm 對應主機名來替換。
yarn.resourcemanager.scheduler.address.rm-id For each rm-id, specify scheduler host:port for ApplicationMasters to obtain resources. If set, overrides the hostname set inv yarn.resourcemanager.hostname.rm-id.
yarn.resourcemanager.resource-tracker.address.rm-id For each rm-id, specify host:port for NodeManagers to connect. If set, overrides the hostname set in yarn.resourcemanager.hostname.rm-id.
yarn.resourcemanager.admin.address.rm-id For each rm-id, specify host:port for administrative commands. If set, overrides the hostname set in yarn.resourcemanager.hostname.rm-id.
yarn.resourcemanager.webapp.address.rm-id For each rm-id, specify host:port of the RM web application corresponds to. You do not need this if you set yarn.http.policy to HTTPS_ONLY. If set, overrides the hostname set in yarn.resourcemanager.hostname.rm-id.
yarn.resourcemanager.webapp.https.address.rm-id For each rm-id, specify host:port of the RM https web application corresponds to. You do not need this if you set yarn.http.policy to HTTP_ONLY. If set, overrides the hostname set in yarn.resourcemanager.hostname.rm-id.
yarn.resourcemanager.ha.id Identifies the RM in the ensemble. This is optional; however, if set, admins have to ensure that all the RMs have their own IDs in the config.
yarn.resourcemanager.ha.automatic-failover.enabled 啓動自動故障轉移, 啓用 RM HA 後默認開啓。
yarn.resourcemanager.ha.automatic-failover.embedded 啓用後, 經過內置的 leader 選舉來選 Active RM。 啓用 RM HA 時默認開啓。
yarn.resourcemanager.cluster-id 集羣標識, 確保 RM 不會接管另外一個集羣(即不會成爲其餘集羣的 Active RM)。
yarn.client.failover-proxy-provider Client、AM、NM 鏈接 Active RM 故障轉移的類。
yarn.client.failover-max-attempts FailoverProxyProvider 嘗試故障轉移的最大次數。
yarn.client.failover-sleep-base-ms 故障轉移之間計算延遲的 sleep 毫秒數。
yarn.client.failover-sleep-max-ms 故障轉移之間的 sleep 最大毫秒數。
yarn.client.failover-retries 每次鏈接 RM 的重試次數。
yarn.client.failover-retries-on-socket-timeouts 每次鏈接 RM 的 socket 超時重試次數。

能夠根據以上配置項對 RM HA 進行優化。session

簡單文件 sample:app

<property>
  <name>yarn.resourcemanager.ha.enabled</name>
  <value>true</value>
</property>
<property>
  <name>yarn.resourcemanager.cluster-id</name>
  <value>cluster1</value>
</property>
<property>
  <name>yarn.resourcemanager.ha.rm-ids</name>
  <value>rm1,rm2</value>
</property>
<property>
  <name>yarn.resourcemanager.hostname.rm1</name>
  <value>master1</value>
</property>
<property>
  <name>yarn.resourcemanager.hostname.rm2</name>
  <value>master2</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address.rm1</name>
  <value>master1:8088</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address.rm2</name>
  <value>master2:8088</value>
</property>
<property>
  <name>yarn.resourcemanager.zk-address</name>
  <value>zk1:2181,zk2:2181,zk3:2181</value>
</property>

切換 Active RM

# 獲取 active 狀態
yarn rmadmin -getServiceState rm1
yarn rmadmin -getServiceState rm2

# 切換 rm1 到 active 狀態
yarn rmadmin -transitionToActive rm1

RM HA & ZK

RM HA 中使用 ZK 的地方是 ZKRMStateStoreZKFailoverController框架

ZKRMStateStore

ZKRMStateStore 繼承了抽象類 RMStateStore, 用來存儲 RM 的狀態。

RMStateStore 中包含對 RMState, RMDTSecretManagerState, ApplicationStaateData, ApplicationAttemptStateData 的 store, load, remove, update 操做。

  1. RMState: ResourceManager 的狀態。
  2. ApplicationStateData: Application 狀態的數據。
  3. ApplicationAttemptStateData: Application 重試狀態, 一個 ApplicationState 可能包含多個 ApplicationAttemptState(假如客戶端設置重試2次, 第一次失敗, 第二次成功, 則 ApplicationState 會保存兩個 ApplicationAttemptState)。

在 RM 啓動時, 會加載上述幾種狀態(RMStateStore#loadState(), 見 ResourceManager#RMActiveServices#serviceStart()):

protected void serviceStart() throws Exception {
  RMStateStore rmStore = rmContext.getStateStore();
  // The state store needs to start irrespective of recoveryEnabled as apps
  // need events to move to further states.
  rmStore.start();

  pauseMonitor.start();

  if(recoveryEnabled) {
    try {
      LOG.info("Recovery started");
      rmStore.checkVersion();
      if (rmContext.isWorkPreservingRecoveryEnabled()) {
        rmContext.setEpoch(rmStore.getAndIncrementEpoch());
      }
      // 加載上一次的 RMState
      RMState state = rmStore.loadState();
      recover(state);
      LOG.info("Recovery ended");
    } catch (Exception e) {
      // the Exception from loadState() needs to be handled for
      // HA and we need to give up master status if we got fenced
      LOG.error("Failed to load/recover state", e);
      throw e;
    }
  }
  super.serviceStart();
}

ZK 中存儲 RM 狀態目錄結構以下:

ROOT_DIR_PATH
|--- VERSION_INFO
|--- EPOCH_NODE # RM 重啓的元信息
|--- RM_ZK_FENCING_LOCK
|--- RM_APP_ROOT
|     |----- (#ApplicationId1)
|     |        |----- (#ApplicationAttemptIds)
|     |
|     |----- (#ApplicationId2)
|     |       |----- (#ApplicationAttemptIds)
|     ....
|
|--- RM_DT_SECRET_MANAGER_ROOT
       |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
       |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
       |       |----- Token_1
       |       |----- Token_2
       |       ....
       |
       |----- RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME
       |      |----- Key_1
       |      |----- Key_2
               ....
|--- AMRMTOKEN_SECRET_MANAGER_ROOT
       |----- currentMasterKey
       |----- nextMasterKey
|-- RESERVATION_SYSTEM_ROOT
       |------PLAN_1
       |      |------ RESERVATION_1
       |      |------ RESERVATION_2
       |      ....
       |------PLAN_2
       ....

存儲的信息主要包 ApplicationSECRET_MANAGER (安全與權限相關) 的信息。

實現隔離

ZKRMStoreStateStore#startInternal() 會隔離相關路徑、ACL、OPS。

private synchronized void fence() throws Exception {
  if (LOG.isTraceEnabled()) {
    logRootNodeAcls("Before fencing\n");
  }

  curatorFramework.setACL().withACL(zkRootNodeAcl).forPath(zkRootNodePath);
  delete(fencingNodePath);

  if (LOG.isTraceEnabled()) {
    logRootNodeAcls("After fencing\n");
  }
}

還原狀態

RMActiveServices 在啓動時 (serviceStart) 會調用 RMStateStore#loadState() 方法加載已經存儲的 RM 狀態。 RM HA 啓動後默認進入 Standby 狀態, 經過手動或者配置自動選舉的方式選擇 Active, 此時 RM 會加載已經存儲的狀態並還原。

ActiveStandbyElector

ActiveStandbyElector 主要負責完成自動的主被選舉(NameNode/ResourceManager), 內部封裝 ZK 的處理邏輯, 一旦主被選舉完成, 回調進行切換主備。

實現分析

  • 建立鎖節點

若是目前尚未進行過主備選舉的話, 那麼相應的 ActiveStandbyElector 就會發起一次主備選舉, Zookeeper 的寫一致性會保證最終只會有一個 ActiveStandbyElector 建立成功, 建立成功的 ActiveStandbyElector 對應的 RM 切換成 Active RM, 建立失敗的 ActiveStandbyElector 對應的 RM 爲 Stabdby RM, ActiveStandbyElector 回調 EmbeddedElectorService 的方法將對應的 RM 切換爲相應的 RM。

call back for become active or standby

  • 註冊 Watcher 監聽

註冊 Watcher 的實如今 org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore 中, 關注點爲 org.apache.zookeeper.Watcher.Event.EventType#NodeDeletedorg.apache.zookeeper.Watcher.Event.EventType#NodeDataChanged 的事件, 詳見 org.apache.hadoop.ha.ActiveStandbyElector#processWatchEvent()。 具體實現以下:

/**
 * interface implementation of Zookeeper watch events (connection and node),
 * 監控對應 ZNode 的 change 或 delete 事件。
 * proxied by {@link WatcherWithClientRef}.
 */
synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
  Event.EventType eventType = event.getType();
  if (isStaleClient(zk)) return;
  LOG.debug("Watcher event type: " + eventType + " with state:"
      + event.getState() + " for path:" + event.getPath()
      + " connectionState: " + zkConnectionState
      + " for " + this);

  if (eventType == Event.EventType.None) {
    // the connection state has changed
    switch (event.getState()) {
    case SyncConnected:
      LOG.info("Session connected.");
      // if the listener was asked to move to safe state then it needs to
      // be undone
      ConnectionState prevConnectionState = zkConnectionState;
      zkConnectionState = ConnectionState.CONNECTED;
      if (prevConnectionState == ConnectionState.DISCONNECTED &&
          wantToBeInElection) {
        monitorActiveStatus();
      }
      break;
    case Disconnected:
      LOG.info("Session disconnected. Entering neutral mode...");
      // ask the app to move to safe state because zookeeper connection
      // is not active and we dont know our state
      zkConnectionState = ConnectionState.DISCONNECTED;
      enterNeutralMode();
      break;
    case Expired:
      // the connection got terminated because of session timeout
      // call listener to reconnect
      LOG.info("Session expired. Entering neutral mode and rejoining...");
      enterNeutralMode();
      reJoinElection(0);
      break;
    case SaslAuthenticated:
      LOG.info("Successfully authenticated to ZooKeeper using SASL.");
      break;
    default:
      fatalError("Unexpected Zookeeper watch event state: "
          + event.getState());
      break;
  }
  return;
}
/**
 * Watcher implementation which forward events to the ZKRMStateStore This
 * hides the ZK methods of the store from its public interface
 */
private final class ForwardingWatcher implements Watcher {
  private ZooKeeper watchedZkClient;

  public ForwardingWatcher(ZooKeeper client) {
    this.watchedZkClient = client;
  }

  @Override
  public void process(WatchedEvent event) {
    try {
      ZKRMStateStore.this.processWatchEvent(watchedZkClient, event);
    } catch (Throwable t) {
      LOG.error("Failed to process watcher event " + event + ": "
          + StringUtils.stringifyException(t));
    }
  }
}
  • 自動觸發主備選舉

監控到對應的 ZNode 被刪除的事件, 做出相應的操做:

switch (eventType) {
  case NodeDeleted:
    if (state == State.ACTIVE) {
      enterNeutralMode();
    }
    joinElectionInternal();
    break;
  case NodeDataChanged:
    monitorActiveStatus();
    break;
  default:
    LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
    monitorActiveStatus();
}
private void enterNeutralMode() {
  if (state != State.NEUTRAL) {
    LOG.debug("Entering neutral mode for " + this);
    state = State.NEUTRAL;
    appClient.enterNeutralMode();
  }
}
  • 防止腦裂

Zookeeper 在工程實踐中常常發生 Zookeeper Client 假死, 致使 Zookeeper Client 到 Zookeeper Server 的心跳不能正常發出, 超過 Zookeeper Session Timeout 後, Zookeeper Server 會認爲 Client 的 Session 已通過期而關閉 Session。 假死 可能引起分佈式系統常說的雙主腦裂(brain-split)現象。 致使 Zookeeper Client 假死 的緣由多是 ZK Client 正在進行 Full GC 或 Client 所在機器負載太高等。 Zookeeper 社區針對這種問題的解決方法是隔離, 將舊的 Active RM 隔離起來, 使其不能對外提供服務。

爲實現隔離, ActiveStandbyElector 會建立一個 fencing 節點, 在 RM 中是 RM_ZK_FENCING_LOCK, 其實現相似於 ZKFC。 暫未去模擬腦裂的場景。

對 ActiveStandbyElector 主被選舉狀態變化的處理

ActiveStandbyElector 的貯備選舉狀態發生變化時, 會調用 EmbeddedElectorService 中註冊的回調函數進行相應的處理。

  • 若是 ActiveStandbyElector 選主成功, 那麼 ActiveStandbyElector 對應的 RM 成爲 Active RM, ActiveStandbyElector 會回調 EmbeddedElectorServicebecomeActive 方法。
  • 若是 ActiveStandbyElector 選主失敗, 那麼 EmbeddedElectorService 對應的 RM 成爲 Standby RM, ActiveStandbyElector 會回調 EmbeddedElectorService 對應的 becomeStandby 方法。
相關文章
相關標籤/搜索