hadoop 之源碼 job

hadoop source

job

類 JobSubmitter.submitJobInternal()apache

The job submission process involves:api

  1. Checking the input and output specifications of the job.
  2. Computing the InputSplit values for the job.
  3. Setting up the requisite accounting information for the DistributedCache of the job, if necessary.
  4. Copying the job's jar and configuration to the MapReduce system directory on the FileSystem.
  5. Submitting the job to the ResourceManager and optionally monitoring it's status.

部分方法摘要緩存

//validate the jobs output specs
   checkSpecs(job);

   Configuration conf = job.getConfiguration();
   addMRFrameworkToDistributedCache(conf);

   Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

根據源碼看,添加資源至 分佈式緩存應該是第二步 纔對app

1 檢查job輸出路徑分佈式

/**
 * Check for validity of the output-specification for the job.
 *  
 * <p>This is to validate the output specification for the job when it is
 * a job is submitted.  Typically checks that it does not already exist,
 * throwing an exception when it already exists, so that output is not
 * overwritten.</p>
 *
 * @param ignored
 * @param job job configuration.
 * @throws IOException when output should not be attempted
 */
void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;

從源碼註釋上能夠看出,爲了不數據的丟失,若是輸出路徑存在,這裏會拋出異常。ide

2 檢查資源權限oop

/**
   * Initializes the staging directory and returns the path. It also
   * keeps track of all necessary ownership & permissions
   * @param cluster
   * @param conf
   */
  public static Path getStagingDir(Cluster cluster, Configuration conf)
  throws IOException,InterruptedException {

在後面的方法中會根據這個方法輸出的path,再次進行校驗ui

/**
 * Make sure that a path specifies a FileSystem.
 * @param path to use
 */
public Path makeQualified(Path path) {
  checkPath(path);
  return path.makeQualified(this.getUri(), this.getWorkingDirectory());
}

3 計算分片this

private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
    InterruptedException, ClassNotFoundException {
  Configuration conf = job.getConfiguration();
  InputFormat<?, ?> input =
    ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

  List<InputSplit> splits = input.getSplits(job);
  T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

  // sort the splits into order based on size, so that the biggest
  // go first
  Arrays.sort(array, new SplitComparator());
  JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
      jobSubmitDir.getFileSystem(conf), array);
  return array.length;
}

在新的api中,分片的切割是根據InputFormat切割的,再看源碼getSplits方法code

/**
 * Logically split the set of input files for the job.  
 *
 * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
 * for processing.</p>
 *
 * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
 * input files are not physically split into chunks. For e.g. a split could
 * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
 * also creates the {@link RecordReader} to read the {@link InputSplit}.
 *
 * @param context job configuration.
 * @return an array of {@link InputSplit}s for the job.
 */
public abstract
  List<InputSplit> getSplits(JobContext context
                             ) throws IOException, InterruptedException;

註釋中有最重要的一條,這個不是物理上的切割。mapreduce程序默認使用TextInputFormat方法,TextInputFormat類繼承了FileInputFormat,FileInputFormat是一個基類,裏面實現了getSplits()方法。

/**
   * Generate the list of files and make them into FileSplits.
   * @param job the job context
   * @throws IOException
   */
  public List<InputSplit> getSplits(JobContext job) throws IOException {

再看,切分大小是如何計算的:

protected long computeSplitSize(long blockSize, long minSize,
                                  long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
  }

maxSize 是用戶設置屬性 mapreduce.input.fileinputformat.split.maxsize 的大小,默認是Long.MAX_VALUE
minSize 能夠經過屬性 mapreduce.input.fileinputformat.split.minsize 控制,默認是1L
因此 默認狀況下,分片的大小是blockSize。

迴歸主線,關於InputFormat,之後詳細的分析。
在獲取了List ,會有一個排序,比較大小,從大到小。比較器以下:

private static class SplitComparator implements Comparator<InputSplit> {
   @Override
   public int compare(InputSplit o1, InputSplit o2) {
     try {
       long len1 = o1.getLength();
       long len2 = o2.getLength();
       if (len1 < len2) {
         return 1;
       } else if (len1 == len2) {
         return 0;
       } else {
         return -1;
       }
     } catch (IOException ie) {
       throw new RuntimeException("exception in compare", ie);
     } catch (InterruptedException ie) {
       throw new RuntimeException("exception in compare", ie);
     }
   }
 }

舊api

//method to write splits for old api mapper.
 private int writeOldSplits(JobConf job, Path jobSubmitDir)
 throws IOException {
   org.apache.hadoop.mapred.InputSplit[] splits =
   job.getInputFormat().getSplits(job, job.getNumMapTasks());
   // sort the splits into order based on size, so that the biggest
   // go first
   Arrays.sort(splits, new Comparator<org.apache.hadoop.mapred.InputSplit>() {
     public int compare(org.apache.hadoop.mapred.InputSplit a,
                        org.apache.hadoop.mapred.InputSplit b) {
       try {
         long left = a.getLength();
         long right = b.getLength();
         if (left == right) {
           return 0;
         } else if (left < right) {
           return 1;
         } else {
           return -1;
         }
       } catch (IOException ie) {
         throw new RuntimeException("Problem getting input split size", ie);
       }
     }
   });
   JobSplitWriter.createSplitFiles(jobSubmitDir, job,
       jobSubmitDir.getFileSystem(job), splits);
   return splits.length;
 }

舊api 實現相似。

4 提交做業

// Write job file to submit dir
   writeConf(conf, submitJobFile);

   //
   // Now, actually submit the job (using the submit name)
   //
   printTokens(jobId, job.getCredentials());
   status = submitClient.submitJob(
       jobId, submitJobDir.toString(), job.getCredentials());

等等,是否是缺乏了什麼?哪裏有提交job jar了?在計算maps 前就已upload了文件 註釋順序並不必定是代碼順序!!!

/**
 * configure the jobconf of the user with the command line options of
 * -libjars, -files, -archives.
 * @param job
 * @throws IOException
 */
private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
throws IOException {
  JobResourceUploader rUploader = new JobResourceUploader(jtFs);
  rUploader.uploadFiles(job, jobSubmitDir);

  // Set the working directory
  if (job.getWorkingDirectory() == null) {
    job.setWorkingDirectory(jtFs.getWorkingDirectory());
  }
}

順便提一下hadoop job 四種狀態

RUNNING(1),
 SUCCEEDED(2),
 FAILED(3),
 PREP(4),
 KILLED(5);
相關文章
相關標籤/搜索