1、概述java
本文將介紹ResourceManager在Yarn中的功能做用,從更細的粒度分析RM內部組成的各個組件功能和他們相互的交互方式。node
2、ResourceManager的交互協議與基本職能安全
一、ResourceManager交互協議架構
在整個Yarn框架中主要涉及到7個協議,分別是ApplicationClientProtocol、MRClientProtocol、ContainerManagementProtocol、ApplicationMasterProtocol、ResourceTracker、LocalizationProtocol、TaskUmbilicalProtocol,這些協議封裝了各個組件交互的信息。ResourceManager現實功能須要和NodeManager以及ApplicationMaster進行信息交互,其中涉及到的RPC協議有ResourceTrackerProtocol、ApplicationMasterProtocol和ResourceTrackerProtocol。
app
ResourceTracker框架
NodeManager經過該協議向ResourceManager中註冊、彙報節點健康狀況以及Container的運行狀態,而且領取ResourceManager下達的從新初始化、清理Container等命令。NodeManager和ResourceManager這種RPC通訊採用了和MRv1相似的「pull模型」(ResourceManager充當RPC server角色,NodeManager充當RPC client角色),NodeManager週期性主動地向ResourceManager發起請求,而且領取下達給本身的命令。異步
ApplicationMasterProtocolide
應用程序的ApplicationMaster同過該協議向ResourceManager註冊、申請和釋放資源。該協議和上面協議一樣也是採用了「pull模型」,其中在RPC機制中,ApplicationMaster充當RPC client角色,ResourceManager充當RPC server角色。函數
ApplicationClientProtocoloop
客戶端經過該協議向ResourceManager提交應用程序、控制應用程序(如殺死job)以及查詢應用程序的運行狀態等。在該RPC 協議中應用程序客戶端充當RPC client角色,ResourceManager充當RPC server角色。
整理一下ResourceManager與NodeManager、ApplicationMaster和客戶端RPC協議交互的信息:
上圖中的ResourceTrackeServer、ApplicationMasterService 、ClientRMServer是ResourceManager中處理上述功能的組件。
一、ResourceManager基本職能
ResourceManager基本職能歸納起來就如下幾方面:
與客戶端進行交互,處理來自於客戶端的請求,如查詢應用的運行狀況等。
啓動和管理各個應用的ApplicationMaster,而且爲ApplicationMaster申請第一個Container用於啓動和在它運行失敗時將它從新啓動。
管理NodeManager,接收來自NodeManager的資源和節點健康狀況彙報,並向NodeManager下達管理資源命令,例如kill掉某個container。
資源管理和調度,接收來自ApplicationMaster的資源申請,而且爲其進行分配。這個是它的最重要的職能。
3、ResourceManager內部組成架構分析
ResourceManager在底層代碼實現上將各個功能模塊分的比較細,各個模塊功能具備很強的獨立性。下圖所示的是ResourceManager中的大概的功能模塊組成:
一、用戶交互模塊
用戶交互模塊即上圖顯示的User Service管理模塊。在這裏邊還能夠看到根據不一樣的用戶類型啓用了不一樣的服務進行處理,AdminService處理管理員相關請求,ClientRMService處理普通客戶相關請求,這樣使得管理員不會由於普通客戶請求太多而形成堵塞。下面看看這2個服務的具體實現代碼:
ClientRMService
public class ClientRMService extends AbstractService implements ApplicationClientProtocol { private static final ArrayList<ApplicationReport> EMPTY_APPS_REPORT = new ArrayList<ApplicationReport>(); private static final Log LOG = LogFactory.getLog(ClientRMService.class); final private AtomicInteger applicationCounter = new AtomicInteger(0); final private YarnScheduler scheduler;//調度器 final private RMContext rmContext;//RM上下文對象,其包含了RM大部分運行時信息,如節點列表、隊列列表、應用程序列表等 private final RMAppManager rmAppManager;//app管理對象 private Server server;//一個RPC Server protected RMDelegationTokenSecretManager rmDTSecretManager; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); InetSocketAddress clientBindAddress; //訪問控制對象,例如,一些應用程序在提交時設置了查看權限的話,其餘普通用戶就沒法查看。 private final ApplicationACLsManager applicationsACLsManager; private final QueueACLsManager queueACLsManager; ...... @Override protected void serviceStart() throws Exception { Configuration conf = getConfig(); YarnRPC rpc = YarnRPC.create(conf); this.server = //實現RPC協議ApplicationClientProtocol rpc.getServer(ApplicationClientProtocol.class, this, clientBindAddress, conf, this.rmDTSecretManager, conf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT)); // Enable service authorization? if (conf.getBoolean( CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { refreshServiceAcls(conf, new RMPolicyProvider()); } this.server.start(); ...... }
從上面ClientRMService的基本代碼架構咱們能夠看出:
(1)ClientRMService是一個RPC Server,主要爲來自於普通客戶端的各類RPC請求。從代碼實現的角度看,它是ApplicationClientProtocol協議的一個實現。
(2)以前咱們已經說了,普通用戶能夠經過該服務來得到正在運行應用程序的相關信息,如進度狀況、應用程序列表等。上面代碼中都將ResourceManager運行信息封裝在RMContxt接口中了,下面來看看這個接口的一個實現對象RMContextImpl:
public class RMContextImpl implements RMContext { //中央異步調度器。RM中的各個服務和組件以及它們處理和輸出的事件類型都是經過中央異步調度器組織在一塊兒的,這樣能夠有效提升系統的吞吐量。 private final Dispatcher rmDispatcher; private final ConcurrentMap<ApplicationId, RMApp> applications//應用程序列表 = new ConcurrentHashMap<ApplicationId, RMApp>(); private final ConcurrentMap<NodeId, RMNode> nodes//節點列表 = new ConcurrentHashMap<NodeId, RMNode>(); private final ConcurrentMap<String, RMNode> inactiveNodes//非活躍節點列表 = new ConcurrentHashMap<String, RMNode>(); //正在運行中的AP心跳監控對象 private AMLivelinessMonitor amLivelinessMonitor;//正在運行中的AP心跳監控對象 //運行完畢後的AM心跳監控對象 private AMLivelinessMonitor amFinishingMonitor; //用於存儲ResourceManager運行狀態 private RMStateStore stateStore = null; //用於Container的超時監控,應用程序必須在必定時間內(默認10Min)使用分配到的Container去運行task,不然會被回收 private ContainerAllocationExpirer containerAllocationExpirer; //下面變量都是與安全管理相關的對象 private final DelegationTokenRenewer delegationTokenRenewer; private final AMRMTokenSecretManager amRMTokenSecretManager; private final RMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInRM nmTokenSecretManager; private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager; private ClientRMService clientRMService; private RMDelegationTokenSecretManager rmDelegationTokenSecretManager; ...... }
AdminService
AdminService和ClientRMService同樣都是做爲RPC的服務端,它針對的處理管理員RPC請求,負責訪問權限的控制,中Yarn中管理員權限的設定能夠在yarn-site.xml中yarn.admi.acl項進行設置,該項的默認值是*,也就是說若是不進行設置的話就當全部的用戶都是管理員。從代碼上看,它是ResourceManagerAdministrationProtocol協議的一個實現:
public class AdminService extends AbstractService implements ResourceManagerAdministrationProtocol { private static final Log LOG = LogFactory.getLog(AdminService.class); private final Configuration conf; private final ResourceScheduler scheduler; private final RMContext rmContext; private final NodesListManager nodesListManager; private final ClientRMService clientRMService; private final ApplicationMasterService applicationMasterService; private final ResourceTrackerService resourceTrackerService; private Server server; private InetSocketAddress masterServiceAddress; private AccessControlList adminAcl; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); ..... }
AdminService代碼和ClientRMService比較類似,它各種功能對象也差很少。
二、NodeManager管理
NodeManager主要是經過NMLivelinessMonitor、ResourceTrackerService和NodeListManager這3大組件來對NodeManager的生命週期、心跳處理以及黑名單處理。
(1)ResourceTrackerService
ResourceTrackerService是RPC協議ResourceTracker的一個實現,它做爲一個RPC Server端接收NodeManager的RPC請求,請求主要包含2種信息,註冊NodeManager和處理心跳信息。NodeManger啓動時第一件事就是像ResourceManager註冊,註冊時NodeManager發給ResourceTrackerService的RPC包主要包含NodeManager所在節點的可用資源總量、對外開放的htpp端口、節點的host和port等信息,具體代碼看ResourceTrackerService#registerNodeManager方法:
@SuppressWarnings("unchecked") @Override public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerRequest request) throws YarnException, IOException { NodeId nodeId = request.getNodeId();//從NodeManager帶來的NodeID String host = nodeId.getHost();//NodeManager所在節點的host int cmPort = nodeId.getPort(); //NodeManager所在節點的port int httpPort = request.getHttpPort();//對外開放的http端口 Resource capability = request.getResource();//得到NodeManager所在節點的資源上限 RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); // Check if this node is a 'valid' node //檢測節點host名稱的的合法性 if (!this.nodesListManager.isValidNode(host)) { String message = "Disallowed NodeManager from " + host + ", Sending SHUTDOWN signal to the NodeManager."; LOG.info(message); response.setDiagnosticsMessage(message); response.setNodeAction(NodeAction.SHUTDOWN); return response; } ..... }
ResourceTrackerService另一種功能就是處理心跳信息了,當NodeManager啓動後,它會週期性地調用RPC函數ResourceTracker#nodeHeartbeat彙報心跳,心跳信息主要包含該節點的各個Container的運行狀態、正在運行的Application列表、節點的健康情況等,隨後ResourceManager爲該NodeManager返回須要釋放的Container列表、Application列表等信息。其中心跳信息處理的流程:首先,從NodeManager發來的心跳包中得到節點的狀態狀態信息,而後檢測該節點是否已經註冊過,而後檢測該節點的host名稱是否合法,例如是否在excluded列表中,而後再檢測該次心跳是否是第一次心跳信息,這點很是重要,由於關係到心跳的重複發送與應答的相關問題。其實ResourceTrackerService和NodeManager的心跳處理機制和以前Hadoop1.x中的JobTracker與TaskTacker之間的心跳處理很想象,具體請看我以前寫的一篇blog:http://zengzhaozheng.blog.51cto.com/8219051/1359887 ,再而後,爲NodeManager返回心跳應答信息,最後,想RMNode發送該NodeManager的狀態信息而且保存最近一次心跳應答信息。再具體看看ResourceTracker#nodeHeart方法:
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnException, IOException { //從RPC Clinet中得到nodeManager所在節點的健康情況 NodeStatus remoteNodeStatus = request.getNodeStatus(); /** * Here is the node heartbeat sequence... * 1. Check if it's a registered node * 2. Check if it's a valid (i.e. not excluded) node * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat * 4. Send healthStatus to RMNode */ NodeId nodeId = remoteNodeStatus.getNodeId(); // 1. Check if it's a registered node RMNode rmNode = this.rmContext.getRMNodes().get(nodeId); if (rmNode == null) { /* node does not exist */ String message = "Node not found resyncing " + remoteNodeStatus.getNodeId(); LOG.info(message); resync.setDiagnosticsMessage(message); return resync; } // Send ping this.nmLivelinessMonitor.receivedPing(nodeId); // 2. Check if it's a valid (i.e. not excluded) node if (!this.nodesListManager.isValidNode(rmNode.getHostName())) { String message = "Disallowed NodeManager nodeId: " + nodeId + " hostname: " + rmNode.getNodeAddress(); LOG.info(message); shutDown.setDiagnosticsMessage(message); this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION)); return shutDown; } // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse(); if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse .getResponseId()) { LOG.info("Received duplicate heartbeat from node " + rmNode.getNodeAddress()); return lastNodeHeartbeatResponse; } else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse .getResponseId()) { String message = "Too far behind rm response id:" + lastNodeHeartbeatResponse.getResponseId() + " nm response id:" + remoteNodeStatus.getResponseId(); LOG.info(message); resync.setDiagnosticsMessage(message); // TODO: Just sending reboot is not enough. Think more. this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING)); return resync; } // Heartbeat response NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils .newNodeHeartbeatResponse(lastNodeHeartbeatResponse. getResponseId() + 1, NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval); rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse); populateKeys(request, nodeHeartBeatResponse); // 4. Send status to RMNode, saving the latest response. this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), remoteNodeStatus.getContainersStatuses(), remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse)); return nodeHeartBeatResponse; }
(2)NodeListManager
NodeListManager主要分管黑名單(include列表)和白名單(exlude列表)管理功能,分別有yarnresouecemanager.nodes.include-path和yarnresourcemanager.nodes.exclude-path指定。黑名單列表中的nodes不可以和RM直接通訊(直接拋出RPC異常),管理員能夠對這兩個列表進行編輯,而後使用$HADOOP_HOME/bin/yarn rmadmin-refreshNodes動態加載修改後的列表,使之生效。
(3)NMLivelinessMonitor
NMLivelinessMonitor主要是分管心跳異常請求。該服務會週期性地遍歷集羣中的全部NodeManager,若是某個NodeManager在必定時間內(默認10min,能夠有參數yarn.nm.liveness-monitor.expiry-interval-ms配置)沒有進行心跳彙報,那麼則認爲它已經死掉,同時在該節點上運行的Container也會被置爲運行失敗釋放資源。那麼這些被置爲失敗的Container是不會直接被RM分配執行的,RM只是負責將這些被置爲失敗的Container信息告訴它們所對應的ApplicationMaster,需不須要從新運行它說的算,若是須要重新運行的話,該ApplicationMaster要重新向RM申請資源,而後由ApplicationMaster與對應的NodeManager通訊以重新運行以前失敗的Container。
二、ApplicationMaster管理模塊