hive中可在建表語句中指定fileformathtml
CREATE TABLE `test`(
`id` string COMMENT '',
`name` string COMMENT ''
)
COMMENT ''
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'viewfs://xxx'
複製代碼
blog.csdn.net/Thomson617/… spark.apache.org/docs/latest…算法
說明 | 詳情 |
---|---|
全類名 | org.apache.hadoop.mapreduce.lib.input.FileInputFormat |
計算公式 | Math.max(minSize, Math.min(maxSize, blockSize)) |
maxSize | mapreduce.input.fileinputformat.split.maxsize,默認 Integer.MAX_VALUE |
minSize | mapreduce.input.fileinputformat.split.minsize,默認 1 |
blockSize | hdfs上設置的一個塊的大小,默認128M |
算法含義 | 若maxSize小於blockSize(min<max<block),則按照maxSize切分文件(一個block切分紅多個split);若minSize大於blockSize(block<min<max),則按照minSize切分文件(多個block組成一個split);不然(min<block<max),按照block切分文件 |
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
// 計算每一個split大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
// SPLIT_SLOP = 1.1 ,含義爲剩餘10%不切分
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
// 當前要切分的split在哪一個block中
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
// 剩餘未切分的文件
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
複製代碼
說明 | 詳情 |
---|---|
全類名 | org.apache.hadoop.hive.ql.io.orc.OrcInputFormat |
strategy | hive.exec.orc.split.strategy 默認 HYBRID |
maxSize | mapreduce.input.fileinputformat.split.maxsize,默認 Integer.MAX_VALUE |
minSize | mapreduce.input.fileinputformat.split.minsize,默認 1 |
計算公式 | strategy = HYBRID? split by file (BI) : merge stripe if less then minSize(ETL) |
算法說明 | hive.exec.orc.split.strategy參數控制在讀取ORC表時生成split的策略。BI策略以文件爲粒度進行split劃分;ETL策略會將文件進行切分,多個stripe組成一個split;HYBRID策略爲:當文件的平均大小大於hadoop最大split值(默認256 * 1024 * 1024)時使用ETL策略,不然使用BI策略。 --- 對於一些較大的ORC表,可能其footer較大,ETL策略可能會致使其從hdfs拉取大量的數據來切分split,甚至會致使driver端OOM,所以這類表的讀取建議使用BI策略。對於一些較小的尤爲有數據傾斜的表(這裏的數據傾斜指大量stripe存儲於少數文件中),建議使用ETL策略。--- 另外,spark.hadoop.mapreduce.input.fileinputformat.split.minsize參數能夠控制在ORC切分時stripe的合併處理。具體邏輯是,當幾個stripe的大小小於spark.hadoop.mapreduce.input.fileinputformat.split.minsize時,會合併到一個task中處理。能夠適當調小該值,以此增大讀ORC表的併發。 |
switch(context.splitStrategyKind) {
case BI:
// BI strategy requested through config
splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal,
deltas, covered);
break;
case ETL:
// ETL strategy requested through config
splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal,
deltas, covered);
break;
default:
// HYBRID strategy
if (avgFileSize > context.maxSize) {
splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal, deltas,
covered);
} else {
splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal, deltas,
covered);
}
break;
}
複製代碼
/**
* BI strategy is used when the requirement is to spend less time in split generation
* as opposed to query execution (split generation does not read or cache file footers).
*/
static final class BISplitStrategy extends ACIDSplitStrategy {
List<FileStatus> fileStatuses;
boolean isOriginal;
List<Long> deltas;
FileSystem fs;
Context context;
Path dir;
public BISplitStrategy(Context context, FileSystem fs,
Path dir, List<FileStatus> fileStatuses, boolean isOriginal,
List<Long> deltas, boolean[] covered) {
super(dir, context.numBuckets, deltas, covered);
this.context = context;
this.fileStatuses = fileStatuses;
this.isOriginal = isOriginal;
this.deltas = deltas;
this.fs = fs;
this.dir = dir;
}
@Override
public List<OrcSplit> getSplits() throws IOException {
List<OrcSplit> splits = Lists.newArrayList();
for (FileStatus fileStatus : fileStatuses) {
String[] hosts = SHIMS.getLocationsWithOffset(fs, fileStatus).firstEntry().getValue()
.getHosts();
OrcSplit orcSplit = new OrcSplit(fileStatus.getPath(), 0, fileStatus.getLen(), hosts,
null, isOriginal, true, deltas, -1);
splits.add(orcSplit);
}
// add uncovered ACID delta splits
splits.addAll(super.getSplits());
return splits;
}
@Override
public String toString() {
return BISplitStrategy.class.getSimpleName() + " strategy for " + dir;
}
}
複製代碼
/**
* ETL strategy is used when spending little more time in split generation is acceptable
* (split generation reads and caches file footers).
*/
static final class ETLSplitStrategy implements SplitStrategy<SplitInfo> {
Context context;
FileSystem fs;
List<FileStatus> files;
boolean isOriginal;
List<Long> deltas;
Path dir;
boolean[] covered;
public ETLSplitStrategy(Context context, FileSystem fs, Path dir, List<FileStatus> children,
boolean isOriginal, List<Long> deltas, boolean[] covered) {
this.context = context;
this.dir = dir;
this.fs = fs;
this.files = children;
this.isOriginal = isOriginal;
this.deltas = deltas;
this.covered = covered;
}
private FileInfo verifyCachedFileInfo(FileStatus file) {
context.numFilesCounter.incrementAndGet();
FileInfo fileInfo = Context.footerCache.getIfPresent(file.getPath());
if (fileInfo != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Info cached for path: " + file.getPath());
}
if (fileInfo.modificationTime == file.getModificationTime() &&
fileInfo.size == file.getLen()) {
// Cached copy is valid
context.cacheHitCounter.incrementAndGet();
return fileInfo;
} else {
// Invalidate
Context.footerCache.invalidate(file.getPath());
if (LOG.isDebugEnabled()) {
LOG.debug("Meta-Info for : " + file.getPath() +
" changed. CachedModificationTime: "
+ fileInfo.modificationTime + ", CurrentModificationTime: "
+ file.getModificationTime()
+ ", CachedLength: " + fileInfo.size + ", CurrentLength: " +
file.getLen());
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Info not cached for path: " + file.getPath());
}
}
return null;
}
@Override
public List<SplitInfo> getSplits() throws IOException {
List<SplitInfo> result = Lists.newArrayList();
for (FileStatus file : files) {
FileInfo info = null;
if (context.cacheStripeDetails) {
info = verifyCachedFileInfo(file);
}
// ignore files of 0 length
if (file.getLen() > 0) {
result.add(new SplitInfo(context, fs, file, info, isOriginal, deltas, true, dir, covered));
}
}
return result;
}
@Override
public String toString() {
return ETLSplitStrategy.class.getSimpleName() + " strategy for " + dir;
}
}
複製代碼
long currentOffset = -1;
long currentLength = 0;
int idx = -1;
for (StripeInformation stripe : stripes) {
idx++;
if (!includeStripe[idx]) {
// create split for the previous unfinished stripe
if (currentOffset != -1) {
splits.add(createSplit(currentOffset, currentLength, fileMetaInfo));
currentOffset = -1;
}
continue;
}
// if we are working on a stripe, over the min stripe size, and
// crossed a block boundary, cut the input split here.
if (currentOffset != -1 && currentLength > context.minSize &&
(currentOffset / blockSize != stripe.getOffset() / blockSize)) {
splits.add(createSplit(currentOffset, currentLength, fileMetaInfo));
currentOffset = -1;
}
// if we aren't building a split, start a new one. if (currentOffset == -1) { currentOffset = stripe.getOffset(); currentLength = stripe.getLength(); } else { currentLength = (stripe.getOffset() + stripe.getLength()) - currentOffset; } if (currentLength >= context.maxSize) { splits.add(createSplit(currentOffset, currentLength, fileMetaInfo)); currentOffset = -1; } } if (currentOffset != -1) { splits.add(createSplit(currentOffset, currentLength, fileMetaInfo)); } // add uncovered ACID delta splits splits.addAll(deltaSplits); 複製代碼