1. Hadoop的一個Block默認是64M,那麼對於一個記錄行形式的文本,會不會形成一行記錄被分到兩個Block當中? java
2. 在把文件從Block中讀取出來進行切分時,會不會形成一行記錄被分紅兩個InputSplit,若是被分紅兩個InputSplit,這樣一個InputSplit裏面就有一行不完整的數據,那麼處理這個InputSplit的Mapper會不會得出不正確的結果? apache
對於上面的兩個問題,首先要明確兩個概念:Block和InputSplit: app
1. Block是HDFS存儲文件的單位(默認是64M); ide
2. InputSplit是MapReduce對文件進行處理和運算的輸入單位,只是一個邏輯概念,每一個InputSplit並無對文件實際的切割,只是記錄了要處理的數據的位置(包括文件的path和hosts)和長度(由start和length決定)。 oop
所以以行記錄形式的文本,可能存在一行記錄被劃分到不一樣的Block,甚至不一樣的DataNode上去。經過分析FileInputFormat裏面的getSplits方法,能夠得出,某一行記錄一樣也可能被劃分到不一樣的InputSplit。 this
下面以hadoop-0.22.0源碼進行分析 spa
org.apache.hadoop.mapred.FileInputFormat: debug
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { FileStatus[] files = listStatus(job); // Save the number of input files for metrics/loadgen job.setLong(NUM_INPUT_FILES, files.length); long totalSize = 0; // compute total size for (FileStatus file: files) { // check we have valid files if (file.isDirectory()) { throw new IOException("Not a file: "+ file.getPath()); } totalSize += file.getLen(); } long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); // generate splits ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); NetworkTopology clusterMap = new NetworkTopology(); for (FileStatus file: files) { Path path = file.getPath(); FileSystem fs = path.getFileSystem(job); long length = file.getLen(); BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); if ((length != 0) && isSplitable(fs, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(goalSize, minSize, blockSize); long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { String[] splitHosts = getSplitHosts(blkLocations, length-bytesRemaining, splitSize, clusterMap); splits.add(makeSplit(path, length-bytesRemaining, splitSize, splitHosts)); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkLocations.length-1].getHosts())); } } else if (length != 0) { String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap); splits.add(makeSplit(path, 0, length, splitHosts)); } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } LOG.debug("Total # of splits: " + splits.size()); return splits.toArray(new FileSplit[splits.size()]); }
從上面的代碼能夠看出,對文件進行切分其實很簡單:獲取文件在HDFS上的路徑和Block信息,而後根據splitSize對文件進行切分,splitSize = computeSplitSize(goalSize, minSize, blockSize);goalSize,minSize,blockSize均可以配置,默認splitSize 就等於blockSize的默認值(64m)。 設計
FileInputFormat對文件的切分是嚴格按照偏移量來的,所以一行記錄比較長的話,可能被切分到不一樣的InputSplit。 但這並不會對Map形成影響,儘管一行記錄可能被拆分到不一樣的InputSplit,可是與FileInputFormat關聯的RecordReader被設計的足夠健壯,當一行記錄跨InputSplit時,其可以到讀取不一樣的InputSplit,直到把這一行記錄讀取完成 。org.apache.hadoop.mapred.TextInputFormat: code
public RecordReader<LongWritable, Text> getRecordReader( InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); return new LineRecordReader(job, (FileSplit) genericSplit); }org.apache.hadoop.mapred.LineRecordReader :
/** Read a line. */ public synchronized boolean next(LongWritable key, Text value) throws IOException { // We always read one extra line, which lies outside the upper // split limit i.e. (end - 1) while (getFilePosition() <= end) { key.set(pos); int newSize = in.readLine(value, maxLineLength, Math.max(maxBytesToConsume(pos), maxLineLength)); if (newSize == 0) { return false; } pos += newSize; if (newSize < maxLineLength) { return true; } // line too long. try again LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } return false; }對於跨InputSplit的行,LineRecordReader會自動跨InputSplit去讀取 。
若是一行記錄L跨越了A,B兩個InputSplit,讀A的時候已經讀取了跨越A,B的這條記錄L,那麼對B這個InputSplit讀取的時候,如何作到不讀取L這條記錄在B中的部分呢?
org.apache.hadoop.mapred.LineRecordReader:
// If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); }若是不是first split,則會丟棄第一個record,避免了重複讀取的問題。