MapReduce V1:Job提交流程之JobClient端分析

咱們基於Hadoop 1.2.1源碼分析MapReduce V1的處理流程。
MapReduce V1實現中,主要存在3個主要的分佈式進程(角色):JobClient、JobTracker和TaskTracker,咱們主要是以這三個角色的實際處理活動爲主線,並結合源碼,分析實際處理流程。下圖是《Hadoop權威指南》一書給出的MapReduce V1處理Job的抽象流程圖:web

如上圖,咱們展開陰影部分的處理邏輯,詳細分析Job提交在JobClient端的具體流程。
在編寫好MapReduce程序之後,須要將Job提交給JobTracker,那麼咱們就須要瞭解在提交Job的過程當中,在JobClient端都作了哪些工做,或者說執行了哪些處理。在JobClient端提交Job的處理流程,以下圖所示:

上圖所描述的Job的提交流程,說明以下所示:apache

 

  1. 在MR程序中建立一個Job實例,設置Job狀態app

  2. 建立一個JobClient實例,準備將建立的Job實例提交到JobTracker分佈式

  3. 在建立JobClient的過程當中,首先必須保證創建到JobTracker的RPC鏈接ide

  4. 基於JobSubmissionProtocol協議遠程調用JobTracker獲取一個新的Job IDoop

  5. 根據MR程序中配置的Job,在HDFS上建立Job相關目錄,並將配置的tmpfiles、tmpjars、tmparchives,以及Job對應jar文件等資源複製到HDFS源碼分析

  6. 根據Job配置的InputFormat,計算該Job輸入的Split信息和元數據(SplitMetaInfo)信息,以及計算出map和reduce的個數,最後將這些信息連通Job配置寫入到HDFS(保證JobTracker可以讀取)this

  7. 經過JobClient基於JobSubmissionProtocol協議方法submitJob提交Job到JobTrackercode

MR程序建立Joborm

下面的MR程序示例代碼,已經很熟悉了:

01 public static void main(String[] args) throws Exception {
02   Configuration conf = new Configuration();
03   String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
04   if (otherArgs.length != 2) {
05     System.err.println("Usage: wordcount <in> <out>");
06     System.exit(2);
07   }
08   Job job = new Job(conf, "word count");
09   job.setJarByClass(WordCount.class);
10   job.setMapperClass(TokenizerMapper.class);
11   job.setCombinerClass(IntSumReducer.class);
12   job.setReducerClass(IntSumReducer.class);
13   job.setOutputKeyClass(Text.class);
14   job.setOutputValueClass(IntWritable.class);
15   FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
16   FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
17   System.exit(job.waitForCompletion(true) ? 0 1);
18 }

在MR程序中,首先建立一個Job,並進行配置,而後經過調用Job的waitForCompletion方法將Job提交到MapReduce集羣。這個過程當中,Job存在兩種狀態:Job.JobState.DEFINE和Job.JobState.RUNNING,建立一個Job後,該Job的狀態爲Job.JobState.DEFINE,Job內部經過JobClient基於org.apache.hadoop.mapred.JobSubmissionProtocol協議提交給JobTracker,而後該Job的狀態變爲Job.JobState.RUNNING。

Job提交目錄submitJobDir

經過以下代碼能夠看到,Job提交目錄是如何建立的:

1 JobConf jobCopy = job;
2 Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this, jobCopy); // 獲取到StagingArea目錄
3 JobID jobId = jobSubmitClient.getNewJobId();
4 Path submitJobDir = new Path(jobStagingArea, jobId.toString());

獲取StagingArea目錄,JobClient須要經過JobSubmissionProtocol協議的遠程方法getStagingAreaDir從JobTracker端獲取到,咱們看一下JobTracker端的getStagingAreaDirInternal方法,以下所示:

1 private String getStagingAreaDirInternal(String user) throws IOException {
2   final Path stagingRootDir = new Path(conf.get("mapreduce.jobtracker.staging.root.dir","/tmp/hadoop/mapred/staging"));
3   final FileSystem fs = stagingRootDir.getFileSystem(conf);
4   return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
5 }

最終獲取到的StagingArea目錄爲${mapreduce.jobtracker.staging.root.dir}/${user}/.staging/,例如,若是使用默認的mapreduce.jobtracker.staging.root.dir值,用戶爲shirdrn,則StagingArea目錄/tmp/hadoop/mapred/staging/shirdrn/.staging/。經過Path submitJobDir = new Path(jobStagingArea, jobId.toString());能夠獲得submitJobDir,假如一個job的ID爲job_200912121733_0002,則submitJobDir的值爲/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/

拷貝資源文件

在配置Job的時候,能夠指定tmpfiles、tmpjars、tmparchives,JobClient會將對應的資源文件拷貝到指定的目錄中,對應目錄以下代碼所示:

1     Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
2     Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
3     Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
4 ...
5     Path submitJarFile = JobSubmissionFiles.getJobJar(submitJobDir);
6     job.setJar(submitJarFile.toString());
7     fs.copyFromLocalFile(originalJarFile, submitJarFile);

上面已經知道Job提交目錄,能夠分別獲得對應的資源所在目錄:

  • tmpfiles目錄:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/files

  • tmpjars目錄:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/libjars

  • tmparchives目錄:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/archives

  • Job Jar文件:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.jar

而後,就能夠將對應的資源文件拷貝到對應的目錄中。

計算並存儲Split數據

根據Job配置中設置的InputFormat,計算該Job的數據數據文件是如何進行分片的,代碼以下所示:

