hadoop2.7之做業提交詳解(下)

接着做業提交詳解(上)繼續寫:在上一篇(hadoop2.7之做業提交詳解(上))中已經講到了YARNRunner.submitJob()html

[WordCount.main() -> Job.waitForCompletion() -> Job.submit()  -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob()]sql

那麼如今接着從YARNRunner.submitJob()開始說:shell

先簡單看一下YARNRunner這個類(摘錄一部分):apache

package org.apache.hadoop.mapred;
public class YARNRunner implements ClientProtocol {
  private ResourceMgrDelegate resMgrDelegate; //這是RM派駐在「地方」上的特派員
  private ClientCache clientCache;
  private Configuration conf;
  private final FileContext defaultFileContext;
  
  public YARNRunner(Configuration conf) {//構造函數,須要建立特派員,而後調用下一個構造函數
   this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));
  }

  public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {//須要建立ClientCache
   this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
  }

  public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
      ClientCache clientCache) {//這是最終的構造函數
    this.conf = conf;
    try {
      this.resMgrDelegate = resMgrDelegate;
      this.clientCache = clientCache;
      this.defaultFileContext = FileContext.getFileContext(this.conf);
    } catch (UnsupportedFileSystemException ufe) {
      throw new RuntimeException("Error in instantiating YarnClient", ufe);
    }
  }
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
  
  addHistoryToken(ts);//用於爲歷史記錄服務,與「做業歷史(JobHistory)」有關
  
  // Construct necessary information to start the MR AM
  //構建MR AM的必要啓動信息
  //建立一個ApplicationSubmissionContext,並將conf中的相關信息轉移過去
  ApplicationSubmissionContext appContext =
    createApplicationSubmissionContext(conf, jobSubmitDir, ts);

  // Submit to ResourceManager
  try {
 /* 將做業提交給資源管理者(ResourceManager)*/
//RM受理了所提交的做業之後,會把這個ContainerLaunchContext轉發到某個NM節點
//上,在那裏執行這個shell命令行,另起一個Java虛擬機,讓它執行MRAppMaster.class。
//因而可知,這個ApplicationSubmissionContext對象appContext,真的是「表明着ResourceManager
//爲發起該應用的ApplicationMaster所需的所有信息」
    ApplicationId applicationId =
        resMgrDelegate.submitApplication(appContext);

    ApplicationReport appMaster = resMgrDelegate
        .getApplicationReport(applicationId);
    String diagnostics =
        (appMaster == null ?
            "application report is null" : appMaster.getDiagnostics());
    if (appMaster == null
        || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
        || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
      throw new IOException("Failed to run job : " +
          diagnostics);
    }
    return clientCache.getClient(jobId).getJobStatus(jobId);
  } catch (YarnException e) {
    throw new IOException(e);
  }
}
}

其中createApplicationSubmissionContext方法的做用:
一、設置資源:默認內存爲1536M,cpu的core爲1
二、設置本地資源,好比臨時工做目錄,jar包等
三、設置安全票據tokens
四、設置啓動AM的命令
五、檢查map和reduce的配置信息
六、設置環境CLASSPATH等
七、爲AM的container設置ContainerLaunchContext
八、設置ApplicationSubmissionContext
九、設置MRAppMaster的執行路徑
並把配置塊conf中當前的相關信息、已上傳資料所在的目錄路徑以及有關身份和訪問權限的信息都複製轉移過去。提供了有關ApplicationMaster即「項目組長」該用哪個Shell(例如bash)以及有關某些環境變量的信息。再如做業的名稱等。安全

接下來就是調用ResourceMgrDelegate.submitApplication方法:(因此咱們先看一下ResourceMgrDelegate這個類)bash

