Hive 各InputFormat切分算法整理

hive中可在建表語句中指定fileformathtml

CREATE TABLE `test`( 
  `id` string COMMENT '',  
  `name` string COMMENT ''
  ) 
COMMENT '' 
ROW FORMAT SERDE  
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde'  
STORED AS INPUTFORMAT  
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'  
OUTPUTFORMAT  
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' 
LOCATION 
  'viewfs://xxx' 
複製代碼

blog.csdn.net/Thomson617/… spark.apache.org/docs/latest…算法

  • FileInputFormat

說明 詳情
全類名 org.apache.hadoop.mapreduce.lib.input.FileInputFormat
計算公式 Math.max(minSize, Math.min(maxSize, blockSize))
maxSize mapreduce.input.fileinputformat.split.maxsize,默認 Integer.MAX_VALUE
minSize mapreduce.input.fileinputformat.split.minsize,默認 1
blockSize hdfs上設置的一個塊的大小,默認128M
算法含義 若maxSize小於blockSize(min<max<block),則按照maxSize切分文件(一個block切分紅多個split);若minSize大於blockSize(block<min<max),則按照minSize切分文件(多個block組成一個split);不然(min<block<max),按照block切分文件
protected long computeSplitSize(long blockSize, long minSize,
                                  long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
 }
 
 if (isSplitable(job, path)) {
      long blockSize = file.getBlockSize();
      // 計算每一個split大小
      long splitSize = computeSplitSize(blockSize, minSize, maxSize);
      long bytesRemaining = length;
      // SPLIT_SLOP = 1.1 ,含義爲剩餘10%不切分
      while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
        // 當前要切分的split在哪一個block中
        int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
        splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                    blkLocations[blkIndex].getHosts(),
                    blkLocations[blkIndex].getCachedHosts()));
        bytesRemaining -= splitSize;
      }
      // 剩餘未切分的文件
      if (bytesRemaining != 0) {
        int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
        splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                   blkLocations[blkIndex].getHosts(),
                   blkLocations[blkIndex].getCachedHosts()));
      }
    } else { // not splitable
      splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                  blkLocations[0].getCachedHosts()));
}
複製代碼
  • OrcInputFormat

說明 詳情
全類名 org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
strategy hive.exec.orc.split.strategy 默認 HYBRID
maxSize mapreduce.input.fileinputformat.split.maxsize,默認 Integer.MAX_VALUE
minSize mapreduce.input.fileinputformat.split.minsize,默認 1
計算公式 strategy = HYBRID? split by file (BI) : merge stripe if less then minSize(ETL)
算法說明 hive.exec.orc.split.strategy參數控制在讀取ORC表時生成split的策略。BI策略以文件爲粒度進行split劃分;ETL策略會將文件進行切分,多個stripe組成一個split;HYBRID策略爲:當文件的平均大小大於hadoop最大split值(默認256 * 1024 * 1024)時使用ETL策略,不然使用BI策略。 --- 對於一些較大的ORC表,可能其footer較大,ETL策略可能會致使其從hdfs拉取大量的數據來切分split,甚至會致使driver端OOM,所以這類表的讀取建議使用BI策略。對於一些較小的尤爲有數據傾斜的表(這裏的數據傾斜指大量stripe存儲於少數文件中),建議使用ETL策略。--- 另外,spark.hadoop.mapreduce.input.fileinputformat.split.minsize參數能夠控制在ORC切分時stripe的合併處理。具體邏輯是,當幾個stripe的大小小於spark.hadoop.mapreduce.input.fileinputformat.split.minsize時,會合併到一個task中處理。能夠適當調小該值,以此增大讀ORC表的併發。
switch(context.splitStrategyKind) {
          case BI:
            // BI strategy requested through config
            splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal,
                deltas, covered);
            break;
          case ETL:
            // ETL strategy requested through config
            splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal,
                deltas, covered);
            break;
          default:
            // HYBRID strategy
            if (avgFileSize > context.maxSize) {
              splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal, deltas,
                  covered);
            } else {
              splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal, deltas,
                  covered);
            }
            break;
        }
