hadoop2.7之做業提交詳解(上)

根據wordcount進行分析:html

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @author: LUGH1 * @date: 2019-4-8 * @description: */
public class WordCount { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://192.168.88.130:9000"); Job job = Job.getInstance(conf); job.setJarByClass(WordCount.class); job.setMapperClass(WdMapper.class); job.setReducerClass(WdReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("/test/word.txt")); FileOutputFormat.setOutputPath(job, new Path("/test/output")); boolean result = job.waitForCompletion(true); System.exit(result?0:1); System.out.println("good job"); } } class WdMapper extends Mapper<Object, Text, Text, IntWritable> { @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split(" "); for(String word : split){ context.write(new Text(word), new IntWritable(1)); } } } class WdReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for(IntWritable i : values){ count += i.get(); } context.write(key,new IntWritable(count)); } }

這上面是個簡單wordcount的代碼,這裏就不一一說明了,咱們首先看main方法:獲取一個job對象,而後通過一系列的設置,最後調用waitForCompletion方法java

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
 //....省略具體代碼.....
   boolean result = job.waitForCompletion(true);  //調用由Job類提供的方法waitForCompletion()提交做業
   System.exit(result?0:1);
}

  接下來咱們看下一調用waitForCompletion方法的這個類Job(因爲類的內容不少,這裏只展現咱們須要的部分):node

public class Job extends JobContextImpl implements JobContext {                                                                                                                                                                                                                                                                                                                                                                                                                                                                
  private static final Log LOG = LogFactory.getLog(Job.class);
  public static enum JobState {DEFINE, RUNNING}; //定義兩種狀態
  private static final long MAX_JOBSTATUS_AGE = 1000 * 2;  //表示最多2000毫秒刷新狀態
  public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
  public static final String COMPLETION_POLL_INTERVAL_KEY = "mapreduce.client.completion.pollinterval";
  static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
  public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY ="mapreduce.client.progressmonitor.pollinterval";
  static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
  public static final String USED_GENERIC_PARSER = "mapreduce.client.genericoptionsparser.used";
  public static final String SUBMIT_REPLICATION =  "mapreduce.client.submit.file.replication";
  public static final int DEFAULT_SUBMIT_REPLICATION = 10;
  public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
  static {
    ConfigUtil.loadResources();  //加載配置
  }
  private JobState state = JobState.DEFINE;  //加載類的時候默認設置狀態爲DEFINE狀態
  private JobStatus status;
  private long statustime;
  private Cluster cluster;
  private ReservationId reservationId;    

 boolean waitForCompletion(booleanverbose) 
submit() setUseNewAPI() connect() getJobSubmitter(FileSystemfs, ClientProtocolsubmitClient) isUber() //是否「拼車」模式(MapTask與ReduceTask在同一節點上) setPartitionerClass()//Mapper的輸出可能要由Partitioner按某種規則分發給多個Reducer setMapSpeculativeExecution() //是否須要有Speculative的Mapper起預備隊的做用 setReduceSpeculativeExecution() //是否須要有Speculative的Reducer起預備隊的做用 setCacheFiles()
}

  在Job類中有不少的靜態變量,代碼塊等,咱們知道在java中初始化會先加載靜態的這些變量和代碼塊,因此咱們在main方法中調用Job job = Job.getInstance(conf);方法的時候,就會對這些靜態的變量和代碼進行加載,這些靜態的變量和代碼塊就是設置一些參數,好比設置job的默認狀態的DEFINE狀態,以及加載一些配置文件,加載配置文件的方法以下:web

public static void loadResources() {
    addDeprecatedKeys();
    Configuration.addDefaultResource("mapred-default.xml");
    Configuration.addDefaultResource("mapred-site.xml");
    Configuration.addDefaultResource("yarn-default.xml");
    Configuration.addDefaultResource("yarn-site.xml");
  }

 記載配置文件就是加載hadoop的一些配置文件,因此在咱們調用waitForCompletion方法以前這些都是已經加載好了的,接下來咱們看waitForCompletion方法:算法

