yarn Capacity Schedulery的實現(2)--AM提交資源申請的過程

       總結了AM起來後註冊、申請資源的過程。客戶端實現的ApplicationMaster使用AMRMClientAsync交互,AMRMClientAsync使用AMRMClient與RM交互,AMRMClient中使用ApplicationMasterProtocol與RM交互。java

一、AM運行起來後,須要向ResourceManager註冊。node

private void init() { AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); amRMClient.init(conf); amRMClient.start(); containerListener = new NMCallbackHandler(this); nmClient = new NMClientAsyncImpl(containerListener); nmClient.init(conf); nmClient.start(); } private void run() throws YarnException, IOException { RegisterApplicationMasterResponse response = amRMClient.registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl); int maxMem = response.getMaximumResourceCapability().getMemory(); int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); containerMem = Math.min(maxMem,containerMem); containerVCores = Math.min(maxVCores, containerVCores); List<Container> previousAMRunningContainers = response.getContainersFromPreviousAttempts(); numAllocatedContainers.addAndGet(previousAMRunningContainers.size()); int numTotalContainersToRequest = numTotalContainers - previousAMRunningContainers.size(); for (int i = 0; i < numTotalContainersToRequest; ++i) { Priority pri = Priority.newInstance(requestPriority); Resource capability = Resource.newInstance(containerMem, containerVCores); ContainerRequest request = new ContainerRequest(capability, null, null, pri); amRMClient.addContainerRequest(request); } numRequestedContainers.set(numTotalContainers); }
View Code

AM這邊使用到的類是AMRMClientAsync,實現類是../hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java。express

在上面的run()方法中,AM先向RM註冊了。註冊的實現以下。註冊以後hearbeatThread開始啓動,開始會按期向Resourcemanager報告心跳。api

/** * Registers this application master with the resource manager. On successful * registration, starts the heartbeating thread. * @throws YarnException * @throws IOException */
  public RegisterApplicationMasterResponse registerApplicationMaster( String appHostName, int appHostPort, String appTrackingUrl) throws YarnException, IOException { RegisterApplicationMasterResponse response = client .registerApplicationMaster(appHostName, appHostPort, appTrackingUrl); heartbeatThread.start(); return response; }
View Code

這裏面client是類AMRMClientImpl的實例。後面addContainerRequest()的實現以及報告心跳的allocate()方法也是在這個類裏面實現。app

 

二、申請資源,調用addContainerRequest()。async

 1 @Override  2   public synchronized void addContainerRequest(T req) {  3     LOG.info("***enter addContainerRequest***");  4     Preconditions.checkArgument(req != null,  5         "Resource request can not be null.");  6     Set<String> dedupedRacks = new HashSet<String>();  7     if (req.getRacks() != null) {  8  dedupedRacks.addAll(req.getRacks());  9       if(req.getRacks().size() != dedupedRacks.size()) { 10         Joiner joiner = Joiner.on(','); 11         LOG.warn("ContainerRequest has duplicate racks: "
12             + joiner.join(req.getRacks())); 13  } 14  } 15     Set<String> inferredRacks = resolveRacks(req.getNodes()); 16  inferredRacks.removeAll(dedupedRacks); 17 
18     // check that specific and non-specific requests cannot be mixed within a 19     // priority
20  checkLocalityRelaxationConflict(req.getAllocationRequestId(), 21  req.getPriority(), ANY_LIST, req.getRelaxLocality()); 22     // check that specific rack cannot be mixed with specific node within a 23     // priority. If node and its rack are both specified then they must be 24     // in the same request. 25     // For explicitly requested racks, we set locality relaxation to true
26  checkLocalityRelaxationConflict(req.getAllocationRequestId(), 27         req.getPriority(), dedupedRacks, true); 28  checkLocalityRelaxationConflict(req.getAllocationRequestId(), 29  req.getPriority(), inferredRacks, req.getRelaxLocality()); 30     // check if the node label expression specified is valid
31  checkNodeLabelExpression(req); 32 
33     if (req.getNodes() != null) { 34       HashSet<String> dedupedNodes = new HashSet<String>(req.getNodes()); 35       if(dedupedNodes.size() != req.getNodes().size()) { 36         Joiner joiner = Joiner.on(','); 37         LOG.warn("ContainerRequest has duplicate nodes: "
38             + joiner.join(req.getNodes())); 39  } 40       for (String node : dedupedNodes) { 41  addResourceRequest(req.getPriority(), node, 42             req.getExecutionTypeRequest(), req.getCapability(), req, true, 43  req.getNodeLabelExpression()); 44  } 45  } 46 
47     for (String rack : dedupedRacks) { 48  addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(), 49           req.getCapability(), req, true, req.getNodeLabelExpression()); 50  } 51 
52     // Ensure node requests are accompanied by requests for 53     // corresponding rack
54     for (String rack : inferredRacks) { 55  addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(), 56  req.getCapability(), req, req.getRelaxLocality(), 57  req.getNodeLabelExpression()); 58  } 59     // Off-switch
60  addResourceRequest(req.getPriority(), ResourceRequest.ANY, 61  req.getExecutionTypeRequest(), req.getCapability(), req, 62  req.getRelaxLocality(), req.getNodeLabelExpression()); 63   }
View Code

這個方法的一開始加入了打印,可是不知爲什麼在ResourceManager的log中沒有看到這句話被打印出來。ide

方法的前面是與relax locality的機架、節點選擇,後面調用addResourceRequest()方法申請資源(不知道爲何會調用這麼屢次??)。oop

