Hadoop MapReduce中如何處理跨行Block和InputSplit

1. Hadoop的一個Block默認是64M,那麼對於一個記錄行形式的文本,會不會形成一行記錄被分到兩個Block當中? java

2. 在把文件從Block中讀取出來進行切分時,會不會形成一行記錄被分紅兩個InputSplit,若是被分紅兩個InputSplit,這樣一個InputSplit裏面就有一行不完整的數據,那麼處理這個InputSplit的Mapper會不會得出不正確的結果? apache

對於上面的兩個問題,首先要明確兩個概念:Block和InputSplit: app

1. Block是HDFS存儲文件的單位(默認是64M); ide

2. InputSplit是MapReduce對文件進行處理和運算的輸入單位,只是一個邏輯概念,每一個InputSplit並無對文件實際的切割,只是記錄了要處理的數據的位置(包括文件的path和hosts)和長度(由start和length決定)。 oop

所以以行記錄形式的文本,可能存在一行記錄被劃分到不一樣的Block,甚至不一樣的DataNode上去。經過分析FileInputFormat裏面的getSplits方法,能夠得出,某一行記錄一樣也可能被劃分到不一樣的InputSplit。 this

下面以hadoop-0.22.0源碼進行分析 spa

org.apache.hadoop.mapred.FileInputFormat: debug

public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    FileStatus[] files = listStatus(job);
    
    // Save the number of input files for metrics/loadgen
    job.setLong(NUM_INPUT_FILES, files.length);
    long totalSize = 0;                           // compute total size
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDirectory()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }

    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

    // generate splits
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology();
    for (FileStatus file: files) {
      Path path = file.getPath();
      FileSystem fs = path.getFileSystem(job);
      long length = file.getLen();
      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
      if ((length != 0) && isSplitable(fs, path)) { 
        long blockSize = file.getBlockSize();
        long splitSize = computeSplitSize(goalSize, minSize, blockSize);

        long bytesRemaining = length;
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          String[] splitHosts = getSplitHosts(blkLocations, 
              length-bytesRemaining, splitSize, clusterMap);
          splits.add(makeSplit(path, length-bytesRemaining, splitSize, 
                               splitHosts));
          bytesRemaining -= splitSize;
        }
        
        if (bytesRemaining != 0) {
          splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, 
                     blkLocations[blkLocations.length-1].getHosts()));
        }
      } else if (length != 0) {
        String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
        splits.add(makeSplit(path, 0, length, splitHosts));
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    LOG.debug("Total # of splits: " + splits.size());
    return splits.toArray(new FileSplit[splits.size()]);
  }

從上面的代碼能夠看出,對文件進行切分其實很簡單:獲取文件在HDFS上的路徑和Block信息,而後根據splitSize對文件進行切分,splitSize = computeSplitSize(goalSize, minSize, blockSize);goalSize,minSize,blockSize均可以配置,默認splitSize 就等於blockSize的默認值(64m)。 設計

FileInputFormat對文件的切分是嚴格按照偏移量來的,所以一行記錄比較長的話,可能被切分到不一樣的InputSplit。 但這並不會對Map形成影響,儘管一行記錄可能被拆分到不一樣的InputSplit,可是與FileInputFormat關聯的RecordReader被設計的足夠健壯,當一行記錄跨InputSplit時,其可以到讀取不一樣的InputSplit,直到把這一行記錄讀取完成

org.apache.hadoop.mapred.TextInputFormat: code

public RecordReader<LongWritable, Text> getRecordReader(
                                          InputSplit genericSplit, JobConf job,
                                          Reporter reporter)
    throws IOException {
    
    reporter.setStatus(genericSplit.toString());
    return new LineRecordReader(job, (FileSplit) genericSplit);
  }
org.apache.hadoop.mapred.LineRecordReader :

/** Read a line. */
  public synchronized boolean next(LongWritable key, Text value)
    throws IOException {

    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end) {
      key.set(pos);

      int newSize = in.readLine(value, maxLineLength,
          Math.max(maxBytesToConsume(pos), maxLineLength));
      if (newSize == 0) {
        return false;
      }
      pos += newSize;
      if (newSize < maxLineLength) {
        return true;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
    }

    return false;
  }
對於跨InputSplit的行,LineRecordReader會自動跨InputSplit去讀取


若是一行記錄L跨越了A,B兩個InputSplit,讀A的時候已經讀取了跨越A,B的這條記錄L,那麼對B這個InputSplit讀取的時候,如何作到不讀取L這條記錄在B中的部分呢?

org.apache.hadoop.mapred.LineRecordReader:

// If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
若是不是first split,則會丟棄第一個record,避免了重複讀取的問題。
相關文章
相關標籤/搜索