總結了AM起來後註冊、申請資源的過程。客戶端實現的ApplicationMaster使用AMRMClientAsync交互,AMRMClientAsync使用AMRMClient與RM交互,AMRMClient中使用ApplicationMasterProtocol與RM交互。java
一、AM運行起來後,須要向ResourceManager註冊。node
private void init() { AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); amRMClient.init(conf); amRMClient.start(); containerListener = new NMCallbackHandler(this); nmClient = new NMClientAsyncImpl(containerListener); nmClient.init(conf); nmClient.start(); } private void run() throws YarnException, IOException { RegisterApplicationMasterResponse response = amRMClient.registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl); int maxMem = response.getMaximumResourceCapability().getMemory(); int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); containerMem = Math.min(maxMem,containerMem); containerVCores = Math.min(maxVCores, containerVCores); List<Container> previousAMRunningContainers = response.getContainersFromPreviousAttempts(); numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); int numTotalContainersToRequest = numTotalContainers - previousAMRunningContainers.size(); for (int i = 0; i < numTotalContainersToRequest; ++i) { Priority pri = Priority.newInstance(requestPriority); Resource capability = Resource.newInstance(containerMem, containerVCores); ContainerRequest request = new ContainerRequest(capability, null, null, pri); amRMClient.addContainerRequest(request); } numRequestedContainers.set(numTotalContainers); }
AM這邊使用到的類是AMRMClientAsync,實現類是../hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java。express
在上面的run()方法中,AM先向RM註冊了。註冊的實現以下。註冊以後hearbeatThread開始啓動,開始會按期向Resourcemanager報告心跳。api
/** * Registers this application master with the resource manager. On successful * registration, starts the heartbeating thread. * @throws YarnException * @throws IOException */ public RegisterApplicationMasterResponse registerApplicationMaster( String appHostName, int appHostPort, String appTrackingUrl) throws YarnException, IOException { RegisterApplicationMasterResponse response = client .registerApplicationMaster(appHostName, appHostPort, appTrackingUrl); heartbeatThread.start(); return response; }
這裏面client是類AMRMClientImpl的實例。後面addContainerRequest()的實現以及報告心跳的allocate()方法也是在這個類裏面實現。app
二、申請資源,調用addContainerRequest()。async
1 @Override 2 public synchronized void addContainerRequest(T req) { 3 LOG.info("***enter addContainerRequest***"); 4 Preconditions.checkArgument(req != null, 5 "Resource request can not be null."); 6 Set<String> dedupedRacks = new HashSet<String>(); 7 if (req.getRacks() != null) { 8 dedupedRacks.addAll(req.getRacks()); 9 if(req.getRacks().size() != dedupedRacks.size()) { 10 Joiner joiner = Joiner.on(','); 11 LOG.warn("ContainerRequest has duplicate racks: " 12 + joiner.join(req.getRacks())); 13 } 14 } 15 Set<String> inferredRacks = resolveRacks(req.getNodes()); 16 inferredRacks.removeAll(dedupedRacks); 17 18 // check that specific and non-specific requests cannot be mixed within a 19 // priority 20 checkLocalityRelaxationConflict(req.getAllocationRequestId(), 21 req.getPriority(), ANY_LIST, req.getRelaxLocality()); 22 // check that specific rack cannot be mixed with specific node within a 23 // priority. If node and its rack are both specified then they must be 24 // in the same request. 25 // For explicitly requested racks, we set locality relaxation to true 26 checkLocalityRelaxationConflict(req.getAllocationRequestId(), 27 req.getPriority(), dedupedRacks, true); 28 checkLocalityRelaxationConflict(req.getAllocationRequestId(), 29 req.getPriority(), inferredRacks, req.getRelaxLocality()); 30 // check if the node label expression specified is valid 31 checkNodeLabelExpression(req); 32 33 if (req.getNodes() != null) { 34 HashSet<String> dedupedNodes = new HashSet<String>(req.getNodes()); 35 if(dedupedNodes.size() != req.getNodes().size()) { 36 Joiner joiner = Joiner.on(','); 37 LOG.warn("ContainerRequest has duplicate nodes: " 38 + joiner.join(req.getNodes())); 39 } 40 for (String node : dedupedNodes) { 41 addResourceRequest(req.getPriority(), node, 42 req.getExecutionTypeRequest(), req.getCapability(), req, true, 43 req.getNodeLabelExpression()); 44 } 45 } 46 47 for (String rack : dedupedRacks) { 48 addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(), 49 req.getCapability(), req, true, req.getNodeLabelExpression()); 50 } 51 52 // Ensure node requests are accompanied by requests for 53 // corresponding rack 54 for (String rack : inferredRacks) { 55 addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(), 56 req.getCapability(), req, req.getRelaxLocality(), 57 req.getNodeLabelExpression()); 58 } 59 // Off-switch 60 addResourceRequest(req.getPriority(), ResourceRequest.ANY, 61 req.getExecutionTypeRequest(), req.getCapability(), req, 62 req.getRelaxLocality(), req.getNodeLabelExpression()); 63 }
這個方法的一開始加入了打印,可是不知爲什麼在ResourceManager的log中沒有看到這句話被打印出來。ide
方法的前面是與relax locality的機架、節點選擇,後面調用addResourceRequest()方法申請資源(不知道爲何會調用這麼屢次??)。oop
addResourceRequest()方法:this
1 private void addResourceRequest(Priority priority, String resourceName, 2 ExecutionTypeRequest execTypeReq, Resource capability, T req, 3 boolean relaxLocality, String labelExpression) { 4 RemoteRequestsTable<T> remoteRequestsTable = 5 getTable(req.getAllocationRequestId()); 6 if (remoteRequestsTable == null) { 7 remoteRequestsTable = new RemoteRequestsTable<T>(); 8 putTable(req.getAllocationRequestId(), remoteRequestsTable); 9 } 10 @SuppressWarnings("unchecked") 11 ResourceRequestInfo resourceRequestInfo = remoteRequestsTable 12 .addResourceRequest(req.getAllocationRequestId(), priority, 13 resourceName, execTypeReq, capability, req, relaxLocality, 14 labelExpression); 15 // Note this down for next interaction with ResourceManager 16 addResourceRequestToAsk(resourceRequestInfo.remoteRequest); 17 18 if (LOG.isDebugEnabled()) { 19 LOG.debug("addResourceRequest:" + " applicationId=" 20 + " priority=" + priority.getPriority() 21 + " resourceName=" + resourceName + " numContainers=" 22 + resourceRequestInfo.remoteRequest.getNumContainers() 23 + "remoteRequest=" + resourceRequestInfo.remoteRequest 24 + " #asks=" + ask.size() + " capacity=" + capability); 25 } 26 }
資源的表示ResourceRequestInfo有如下元素:spa
ResourceRequest remoteRequest;
LinkedHashSet<T> containerRequests;
其中remoteRequest對象描述了container的具體信息,參考註釋
ResourceRequest represents the request made by an application to the ResourceManager to obtain various Container allocations.
包括瞭如下內容:
* Priority :請求的優先級
* name: 指按期望被分配到的host或rack的名字,值爲ANY代表無要求
* Resource:每一個請求須要的資源量
* Number of containers:上述要求下的container數量
* relaxLocality:指定是否能夠relax locality
(這邊能夠對應上董西城的《Hadoop技術內幕》p177中的ResourceRequestProto的描述)
remoteRequestTable的addResourcRequest()的實現中有負責修改了NumContainer.這裏判斷是根據resourceInfo來區分container的,若是是priority、resourceName、executionType及capability都同樣的的container會有同一個resourceReqeustInfo,所以當增長的container請求與以前的一致時,會將NumContainers+1便可。
1 ResourceRequestInfo addResourceRequest(Long allocationRequestId, 2 Priority priority, String resourceName, ExecutionTypeRequest execTypeReq, 3 Resource capability, T req, boolean relaxLocality, 4 String labelExpression) { 5 ResourceRequestInfo resourceRequestInfo = get(priority, resourceName, 6 execTypeReq.getExecutionType(), capability); 7 if (resourceRequestInfo == null) { 8 resourceRequestInfo = 9 new ResourceRequestInfo(allocationRequestId, priority, resourceName, 10 capability, relaxLocality); 11 put(priority, resourceName, execTypeReq.getExecutionType(), capability, 12 resourceRequestInfo); 13 } 14 resourceRequestInfo.remoteRequest.setExecutionTypeRequest(execTypeReq); 15 resourceRequestInfo.remoteRequest.setNumContainers( 16 resourceRequestInfo.remoteRequest.getNumContainers() + 1); 17 18 if (relaxLocality) { 19 resourceRequestInfo.containerRequests.add(req); 20 } 21 22 if (ResourceRequest.ANY.equals(resourceName)) { 23 resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression); 24 } 25 return resourceRequestInfo; 26 }
回到上面的addResourceRequestToAsk()方法,註釋裏也說了會在下次interaction的時候交與resourcemanager。該方法裏作的修改是在TreeSet<ResourceRequest>類型的ask變量中加入這個ResourceRequest。
三、HeartbeatThread的工做
1 private class HeartbeatThread extends Thread { 2 public HeartbeatThread() { 3 super("AMRM Heartbeater thread"); 4 } 5 6 public void run() { 7 while (true) { 8 Object response = null; 9 // synchronization ensures we don't send heartbeats after unregistering 10 synchronized (unregisterHeartbeatLock) { 11 if (!keepRunning) { 12 return; 13 } 14 15 try { 16 response = client.allocate(progress); 17 } catch (ApplicationAttemptNotFoundException e) { 18 handler.onShutdownRequest(); 19 LOG.info("Shutdown requested. Stopping callback."); 20 return; 21 } catch (Throwable ex) { 22 LOG.error("Exception on heartbeat", ex); 23 response = ex; 24 } 25 if (response != null) { 26 while (true) { 27 try { 28 responseQueue.put(response); 29 break; 30 } catch (InterruptedException ex) { 31 LOG.debug("Interrupted while waiting to put on response queue", ex); 32 } 33 } 34 } 35 } 36 try { 37 Thread.sleep(heartbeatIntervalMs.get()); 38 } catch (InterruptedException ex) { 39 LOG.debug("Heartbeater interrupted", ex); 40 } 41 } 42 } 43 } 44
能夠看到主要就是在反覆調用client.allocate()以及處理返回的response.response會放入一個BlockingQueue中,另外一個線程CallbackHandlerThread會從隊列中take。
四、整個過程理清了。。。補充細節
AM -> RM <- NM