經過本章節,您能夠學習到:前端
1)job提交流程源碼詳解 waitForCompletion() submit(); // 1創建鏈接 connect(); // 1)建立提交job的代理 new Cluster(getConfiguration()); // (1)判斷是本地yarn仍是遠程 initialize(jobTrackAddr, conf); // 2 提交job submitter.submitJobInternal(Job.this, cluster) // 1)建立給集羣提交數據的Stag路徑 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); // 2)獲取jobid ,並建立job路徑 JobID jobId = submitClient.getNewJobID(); // 3)拷貝jar包到集羣 copyAndConfigureFiles(job, submitJobDir); rUploader.uploadFiles(job, jobSubmitDir); // 4)計算切片,生成切片規劃文件 writeSplits(job, submitJobDir); maps = writeNewSplits(job, jobSubmitDir); input.getSplits(job); // 5)向Stag路徑寫xml配置文件 writeConf(conf, submitJobFile); conf.writeXml(out); // 6)提交job,返回提交狀態 status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
注意以上代碼只是大過程的提取,並非連續的某處的代碼。要了解詳細的過程,能夠經過編譯器打斷點了解。java
紅色劃分是均分方式,這種方式比較低下。apache
而當前採用的是藍色方式,以一個塊爲一個切片。大體流程以下:oop
block是HDFS上物理上存儲的存儲的數據,切片是對數據邏輯上的劃分。源碼分析
經過如下的學習,咱們能夠總結出如下三個結論:學習
舉個例子加入咱們有如下兩個文件this
file1.txt 320M file2.txt 10M
通過FileInputFormat的切片機制運算後,默認配置下造成的切片信息以下:debug
file1.txt.split1-- 0~128 file1.txt.split2-- 128~256 file1.txt.split3-- 256~320 file2.txt.split1-- 0~10M
經過分析源碼org.apache.hadoop.mapreduce.lib.input.FileInputFormat
,咱們先來看看他的父類InputFormat代理
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by Fernflower decompiler) // package org.apache.hadoop.mapreduce; import java.io.IOException; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; @Public @Stable public abstract class InputFormat<K, V> { public InputFormat() { } public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException; public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException; }
父類規定了兩個抽象方法getSplits以及RecordReader。code
再來看看FileInputFormat計算分片大小的相關代碼:
public List<InputSplit> getSplits(JobContext job) throws IOException { StopWatch sw = (new StopWatch()).start(); long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); List<InputSplit> splits = new ArrayList(); List<FileStatus> files = this.listStatus(job); Iterator var9 = files.iterator(); while(true) { while(true) { while(var9.hasNext()) { FileStatus file = (FileStatus)var9.next(); Path path = file.getPath(); long length = file.getLen(); if (length != 0L) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus)file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0L, length); } if (this.isSplitable(job, path)) { long blockSize = file.getBlockSize(); long splitSize = this.computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining; int blkIndex; for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } if (bytesRemaining != 0L) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { splits.add(this.makeSplit(path, 0L, length, new String[0])); } } job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size()); sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); } return splits; } } }
從中咱們能夠了解到,計算分片大小的邏輯爲
// 初始化值 long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); ... // 計算分片大小 long splitSize = this.computeSplitSize(blockSize, minSize, maxSize); ... protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); } ... // minSize默認值爲1L protected long getFormatMinSplitSize() { return 1L; }
也就說,切片主要由這幾個值來運算決定
mapreduce.input.fileinputformat.split.minsize=1 默認值爲1 mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默認值Long.MAXValue
所以,默認狀況下,切片大小=blocksize。咱們不可貴到,要想修改分片的大小,徹底能夠經過配置文件的mapreduce.input.fileinputformat.split.minsize
以及mapreduce.input.fileinputformat.split.maxsize
進行配置:
FileInputFormat有多個底層實現,2.7版本的jdk具備以下的繼承樹
默認狀況下Job任務使用的是
// 根據文件類型獲取切片信息 FileSplit inputSplit = (FileSplit) context.getInputSplit(); // 獲取切片的文件名稱 String name = inputSplit.getPath().getName();
默認狀況下TextInputformat對任務的切片機制是按文件規劃切片,無論文件多小,都會是一個單獨的切片,都會交給一個maptask,這樣若是有大量小文件,就會產生大量的maptask,處理效率極其低下。最好的辦法,在數據處理系統的最前端(預處理/採集),將小文件先合併成大文件,再上傳到HDFS作後續分析。
若是已是大量小文件在HDFS中了,可使用另外一種InputFormat來作切片(CombineTextInputFormat),它的切片邏輯跟TextFileInputFormat不一樣:它能夠將多個小文件從邏輯上規劃到一個切片中,這樣,多個小文件就能夠交給一個maptask。
優先知足最小切片大小,不超過最大切片大小
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
舉例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m
若是不設置InputFormat,它默認用的是TextInputFormat.class,所以咱們須要手動指定InputFormat類型,在執行job以前指定:
job.setInputFormatClass(CombineTextInputFormat.class) CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
經過此設置以後,分片會變得更少一些,不會像以前同樣,一個文件造成一個分片(文件太小的狀況尤爲浪費)。