修改SequenceFileInputFormat hdfs blocksize

用spark讀取sequencefile時,很是消耗時間,默認狀況下SequenceFileInputFormat切分文件是沿用FIleInputFormat,對於大文件會切成Hdfs block size大小,若是想切的更小,增長spark任務的並法度,能夠本身修改:ide

class MySequenceFileInputFormat[K, V] extends FileInputFormat[K, V] {
  private val LOG: Log = LogFactory.getLog(classOf[MySequenceFileInputFormat[K, V]])

  val sequenceFileBlockSize = 30000000 //手動設置blocksize爲30M
  val  SPLIT_SLOP:Double = 1.1;   // 10% slop
  val  NUM_INPUT_FILES:String ="mapreduce.input.fileinputformat.numinputfiles";

  @throws[IOException]
  def createRecordReader(split: InputSplit, context: TaskAttemptContext): RecordReader[K, V] = new SequenceFileRecordReader

  override protected def getFormatMinSplitSize: Long = 2000L

  @throws[IOException]
  override protected def listStatus(job: JobContext): List[FileStatus] = {
    val files: List[FileStatus] = super.listStatus(job)
    val len: Int = files.size
    var j: Int = 0

    for (i<-0 to len-1){
      val f = files.get(i)
      if(f.isDirectory){
        val pth:Path = f.getPath
        val fs: FileSystem = pth.getFileSystem(job.getConfiguration)
        files.set(i,fs.getFileStatus(new Path(pth, "data")))
      }
      if((files.get(i)).getLen() != 0L) {
        files.set(j, files.get(i))
        j+=1
      }
    }

    files.subList(0, j)
  }

  @throws[IOException]
  override def getSplits(job: JobContext): List[InputSplit] = {
    val sw :Stopwatch= new Stopwatch().start();
    val minSize:Long = Math.max(getFormatMinSplitSize(), FileInputFormat.getMinSplitSize(job));
    val maxSize :Long= FileInputFormat.getMaxSplitSize(job);

    // generate splits
    val splits: ArrayList[InputSplit] = new ArrayList[InputSplit]
    val files: List[FileStatus] = listStatus(job)
    for ( i<- 0 to files.size()-1) {
      val file = files.get(i)
       val path:Path = file.getPath();
      val length:Long = file.getLen();
      if (length != 0) {
        var blkLocations: Array[BlockLocation] = null
        if (file.isInstanceOf[LocatedFileStatus] ) {
          blkLocations = ( file.asInstanceOf[LocatedFileStatus]).getBlockLocations()
        } else {
          val fs:FileSystem = path.getFileSystem(job.getConfiguration())
          blkLocations = fs.getFileBlockLocations(file, 0, length)
        }
        if (isSplitable(job, path)) {
//          val blockSize:Long = file.getBlockSize()
          val blockSize:Long = sequenceFileBlockSize
          val splitSize:Long = computeSplitSize(blockSize, minSize, maxSize)

          var bytesRemaining:Long = length;
          while (( bytesRemaining.toDouble)/splitSize > SPLIT_SLOP) {
            val blkIndex:Int = getBlockIndex(blkLocations, length-bytesRemaining)
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
              blkLocations(blkIndex).getHosts(),
              blkLocations(blkIndex).getCachedHosts()))
            bytesRemaining -= splitSize
          }

          if (bytesRemaining != 0) {
            val blkIndex:Int = getBlockIndex(blkLocations, length-bytesRemaining)
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
              blkLocations(blkIndex).getHosts(),
              blkLocations(blkIndex).getCachedHosts()));
          }
        } else { // not splitable
          splits.add(makeSplit(path, 0, length, blkLocations(0).getHosts(),
          blkLocations(0).getCachedHosts()));
        }
      } else {
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new Array[String](0)))
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size())
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
        + ", TimeTaken: " + sw.elapsedMillis())
    }
    return splits
  }


}


sequenceFileBlockSize  改爲本身想要的大小


使用:oop

val dd = sc.newAPIHadoopFile[BytesWritable,BytesWritable, MySequenceFileInputFormat[BytesWritable,BytesWritable]](sourceDir).flatMap(x=>{
  function(new String(x._2.getBytes))
})
相關文章
相關標籤/搜索