YARN

1. 什麼是YARN

Yet Another Resource Negotiator(另外一種資源協調者),是一種新的Hadoop資源管理器,它是一個通用資源管理系統,可爲上層應用提供統一的資源管理和調度java

2. YARN架構

  1. ResurceManager(RM):一個純粹的調度器,專門負責集羣中可用資源的分配和管理。
  2. Container :分配給具體應用的資源抽象表現形式,包括內存、cpu、disk
  3. NodeManager(NM) :負責節點本地資源的管理,包括啓動應用程序的Container,監控它們的資源使用狀況,並報告給RM
  4. App Master (ApplicationMaster(AM)):特定框架庫的一個實例,負責有RM協商資源,並和NM協調工做來執行和監控Container以及它們的資源消耗。AM也是以一個的Container身份運行。
  5. 客戶端(Client):是集羣中一個能向RM提交應用的實例,而且指定了執行應用所須要的AM類型

YARN架構

MR Client和Drill Client

3. 如何編寫YARN應用程序

  1. Client編程

    • 初始化並啓動一個YarnClient
    Configuration yarnConfig = new YarnConfiguration(getConf());
    YarnClient client = YarnClient.createYarnClient();
    client.init(yarnConfig);
    client.start();
    • 建立一個應用程序
    YarnClientApplication app = client.createApplication();
    GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
    • 設置應用程序提交上下文
    // 1. 設置應用程序提交上下文基本信息
    ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
    appContext.setApplicationId(appResponse.getApplicationId());
    appContext.setApplicationName(config.getProperty("app.name"));
    appContext.setApplicationType(config.getProperty("app.type"));
    appContext.setApplicationTags(new LinkedHashSet<>(Arrays.asList(config.getProperty("app.tags").split(","))));
    // queue:默認是default
    appContext.setQueue(config.getProperty("app.queue"));
    appContext.setPriority(Priority.newInstance(Integer.parseInt(config.getProperty("app.priority"))));
    appContext.setResource(Resource.newInstance(Integer.parseInt(config.getProperty("am.memory")),
        Integer.parseInt(config.getProperty("am.vCores"))));
    
    //2. 設置am container啓動上下文 
    ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
    
    // 3. 設置am localResources
    Map<String, LocalResource> amLocalResources = new LinkedHashMap<>();
    LocalResource drillArchive = Records.newRecord(LocalResource.class);
    drillArchive.setResource(ConverterUtils.getYarnUrlFromPath(drillArchiveFileStatus.getPath()));
    drillArchive.setSize(drillArchiveFileStatus.getLen());
    drillArchive.setTimestamp(drillArchiveFileStatus.getModificationTime());
    drillArchive.setType(LocalResourceType.ARCHIVE);
    drillArchive.setVisibility(LocalResourceVisibility.PUBLIC);
    amLocalResources.put(config.getProperty("drill.archive.name"), drillArchive);
    amContainer.setLocalResources(amLocalResources);
    
    // 4. 設置am environment
     Map<String, String> amEnvironment = new LinkedHashMap<>();
    // add Hadoop Classpath
    for (String classpath : yarnConfig.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
        YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
        Apps.addToEnvironment(amEnvironment, Environment.CLASSPATH.name(),
            classpath.trim(), ApplicationConstants.CLASS_PATH_SEPARATOR);
    }
    Apps.addToEnvironment(amEnvironment, Environment.CLASSPATH.name(),
        Environment.PWD.$() + File.separator + "*", ApplicationConstants.CLASS_PATH_SEPARATOR);
    StringWriter sw = new StringWriter();
    config.store(sw, "");
    String configBase64Binary = DatatypeConverter.printBase64Binary(sw.toString().getBytes("UTF-8"));
    Apps.addToEnvironment(amEnvironment, "DRILL_ON_YARN_CONFIG", configBase64Binary,
        ApplicationConstants.CLASS_PATH_SEPARATOR);
    amContainer.setEnvironment(amEnvironment);
    
    // 5. 設置am command
    
    List<String> commands = new ArrayList<>();
    commands.add(Environment.SHELL.$$());
    commands.add(config.getProperty("drill.archive.name") + "/bin/drill-am.sh");
    commands.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + ApplicationConstants.STDOUT);
    commands.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + ApplicationConstants.STDERR);
    StringBuilder amCommand = new StringBuilder();
    for (String str : commands) {
        amCommand.append(str).append(" ");
    }
    amCommand.setLength(amCommand.length() - " ".length());
    amContainer.setCommands(Collections.singletonList(amCommand.toString()));
    
    // 6. 設置安全令牌
    if (UserGroupInformation.isSecurityEnabled()) {
        Credentials credentials = new Credentials();
        String tokenRenewer = yarnConfig.get(YarnConfiguration.RM_PRINCIPAL);
        final Token<?> tokens[] = fileSystem.addDelegationTokens(tokenRenewer, credentials);
        DataOutputBuffer dob = new DataOutputBuffer();
        credentials.writeTokenStorageToStream(dob);
        ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
        amContainer.setTokens(fsTokens);
    }
    
    appContext.setAMContainerSpec(amContainer);
    • 提交應用程序
    client.submitApplication(appContext);
  2. ApplicationMaster(AM)安全

    1. 初始化AMRMClientAsync
    YarnConfiguration yarnConfig = new YarnConfiguration();
    AMRMClientAsync amrmClientAsync = AMRMClientAsync.createAMRMClientAsync(5000, new AMRMCallbackHandler());
    amrmClientAsync.init(yarnConfig);
    amrmClientAsync.start();
    1. 初始化NMClientAsync
    YarnConfiguration yarnConfig = new YarnConfiguration();
    NMClientAsync nmClientAsync = NMClientAsync.createNMClientAsync(new NMCallbackHandler());
    nmClientAsync.init(yarnConfig);
    nmClientAsync.start();
    1. 註冊ApplicationMaster(AM)
    String thisHostName = InetAddress.getLocalHost();
    amrmClientAsync.registerApplicationMaster(thisHostName, 0, "");
    1. 添加ContainerRequest
    for (NodeReport containerReport : containerReports) {
        ContainerRequest containerRequest = new ContainerRequest(capability,
            new String[] {containerReport.getNodeId().getHost()},
            null, priority, false);
        amrmClientAsync.addContainerRequest(containerRequest);
    }
    1. 啓動容器
    private static class AMRMCallbackHandler implements AMRMClientAsync.CallbackHandler {
        @Override
        public void onContainersAllocated(List<Container> containers) {
            for (Container container : containers) {
                ContainerLaunchContext containerContext = Records.newRecord(ContainerLaunchContext.class);
    
                // setEnvironment
                Map<String, String> containerEnvironment = new LinkedHashMap<>();
                // add Hadoop Classpath
                for (String classpath : yarnConfig.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
                    YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
                    Apps.addToEnvironment(containerEnvironment, Environment.CLASSPATH.name(),
                        classpath.trim(), ApplicationConstants.CLASS_PATH_SEPARATOR);
                }
                Apps.addToEnvironment(containerEnvironment, Environment.CLASSPATH.name(),
                    Environment.PWD.$() + File.separator + "*", ApplicationConstants.CLASS_PATH_SEPARATOR);
                containerContext.setEnvironment(containerEnvironment);
    
                // setContainerResource
                Map<String, LocalResource> containerResources = new LinkedHashMap<>();
                LocalResource drillArchive = Records.newRecord(LocalResource.class);
                String drillArchivePath = appConfig.getProperty("fs.upload.dir") + appConfig.getProperty(
                    "drill.archive.name");
                Path path = new Path(drillArchivePath);
                FileStatus fileStatus = FileSystem.get(yarnConfig).getFileStatus(path);
                drillArchive.setResource(ConverterUtils.getYarnUrlFromPath(fileStatus.getPath()));
                drillArchive.setSize(fileStatus.getLen());
                drillArchive.setTimestamp(fileStatus.getModificationTime());
                drillArchive.setType(LocalResourceType.ARCHIVE);
                drillArchive.setVisibility(LocalResourceVisibility.PUBLIC);
                containerResources.put(appConfig.getProperty("drill.archive.name"), drillArchive);
                containerContext.setLocalResources(containerResources);
    
                // setContainerCommand
                List<String> commands = new ArrayList<>();
                commands.add(Environment.SHELL.$$());
                commands.add(appConfig.getProperty("drill.archive.name") + "/bin/drillbit.sh run");
                commands.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + ApplicationConstants.STDOUT);
                commands.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + ApplicationConstants.STDERR);
                StringBuilder containerCommand = new StringBuilder();
                for (String str : commands) {
                    containerCommand.append(str).append(" ");
                }
                containerCommand.setLength(containerCommand.length() - " ".length());
                containerContext.setCommands(Collections.singletonList(containerCommand.toString()));
    
                nmClientAsync.startContainerAsync(container, containerContext);
            }
    
        }
    }
    1. unregisterApplicationMaster(AM)
    amrmClientAsync.unregisterApplicationMaster(appStatus, appMessage, null);