addResourceRequest()方法:this

 1 private void addResourceRequest(Priority priority, String resourceName,  2  ExecutionTypeRequest execTypeReq, Resource capability, T req,  3       boolean relaxLocality, String labelExpression) {  4     RemoteRequestsTable<T> remoteRequestsTable =
 5  getTable(req.getAllocationRequestId());  6     if (remoteRequestsTable == null) {  7       remoteRequestsTable = new RemoteRequestsTable<T>();  8  putTable(req.getAllocationRequestId(), remoteRequestsTable);  9  } 10     @SuppressWarnings("unchecked") 11     ResourceRequestInfo resourceRequestInfo = remoteRequestsTable 12  .addResourceRequest(req.getAllocationRequestId(), priority, 13  resourceName, execTypeReq, capability, req, relaxLocality, 14  labelExpression); 15     // Note this down for next interaction with ResourceManager
16  addResourceRequestToAsk(resourceRequestInfo.remoteRequest); 17 
18     if (LOG.isDebugEnabled()) { 19       LOG.debug("addResourceRequest:" + " applicationId="
20           + " priority=" + priority.getPriority() 21           + " resourceName=" + resourceName + " numContainers="
22           + resourceRequestInfo.remoteRequest.getNumContainers() 23           + "remoteRequest=" + resourceRequestInfo.remoteRequest 24         + " #asks=" + ask.size() + " capacity=" + capability); 25  } 26   }
View Code

資源的表示ResourceRequestInfo有如下元素:spa

ResourceRequest remoteRequest;
LinkedHashSet<T> containerRequests;

其中remoteRequest對象描述了container的具體信息,參考註釋

 ResourceRequest represents the request made by an application to the ResourceManager to obtain various Container allocations.

包括瞭如下內容:
* Priority :請求的優先級
* name: 指按期望被分配到的host或rack的名字,值爲ANY代表無要求
* Resource:每一個請求須要的資源量
* Number of containers:上述要求下的container數量
* relaxLocality:指定是否能夠relax locality

(這邊能夠對應上董西城的《Hadoop技術內幕》p177中的ResourceRequestProto的描述)

 

remoteRequestTable的addResourcRequest()的實現中有負責修改了NumContainer.這裏判斷是根據resourceInfo來區分container的,若是是priority、resourceName、executionType及capability都同樣的的container會有同一個resourceReqeustInfo,所以當增長的container請求與以前的一致時,會將NumContainers+1便可。

 1 ResourceRequestInfo addResourceRequest(Long allocationRequestId,  2  Priority priority, String resourceName, ExecutionTypeRequest execTypeReq,  3       Resource capability, T req, boolean relaxLocality,  4  String labelExpression) {  5     ResourceRequestInfo resourceRequestInfo = get(priority, resourceName,  6  execTypeReq.getExecutionType(), capability);  7     if (resourceRequestInfo == null) {  8       resourceRequestInfo =
 9           new ResourceRequestInfo(allocationRequestId, priority, resourceName, 10  capability, relaxLocality); 11  put(priority, resourceName, execTypeReq.getExecutionType(), capability, 12  resourceRequestInfo); 13  } 14  resourceRequestInfo.remoteRequest.setExecutionTypeRequest(execTypeReq); 15  resourceRequestInfo.remoteRequest.setNumContainers( 16         resourceRequestInfo.remoteRequest.getNumContainers() + 1); 17 
18     if (relaxLocality) { 19  resourceRequestInfo.containerRequests.add(req); 20  } 21 
22     if (ResourceRequest.ANY.equals(resourceName)) { 23  resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression); 24  } 25     return resourceRequestInfo; 26   }
View Code

回到上面的addResourceRequestToAsk()方法,註釋裏也說了會在下次interaction的時候交與resourcemanager。該方法裏作的修改是在TreeSet<ResourceRequest>類型的ask變量中加入這個ResourceRequest。

 

三、HeartbeatThread的工做

 1 private class HeartbeatThread extends Thread {  2     public HeartbeatThread() {  3       super("AMRM Heartbeater thread");  4  }  5     
 6     public void run() {  7       while (true) {  8         Object response = null;  9         // synchronization ensures we don't send heartbeats after unregistering
10         synchronized (unregisterHeartbeatLock) { 11           if (!keepRunning) { 12             return; 13  } 14 
15           try { 16             response = client.allocate(progress); 17           } catch (ApplicationAttemptNotFoundException e) { 18  handler.onShutdownRequest(); 19             LOG.info("Shutdown requested. Stopping callback."); 20             return; 21           } catch (Throwable ex) { 22             LOG.error("Exception on heartbeat", ex); 23             response = ex; 24  } 25           if (response != null) { 26             while (true) { 27               try { 28  responseQueue.put(response); 29                 break; 30               } catch (InterruptedException ex) { 31                 LOG.debug("Interrupted while waiting to put on response queue", ex); 32  } 33  } 34  } 35  } 36         try { 37  Thread.sleep(heartbeatIntervalMs.get()); 38         } catch (InterruptedException ex) { 39           LOG.debug("Heartbeater interrupted", ex); 40  } 41  } 42  } 43  } 44
View Code

能夠看到主要就是在反覆調用client.allocate()以及處理返回的response.response會放入一個BlockingQueue中,另外一個線程CallbackHandlerThread會從隊列中take。

 

四、整個過程理清了。。。補充細節

 AM -> RM <- NM

相關文章
相關標籤/搜索