public class ResourceMgrDelegate extends YarnClient {   
  private YarnConfiguration conf;
  private ApplicationSubmissionContext application;
  private ApplicationId applicationId;
  protected YarnClient client;//其實是YarnClientImpl類的對象,那也是對YarnClient的繼承和擴展
  private Text rmDTService;
  //這是ResourceMgrDelegate的構造方法
  public ResourceMgrDelegate(YarnConfiguration conf) {
    super(ResourceMgrDelegate.class.getName());
    this.conf = conf;
  //建立YarnClient對象client
  //YarnClient.createYarnClient()建立的是YarnClientImpl
    this.client = YarnClient.createYarnClient();
    init(conf);//這是由AbstractService類提供的,YarnClient是對AbstractService的擴展
    start();//這也是由AbstractService類提供的
  }
public ApplicationId
      submitApplication(ApplicationSubmissionContext appContext)
          throws YarnException, IOException {
    return client.submitApplication(appContext);//調用YarnClientImpl.submitApplication方法
  }

從前面全部的代碼中咱們能夠得知:網絡

ResourceMgrDelegate對象是在YARNRunner的構造函數中建立的。而YARNRunner,則是在前面的Cluster.Initialize()中建立的。再往上追溯,則Cluster類對象是在首次調用connect()時建立的。因此,任何一個節點,只要曾經調用過connect(),即曾經與「集羣」有過鏈接,節點上就會有個Cluster類對象,從而就會有個YARNRunner對象,也就會有個ResourceMgrDelegate對象,並且以下所述就會有個YarnClientImpl對象。app

如今爲止,咱們的做業提交路徑是:框架

[WordCount.main() -> Job.waitForCompletion() -> Job.submit()  -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob() -> ResourceMgrDelegate.submitApplication() -> YarnClientImpl.submitApplication()]async

解下來咱們繼續看YarnClientImpl.submitApplication()方法:

public ApplicationId
      submitApplication(ApplicationSubmissionContext appContext)
          throws YarnException, IOException {
    ApplicationId applicationId = appContext.getApplicationId();
    if (applicationId == null) {
      throw new ApplicationIdNotProvidedException(
          "ApplicationId is not provided in ApplicationSubmissionContext");
    }
    //建立一個SubmitApplicationRequestPBImpl類的記錄塊
    SubmitApplicationRequest request =
        Records.newRecord(SubmitApplicationRequest.class);
    request.setApplicationSubmissionContext(appContext);//設置好記錄塊中的Context
    // Automatically add the timeline DT into the CLC
    // Only when the security and the timeline service are both enabled
    if (isSecurityEnabled() && timelineServiceEnabled) {
      addTimelineDelegationToken(appContext.getAMContainerSpec());
    }

    //TODO: YARN-1763:Handle RM failovers during the submitApplication call.
    rmClient.submitApplication(request);//實際的跨節點提交

    int pollCount = 0;
    long startTime = System.currentTimeMillis();
    EnumSet<YarnApplicationState> waitingStates = 
                                 EnumSet.of(YarnApplicationState.NEW,
                                 YarnApplicationState.NEW_SAVING,
                                 YarnApplicationState.SUBMITTED);
    EnumSet<YarnApplicationState> failToSubmitStates = 
                                  EnumSet.of(YarnApplicationState.FAILED,
                                  YarnApplicationState.KILLED);        
    while (true) {
      try {
    //獲取來自RM節點的應用狀態報告,從中獲取本應用的當前狀態
        ApplicationReport appReport = getApplicationReport(applicationId);
        YarnApplicationState state = appReport.getYarnApplicationState();
        if (!waitingStates.contains(state)) {
          if(failToSubmitStates.contains(state)) {
            throw new YarnException("Failed to submit " + applicationId + 
                " to YARN : " + appReport.getDiagnostics());
          }
          LOG.info("Submitted application " + applicationId);
          break;//做業已進入運行階段,結束while循環
        }

        long elapsedMillis = System.currentTimeMillis() - startTime;
        if (enforceAsyncAPITimeout() &&
            elapsedMillis >= asyncApiPollTimeoutMillis) {
          throw new YarnException("Timed out while waiting for application " +
              applicationId + " to be submitted successfully");
        }

        // Notify the client through the log every 10 poll, in case the client
        // is blocked here too long.
        if (++pollCount % 10 == 0) {
          LOG.info("Application submission is not finished, " +
              "submitted application " + applicationId +
              " is still in " + state);
        }
        try {
          Thread.sleep(submitPollIntervalMillis);
        } catch (InterruptedException ie) {
          String msg = "Interrupted while waiting for application "
              + applicationId + " to be successfully submitted.";
          LOG.error(msg);
          throw new YarnException(msg, ie);
        }
      } catch (ApplicationNotFoundException ex) {
        // FailOver or RM restart happens before RMStateStore saves
        // ApplicationState
        LOG.info("Re-submit application " + applicationId + "with the " +
            "same ApplicationSubmissionContext");
        rmClient.submitApplication(request);//失敗後的再次提交
      }
    }

    return applicationId;
  }

從上看來只要是調用了rmClient.submitApplication(request)方法,那這兒rmClient又是個什麼呢?咱們接着來看一下YarnClientImpl這個類的簡單定義:

public class YarnClientImpl extends YarnClient {

