這裏的ApplicationMaster只負責管理一個Framework。所以它也被設計成一個micro kernel來鏈接全部的subservice,這些subservice有:app
ZookeeperStore、hdfsStoreide
YarnClient、LauncherClientoop
AMRMClientAsync、NMClientAsyncui
StatusManager、RequestManager
spa
RMResyncHandler、SelectionManager線程
在下面的流程中,主要用到的subService有AMRMClientAsync、NMClientAsync、StatusManager及RequestManager。設計
除了上面的subservice外,ApplicationMaster中還有的成員是:日誌
conf -- 一些配置code
SystemTaskQueue -- 是一個只有一個線程的SchedualThreadPool類型的線程池orm
Map<String, Container> allocatedContainers -- 記錄全部被分到的container
Map<String, > -- 記錄每一個container的超出數???
因爲每一個自定義的ApplicationMaster端都須要實現AMCallbackHandler和NMCallbackHandler,而在AMCallbackHandler中有onContainersAllocated()方法,咱們進入到這個方法中看container分配下來後,AM作了些什麼。
從下面代碼中能夠看到,是把allocateContainers(containers) 這個任務扔給上面說的SystemTaskQueue類型的線程池去執行。固然扔完以後會檢查一下等待隊列的任務是否超出分配值了。
public void onContainersAllocated(List<Container> containers) { if (containers.size() <= 0) { return; } LOGGER.logInfo( "onContainersAllocated: Allocated Containers: %s.", containers.size()); transitionTaskStateQueue.queueSystemTask(() -> { allocateContainers(containers); }); }
接下來看allocateContaienrs(containers)。這是一批containers,那麼用一個for循環來對每個container調用allocateContainer(container)方法。
這其中對container的啓動方式有兩種:一種是一個Task分到一個Container就起一個,另外一種是等全部Task都分配到container了,再一塊兒起。若是是第二種的狀況,那麼在每一個container的working directory下面會有一個文件存放其餘全部Task所在container的IP地址。
Boolean generateContainerIpList 就是判斷是採用上面的哪種狀況。
下面的代碼是具體的流程:
private void allocateContainer(Container container) throws Exception { String containerId = container.getId().toString(); Boolean generateContainerIpList = requestManager.getPlatParams().getGenerateContainerIpList(); LOGGER.logInfo( "[%s]: allocateContainer: Try to Allocate Container to Task: Container: %s", containerId, HadoopExts.toString(container)); // 0. findTask TaskStatus taskStatus = findTask(container); if (taskStatus == null) { LOGGER.logDebug( "[%s]: Cannot find a suitable Task to accept the Allocate Container. It should be exceeded.", containerId); tryToReleaseContainer(containerId); return; } String taskRoleName = taskStatus.getTaskRoleName(); TaskStatusLocator taskLocator = new TaskStatusLocator(taskRoleName, taskStatus.getTaskIndex()); // 1. removeContainerRequest removeContainerRequest(taskStatus); // 2. testContainer if (!testContainer(container)) { LOGGER.logInfo( "%s[%s]: Container is Rejected, Release Container and Request again", taskLocator, containerId); tryToReleaseContainer(containerId); statusManager.transitionTaskState(taskLocator, TaskState.TASK_WAITING); addContainerRequest(taskStatus); return; } // 3. allocateContainer try { statusManager.transitionTaskState(taskLocator, TaskState.CONTAINER_ALLOCATED, new TaskEvent().setContainer(container)); LOGGER.logInfo("%s[%s]: Succeeded to Allocate Container to Task", taskLocator, containerId); if (containerConnectionExceedCount.containsKey(containerId)) { // Pending Exceed Container now is settled to live associated Container containerConnectionExceedCount.remove(containerId); } } catch (Exception e) { LOGGER.logWarning(e, "%s[%s]: Failed to Allocate Container to Task, Release Container and Request again", taskLocator, containerId); tryToReleaseContainer(containerId); statusManager.transitionTaskState(taskLocator, TaskState.TASK_WAITING); addContainerRequest(taskStatus); return; } // 4. launchContainer if (!generateContainerIpList) { launchContainer(taskStatus, container); } else { allocatedContainers.put(containerId, container); int neverBeenAllocatedTaskCount = statusManager.getTaskCount( new HashSet<>(Arrays.asList(TaskState.TASK_WAITING, TaskState.CONTAINER_REQUESTED))); if (neverBeenAllocatedTaskCount == 0) { launchContainersTogether(); } else { LOGGER.logInfo( "Waiting for %s never been CONTAINER_ALLOCATED Tasks to become CONTAINER_ALLOCATED, " + "since GenerateContainerIpList enabled", neverBeenAllocatedTaskCount); } } }
跟着註釋的說明四步走:
0、給這個container找到一個合適的Task,若是找到了,那麼返回一個TaskStatus對象,稍後設置container運行的環境、command、本地資源等都有這個TaskStatus去幫助取得。若是找不到,那麼釋放掉container。
一、刪除這個Task的container請求。
二、檢查container是否可用。(不明白這個順序,爲何不先檢查呢?這裏檢查跟Task看起來沒有關係呀)
三、分配container,這裏StatusManager負責將該Task的狀態標記爲CONTAINER_ALLOCATED,同時new 一個 TaskEvent對象來記錄Task和該Container的一些信息。
四、運行container。分上述兩種狀況:
1) 不用產生ContaienrIpList,那就直接調用launchContainer()方法跑起這個container。
2) 須要產生ContainerIpList,那麼先把這個container記錄到allocatedContainers中,再去看看其餘人狀況怎麼樣了,即查看狀態是TASK_WAITING和CONTAINER_REQUESTED得Task數量。若是數量爲0,那麼launchContaienrsTogether,一塊兒跑。數量不爲0,那麼代碼中只是日誌記錄了一下,不作其餘事情了。
方法的傳入參數是TaskStatus和Container兩個對象。設置好運行環境後經過NMClientAsync啓動container,而後StatusManager將Task狀態變爲CONTAINER_LAUNCHED.
private void launchContainer(TaskStatus taskStatus, Container container) throws Exception { String taskRoleName = taskStatus.getTaskRoleName(); TaskStatusLocator taskLocator = new TaskStatusLocator(taskRoleName, taskStatus.getTaskIndex()); String containerId = container.getId().toString(); assert containerId.equals(taskStatus.getContainerId()); LOGGER.logInfo("%s[%s]: launchContainer", taskLocator, containerId); ContainerLaunchContext launchContext = setupContainerLaunchContext(taskStatus); nmClient.startContainerAsync(container, launchContext); statusManager.transitionTaskState(taskLocator, TaskState.CONTAINER_LAUNCHED); }
設置運行環境ContainerLaunchContext,三樣:command(也就是entryPoint)、localresources和envrioments。
private ContainerLaunchContext setupContainerLaunchContext(TaskStatus taskStatus) throws Exception { String taskRoleName = taskStatus.getTaskRoleName(); Integer taskIndex = taskStatus.getTaskIndex(); Integer serviceVersion = getServiceVersion(taskRoleName); UserDescriptor user = requestManager.getUser(); Boolean generateContainerIpList = requestManager.getPlatParams().getGenerateContainerIpList(); List<String> sourceLocations = requestManager.getTaskServices().get(taskRoleName).getSourceLocations(); String entryPoint = requestManager.getTaskServices().get(taskRoleName).getEntryPoint(); // SetupLocalResources Map<String, LocalResource> localResources = new HashMap<>(); try { for (String location : sourceLocations) { HadoopUtils.addToLocalResources(localResources, location); } } catch (Exception e) { // User is likely to set an invalid SourceLocations, and it contains HDFS OP, // so handle the corresponding Exception ASAP handleException(e); } if (generateContainerIpList) { String location = hdfsStore.getHdfsStruct().getContainerIpListFilePath(conf.getFrameworkName()); HadoopUtils.addToLocalResources(localResources, location); } // SetupLocalEnvironment Map<String, String> localEnvs = new HashMap<>(); localEnvs.put(GlobalConstants.ENV_VAR_HADOOP_USER_NAME, user.getName()); localEnvs.put(GlobalConstants.ENV_VAR_FRAMEWORK_NAME, conf.getFrameworkName()); localEnvs.put(GlobalConstants.ENV_VAR_FRAMEWORK_VERSION, conf.getFrameworkVersion().toString()); localEnvs.put(GlobalConstants.ENV_VAR_TASK_ROLE_NAME, taskRoleName); localEnvs.put(GlobalConstants.ENV_VAR_TASK_INDEX, taskIndex.toString()); localEnvs.put(GlobalConstants.ENV_VAR_SERVICE_VERSION, serviceVersion.toString()); localEnvs.put(GlobalConstants.ENV_VAR_ZK_CONNECT_STRING, conf.getZkConnectString()); localEnvs.put(GlobalConstants.ENV_VAR_ZK_ROOT_DIR, conf.getZkRootDir()); localEnvs.put(GlobalConstants.ENV_VAR_ZK_COMPRESSION_ENABLE, conf.getZkCompressionEnable().toString()); localEnvs.put(GlobalConstants.ENV_VAR_AM_VERSION, conf.getAmVersion().toString()); localEnvs.put(GlobalConstants.ENV_VAR_APP_ID, conf.getApplicationId()); localEnvs.put(GlobalConstants.ENV_VAR_ATTEMPT_ID, conf.getAttemptId()); localEnvs.put(GlobalConstants.ENV_VAR_CONTAINER_GPUS, taskStatus.getContainerGpus().toString()); if (generateContainerIpList) { // Since one machine may have many external IPs, we assigned a specific one to // help the UserService to locate itself in CONTAINER_IP_LIST_FILE localEnvs.put(GlobalConstants.ENV_VAR_CONTAINER_IP, taskStatus.getContainerIp()); } // SetupEntryPoint String command = String.format( "%1$s 1>%2$sstdout 2>%2$sstderr", entryPoint, ApplicationConstants.LOG_DIR_EXPANSION_VAR + File.separator); ContainerLaunchContext launchContext = Records.newRecord(ContainerLaunchContext.class); launchContext.setLocalResources(localResources); launchContext.setCommands(Collections.singletonList(command)); launchContext.setServiceData(new HashMap<>()); launchContext.setEnvironment(localEnvs); return launchContext; }
能夠看到,command、localResources都是用requestManager根據Task的TaskRoleName獲取到的。
並且若是有須要ContaienrIpList的話,還會去hdfs上把這個文件下到本地中。
比上面的多了一步,就是經過StatusManager獲取到全部狀態爲CONTAINER_ALLOCATED的Task,將他們的container IP寫入一個文件,並上傳到hdfs。
隨後,用for循環對每一個Task調用launchContainer()方法。
代碼以下:
private void launchContainersTogether() throws Exception { List<TaskStatus> taskStatuses = statusManager.getTaskStatus( new HashSet<>(Collections.singletonList(TaskState.CONTAINER_ALLOCATED))); Boolean generateContainerIpList = requestManager.getPlatParams().getGenerateContainerIpList(); LOGGER.logInfo("launchContainersTogether: %s Tasks", taskStatuses.size()); if (generateContainerIpList) { StringBuilder fileContent = new StringBuilder(); for (TaskStatus taskStatus : taskStatuses) { fileContent.append(taskStatus.getContainerIp()); fileContent.append("\n"); } CommonUtils.writeFile(GlobalConstants.CONTAINER_IP_LIST_FILE, fileContent.toString()); try { hdfsStore.uploadContainerIpListFile(conf.getFrameworkName()); HadoopUtils.invalidateLocalResourcesCache(); } catch (Exception e) { // It contains HDFS OP, so handle the corresponding Exception ASAP handleException(e); } } for (TaskStatus taskStatus : taskStatuses) { String containerId = taskStatus.getContainerId(); assert allocatedContainers.containsKey(containerId); launchContainer(taskStatus, allocatedContainers.get(taskStatus.getContainerId())); } }
over。感受理解AM重點是要知道里面各個SubService的分工。