默認狀況下,分片和輸入文件的分塊數是相等的。也不徹底相等,若是block size大小事128M,文件大小爲128.1M,文件的block數目爲2,可是application運行過程當中,你會發現分片數目是1,而不是2,其中的機理,後面會分析html
有的程序會設置map的數目,那麼map數目是怎樣影響分片的數目的呢?java
若是文件大小爲0,是否會做爲一個分片傳給map任務?
node
經過listStatus()獲取輸入文件列表files,其中會遍歷輸入目錄的子目錄,並過濾掉部分文件,如文件_SUCCESSexpress
獲取全部的文件大小totalSIzeapp
goalSIze=totalSize/numMaps。numMaps是用戶指定的map數目ide
files中取出一個文件fileoop
計算splitSize。splitSize=max(minSplitSize,min(file.blockSize,goalSize)),其中minSplitSize是容許的最小分片大小,默認爲1Bui
後面根據splitSize大小將file分片。在分片的時候,若是剩餘的大小不大於splitSize*1.1,且大於0B的時候,會將該區域整個做爲一個分片。這樣作是爲了防止一個mapper處理的數據過小this
將file的分片加入到splits中spa
返回4,直到將files遍歷完
結束,返回splits
源碼(hadoop2.2.0)
其實流程算起來也不算複雜,因此就直接用代碼註釋來作吧
這裏邊涉及這麼幾個方法:
一、public List<InputSplit> getSplits(JobContext job), 這個由客戶端調用來得到當前Job的全部分片(split),而後發送給JobTracker(新API中應該是ResourceManager),而JobTracker根據這些分片的存儲位置來給TaskTracker分配map任務去處理這些分片。這個方法用到了後邊的listStatus,而後根據獲得的這些文件信息,從FileSystem那裏去拉取這些組成這些文件的塊的信息(BlockLocation),使用的是getFileBlockLocation(file,start,len),這個方法是與使用的文件系統實現相關的(FileSystem,LocalFileSystem,DistributedFileSystem)
/** * Generate the list of files and make them into FileSplits. * @param job the job context * @throws IOException */ public List<InputSplit> getSplits(JobContext job) throws IOException { long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job); for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); if (length != 0) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(job, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts())); } } else { // not splitable splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts())); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); LOG.debug("Total # of splits: " + splits.size()); return splits; }
二、protected List<FileStatus> listStatus(JobContext job), 先根據「mapred.input.dir」的配置值去獲得用戶指定的全部Path。而後根據這個JobContext的Configuration獲得FileSystem(固然,更多是 DistributedFileSystem )。最後應用用戶可能設置了的PathFilter,經過FileSystem獲取全部這些Path所表明的File(FileStatus)。注:這個方法的東西至關多,不少內容還十分陌生。
/** List input directories. * Subclasses may override to, e.g., select only files matching a regular * expression. * * @param job the job to list input paths for * @return array of FileStatus objects * @throws IOException if zero items. */ protected List<FileStatus> listStatus(JobContext job ) throws IOException { List<FileStatus> result = new ArrayList<FileStatus>(); Path[] dirs = getInputPaths(job); if (dirs.length == 0) { throw new IOException("No input paths specified in job"); } // get tokens for all the required FileSystems.. TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job.getConfiguration()); // Whether we need to recursive look into the directory structure boolean recursive = getInputDirRecursive(job); List<IOException> errors = new ArrayList<IOException>(); // creates a MultiPathFilter with the hiddenFileFilter and the // user provided one (if any). List<PathFilter> filters = new ArrayList<PathFilter>(); filters.add(hiddenFileFilter); PathFilter jobFilter = getInputPathFilter(job); if (jobFilter != null) { filters.add(jobFilter); } PathFilter inputFilter = new MultiPathFilter(filters); for (int i=0; i < dirs.length; ++i) { Path p = dirs[i]; FileSystem fs = p.getFileSystem(job.getConfiguration()); FileStatus[] matches = fs.globStatus(p, inputFilter); if (matches == null) { errors.add(new IOException("Input path does not exist: " + p)); } else if (matches.length == 0) { errors.add(new IOException("Input Pattern " + p + " matches 0 files")); } else { for (FileStatus globStat: matches) { if (globStat.isDirectory()) { RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(globStat.getPath()); while (iter.hasNext()) { LocatedFileStatus stat = iter.next(); if (inputFilter.accept(stat.getPath())) { if (recursive && stat.isDirectory()) { addInputPathRecursively(result, fs, stat.getPath(), inputFilter); } else { result.add(stat); } } } } else { result.add(globStat); } } } } if (!errors.isEmpty()) { throw new InvalidInputException(errors); } LOG.info("Total input paths to process : " + result.size()); return result; }
三、protected long computeSplitSize(long blockSize, long minSize, long maxSize),計算出當前Job所配置的分片最大尺寸。
protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); }
四、protected int getBlockIndex(BlockLocation[] blkLocations, long offset), 因爲組成文件的塊的信息已經得到了,只須要根據offset來計算所在的那個塊就好了。
protected int getBlockIndex(BlockLocation[] blkLocations, long offset) { for (int i = 0 ; i < blkLocations.length; i++) { // is the offset inside this block? if ((blkLocations[i].getOffset() <= offset) && (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ return i; } } BlockLocation last = blkLocations[blkLocations.length -1]; long fileLength = last.getOffset() + last.getLength() -1; throw new IllegalArgumentException("Offset " + offset + " is outside of file (0.." + fileLength + ")"); }
轉自:MapReduce獲取分片數, Hadoop中FileInputFormat計算InputSplit的getSplits方法的流程