咱們基於Hadoop 1.2.1源碼分析MapReduce V1的處理流程。
MapReduce V1實現中,主要存在3個主要的分佈式進程(角色):JobClient、JobTracker和TaskTracker,咱們主要是以這三個角色的實際處理活動爲主線,並結合源碼,分析實際處理流程。下圖是《Hadoop權威指南》一書給出的MapReduce V1處理Job的抽象流程圖:web
如上圖,咱們展開陰影部分的處理邏輯,詳細分析Job提交在JobClient端的具體流程。
在編寫好MapReduce程序之後,須要將Job提交給JobTracker,那麼咱們就須要瞭解在提交Job的過程當中,在JobClient端都作了哪些工做,或者說執行了哪些處理。在JobClient端提交Job的處理流程,以下圖所示:
上圖所描述的Job的提交流程,說明以下所示:apache
在MR程序中建立一個Job實例,設置Job狀態app
建立一個JobClient實例,準備將建立的Job實例提交到JobTracker分佈式
在建立JobClient的過程當中,首先必須保證創建到JobTracker的RPC鏈接ide
基於JobSubmissionProtocol協議遠程調用JobTracker獲取一個新的Job IDoop
根據MR程序中配置的Job,在HDFS上建立Job相關目錄,並將配置的tmpfiles、tmpjars、tmparchives,以及Job對應jar文件等資源複製到HDFS源碼分析
根據Job配置的InputFormat,計算該Job輸入的Split信息和元數據(SplitMetaInfo)信息,以及計算出map和reduce的個數,最後將這些信息連通Job配置寫入到HDFS(保證JobTracker可以讀取)this
經過JobClient基於JobSubmissionProtocol協議方法submitJob提交Job到JobTrackercode
MR程序建立Joborm
下面的MR程序示例代碼,已經很熟悉了:
01 |
public static void main(String[] args) throws Exception { |
02 |
Configuration conf = new Configuration(); |
03 |
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); |
04 |
if (otherArgs.length != 2 ) { |
05 |
System.err.println( "Usage: wordcount <in> <out>" ); |
08 |
Job job = new Job(conf, "word count" ); |
09 |
job.setJarByClass(WordCount. class ); |
10 |
job.setMapperClass(TokenizerMapper. class ); |
11 |
job.setCombinerClass(IntSumReducer. class ); |
12 |
job.setReducerClass(IntSumReducer. class ); |
13 |
job.setOutputKeyClass(Text. class ); |
14 |
job.setOutputValueClass(IntWritable. class ); |
15 |
FileInputFormat.addInputPath(job, new Path(otherArgs[ 0 ])); |
16 |
FileOutputFormat.setOutputPath(job, new Path(otherArgs[ 1 ])); |
17 |
System.exit(job.waitForCompletion( true ) ? 0 : 1 ); |
在MR程序中,首先建立一個Job,並進行配置,而後經過調用Job的waitForCompletion方法將Job提交到MapReduce集羣。這個過程當中,Job存在兩種狀態:Job.JobState.DEFINE和Job.JobState.RUNNING,建立一個Job後,該Job的狀態爲Job.JobState.DEFINE,Job內部經過JobClient基於org.apache.hadoop.mapred.JobSubmissionProtocol協議提交給JobTracker,而後該Job的狀態變爲Job.JobState.RUNNING。
Job提交目錄submitJobDir
經過以下代碼能夠看到,Job提交目錄是如何建立的:
2 |
Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient. this , jobCopy); // 獲取到StagingArea目錄 |
3 |
JobID jobId = jobSubmitClient.getNewJobId(); |
4 |
Path submitJobDir = new Path(jobStagingArea, jobId.toString()); |
獲取StagingArea目錄,JobClient須要經過JobSubmissionProtocol協議的遠程方法getStagingAreaDir從JobTracker端獲取到,咱們看一下JobTracker端的getStagingAreaDirInternal方法,以下所示:
1 |
private String getStagingAreaDirInternal(String user) throws IOException { |
2 |
final Path stagingRootDir = new Path(conf.get( "mapreduce.jobtracker.staging.root.dir" , "/tmp/hadoop/mapred/staging" )); |
3 |
final FileSystem fs = stagingRootDir.getFileSystem(conf); |
4 |
return fs.makeQualified( new Path(stagingRootDir, user+ "/.staging" )).toString(); |
最終獲取到的StagingArea目錄爲${mapreduce.jobtracker.staging.root.dir}/${user}/.staging/,例如,若是使用默認的mapreduce.jobtracker.staging.root.dir值,用戶爲shirdrn,則StagingArea目錄/tmp/hadoop/mapred/staging/shirdrn/.staging/。經過Path submitJobDir = new Path(jobStagingArea, jobId.toString());能夠獲得submitJobDir,假如一個job的ID爲job_200912121733_0002,則submitJobDir的值爲/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/
拷貝資源文件
在配置Job的時候,能夠指定tmpfiles、tmpjars、tmparchives,JobClient會將對應的資源文件拷貝到指定的目錄中,對應目錄以下代碼所示:
1 |
Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir); |
2 |
Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir); |
3 |
Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir); |
5 |
Path submitJarFile = JobSubmissionFiles.getJobJar(submitJobDir); |
6 |
job.setJar(submitJarFile.toString()); |
7 |
fs.copyFromLocalFile(originalJarFile, submitJarFile); |
上面已經知道Job提交目錄,能夠分別獲得對應的資源所在目錄:
tmpfiles目錄:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/files
tmpjars目錄:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/libjars
tmparchives目錄:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/archives
Job Jar文件:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.jar
而後,就能夠將對應的資源文件拷貝到對應的目錄中。
計算並存儲Split數據
根據Job配置中設置的InputFormat,計算該Job的數據數據文件是如何進行分片的,代碼以下所示:
1 |
Configuration conf = job.getConfiguration(); |
2 |
InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); |
3 |
List<InputSplit> splits = input.getSplits(job); |
實際上就是調用InputFormat的getSplits方法,若是不適用Hadoop自帶的FileInputFormat的默認getSplits方法實現,能夠自定義實現,重寫該默認實現邏輯來定義數據數據文件分片的規則。
計算出輸入文件的分片信息,而後須要將這些分片數據寫入到HDFS供JobTracker查詢初始化MapTask,寫入分片數據的實現代碼:
1 |
T[] array = (T[]) splits.toArray( new InputSplit[splits.size()]); |
2 |
// sort the splits into order based on size, so that the biggest |
4 |
Arrays.sort(array, new SplitComparator()); // 根據InputSplit的長度作了一個逆序排序 |
5 |
JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); // 將split及其元數據信息寫入HDFS |
接着調用JobSplitWriter.createSplitFiles方法存儲Split信息,並建立元數據信息,並保存元數據信息。存儲Split信息,代碼實現以下所示:
01 |
SerializationFactory factory = new SerializationFactory(conf); |
03 |
long offset = out.getPos(); |
05 |
long prevCount = out.getPos(); |
06 |
Text.writeString(out, split.getClass().getName()); |
07 |
Serializer<T> serializer = factory.getSerializer((Class<T>) split.getClass()); |
09 |
serializer.serialize(split); // 將split序列化寫入到HDFS文件中 |
10 |
long currCount = out.getPos(); |
11 |
String[] locations = split.getLocations(); |
12 |
final int max_loc = conf.getInt(MAX_SPLIT_LOCATIONS, 10 ); |
13 |
if (locations.length > max_loc) { |
14 |
LOG.warn( "Max block location exceeded for split: " + split + " splitsize: " + locations.length + " maxsize: " + max_loc); |
15 |
locations = Arrays.copyOf(locations, max_loc); |
17 |
info[i++] = new JobSplit.SplitMetaInfo(locations, offset, split.getLength()); // 建立SplitMetaInfo實例 |
18 |
offset += currCount - prevCount; |
咱們先看一下FileSplit包含的分片內容,以下所示:
4 |
private String[] hosts; |
在序列化保存FileSplit到HDFS,能夠經過查看FileSplit的write方法,以下所示:
2 |
public void write(DataOutput out) throws IOException { |
3 |
Text.writeString(out, file.toString()); |
須要注意的是,這裏面並無將FileSplit的hosts信息保存,而是存儲到了SplitMetaInfo中new JobSplit.SplitMetaInfo(locations, offset, split.getLength())。
下面是保存SplitMetaInfo信息的實現:
01 |
private static void writeJobSplitMetaInfo(FileSystem fs, Path filename, |
02 |
FsPermission p, int splitMetaInfoVersion, |
03 |
JobSplit.SplitMetaInfo[] allSplitMetaInfo) throws IOException { |
04 |
// write the splits meta-info to a file for the job tracker |
05 |
FSDataOutputStream out = FileSystem.create(fs, filename, p); |
06 |
out.write(JobSplit.META_SPLIT_FILE_HEADER); // 寫入META頭信息:META_SPLIT_FILE_HEADER = "META-SPL".getBytes("UTF-8"); |
07 |
WritableUtils.writeVInt(out, splitMetaInfoVersion); // META版本信息:1 |
08 |
WritableUtils.writeVInt(out, allSplitMetaInfo.length); // META對象的數量:每一個InputSplit對應一個SplitMetaInfo |
09 |
for (JobSplit.SplitMetaInfo splitMetaInfo : allSplitMetaInfo) { |
10 |
splitMetaInfo.write(out); // 每一個都進行存儲 |
看一下SplitMetaInfo存儲時包含的數據信息:
1 |
public void write(DataOutput out) throws IOException { |
2 |
WritableUtils.writeVInt(out, locations.length); // location個數 |
3 |
for ( int i = 0 ; i < locations.length; i++) { |
4 |
Text.writeString(out, locations[i]); // 寫入每個location位置信息 |
6 |
WritableUtils.writeVLong(out, startOffset); // 偏移量 |
7 |
WritableUtils.writeVLong(out, inputDataLength); // 數據長度 |
最後,咱們看一下這些數據保存的目錄和文件狀況。前面已經知道Job提交目錄,下面看split存儲的文件是如何構建的:
1 |
FSDataOutputStream out = createFile(fs, JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf); |
2 |
SplitMetaInfo[] info = writeNewSplits(conf, splits, out); |
那麼split保存的文件爲:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.split。
一樣,split元數據信息文件構建以下所示:
1 |
writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir), |
2 |
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion, info); |
split元數據信息文件爲:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.splitmetainfo。
保存Job配置數據
在提交Job到JobTracker以前,還須要保存Job的配置信息,這些配置數據根據用戶在MR程序中配置,覆蓋默認的配置值,最後保存到XML文件(job.xml)到HDFS,供JobTracker查詢。以下代碼,建立submitJobFile文件並寫入job配置數據:
02 |
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); |
03 |
jobCopy.set( "mapreduce.job.dir" , submitJobDir.toString()); |
05 |
// Write job file to JobTracker's fs |
06 |
FSDataOutputStream out = FileSystem.create(fs, submitJobFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)); |
09 |
jobCopy.writeXml(out); |
前面已經知道Job提交目錄,咱們很容易就能獲得job.xml文件的存儲路徑:/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/job.xml。
最後,全部的數據都已經準備完成,JobClient就能夠基於JobSubmissionProtocol協議方法submitJob,提交Job到JobTracker運行。