  private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);

  protected ApplicationClientProtocol rmClient;
  protected long submitPollIntervalMillis;
  private long asyncApiPollIntervalMillis;
  private long asyncApiPollTimeoutMillis;
  protected AHSClient historyClient;
  private boolean historyServiceEnabled;
  protected TimelineClient timelineClient;
  @VisibleForTesting
  Text timelineService;
  @VisibleForTesting
  String timelineDTRenewer;
  protected boolean timelineServiceEnabled;
  protected boolean timelineServiceBestEffort;

  private static final String ROOT = "root";

  public YarnClientImpl() {
    super(YarnClientImpl.class.getName());
  }

從上能夠看出rmClient是一個ApplicationClientProtocol對象,這個又是一個接口,具體的實現類是ApplicationClientProtocolPBClientImpl ,接下來咱們看一下這個類:

public class ApplicationClientProtocolPBClientImpl implements ApplicationClientProtocol,
    Closeable {

  private ApplicationClientProtocolPB proxy;

  public ApplicationClientProtocolPBClientImpl(long clientVersion,
      InetSocketAddress addr, Configuration conf) throws IOException {
//將配置項「rpc.engine.ApplicationClientProtocolPB」設置成ProtobufRpcEngine 
    RPC.setProtocolEngine(conf, ApplicationClientProtocolPB.class,
      ProtobufRpcEngine.class);
//建立proxy
//這個proxy存在於用戶爲提交運行具體應用而起的那個JVM上,它既不屬於
//ResourceManager,也不屬於NodeManager,而是一個獨立的Java虛擬機,能夠是在集羣//內的任何一臺機器上
    proxy = RPC.getProxy(ApplicationClientProtocolPB.class, clientVersion, addr, conf);
  }
  
  public SubmitApplicationResponse submitApplication(
    SubmitApplicationRequest request) throws YarnException,
    IOException {
//從請求request中取出其協議報文(message)部分 
  SubmitApplicationRequestProto requestProto =
      ((SubmitApplicationRequestPBImpl) request).getProto();
  try {
//交由proxy將報文發送出去,並等候服務端迴應 
//將服務端迴應包裝成SubmitApplicationResponsePBImpl對象 
    return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null, requestProto));
  } catch (ServiceException e) {
    RPCUtil.unwrapAndThrowException(e);
    return null;
  }
}
 }

ApplicationClientProtocolPBClientImpl的submitApplication方法,在其裏面就是調用proxy.submitApplication方法,而proxy是在構造函數中建立的。

  經過proxy發出的SubmitApplicationRequest,是以RM節點爲目標的,最終經由操做系統提供的網絡傳輸層以TCP報文的方式送達RM所在節點機上的對等層,那上面是
ProtoBuf,它會從TCP報文中還原出對端所發送的對象。再往上,那就是一樣也實現了ApplicationClientProtocolPB界面的ApplicationClientProtocolPBServiceImpl,ProtoBuf這一
層會根據對方請求直接就調用其submitApplication()。這樣,Client一側對於ApplicationClientProtocolPBClientImpl所提供函數的調用就轉化成Server一側對於applicationClientProtocolPBServiceImpl所提供的對應函數的調用。固然,Server一側函數調用的返回值也會轉化成Client一側的返回值,這就實現了遠程過程調用RPC。不言而喻,Client/Server雙方的這兩個對象必須提供對同一個界面的實現,在這裏就是ApplicationClientProtocolPB。

Client端
YARNRunner.submitJob() //這是處於頂層的應用層
ResourceMgrDelegate.submitApplication() //這是RM的代理
YarnClientImpl.submitApplication() //YARN框架的Client一側
ApplicationClientProtocolPBClientImpl.submitApplication()//ApplicationClientProtocol界面
proxy.submitApplication() //ApplicationClientProtocolPB界面
Protocol內部實現的submitApplication() //在TCP/IP的基礎上發送應用層的請求
Socket和TCP/IP //這是網絡鏈接的最低層

 

Server端:
Server這一邊就不一樣了。在Server這一邊,結構的層次和函數調用的層次是相反的,結構上處於最底層的Socket和TCP/IP反倒處於函數調用棧的最高層,愈往下調用實質上就愈往結構上的高層走。這是由於TCP/IP報文最初到達的是底層,而後逐層往上遞交的過程通常都是經過函數調用實現的,因此層層往下調用的過程反倒變成了層層往上遞交的過程。

