在 Hadoop 生態中(Hadoop2.x及之後版本), JobTracker 和 TaskTracker 演變爲 Yarn 做爲 Hadoop 的資源管理器。 同時, MapReduce、Spark、Flink、等計算框架也支持 Yarn 來調度, 所以, Yarn 高可用極爲重要。 關於 Yarn 相關內容, 詳情查看Apache Hadoop YARN Architecture, 這裏對 Yarn ResourceManager 的 HA 作一個簡單介紹。 文章部分來自官方文檔。html
ResourceManager HA 的目的是當 Active RM 沒法工做時, Standby RM 可以階梯正在服務的 Active RM, 防止集羣出現不可用狀態。java
ResourceManager HA 經過 Active/Standby 體系結構實現, 即在任意時刻, 都有一個 RM(ResourceManager) 處於 Active 狀態, 一個或多個 RM 處於 Standby 狀態, 若是 Active RM 發生故障, 由 Standby 的 RM 接管 Active RM 的工做。 啓動自動故障轉移時, 經過 Admin 接口或集成故障轉移器將 Standby RM 轉換爲 Active RM。node
不啓動自動故障轉移時, 管理員必須手動將其中一個 RM 轉換成 Active 狀態。 要從另外一個 RM 進行故障轉移到另外一個 RM, 須要先將 Active-RM 轉換成 Standby, 並將備用的 RM 轉換成 Active。 這些操做能夠經過 Yarn admin client 完成。web
RM 能夠選擇基於 zookeeper 的 ActiveStandbyElector
來決定那個 RM 是 Active 的, 當 Active 的 RM 中止或無響應時, 自動選擇一個 Standby RM 做爲 Active RM 來接管。 RM 的 HA 不須要單獨的守護進程(如 HDFS 的 ZKFC), 而是由嵌入到 RM 中的 ActiveStandbyElector
充當故障檢測器和 leader elector。apache
當存在多個 RM 時, Client 和 全部節點的配置(yarn-site.xml
)中須要列出全部的 RM, Client、AM(ApplicationMaster)、NodeManager 將循環嘗試鏈接 RM, 知道鏈接到 Active 的 RM。 若是 Active 的 RM 中止了, 將繼續輪詢, 直到能鏈接到新的 Active 的 RM 爲止。 能夠經過實現 org.apache.hadoop.yarn.client.RMFailoverProxyProvider
或者 配置 yarn.client.failover-proxy-provider
來實現。安全
啓用 ResourceManager 後, 將 RM 狀態轉換成 Active 狀態須要加載 RM 內部狀態, 並根據 RM Restart 特性儘量從以前中止的位置繼續執行。 對於以前提交給 RM 託管的 Application, 都會生成一個新的 Application。 Application 能夠按期 CheckPoint, 以避免丟失任何 work。 狀態存儲必須在 Active & Standby 的 RM 中可見, 目前有兩種用於持久化的 RMStateStore
實現: FileSystemRMStateStore
和 ZKRMStateStore
。 ZKRMStateStore
隱式容許任什麼時候間任何節點對單個 RM 進行寫訪問, 所以官方推薦使用 ZKRMStateStore
。 在使用 ZKRMStateStore
時, 不須要單獨的隔離機制來處理潛在的腦裂狀況。 在這種狀況下, 多個 RM 能夠潛在地承擔活動角色。 在使用 ZKRMStateStore
時, 官方建議不要在 zookeeper 集羣中設置 zookeeper.DigestAuthenticationProvider
, 同時 zookeeper 管理員用戶不能有 Yarn 的 application/user 的憑證信息。bash
配置項 | 描述 |
---|---|
yarn.resourcemanager.zk-address | zk-quorum的地址。同時用於狀態存儲和leader選舉。 |
yarn.resourcemanager.ha.enabled | Enable RM HA. |
yarn.resourcemanager.ha.rm-ids | RM 的邏輯 ID, 好比 rm1 , rm2 |
yarn.resourcemanager.hostname.rm-id | 爲每一個 rm-id 聲明一個對應的主機名, 也能夠聲明 RM 的服務地址來替換。 |
yarn.resourcemanager.address.rm-id | 爲每一個 rm-id 聲明一個對應的 RM 服務地址, 也能夠聲明 rm 對應主機名來替換。 |
yarn.resourcemanager.scheduler.address.rm-id | For each rm-id, specify scheduler host:port for ApplicationMasters to obtain resources. If set, overrides the hostname set inv yarn.resourcemanager.hostname.rm-id. |
yarn.resourcemanager.resource-tracker.address.rm-id | For each rm-id, specify host:port for NodeManagers to connect. If set, overrides the hostname set in yarn.resourcemanager.hostname.rm-id. |
yarn.resourcemanager.admin.address.rm-id | For each rm-id, specify host:port for administrative commands. If set, overrides the hostname set in yarn.resourcemanager.hostname.rm-id. |
yarn.resourcemanager.webapp.address.rm-id | For each rm-id, specify host:port of the RM web application corresponds to. You do not need this if you set yarn.http.policy to HTTPS_ONLY. If set, overrides the hostname set in yarn.resourcemanager.hostname.rm-id. |
yarn.resourcemanager.webapp.https.address.rm-id | For each rm-id, specify host:port of the RM https web application corresponds to. You do not need this if you set yarn.http.policy to HTTP_ONLY. If set, overrides the hostname set in yarn.resourcemanager.hostname.rm-id. |
yarn.resourcemanager.ha.id | Identifies the RM in the ensemble. This is optional; however, if set, admins have to ensure that all the RMs have their own IDs in the config. |
yarn.resourcemanager.ha.automatic-failover.enabled | 啓動自動故障轉移, 啓用 RM HA 後默認開啓。 |
yarn.resourcemanager.ha.automatic-failover.embedded | 啓用後, 經過內置的 leader 選舉來選 Active RM。 啓用 RM HA 時默認開啓。 |
yarn.resourcemanager.cluster-id | 集羣標識, 確保 RM 不會接管另外一個集羣(即不會成爲其餘集羣的 Active RM)。 |
yarn.client.failover-proxy-provider | Client、AM、NM 鏈接 Active RM 故障轉移的類。 |
yarn.client.failover-max-attempts | FailoverProxyProvider 嘗試故障轉移的最大次數。 |
yarn.client.failover-sleep-base-ms | 故障轉移之間計算延遲的 sleep 毫秒數。 |
yarn.client.failover-sleep-max-ms | 故障轉移之間的 sleep 最大毫秒數。 |
yarn.client.failover-retries | 每次鏈接 RM 的重試次數。 |
yarn.client.failover-retries-on-socket-timeouts | 每次鏈接 RM 的 socket 超時重試次數。 |
能夠根據以上配置項對 RM HA 進行優化。session
簡單文件 sample:app
<property> <name>yarn.resourcemanager.ha.enabled</name> <value>true</value> </property> <property> <name>yarn.resourcemanager.cluster-id</name> <value>cluster1</value> </property> <property> <name>yarn.resourcemanager.ha.rm-ids</name> <value>rm1,rm2</value> </property> <property> <name>yarn.resourcemanager.hostname.rm1</name> <value>master1</value> </property> <property> <name>yarn.resourcemanager.hostname.rm2</name> <value>master2</value> </property> <property> <name>yarn.resourcemanager.webapp.address.rm1</name> <value>master1:8088</value> </property> <property> <name>yarn.resourcemanager.webapp.address.rm2</name> <value>master2:8088</value> </property> <property> <name>yarn.resourcemanager.zk-address</name> <value>zk1:2181,zk2:2181,zk3:2181</value> </property>
# 獲取 active 狀態 yarn rmadmin -getServiceState rm1 yarn rmadmin -getServiceState rm2 # 切換 rm1 到 active 狀態 yarn rmadmin -transitionToActive rm1
RM HA 中使用 ZK 的地方是 ZKRMStateStore
和 ZKFailoverController
。框架
ZKRMStateStore
繼承了抽象類 RMStateStore
, 用來存儲 RM 的狀態。
RMStateStore
中包含對 RMState
, RMDTSecretManagerState
, ApplicationStaateData
, ApplicationAttemptStateData
的 store, load, remove, update 操做。
在 RM 啓動時, 會加載上述幾種狀態(RMStateStore#loadState()
, 見 ResourceManager#RMActiveServices#serviceStart()
):
protected void serviceStart() throws Exception { RMStateStore rmStore = rmContext.getStateStore(); // The state store needs to start irrespective of recoveryEnabled as apps // need events to move to further states. rmStore.start(); pauseMonitor.start(); if(recoveryEnabled) { try { LOG.info("Recovery started"); rmStore.checkVersion(); if (rmContext.isWorkPreservingRecoveryEnabled()) { rmContext.setEpoch(rmStore.getAndIncrementEpoch()); } // 加載上一次的 RMState RMState state = rmStore.loadState(); recover(state); LOG.info("Recovery ended"); } catch (Exception e) { // the Exception from loadState() needs to be handled for // HA and we need to give up master status if we got fenced LOG.error("Failed to load/recover state", e); throw e; } } super.serviceStart(); }
ZK 中存儲 RM 狀態目錄結構以下:
ROOT_DIR_PATH |--- VERSION_INFO |--- EPOCH_NODE # RM 重啓的元信息 |--- RM_ZK_FENCING_LOCK |--- RM_APP_ROOT | |----- (#ApplicationId1) | | |----- (#ApplicationAttemptIds) | | | |----- (#ApplicationId2) | | |----- (#ApplicationAttemptIds) | .... | |--- RM_DT_SECRET_MANAGER_ROOT |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME | |----- Token_1 | |----- Token_2 | .... | |----- RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME | |----- Key_1 | |----- Key_2 .... |--- AMRMTOKEN_SECRET_MANAGER_ROOT |----- currentMasterKey |----- nextMasterKey |-- RESERVATION_SYSTEM_ROOT |------PLAN_1 | |------ RESERVATION_1 | |------ RESERVATION_2 | .... |------PLAN_2 ....
存儲的信息主要包 Application
和 SECRET_MANAGER
(安全與權限相關) 的信息。
ZKRMStoreStateStore#startInternal()
會隔離相關路徑、ACL、OPS。
private synchronized void fence() throws Exception { if (LOG.isTraceEnabled()) { logRootNodeAcls("Before fencing\n"); } curatorFramework.setACL().withACL(zkRootNodeAcl).forPath(zkRootNodePath); delete(fencingNodePath); if (LOG.isTraceEnabled()) { logRootNodeAcls("After fencing\n"); } }
RMActiveServices
在啓動時 (serviceStart) 會調用 RMStateStore#loadState()
方法加載已經存儲的 RM 狀態。 RM HA 啓動後默認進入 Standby 狀態, 經過手動或者配置自動選舉的方式選擇 Active, 此時 RM 會加載已經存儲的狀態並還原。
ActiveStandbyElector
主要負責完成自動的主被選舉(NameNode/ResourceManager), 內部封裝 ZK 的處理邏輯, 一旦主被選舉完成, 回調進行切換主備。
若是目前尚未進行過主備選舉的話, 那麼相應的 ActiveStandbyElector 就會發起一次主備選舉, Zookeeper 的寫一致性會保證最終只會有一個 ActiveStandbyElector 建立成功, 建立成功的 ActiveStandbyElector 對應的 RM 切換成 Active RM, 建立失敗的 ActiveStandbyElector 對應的 RM 爲 Stabdby RM, ActiveStandbyElector 回調 EmbeddedElectorService
的方法將對應的 RM 切換爲相應的 RM。
註冊 Watcher 的實如今 org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
中, 關注點爲 org.apache.zookeeper.Watcher.Event.EventType#NodeDeleted
和 org.apache.zookeeper.Watcher.Event.EventType#NodeDataChanged
的事件, 詳見 org.apache.hadoop.ha.ActiveStandbyElector#processWatchEvent()
。 具體實現以下:
/** * interface implementation of Zookeeper watch events (connection and node), * 監控對應 ZNode 的 change 或 delete 事件。 * proxied by {@link WatcherWithClientRef}. */ synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) { Event.EventType eventType = event.getType(); if (isStaleClient(zk)) return; LOG.debug("Watcher event type: " + eventType + " with state:" + event.getState() + " for path:" + event.getPath() + " connectionState: " + zkConnectionState + " for " + this); if (eventType == Event.EventType.None) { // the connection state has changed switch (event.getState()) { case SyncConnected: LOG.info("Session connected."); // if the listener was asked to move to safe state then it needs to // be undone ConnectionState prevConnectionState = zkConnectionState; zkConnectionState = ConnectionState.CONNECTED; if (prevConnectionState == ConnectionState.DISCONNECTED && wantToBeInElection) { monitorActiveStatus(); } break; case Disconnected: LOG.info("Session disconnected. Entering neutral mode..."); // ask the app to move to safe state because zookeeper connection // is not active and we dont know our state zkConnectionState = ConnectionState.DISCONNECTED; enterNeutralMode(); break; case Expired: // the connection got terminated because of session timeout // call listener to reconnect LOG.info("Session expired. Entering neutral mode and rejoining..."); enterNeutralMode(); reJoinElection(0); break; case SaslAuthenticated: LOG.info("Successfully authenticated to ZooKeeper using SASL."); break; default: fatalError("Unexpected Zookeeper watch event state: " + event.getState()); break; } return; }
/** * Watcher implementation which forward events to the ZKRMStateStore This * hides the ZK methods of the store from its public interface */ private final class ForwardingWatcher implements Watcher { private ZooKeeper watchedZkClient; public ForwardingWatcher(ZooKeeper client) { this.watchedZkClient = client; } @Override public void process(WatchedEvent event) { try { ZKRMStateStore.this.processWatchEvent(watchedZkClient, event); } catch (Throwable t) { LOG.error("Failed to process watcher event " + event + ": " + StringUtils.stringifyException(t)); } } }
監控到對應的 ZNode 被刪除的事件, 做出相應的操做:
switch (eventType) { case NodeDeleted: if (state == State.ACTIVE) { enterNeutralMode(); } joinElectionInternal(); break; case NodeDataChanged: monitorActiveStatus(); break; default: LOG.debug("Unexpected node event: " + eventType + " for path: " + path); monitorActiveStatus(); }
private void enterNeutralMode() { if (state != State.NEUTRAL) { LOG.debug("Entering neutral mode for " + this); state = State.NEUTRAL; appClient.enterNeutralMode(); } }
Zookeeper 在工程實踐中常常發生 Zookeeper Client 假死
, 致使 Zookeeper Client 到 Zookeeper Server 的心跳不能正常發出, 超過 Zookeeper Session Timeout
後, Zookeeper Server 會認爲 Client 的 Session 已通過期而關閉 Session。 假死
可能引起分佈式系統常說的雙主
或腦裂(brain-split)
現象。 致使 Zookeeper Client 假死
的緣由多是 ZK Client 正在進行 Full GC 或 Client 所在機器負載太高等。 Zookeeper 社區針對這種問題的解決方法是隔離, 將舊的 Active RM 隔離起來, 使其不能對外提供服務。
爲實現隔離, ActiveStandbyElector
會建立一個 fencing 節點, 在 RM 中是 RM_ZK_FENCING_LOCK
, 其實現相似於 ZKFC。 暫未去模擬腦裂的場景。
當 ActiveStandbyElector
的貯備選舉狀態發生變化時, 會調用 EmbeddedElectorService
中註冊的回調函數進行相應的處理。
ActiveStandbyElector
選主成功, 那麼 ActiveStandbyElector
對應的 RM 成爲 Active RM, ActiveStandbyElector
會回調 EmbeddedElectorService
的 becomeActive
方法。ActiveStandbyElector
選主失敗, 那麼 EmbeddedElectorService
對應的 RM 成爲 Standby RM, ActiveStandbyElector
會回調 EmbeddedElectorService
對應的 becomeStandby
方法。