//org.apache.hadoop.mapreduce中的Job類
public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException {
if (state == JobState.DEFINE) {   //判斷做業是不是DEFINE狀態,防止重複提交做業
    submit();  //提交做業 
}  
if (verbose) { //提交以後監控其運行,直到做業結束
  monitorAndPrintJob();   //週期性報告做業進度狀況
 } else {   //要否則就週期行詢問做業是否文成
    // get the completion poll interval from the client.
    int completionPollIntervalMillis =  Job.getCompletionPollInterval(cluster.getConf());
    while (!isComplete()) {
      try {
       Thread.sleep(completionPollIntervalMillis); 
      } catch (InterruptedException ie) {
      }
    }
 }
  return isSuccessful();
}

  

  從做業提交流程的角度看,這個方法的代碼再簡單不過了,實際就是對Job.submit()的調用,只是在調用以前要檢查一下本做業是否處於 DEFINE 狀態,以確保一個做業不會被提交屢次。 如上所述,JobState的值只有 DEFINE 和 RUNNING 兩種,具體Job對象建立之初在構造函數Job()中將其設置成 DEFINE,做業提交成功以後就將其改爲 RUNNING,這就把門關上了。
  在正常的狀況下,Job.submit() 很快就會返回,由於這個方法的做用只是把做業提交上去,而無須等待做業的執行和完成。 可是,在Job.submit()返回以後,Job.waitForCompletion()則要等待做業執行完成了之後纔會返回。 在等待期間,若是參數verbose爲true,就要週期地報告做業執行的進展,或者就只是週期地檢測做業是否已經完成。sql

因此咱們的做業提交流程目前是:apache

[WordCount.main() -> Job.waitForCompletion() -> Job.submit() ]緩存

那麼,接下來,看一看這個submit方法:app