那麼接下來就是經過tcp/ip調用服務端ApplicationClientProtocolPBServiceImpl.submitApplication()方法;

public class ApplicationClientProtocolPBServiceImpl implements ApplicationClientProtocolPB {

  private ApplicationClientProtocol real;
  
  public ApplicationClientProtocolPBServiceImpl(ApplicationClientProtocol impl) {
    this.real = impl;
  }
  public SubmitApplicationResponseProto submitApplication(RpcController arg0,
    SubmitApplicationRequestProto proto) throws ServiceException {
  SubmitApplicationRequestPBImpl request = new SubmitApplicationRequestPBImpl(proto);//建立一個請求
  try {
    SubmitApplicationResponse response = real.submitApplication(request); 
//real爲ClientRMService類對象 ,該對象在RM初始化時由createClientRMService() 方法建立
    return ((SubmitApplicationResponsePBImpl)response).getProto();
  } catch (YarnException e) {
    throw new ServiceException(e);
  } catch (IOException e) {
    throw new ServiceException(e);
  }
}
}

接下來調用ClientRMService.submitApplication(request); 方法

public SubmitApplicationResponse submitApplication(
    SubmitApplicationRequest request) throws YarnException {
  ApplicationSubmissionContext submissionContext = request
      .getApplicationSubmissionContext();
  ApplicationId applicationId = submissionContext.getApplicationId();

  // ApplicationSubmissionContext needs to be validated for safety - only
  // those fields that are independent of the RM's configuration will be
  // checked here, those that are dependent on RM configuration are validated
  // in RMAppManager.

  String user = null;
  try {
    // Safety
    user = UserGroupInformation.getCurrentUser().getShortUserName();//獲取用戶
  } catch (IOException ie) {
    LOG.warn("Unable to get the current user.", ie);
    RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
        ie.getMessage(), "ClientRMService",
        "Exception in submitting application", applicationId);
    throw RPCUtil.getRemoteException(ie);
  }

  // Check whether app has already been put into rmContext,
  // If it is, simply return the response
//判斷做業是否已經存在,若是是則直接返回實例
  if (rmContext.getRMApps().get(applicationId) != null) {
    LOG.info("This is an earlier submitted application: " + applicationId);
    return SubmitApplicationResponse.newInstance();
  }
  //若是沒有設置隊列,則使用默認隊列
   if (submissionContext.getQueue() == null) { 
    submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
  }
  //若是沒有設置application名字,則使用默認的命名規則
  if (submissionContext.getApplicationName() == null) {
    submissionContext.setApplicationName(
        YarnConfiguration.DEFAULT_APPLICATION_NAME);
  }
  //若是沒有指定提交類型,則指定默認爲yarn模式
  if (submissionContext.getApplicationType() == null) {
    submissionContext
      .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);
  } else {
    if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {
      submissionContext.setApplicationType(submissionContext
        .getApplicationType().substring(0,
          YarnConfiguration.APPLICATION_TYPE_LENGTH));
    }
  }

  try {
    // call RMAppManager to submit application directly
    rmAppManager.submitApplication(submissionContext,
        System.currentTimeMillis(), user);//提交做業到rmAppManager手中

    LOG.info("Application with id " + applicationId.getId() + 
        " submitted by user " + user);
    RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
        "ClientRMService", applicationId);
  } catch (YarnException e) {
    LOG.info("Exception in submitting application with id " +
        applicationId.getId(), e);
    RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
        e.getMessage(), "ClientRMService",
        "Exception in submitting application", applicationId);
    throw e;
  }

  SubmitApplicationResponse response = recordFactory
      .newRecordInstance(SubmitApplicationResponse.class);
  return response;
}

從做業提交的角度看,一旦進入了 RM 節點上的RMAppManagers. ubmitApplication(),做業的提交就已完成。 至於這之後的處理,那是 RM的事了,做業提交的最終流程就是:

[WordCount.main() -> Job.waitForCompletion() -> Job.submit()  -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob() -> ResourceMgrDelegate.submitApplication() -> YarnClientImpl.submitApplication() -> ApplicationClientProtocolPBClientImpl.submitApplication() -> ApplicationClientProtocolPBServiceImpl.submitApplication() -> ClientRMService.submitApplication() -> RMAppManager.submitApplication() ]

相關文章
相關標籤/搜索