FileInputFormat 的實現之TextInputFormat

說明

TextInputFormat默認是按行切分記錄record,本篇在於理解,對於同一條記錄record,若是被切分在不一樣的split時是怎麼處理的。首先getSplits是在邏輯上劃分,並無物理切分,也就是隻是記錄每一個split從文件的個位置讀到哪一個位置,文件仍是一個總體。因此在LineRecordReader中,它的處理方式是每一個split多讀一行,也就是讀到下一個split的第一行。而後除了每一個文件的第一個split,其餘split都跳過第一行,進而避免重複讀取,這種方式去處理。

FileInputFomat 之 getSplits

TextInputFormat 繼承TextInputFormat,並無重寫getSplits,而是沿用父類的getSplits方法,下面看下該方法的源碼
public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();
    //getFormatMinSplitSize() == 1,getMinSplitSize(job)爲用戶設置的切片最小值,默認1。 job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.minsize", 1L);
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    // getMaxSplitSize(job)爲用戶設置的切片最大值,context.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", Long.MAX_VALUE);
    long maxSize = getMaxSplitSize(job);

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        //LocatedFileStatus帶有blockLocation信息
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        //判斷文件是否可切分
        if (isSplitable(job, path)) {
          long blockSize = file.getBlockSize();
          //真正的切片設置大小判斷,computeSplitSize方法中的實現,返回值 Math.max(minSize, Math.min(maxSize, blockSize));
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
          if (LOG.isDebugEnabled()) {
            // Log only if the file is big enough to be splitted
            if (length > Math.min(file.getBlockSize(), minSize)) {
              LOG.debug("File is not splittable so no parallelization "
                  + "is possible: " + file.getPath());
            }
          }
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    return splits;
  }

FileInputFomat 之 createRecordReader,主要是看LineRecordReader

public RecordReader<LongWritable, Text> 
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
    //設置record的分隔符
    String delimiter = context.getConfiguration().get(
        "textinputformat.record.delimiter");
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
    return new LineRecordReader(recordDelimiterBytes);
  }

LineRecordReader的方法initialize和nextKeyValue方法

public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();

    // open the file and seek to the start of the split
    final FileSystem fs = file.getFileSystem(job);
    fileIn = fs.open(file);
    
    //判斷是否壓縮,賦值對應的SplitLineReader
    CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
    if (null!=codec) {
      isCompressedInput = true; 
      decompressor = CodecPool.getDecompressor(codec);
      if (codec instanceof SplittableCompressionCodec) {
        final SplitCompressionInputStream cIn =
          ((SplittableCompressionCodec)codec).createInputStream(
            fileIn, decompressor, start, end,
            SplittableCompressionCodec.READ_MODE.BYBLOCK);
        in = new CompressedSplitLineReader(cIn, job,
            this.recordDelimiterBytes);
        start = cIn.getAdjustedStart();
        end = cIn.getAdjustedEnd();
        filePosition = cIn;
      } else {
        in = new SplitLineReader(codec.createInputStream(fileIn,
            decompressor), job, this.recordDelimiterBytes);
        filePosition = fileIn;
      }
    } else {
      fileIn.seek(start);
      in = new UncompressedSplitLineReader(
          fileIn, job, this.recordDelimiterBytes, split.getLength());
      filePosition = fileIn;
    }
    //這句是關鍵,因爲getSplits的時候,並不能保證一條record記錄,不被切分到不一樣的split。因此處理方式是,除了每一個文件的第一個split,其餘每一個split多讀一行
    //因此避免重複讀,不是開始的split都跳過第一行。
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;
  }

接下來是nextKeyValue

public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    //這個in具體看是CompressedSplitLineReader仍是UncompressedSplitLineReader,重寫了其中的readerLine方法
    while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
      if (pos == 0) {
        //跳過utf的開頭
        newSize = skipUtfByteOrderMark();
      } else {
        //readerLine有兩種實現方法,一種readCustomLine這種是本身定義了record的分隔符,還有一種是readDefaultLine,這種是沒有自定義分隔符,默認的讀取數據的方式,用\r,\n或者\r\n分割
        newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
        pos += newSize;
      }

      if ((newSize == 0) || (newSize < maxLineLength)) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
      return true;
    }
  }
相關文章
相關標籤/搜索