客戶端發給ResourceManager的資源請求示例

ApplicationMaster和NodeManager的交互

5. 爲何會有YARN

  1. 可擴展性:實踐證實在MRv1中要將JobTracker擴展至4000個節點規模是極度困難,由於JobTarcker承擔太多的職責,包括資源的調度,任務的跟蹤和監控,當節點規模愈來愈大時,JobTracker編程愈來愈不堪重負。而在YARN中,JobTarcker被拆分爲了RM和AM,職責更清晰,每一個組件承擔的職責變少,更加的輕量級。
  2. 對編程模型多樣性的支持:因爲MRv1中JobTarcker和TaskTarckerde的設計和MR框架耦合驗證,致使MRv1僅僅支持mapreduce計算框架,對並行計算、流式計算、內存計算不支持。而在YARN中具體的框架由AM負責,是應用程序本身控制,YARN提供了統一的資源調度。
  3. 框架升級更容易:在MRv1中若要對MR框架升級,則須要重啓整個Hadoop集羣,風險很大。而在YARN中,具體的框架由AM負責,是應用程序本身控制,因此只需升級用戶程序庫便可。
  4. 集羣資源利用率:MRv1引入了「slot」概念表示各個節點上的計算資源,將各個節點上的資源(CPU、內存和磁盤等)等量切分紅若干份,每一份用一個slot表示,同時規定一個task可根據實際須要佔用多個slot,slot又被分爲Mapslot和Reduceslot兩種,但不容許共享。對於一個做業,剛開始運行時,Map slot資源緊缺而Reduce slot空閒,當Map Task所有運行完成後,Reduce slot緊缺而Map slot空閒,很明顯,下降了資源的利用率。而在YARN中,資源以Container形式表現,包含了內存、cpu等,相比MRv1,資源抽象粒度更細,其次,經過RM的Scheduler(FIFO、Capacity、Fair),保障資源的彈性。

6. 參考資料

  1. 《HadoopYARN權限指南》
  2. 《Hadoop技術內幕 深刻解析YARN架構設計與實現原理》
相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息