Yet Another Resource Negotiator(另外一種資源協調者),是一種新的Hadoop資源管理器,它是一個通用資源管理系統,可爲上層應用提供統一的資源管理和調度。java
Client編程
Configuration yarnConfig = new YarnConfiguration(getConf()); YarnClient client = YarnClient.createYarnClient(); client.init(yarnConfig); client.start();
YarnClientApplication app = client.createApplication(); GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
// 1. 設置應用程序提交上下文基本信息 ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); appContext.setApplicationId(appResponse.getApplicationId()); appContext.setApplicationName(config.getProperty("app.name")); appContext.setApplicationType(config.getProperty("app.type")); appContext.setApplicationTags(new LinkedHashSet<>(Arrays.asList(config.getProperty("app.tags").split(",")))); // queue:默認是default appContext.setQueue(config.getProperty("app.queue")); appContext.setPriority(Priority.newInstance(Integer.parseInt(config.getProperty("app.priority")))); appContext.setResource(Resource.newInstance(Integer.parseInt(config.getProperty("am.memory")), Integer.parseInt(config.getProperty("am.vCores")))); //2. 設置am container啓動上下文 ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); // 3. 設置am localResources Map<String, LocalResource> amLocalResources = new LinkedHashMap<>(); LocalResource drillArchive = Records.newRecord(LocalResource.class); drillArchive.setResource(ConverterUtils.getYarnUrlFromPath(drillArchiveFileStatus.getPath())); drillArchive.setSize(drillArchiveFileStatus.getLen()); drillArchive.setTimestamp(drillArchiveFileStatus.getModificationTime()); drillArchive.setType(LocalResourceType.ARCHIVE); drillArchive.setVisibility(LocalResourceVisibility.PUBLIC); amLocalResources.put(config.getProperty("drill.archive.name"), drillArchive); amContainer.setLocalResources(amLocalResources); // 4. 設置am environment Map<String, String> amEnvironment = new LinkedHashMap<>(); // add Hadoop Classpath for (String classpath : yarnConfig.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { Apps.addToEnvironment(amEnvironment, Environment.CLASSPATH.name(), classpath.trim(), ApplicationConstants.CLASS_PATH_SEPARATOR); } Apps.addToEnvironment(amEnvironment, Environment.CLASSPATH.name(), Environment.PWD.$() + File.separator + "*", ApplicationConstants.CLASS_PATH_SEPARATOR); StringWriter sw = new StringWriter(); config.store(sw, ""); String configBase64Binary = DatatypeConverter.printBase64Binary(sw.toString().getBytes("UTF-8")); Apps.addToEnvironment(amEnvironment, "DRILL_ON_YARN_CONFIG", configBase64Binary, ApplicationConstants.CLASS_PATH_SEPARATOR); amContainer.setEnvironment(amEnvironment); // 5. 設置am command List<String> commands = new ArrayList<>(); commands.add(Environment.SHELL.$$()); commands.add(config.getProperty("drill.archive.name") + "/bin/drill-am.sh"); commands.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + ApplicationConstants.STDOUT); commands.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + ApplicationConstants.STDERR); StringBuilder amCommand = new StringBuilder(); for (String str : commands) { amCommand.append(str).append(" "); } amCommand.setLength(amCommand.length() - " ".length()); amContainer.setCommands(Collections.singletonList(amCommand.toString())); // 6. 設置安全令牌 if (UserGroupInformation.isSecurityEnabled()) { Credentials credentials = new Credentials(); String tokenRenewer = yarnConfig.get(YarnConfiguration.RM_PRINCIPAL); final Token<?> tokens[] = fileSystem.addDelegationTokens(tokenRenewer, credentials); DataOutputBuffer dob = new DataOutputBuffer(); credentials.writeTokenStorageToStream(dob); ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); amContainer.setTokens(fsTokens); } appContext.setAMContainerSpec(amContainer);
client.submitApplication(appContext);
ApplicationMaster(AM)安全
YarnConfiguration yarnConfig = new YarnConfiguration(); AMRMClientAsync amrmClientAsync = AMRMClientAsync.createAMRMClientAsync(5000, new AMRMCallbackHandler()); amrmClientAsync.init(yarnConfig); amrmClientAsync.start();
YarnConfiguration yarnConfig = new YarnConfiguration(); NMClientAsync nmClientAsync = NMClientAsync.createNMClientAsync(new NMCallbackHandler()); nmClientAsync.init(yarnConfig); nmClientAsync.start();
String thisHostName = InetAddress.getLocalHost(); amrmClientAsync.registerApplicationMaster(thisHostName, 0, "");
for (NodeReport containerReport : containerReports) { ContainerRequest containerRequest = new ContainerRequest(capability, new String[] {containerReport.getNodeId().getHost()}, null, priority, false); amrmClientAsync.addContainerRequest(containerRequest); }
private static class AMRMCallbackHandler implements AMRMClientAsync.CallbackHandler { @Override public void onContainersAllocated(List<Container> containers) { for (Container container : containers) { ContainerLaunchContext containerContext = Records.newRecord(ContainerLaunchContext.class); // setEnvironment Map<String, String> containerEnvironment = new LinkedHashMap<>(); // add Hadoop Classpath for (String classpath : yarnConfig.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { Apps.addToEnvironment(containerEnvironment, Environment.CLASSPATH.name(), classpath.trim(), ApplicationConstants.CLASS_PATH_SEPARATOR); } Apps.addToEnvironment(containerEnvironment, Environment.CLASSPATH.name(), Environment.PWD.$() + File.separator + "*", ApplicationConstants.CLASS_PATH_SEPARATOR); containerContext.setEnvironment(containerEnvironment); // setContainerResource Map<String, LocalResource> containerResources = new LinkedHashMap<>(); LocalResource drillArchive = Records.newRecord(LocalResource.class); String drillArchivePath = appConfig.getProperty("fs.upload.dir") + appConfig.getProperty( "drill.archive.name"); Path path = new Path(drillArchivePath); FileStatus fileStatus = FileSystem.get(yarnConfig).getFileStatus(path); drillArchive.setResource(ConverterUtils.getYarnUrlFromPath(fileStatus.getPath())); drillArchive.setSize(fileStatus.getLen()); drillArchive.setTimestamp(fileStatus.getModificationTime()); drillArchive.setType(LocalResourceType.ARCHIVE); drillArchive.setVisibility(LocalResourceVisibility.PUBLIC); containerResources.put(appConfig.getProperty("drill.archive.name"), drillArchive); containerContext.setLocalResources(containerResources); // setContainerCommand List<String> commands = new ArrayList<>(); commands.add(Environment.SHELL.$$()); commands.add(appConfig.getProperty("drill.archive.name") + "/bin/drillbit.sh run"); commands.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + ApplicationConstants.STDOUT); commands.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + ApplicationConstants.STDERR); StringBuilder containerCommand = new StringBuilder(); for (String str : commands) { containerCommand.append(str).append(" "); } containerCommand.setLength(containerCommand.length() - " ".length()); containerContext.setCommands(Collections.singletonList(containerCommand.toString())); nmClientAsync.startContainerAsync(container, containerContext); } } }
amrmClientAsync.unregisterApplicationMaster(appStatus, appMessage, null);