在前面一篇文章中(hadoop2.7之做業提交詳解(上))中涉及到文件的分片。html
JobSubmitter.submitJobInternal方法中調用了
int maps = writeSplits(job, submitJobDir); //設置map的數量,而map的數量是根據文件的大小和分片的大小,以及文件的數量決定的sql
接下來咱們看一下JobSubmitter.writeSplits方法:apache
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { JobConf jConf = (JobConf)job.getConfiguration(); int maps; if (jConf.getUseNewMapper()) { maps = writeNewSplits(job, jobSubmitDir); //這裏咱們使用新的方式 } else { maps = writeOldSplits(jConf, jobSubmitDir); } return maps; }
接下來繼續看JobSubmitter.writeNewSplits方法:數組
private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = job.getConfiguration(); InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); //輸入對象,InputFormat是個抽象類 List<InputSplit> splits = input.getSplits(job); //調用InputFormat實現類的getSplits方法 T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(array, new SplitComparator()); //對切片的大小進行排序,最大的放最前面 JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);//建立Split文件 return array.length; }
接下來看一下InputFormat這個抽象類:緩存
public abstract class InputFormat<K, V> { //用來返回分片結果 public abstract List<InputSplit> getSplits(JobContext context ) throws IOException, InterruptedException; //RecordReader是用來從一個輸入分片中讀取一個一個的K-V對的抽象類,咱們能夠將其看做是在InputSplit上的迭代器。 //最主要的方法就是nextKeyvalue()方法,由它獲取分片上的下一個K-V 對。 public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException; }
接下來咱們繼續看這個抽象類的實現類:app
public class TextInputFormat extends FileInputFormat;
public abstract class FileInputFormat<K, V> extends InputFormat;
public abstract class InputFormat。ide
因爲TextInputFormat從抽象類FileInputFormat中繼承,因此大部分的方法都來自於FileInputFormat類,TextInputFormat類只重寫了兩個方法:以下:函數
public class TextInputFormat extends FileInputFormat<LongWritable, Text> { @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { String delimiter = context.getConfiguration().get( "textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); //LineRecordReader由一個FileSplit構造出來,start是這個FileSplit的起始位置,pos是當前讀取分片的位置, //end是分片結束位置,in是打開的一個讀取這個分片的輸入流,它是使用這個FileSplit對應的文件名來打開的。 //key和value則分別是每次讀取的K-V對。而後咱們還看到能夠利用getProgress()來跟蹤讀取分片的進度, //這個函數就是根據已經讀取的K-V對佔總K-V對的比例來顯示進度的 return new LineRecordReader(recordDelimiterBytes); } @Override protected boolean isSplitable(JobContext context, Path file) { //若是是壓縮文件就不切分,非壓縮文件就切分。 final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file); if (null == codec) { return true; } return codec instanceof SplittableCompressionCodec; } }
咱們在返回到JobSubmitter.writeNewSplits方法中,有List<InputSplit> splits = input.getSplits(job);主要是調用了TextInputFormat.getSplits()方法,而TextInputFormat繼承了FileInputFormat類,因此調用的就是FileInputFormat.getSplits()方法:oop
public List<InputSplit> getSplits(JobContext job) throws IOException { StopWatch sw = new StopWatch().start();//用來計算納秒級別的時間 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); //最小值默認爲1 long maxSize = getMaxSplitSize(job); //最大值爲long的最大值,默認爲0x7fffffffffffffffL // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job); //得到全部的輸入文件 for (FileStatus file: files) { Path path = file.getPath(); //文件路徑 long length = file.getLen(); //文件大小 if (length != 0) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) {//若是是個含有數據塊位置信息的文件 blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { //通常文件 FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(job, path)) { //判斷是否能夠分片 long blockSize = file.getBlockSize(); //128M long splitSize = computeSplitSize(blockSize, minSize, maxSize); //計算分片的大小,默認爲128M long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //判斷剩餘文件大小是否大於128M*1.1 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);//f返回每一個分片起始位置 splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; // 依次減去分片的大小,對剩餘長度再次分片 } // 屢次分片後,最後的數據長度仍不爲0但又不足一個分片大小 if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } //不可分,則把整個文件做爲一個分片 } else { // not splitable splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { //建立空的分片 //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); //設置參數NUM_INPUT_FILES sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); } return splits; } //public class FileSplit extends InputSplit implements Writable { // private Path file;//輸入文件路徑 // private long start;//分片在文件中的位置(起點) // private long length;//分片長度 // private String[] hosts;//這個分片所在數據塊的多個復份所在節點 // private SplitLocationInfo[] hostInfos;//每一個數據塊復份所在節點,以及是否緩存 //} //makeSplit方法存放的分片格式 protected FileSplit makeSplit(Path file, long start, long length, String[] hosts, String[] inMemoryHosts) { return new FileSplit(file, start, length, hosts, inMemoryHosts); } //計算分片的大小 protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); }
經過FileInputFormat.getSplits(),能夠返回一個存放分片的ArraryList,接下繼續回到JobSubmitter.writeNewSplits方法中:post
接下來將ArrayList轉換爲數組,並根據分片的大小排序。而後調用JobSplitWriter.createSplitFiles()方法建立split文件。最後返回數組的長度,也就是map的個數。