1 Configuration conf = job.getConfiguration();
2 InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
3 List<InputSplit> splits = input.getSplits(job);

實際上就是調用InputFormat的getSplits方法,若是不適用Hadoop自帶的FileInputFormat的默認getSplits方法實現,能夠自定義實現,重寫該默認實現邏輯來定義數據數據文件分片的規則。
計算出輸入文件的分片信息,而後須要將這些分片數據寫入到HDFS供JobTracker查詢初始化MapTask,寫入分片數據的實現代碼:

1 T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
2 // sort the splits into order based on size, so that the biggest
3 // go first
4 Arrays.sort(array, new SplitComparator()); // 根據InputSplit的長度作了一個逆序排序
5 JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);// 將split及其元數據信息寫入HDFS

接着調用JobSplitWriter.createSplitFiles方法存儲Split信息,並建立元數據信息,並保存元數據信息。存儲Split信息,代碼實現以下所示:

01 SerializationFactory factory = new SerializationFactory(conf);
02 int i = 0;
03 long offset = out.getPos();
04 for(T split: array) {
05   long prevCount = out.getPos();
06   Text.writeString(out, split.getClass().getName());
07   Serializer<T> serializer = factory.getSerializer((Class<T>) split.getClass());
08   serializer.open(out);
09   serializer.serialize(split); // 將split序列化寫入到HDFS文件中
10   long currCount = out.getPos();
11   String[] locations = split.getLocations();
12   final int max_loc = conf.getInt(MAX_SPLIT_LOCATIONS, 10);
13   if (locations.length > max_loc) {
14     LOG.warn("Max block location exceeded for split: "+ split + " splitsize: " + locations.length + " maxsize: " + max_loc);
15     locations = Arrays.copyOf(locations, max_loc);
16   }
17   info[i++] = new JobSplit.SplitMetaInfo(locations, offset, split.getLength()); // 建立SplitMetaInfo實例
18   offset += currCount - prevCount;
19 }

咱們先看一下FileSplit包含的分片內容,以下所示:

1 private Path file;
2 private long start;
3 private long length;
4 private String[] hosts;

在序列化保存FileSplit到HDFS,能夠經過查看FileSplit的write方法,以下所示:

1 @Override
2 public void write(DataOutput out) throws IOException {
3   Text.writeString(out, file.toString());
4   out.writeLong(start);
5   out.writeLong(length);
6 }

須要注意的是,這裏面並無將FileSplit的hosts信息保存,而是存儲到了SplitMetaInfo中new JobSplit.SplitMetaInfo(locations, offset, split.getLength())。
下面是保存SplitMetaInfo信息的實現:

01 private static void writeJobSplitMetaInfo(FileSystem fs, Path filename,
02     FsPermission p, int splitMetaInfoVersion,
03     JobSplit.SplitMetaInfo[] allSplitMetaInfo) throws IOException {
04   // write the splits meta-info to a file for the job tracker
05   FSDataOutputStream out = FileSystem.create(fs, filename, p);
06   out.write(JobSplit.META_SPLIT_FILE_HEADER); // 寫入META頭信息:META_SPLIT_FILE_HEADER = "META-SPL".getBytes("UTF-8");
07   WritableUtils.writeVInt(out, splitMetaInfoVersion); // META版本信息:1
08   WritableUtils.writeVInt(out, allSplitMetaInfo.length); // META對象的數量:每一個InputSplit對應一個SplitMetaInfo
09   for (JobSplit.SplitMetaInfo splitMetaInfo : allSplitMetaInfo) {
10     splitMetaInfo.write(out); // 每一個都進行存儲
11   }
12   out.close();
13 }

看一下SplitMetaInfo存儲時包含的數據信息:

1 public void write(DataOutput out) throws IOException {
2   WritableUtils.writeVInt(out, locations.length); // location個數
3   for (int i = 0; i < locations.length; i++) {
4     Text.writeString(out, locations[i]); // 寫入每個location位置信息
5   }
6   WritableUtils.writeVLong(out, startOffset); // 偏移量
7   WritableUtils.writeVLong(out, inputDataLength); // 數據長度
8 }

最後,咱們看一下這些數據保存的目錄和文件狀況。前面已經知道Job提交目錄,下面看split存儲的文件是如何構建的:

1 FSDataOutputStream out = createFile(fs, JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
2 SplitMetaInfo[] info = writeNewSplits(conf, splits, out);

那麼split保存的文件爲:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.split。
一樣,split元數據信息文件構建以下所示:

1 writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
2     new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion, info);

split元數據信息文件爲:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.splitmetainfo。

保存Job配置數據

在提交Job到JobTracker以前,還須要保存Job的配置信息,這些配置數據根據用戶在MR程序中配置,覆蓋默認的配置值,最後保存到XML文件(job.xml)到HDFS,供JobTracker查詢。以下代碼,建立submitJobFile文件並寫入job配置數據:

01 ...
02         Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
03         jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
04 ...
05         // Write job file to JobTracker's fs      
06         FSDataOutputStream out = FileSystem.create(fs, submitJobFile, newFsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
07 ...
08           try {
09             jobCopy.writeXml(out);
10           finally {
11             out.close();
12           }

前面已經知道Job提交目錄,咱們很容易就能獲得job.xml文件的存儲路徑:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.xml。

最後,全部的數據都已經準備完成,JobClient就能夠基於JobSubmissionProtocol協議方法submitJob,提交Job到JobTracker運行。

相關文章
相關標籤/搜索