複製代碼
/**
   * BI strategy is used when the requirement is to spend less time in split generation
   * as opposed to query execution (split generation does not read or cache file footers).
   */
  static final class BISplitStrategy extends ACIDSplitStrategy {
    List<FileStatus> fileStatuses;
    boolean isOriginal;
    List<Long> deltas;
    FileSystem fs;
    Context context;
    Path dir;

    public BISplitStrategy(Context context, FileSystem fs,
        Path dir, List<FileStatus> fileStatuses, boolean isOriginal,
        List<Long> deltas, boolean[] covered) {
      super(dir, context.numBuckets, deltas, covered);
      this.context = context;
      this.fileStatuses = fileStatuses;
      this.isOriginal = isOriginal;
      this.deltas = deltas;
      this.fs = fs;
      this.dir = dir;
    }

    @Override
    public List<OrcSplit> getSplits() throws IOException {
      List<OrcSplit> splits = Lists.newArrayList();
      for (FileStatus fileStatus : fileStatuses) {
        String[] hosts = SHIMS.getLocationsWithOffset(fs, fileStatus).firstEntry().getValue()
            .getHosts();
        OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), 0, fileStatus.getLen(), hosts,
            null, isOriginal, true, deltas, -1);
        splits.add(orcSplit);
      }

      // add uncovered ACID delta splits
      splits.addAll(super.getSplits());
      return splits;
    }

    @Override
    public String toString() {
      return BISplitStrategy.class.getSimpleName() + " strategy for " + dir;
    }
  }
複製代碼
/**
   * ETL strategy is used when spending little more time in split generation is acceptable
   * (split generation reads and caches file footers).
   */
  static final class ETLSplitStrategy implements SplitStrategy<SplitInfo> {
    Context context;
    FileSystem fs;
    List<FileStatus> files;
    boolean isOriginal;
    List<Long> deltas;
    Path dir;
    boolean[] covered;

    public ETLSplitStrategy(Context context, FileSystem fs, Path dir, List<FileStatus> children,
        boolean isOriginal, List<Long> deltas, boolean[] covered) {
      this.context = context;
      this.dir = dir;
      this.fs = fs;
      this.files = children;
      this.isOriginal = isOriginal;
      this.deltas = deltas;
      this.covered = covered;
    }

    private FileInfo verifyCachedFileInfo(FileStatus file) {
      context.numFilesCounter.incrementAndGet();
      FileInfo fileInfo = Context.footerCache.getIfPresent(file.getPath());
      if (fileInfo != null) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Info cached for path: " + file.getPath());
        }
        if (fileInfo.modificationTime == file.getModificationTime() &&
            fileInfo.size == file.getLen()) {
          // Cached copy is valid
          context.cacheHitCounter.incrementAndGet();
          return fileInfo;
        } else {
          // Invalidate
          Context.footerCache.invalidate(file.getPath());
          if (LOG.isDebugEnabled()) {
            LOG.debug("Meta-Info for : " + file.getPath() +
                " changed. CachedModificationTime: "
                + fileInfo.modificationTime + ", CurrentModificationTime: "
                + file.getModificationTime()
                + ", CachedLength: " + fileInfo.size + ", CurrentLength: " +
                file.getLen());
          }
        }
      } else {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Info not cached for path: " + file.getPath());
        }
      }
      return null;
    }

    @Override
    public List<SplitInfo> getSplits() throws IOException {
      List<SplitInfo> result = Lists.newArrayList();
      for (FileStatus file : files) {
        FileInfo info = null;
        if (context.cacheStripeDetails) {
          info = verifyCachedFileInfo(file);
        }
        // ignore files of 0 length
        if (file.getLen() > 0) {
          result.add(new SplitInfo(context, fs, file, info, isOriginal, deltas, true, dir, covered));
        }
      }
      return result;
    }

    @Override
    public String toString() {
      return ETLSplitStrategy.class.getSimpleName() + " strategy for " + dir;
    }
  }
複製代碼
long currentOffset = -1;
      long currentLength = 0;
      int idx = -1;
      for (StripeInformation stripe : stripes) {
        idx++;

        if (!includeStripe[idx]) {
          // create split for the previous unfinished stripe
          if (currentOffset != -1) {
            splits.add(createSplit(currentOffset, currentLength, fileMetaInfo));
            currentOffset = -1;
          }
          continue;
        }

        // if we are working on a stripe, over the min stripe size, and
        // crossed a block boundary, cut the input split here.
        if (currentOffset != -1 && currentLength > context.minSize &&
            (currentOffset / blockSize != stripe.getOffset() / blockSize)) {
          splits.add(createSplit(currentOffset, currentLength, fileMetaInfo));
          currentOffset = -1;
        }
        // if we aren't building a split, start a new one. if (currentOffset == -1) { currentOffset = stripe.getOffset(); currentLength = stripe.getLength(); } else { currentLength = (stripe.getOffset() + stripe.getLength()) - currentOffset; } if (currentLength >= context.maxSize) { splits.add(createSplit(currentOffset, currentLength, fileMetaInfo)); currentOffset = -1; } } if (currentOffset != -1) { splits.add(createSplit(currentOffset, currentLength, fileMetaInfo)); } // add uncovered ACID delta splits splits.addAll(deltaSplits); 複製代碼
相關文章
相關標籤/搜索