用spark讀取sequencefile時,很是消耗時間,默認狀況下SequenceFileInputFormat切分文件是沿用FIleInputFormat,對於大文件會切成Hdfs block size大小,若是想切的更小,增長spark任務的並法度,能夠本身修改:ide
class MySequenceFileInputFormat[K, V] extends FileInputFormat[K, V] { private val LOG: Log = LogFactory.getLog(classOf[MySequenceFileInputFormat[K, V]]) val sequenceFileBlockSize = 30000000 //手動設置blocksize爲30M val SPLIT_SLOP:Double = 1.1; // 10% slop val NUM_INPUT_FILES:String ="mapreduce.input.fileinputformat.numinputfiles"; @throws[IOException] def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[K, V] = new SequenceFileRecordReader override protected def getFormatMinSplitSize: Long = 2000L @throws[IOException] override protected def listStatus(job: JobContext): List[FileStatus] = { val files: List[FileStatus] = super.listStatus(job) val len: Int = files.size var j: Int = 0 for (i<-0 to len-1){ val f = files.get(i) if(f.isDirectory){ val pth:Path = f.getPath val fs: FileSystem = pth.getFileSystem(job.getConfiguration) files.set(i,fs.getFileStatus(new Path(pth, "data"))) } if((files.get(i)).getLen() != 0L) { files.set(j, files.get(i)) j+=1 } } files.subList(0, j) } @throws[IOException] override def getSplits(job: JobContext): List[InputSplit] = { val sw :Stopwatch= new Stopwatch().start(); val minSize:Long = Math.max(getFormatMinSplitSize(), FileInputFormat.getMinSplitSize(job)); val maxSize :Long= FileInputFormat.getMaxSplitSize(job); // generate splits val splits: ArrayList[InputSplit] = new ArrayList[InputSplit] val files: List[FileStatus] = listStatus(job) for ( i<- 0 to files.size()-1) { val file = files.get(i) val path:Path = file.getPath(); val length:Long = file.getLen(); if (length != 0) { var blkLocations: Array[BlockLocation] = null if (file.isInstanceOf[LocatedFileStatus] ) { blkLocations = ( file.asInstanceOf[LocatedFileStatus]).getBlockLocations() } else { val fs:FileSystem = path.getFileSystem(job.getConfiguration()) blkLocations = fs.getFileBlockLocations(file, 0, length) } if (isSplitable(job, path)) { // val blockSize:Long = file.getBlockSize() val blockSize:Long = sequenceFileBlockSize val splitSize:Long = computeSplitSize(blockSize, minSize, maxSize) var bytesRemaining:Long = length; while (( bytesRemaining.toDouble)/splitSize > SPLIT_SLOP) { val blkIndex:Int = getBlockIndex(blkLocations, length-bytesRemaining) splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations(blkIndex).getHosts(), blkLocations(blkIndex).getCachedHosts())) bytesRemaining -= splitSize } if (bytesRemaining != 0) { val blkIndex:Int = getBlockIndex(blkLocations, length-bytesRemaining) splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations(blkIndex).getHosts(), blkLocations(blkIndex).getCachedHosts())); } } else { // not splitable splits.add(makeSplit(path, 0, length, blkLocations(0).getHosts(), blkLocations(0).getCachedHosts())); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new Array[String](0))) } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()) sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis()) } return splits } }
sequenceFileBlockSize 改爲本身想要的大小
使用:oop
val dd = sc.newAPIHadoopFile[BytesWritable,BytesWritable, MySequenceFileInputFormat[BytesWritable,BytesWritable]](sourceDir).flatMap(x=>{ function(new String(x._2.getBytes)) })