public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); //確保做業的狀態是DEFINE setUseNewAPI(); //根據配置信息是否使用新的API提交 connect(); //用來鏈接集羣,建立Cluster的cluster對象 final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());//獲取JobSubmitter的實例對象submitter 
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { //ugi.doAs用來控制權限 public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster); //真正用於提交做業 } }); state = JobState.RUNNING; //設置job的狀態爲RUNNING LOG.info("The url to track the job: " + getTrackingURL()); }

接下來咱們先看connect方法:框架

private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { if (cluster == null) { //若是cluter爲空,咱們就建立一個cluster實例 cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() { public Cluster run() throws IOException, InterruptedException, ClassNotFoundException { return new Cluster(getConfiguration()); //建立cluster } }); } }

可見connect()的做用就是保證節點上有個Cluster類對象,若是尚未,就建立一個。 那咱們就看一下Cluster這個類(列出一部分):

public class Cluster { @InterfaceStability.Evolving public static enum JobTrackerStatus {INITIALIZING, RUNNING}; //做業跟蹤狀態
  private ClientProtocolProvider clientProtocolProvider; //集羣版爲YarnClientProtocolProvider ,本地模式爲LocalClientProtocolProvider
  private ClientProtocol client;  //在集羣條件下,這是與外界通訊的渠道和規則
  private UserGroupInformation ugi; //用來控制權限
  private Configuration conf;  //配置信息
  private FileSystem fs = null; //文件系統
  private Path sysDir = null; //系統目錄
  private Path stagingAreaDir = null; private Path jobHistoryDir = null; //歷史做業目錄
  private static final Log LOG = LogFactory.getLog(Cluster.class); //ServiceLoader<ClientProtocolProvider>,就是針對 //ClientProtocolProvider類的ServiceLoader,並且這就是經過ServiceLoaderl.oad()裝載的ServiceLoader實現了Iterable界面,
//提供一個iterator()函數,於是能夠用在for循環中。
//它還提供了一個load()方法,能夠經過ClassLoader加載Class private static ServiceLoader<ClientProtocolProvider> frameworkLoader = ServiceLoader.load(ClientProtocolProvider.class); static { ConfigUtil.loadResources(); //加載配置文件 } //構造器 public Cluster(Configuration conf) throws IOException { this(null, conf); } //構造器 public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.conf = conf; this.ugi = UserGroupInformation.getCurrentUser(); initialize(jobTrackAddr, conf); //調用initialize方法 } //目的是要建立ClientProtocolProvider和ClientProtocol private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { synchronized (frameworkLoader) { //不容許多個線程同時進入此段代碼,須要加鎖 for (ClientProtocolProvider provider : frameworkLoader) { //遍歷frameworkLoader獲取provider LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName()); ClientProtocol clientProtocol = null; try { if (jobTrackAddr == null) { //經過ClientProtocolProvider的create方法建立clientProtocol clientProtocol = provider.create(conf); } else { clientProtocol = provider.create(jobTrackAddr, conf); } if (clientProtocol != null) { clientProtocolProvider = provider; client = clientProtocol; //已經建立了ClientProtocol對象,YARNRunner或LocalJobRunner LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider"); break; //成功後結束循環 } else { //失敗,記錄日誌 LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol"); } } catch (Exception e) { LOG.info("Failed to use " + provider.getClass().getName() + " due to error: ", e); } } } if (null == clientProtocolProvider || null == client) { //判斷是否建立了ClientProtocolProvider和ClientProtocol對象 throw new IOException( "Cannot initialize Cluster. Please check your configuration for " + MRConfig.FRAMEWORK_NAME + " and the correspond server addresses."); } }

  那麼知道job類的connect方法就是確保有實例cluster,若是沒有就經過Cluster的構造函數進行建立,在建立以前須要加載一些配置信息ConfigUtil.loadResources()和對靜態的變量frameworkLoader等賦值,而後在調用Cluster的構造方法,在Cluster的構造方法中一定調用Cluster.initialize()方法,其中ClientProtocolProvider和ClientProtocol:用戶向RM節點提交做業,是要RM爲其安排運行,因此RM起着服務提供者的做用,而用戶則處於客戶的位置。既然如此,雙方就得有個協議,對於雙方怎麼交互,乃至服務怎麼提供,都得有個規定。在Hadoop的代碼中,這所謂Protocol甚至被「上綱上線」到了計算框架的高度,連是否採用YARN框架也被歸入了這個範疇。實際上ClientProtocol就起着這樣的做用,而ClientProtocolProvider顧名思義是ClientProtocol的提供者,起着有點像是Factory的做用。

至於ServiceLoader<ClientProtocolProvider>,那是用來裝載ClientProtocolProvider的。

咱們首先看一下這個類ClientProtocolProvider,很明顯是一個抽象類,這意味着只有繼承和擴充了這個抽象類的具體類才能被實體化成對象

public abstract class ClientProtocolProvider { public abstract ClientProtocol create(Configuration conf) throws IOException; public abstract ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException; public abstract void close(ClientProtocol clientProtocol) throws IOException; }

接下來咱們看看這個抽象類的兩個子類YarnClientProtocolProvider和LocalClientProtocolProvider 

package org.apache.hadoop.mapred; public class YarnClientProtocolProvider extends ClientProtocolProvider { @Override public ClientProtocol create(Configuration conf) throws IOException { if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) { return new YARNRunner(conf); //YARNRunner實現了ClientProtocol接口
 } return null; } @Override
public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException { return create(conf); } @Override public void close(ClientProtocol clientProtocol) throws IOException { if (clientProtocol instanceof YARNRunner) { ((YARNRunner)clientProtocol).close(); } }
package org.apache.hadoop.mapred; public class LocalClientProtocolProvider extends ClientProtocolProvider { @Override public ClientProtocol create(Configuration conf) throws IOException { String framework = conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME); if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) { return null; } conf.setInt(JobContext.NUM_MAPS, 1); //map數爲1
    return new LocalJobRunner(conf); //LocalJobRunner實現了ClientProtocol接口
 } @Override public ClientProtocol create(InetSocketAddress addr, Configuration conf) { return null; // LocalJobRunner doesn't use a socket
 } @Override public void close(ClientProtocol clientProtocol) { // no clean up required
  }

如今返回來在聊聊Cluster.initialize()方法:

  其中ServiceLoader實現了Iterable界面,提供一個iterator()函數,於是能夠用在for循環中。它還提供了一個load()方法,能夠經過ClassLoader加載Class。此外,它還提供解析文件內容的功能裝載了做爲ServiceLoader對象的frameworkLoader,其LinkedHashMap中就有了上述的兩個路徑,這樣就能夠經過其iterator()函數依次引用這兩個路徑了

  而後,在Cluster類的構造函數中就會調用其initialize(),目的是要建立ClientProtocolProvider和ClientProtocol。

  可是ClientProtocolProvider是個抽象類,這意味着只有繼承和擴充了這個抽象類的具體類才能被實體化成對象。Hadoop的源碼中一共只有兩個類擴充和落實了這個抽象類,那就是LocalClientProtocolProvider和YarnClientProtocolProvide

 

  可想而知,由這兩種ClientProtocolProvider提供的ClientProtocol也是不同的。事實上ClientProtocol是個界面,實現了這個界面的類也有兩個,分別爲LocalJobRunner和YARNRunner。可是實際使用的只能是其中之一。

  initialize的for循環,是基於前述ServiceLoader中iterator()的循環。實際上也就是對兩個ClientProtocolProvider的循環,目的是要經過ClientProtocolProvider.create()建立用戶所要求的ClientProtocol,也無非就是LocalJobRunner或YARNRunner。只要有一次建立成功,循環就沒有必要繼續了,由於只能有一種選擇;可是,若是兩次都失敗,程序就沒法繼續了,由於不知道該怎樣讓RM提供計算服務。而可否成功建立,則取決於前述配置項的設置。不過ClientProtocolProvider是抽象類,實際上依次進行嘗試的是LocalClientProtocolProvider和YarnClientProtocolProvider。假定第一輪循環時進行嘗試的是前者,那麼做業的流程就是:

[WordCount.main() -> Job.waitForCompletion() -> Job.submit()  -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> LocalClientProtocolProvider.create()]

若是是後者,則做業的流程就是:

[WordCount.main() -> Job.waitForCompletion() -> Job.submit()  -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create()]

這裏咱們假定以yarn方式提交,因此流程爲第二種。

經過YarnClientProtocolProvider.create()方法,最終返回的是一個new YARNRunner(conf)對象。

  好了,繼續回到咱們的Job.submit()方法,到這裏connect方法就算執行完畢了,接下就是對getJobSubmitter()的調用。 這個函數建立一個JobSubmitter類對象,而後Jobs. ubmit()就調用它的submitJobInternal()方法,完成做業的提交。建立JobSubmitter對象時的兩個參數就是調用getJobSubmitter()時的兩個參數,就是cluster.getFileSystem()和cluster.getClient()。 其中cluster.getClient()返回的就是 YARNRunner或LocalJobRunner;而cluster.getFileSystem()的返回結果對於 YARNRunner是 RM 節點上文件系統的 URL,對於 LocalJobRunner則是本節點上的一個相對路徑爲「mapred/system」的目錄。

  接下來了解下JobSubmitter這個類(部分展現)

 

package org.apache.hadoop.mapreduce;
class JobSubmitter {
  protected static final Log LOG = LogFactory.getLog(JobSubmitter.class);
  private static final String SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1"; //shuffle算法
  private static final int SHUFFLE_KEY_LENGTH = 64;
  private FileSystem jtFs;
  private ClientProtocol submitClient;
  private String submitHostName;
  private String submitHostAddress;
  JobSubmitter(FileSystem submitFs, ClientProtocol submitClient) 
  throws IOException {
    this.submitClient = submitClient; //在集羣條件下是YARNRunner 
    this.jtFs = submitFs;
  }

compareFs(FileSystemsrcFs, FileSystemdestFs) //比較兩個文件系統是否相同
getPathURI()
checkSpecs()
copyRemoteFiles()
copyAndConfigureFiles()
copyJar(PathoriginalJarPath, PathsubmitJarFile,shortreplication)
addMRFrameworkToDistributedCache()
submitJobInternal(Jobjob, Clustercluster) //將做業提交給集羣
writeNewSplits(JobContextjob, PathjobSubmitDir)
getJobSubmitter(FileSystem fs, ClientProtocol submitClient)//底層調用的就是JobSubmitter的構造方法
}

 

接下來看看submitJobInternal方法

JobStatus submitJobInternal(Job job, Cluster cluster) 
throws ClassNotFoundException, InterruptedException, IOException {

  //validate the jobs output specs 驗證輸出格式等配置 
  checkSpecs(job);

  Configuration conf = job.getConfiguration(); //獲取配置信息
  addMRFrameworkToDistributedCache(conf); //添加到緩存

  Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);// 獲取目錄路徑
  //configure the command line options correctly on the submitting dfs
  InetAddress ip = InetAddress.getLocalHost(); //獲取本節點(該主機)的ip地址
  if (ip != null) {
    submitHostAddress = ip.getHostAddress();//本節點IP地址的字符串形式 
    submitHostName = ip.getHostName();//本節點名稱 
    conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); //寫入配置conf中
    conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
  }
  JobID jobId = submitClient.getNewJobID(); //設置JOBId(做業ID惟一)
  job.setJobID(jobId); //設置job的id
  Path submitJobDir = new Path(jobStagingArea, jobId.toString());//本做業的臨時子目錄名中包含着做業ID號碼 
  JobStatus status = null;
  try {
    conf.set(MRJobConfig.USER_NAME,
        UserGroupInformation.getCurrentUser().getShortUserName()); //這是用戶名
    conf.set("hadoop.http.filter.initializers", 
        "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");//準備用於Http接口的過濾器初始化 
    conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());//設置提交job的路徑
    LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
        + " as the submit dir");

    // get delegation token for the dir  /* 準備好與訪問權限有關的證件(token) */ 
    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
        new Path[] { submitJobDir }, conf); //獲取與NameNode打交道所需證件 
    
    populateTokenCache(conf, job.getCredentials());

    // generate a secret to authenticate shuffle transfers//須要生成Mapper與Reducer之間的數據流動所用的密碼 
    if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
      KeyGenerator keyGen;
      try {
        keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
        keyGen.init(SHUFFLE_KEY_LENGTH);
      } catch (NoSuchAlgorithmException e) {
        throw new IOException("Error generating shuffle secret key", e);
      }
      SecretKey shuffleKey = keyGen.generateKey();
      TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
          job.getCredentials());
    }
    if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
      conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
      LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
              "data spill is enabled");
    }

    copyAndConfigureFiles(job, submitJobDir);//將可執行文件之類拷貝到HDFS中,默認的是保留10份,會存在不一樣的節點上

    Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);//配置文件路徑 
    
    // Create the splits for the job
    LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
    int maps = writeSplits(job, submitJobDir);    //設置map數,這裏如何設置map的數量我會單獨寫一篇介紹,
    conf.setInt(MRJobConfig.NUM_MAPS, maps);
    LOG.info("number of splits:" + maps);

    // write "queue admins of the queue to which job is being submitted"  to job file.
    String queue = conf.get(MRJobConfig.QUEUE_NAME,
        JobConf.DEFAULT_QUEUE_NAME); //默認做業調度隊列名爲「default」

    AccessControlList acl = submitClient.getQueueAdmins(queue);
    conf.set(toFullPropertyName(queue,
        QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());  //設置acl權限 

    // removing jobtoken referrals before copying the jobconf to HDFS
    // as the tasks don't need this setting, actually they may break
    // because of it if present as the referral will point to a
    // different job.
    TokenCache.cleanUpTokenReferral(conf); //清楚Token引用的緩存

    if (conf.getBoolean(
        MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
        MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
      // Add HDFS tracking ids 若是啓用了跟蹤機制的話
      ArrayList<String> trackingIds = new ArrayList<String>();
      for (Token<? extends TokenIdentifier> t :
          job.getCredentials().getAllTokens()) {
        trackingIds.add(t.decodeIdentifier().getTrackingId()); //獲取全部相關跟蹤機制
      }
      conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
          trackingIds.toArray(new String[trackingIds.size()])); //設置跟蹤機制
    }

    // Set reservation info if it exists設置預設參數(若是有)
    ReservationId reservationId = job.getReservationId();
    if (reservationId != null) {
      conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
    }

    // Write job file to submit dir
    writeConf(conf, submitJobFile);//將conf的內容寫入一個.xml文件 
    
    //
    // Now, actually submit the job (using the submit name)
    //
    printTokens(jobId, job.getCredentials());

//提交做業,經過YarnRunner.submitJob()或LocalJobRunner.submitJob() 
    status = submitClient.submitJob(
        jobId, submitJobDir.toString(), job.getCredentials());
    if (status != null) {
      return status;  //返回狀態
    } else {
      throw new IOException("Could not launch job");
    }
  } finally {
    if (status == null) {
      LOG.info("Cleaning up the staging area " + submitJobDir);
      if (jtFs != null && submitJobDir != null)
        jtFs.delete(submitJobDir, true); // 刪除臨時目錄 

    }
  }
}

