類 JobSubmitter.submitJobInternal()apache
The job submission process involves:api
部分方法摘要緩存
//validate the jobs output specs checkSpecs(job); Configuration conf = job.getConfiguration(); addMRFrameworkToDistributedCache(conf); Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
根據源碼看,添加資源至 分佈式緩存應該是第二步 纔對app
1 檢查job輸出路徑分佈式
/** * Check for validity of the output-specification for the job. * * <p>This is to validate the output specification for the job when it is * a job is submitted. Typically checks that it does not already exist, * throwing an exception when it already exists, so that output is not * overwritten.</p> * * @param ignored * @param job job configuration. * @throws IOException when output should not be attempted */ void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;
從源碼註釋上能夠看出,爲了不數據的丟失,若是輸出路徑存在,這裏會拋出異常。ide
2 檢查資源權限oop
/** * Initializes the staging directory and returns the path. It also * keeps track of all necessary ownership & permissions * @param cluster * @param conf */ public static Path getStagingDir(Cluster cluster, Configuration conf) throws IOException,InterruptedException {
在後面的方法中會根據這個方法輸出的path,再次進行校驗ui
/** * Make sure that a path specifies a FileSystem. * @param path to use */ public Path makeQualified(Path path) { checkPath(path); return path.makeQualified(this.getUri(), this.getWorkingDirectory()); }
3 計算分片this
private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = job.getConfiguration(); InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); List<InputSplit> splits = input.getSplits(job); T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(array, new SplitComparator()); JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); return array.length; }
在新的api中,分片的切割是根據InputFormat切割的,再看源碼getSplits方法code
/** * Logically split the set of input files for the job. * * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper} * for processing.</p> * * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the * input files are not physically split into chunks. For e.g. a split could * be <i><input-file-path, start, offset></i> tuple. The InputFormat * also creates the {@link RecordReader} to read the {@link InputSplit}. * * @param context job configuration. * @return an array of {@link InputSplit}s for the job. */ public abstract List<InputSplit> getSplits(JobContext context ) throws IOException, InterruptedException;
註釋中有最重要的一條,這個不是物理上的切割。mapreduce程序默認使用TextInputFormat方法,TextInputFormat類繼承了FileInputFormat,FileInputFormat是一個基類,裏面實現了getSplits()方法。
/** * Generate the list of files and make them into FileSplits. * @param job the job context * @throws IOException */ public List<InputSplit> getSplits(JobContext job) throws IOException {
再看,切分大小是如何計算的:
protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); }
maxSize 是用戶設置屬性 mapreduce.input.fileinputformat.split.maxsize 的大小,默認是Long.MAX_VALUE
minSize 能夠經過屬性 mapreduce.input.fileinputformat.split.minsize 控制,默認是1L
因此 默認狀況下,分片的大小是blockSize。
迴歸主線,關於InputFormat,之後詳細的分析。
在獲取了List
private static class SplitComparator implements Comparator<InputSplit> { @Override public int compare(InputSplit o1, InputSplit o2) { try { long len1 = o1.getLength(); long len2 = o2.getLength(); if (len1 < len2) { return 1; } else if (len1 == len2) { return 0; } else { return -1; } } catch (IOException ie) { throw new RuntimeException("exception in compare", ie); } catch (InterruptedException ie) { throw new RuntimeException("exception in compare", ie); } } }
舊api
//method to write splits for old api mapper. private int writeOldSplits(JobConf job, Path jobSubmitDir) throws IOException { org.apache.hadoop.mapred.InputSplit[] splits = job.getInputFormat().getSplits(job, job.getNumMapTasks()); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(splits, new Comparator<org.apache.hadoop.mapred.InputSplit>() { public int compare(org.apache.hadoop.mapred.InputSplit a, org.apache.hadoop.mapred.InputSplit b) { try { long left = a.getLength(); long right = b.getLength(); if (left == right) { return 0; } else if (left < right) { return 1; } else { return -1; } } catch (IOException ie) { throw new RuntimeException("Problem getting input split size", ie); } } }); JobSplitWriter.createSplitFiles(jobSubmitDir, job, jobSubmitDir.getFileSystem(job), splits); return splits.length; }
舊api 實現相似。
4 提交做業
// Write job file to submit dir writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) // printTokens(jobId, job.getCredentials()); status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials());
等等,是否是缺乏了什麼?哪裏有提交job jar了?在計算maps 前就已upload了文件 註釋順序並不必定是代碼順序!!!
/** * configure the jobconf of the user with the command line options of * -libjars, -files, -archives. * @param job * @throws IOException */ private void copyAndConfigureFiles(Job job, Path jobSubmitDir) throws IOException { JobResourceUploader rUploader = new JobResourceUploader(jtFs); rUploader.uploadFiles(job, jobSubmitDir); // Set the working directory if (job.getWorkingDirectory() == null) { job.setWorkingDirectory(jtFs.getWorkingDirectory()); } }
順便提一下hadoop job 四種狀態
RUNNING(1), SUCCEEDED(2), FAILED(3), PREP(4), KILLED(5);