submitJobInternal方法能夠得知,須要隨同做業單一塊兒提交的資源和信息有兩類:

  一類是須要交到資源管理器RM手裏,供RM在立項和調度時使用的;

  一類則並不是供RM直接使用,而是供具體進行計算的節點使用的。前者包括本節點即做業提交者的IP地址、節點名、用戶名、做業ID號,以及有關MapReduce計算輸入數據文件的信息,還有爲提交做業而提供的「證章(Token)」等。這些信息將被打包提交給RM,這就是狹義的做業提交,是流程的主體。後者則有做業執行所需的jar可執行文件、外來對象庫等。若是計算的輸入文件在本地,則後者還應包括輸入文件。這些資源並不須要提交給RM,由於RM自己並不須要用到這些資源,可是必需要把這些資源複製或轉移到全局性的HDFS文件系統中,讓具體承擔計算任務的節點可以取用。

  爲了上傳相關的資源和信息,須要在HDFS文件系統中爲本做業建立一個目錄。HDFS文件系統中有一個目錄是專門用於做業提交的,稱爲「舞臺目錄(stagingdirectory)」。因此這裏要經過JobSubmissionFiles.getStagingDir()從集羣獲取這個目錄的路徑。而後就以本做業的ID,即JobId爲目錄名在這個舞臺目錄中建立一個臨時的子目錄,這就是代碼中的submitJobDir。之後凡是與本做業有關的資源和信息,就都上傳到這個子目錄中。

  這個方法還包括設置map數,執行隊列呀等最後執行connect()方法中建立的對象YARNRunner(或者是LocalJobRunner)的submitJob方法。這樣咱們的做業就提交給RM了,做業流程以下:

[WordCount.main() -> Job.waitForCompletion() -> Job.submit()  -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob()]

可繼續看(hadoop2.7之做業提交詳解(下)

相關文章
相關標籤/搜索