【原創】大叔問題定位分享(27)spark中rdd.cache

spark 2.1.1java

spark應用中有一些task很是慢,持續10個小時,有一個task日誌以下:node

2019-01-24 21:38:56,024 [dispatcher-event-loop-22] INFO org.apache.spark.executor.CoarseGrainedExecutorBackend - Got assigned task 4031
2019-01-24 21:38:56,024 [Executor task launch worker for task 4031] INFO org.apache.spark.executor.Executor - Running task 11.0 in stage 98.0 (TID 4031)
2019-01-24 21:38:56,050 [Executor task launch worker for task 4031] INFO org.apache.spark.MapOutputTrackerWorker - Don't have map outputs for shuffle 13, fetching them
2019-01-24 21:38:56,050 [Executor task launch worker for task 4031] INFO org.apache.spark.MapOutputTrackerWorker - Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@server1:30384)
2019-01-24 21:38:56,052 [Executor task launch worker for task 4031] INFO org.apache.spark.MapOutputTrackerWorker - Got the output locations
2019-01-24 21:38:56,052 [Executor task launch worker for task 4031] INFO org.apache.spark.storage.ShuffleBlockFetcherIterator - Getting 200 non-empty blocks out of 200 blocks
2019-01-24 21:38:56,054 [Executor task launch worker for task 4031] INFO org.apache.spark.storage.ShuffleBlockFetcherIterator - Started 19 remote fetches in 2 mssql

2019-01-25 07:07:54,200 [Executor task launch worker for task 4031] INFO org.apache.spark.storage.memory.MemoryStore - Block rdd_108_11 stored as values in memory (estimated size 222.6 MB, free 1893.2 MB)
2019-01-25 07:07:54,546 [Executor task launch worker for task 4031] INFO org.apache.spark.storage.memory.MemoryStore - Block rdd_117_11 stored as values in memory (estimated size 87.5 MB, free 1805.8 MB)
2019-01-25 07:07:54,745 [Executor task launch worker for task 4031] INFO org.apache.spark.storage.memory.MemoryStore - Block rdd_118_11 stored as values in memory (estimated size 87.5 MB, free 1718.3 MB)
2019-01-25 07:07:54,987 [Executor task launch worker for task 4031] INFO org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer - Sorting complete. Writing out partition files one at a time.
2019-01-25 07:07:57,425 [Executor task launch worker for task 4031] INFO org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - Saved output of task 'attempt_20190124213852_0098_m_000011_0' to hdfs://namenode/user/hive/warehouse/
db_name.db/table_name/.hive-staging_hive_2019-01-24_21-38-52_251_7997709482427937209-1/-ext-10000/_temporary/0/task_20190124213852_0098_m_000011
2019-01-25 07:07:57,425 [Executor task launch worker for task 4031] INFO org.apache.spark.mapred.SparkHadoopMapRedUtil - attempt_20190124213852_0098_m_000011_0: Committed
2019-01-25 07:07:57,426 [Executor task launch worker for task 4031] INFO org.apache.spark.executor.Executor - Finished task 11.0 in stage 98.0 (TID 4031). 4259 bytes result sent to driverapache

從2019-01-24 21:38:56到2019-01-25 07:07:54之間沒有任何日誌,應用還沒結束,當前還有一些很慢的task在運行,查看這些task所在executor的thread dump發現卡在一個線程上:緩存

java.lang.Thread.sleep(Native Method)
app.package.AppClass.do(AppClass.scala:228)
org.apache.spark.sql.execution.MapElementsExec$$anonfun$8$$anonfun$apply$1.apply(objects.scala:237)
org.apache.spark.sql.execution.MapElementsExec$$anonfun$8$$anonfun$apply$1.apply(objects.scala:237)
scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1005)
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:996)
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:936)
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
org.apache.spark.scheduler.Task.run(Task.scala:99)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)app

其中app.package.AppClass.do是一個很耗時的操做,會在rdd的每一個element上操做一次,問題是已經在這個操做以後對rdd作了cache,爲何後續依賴這個rdd的時候又會從新計算一遍?ide

問題簡化以下:oop

rdd.map(item => doLongTime(item))
rdd.cache
//take long time
println(rdd.count)
//take long time too, why?
println(rdd.count)

查看代碼fetch

RDD的compute由子類覆蓋,一般會調用RDD.iteratorui

org.apache.spark.rdd.RDD

  /**
   * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
   * This should ''not'' be called by users directly, but is available for implementors of custom
   * subclasses of RDD.
   */
  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      getOrCompute(split, context)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

  /**
   * Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached.
   */
  private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
    val blockId = RDDBlockId(id, partition.index)
    var readCachedBlock = true
    // This method is called on executors, so we need call SparkEnv.get instead of sc.env.
    SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
      readCachedBlock = false
      computeOrReadCheckpoint(partition, context)
    }) match {
      case Left(blockResult) =>
        if (readCachedBlock) {
          val existingMetrics = context.taskMetrics().inputMetrics
          existingMetrics.incBytesRead(blockResult.bytes)
          new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
            override def next(): T = {
              existingMetrics.incRecordsRead(1)
              delegate.next()
            }
          }
        } else {
          new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
        }
      case Right(iter) =>
        new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
    }
  }

RDD.iterator中會根據storageLevel有一個判斷,一個是嘗試從checkpoint中恢復或者計算,一個是從cache中get或計算,加了cache的rdd會執行RDD.getOrCompute,RDD.getOrCompute會調用BlockManager.getOrElseUpdate

org.apache.spark.storage.BlockManager

  /**
   * Retrieve the given block if it exists, otherwise call the provided `makeIterator` method
   * to compute the block, persist it, and return its values.
   *
   * @return either a BlockResult if the block was successfully cached, or an iterator if the block
   *         could not be cached.
   */
  def getOrElseUpdate[T](
      blockId: BlockId,
      level: StorageLevel,
      classTag: ClassTag[T],
      makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
    // Attempt to read the block from local or remote storage. If it's present, then we don't need
    // to go through the local-get-or-put path.
    get[T](blockId)(classTag) match {
      case Some(block) =>
        return Left(block)
      case _ =>
        // Need to compute the block.
    }
    // Initially we hold no locks on this block.
    doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
      case None =>
        // doPut() didn't hand work back to us, so the block already existed or was successfully
        // stored. Therefore, we now hold a read lock on the block.
        val blockResult = getLocalValues(blockId).getOrElse {
          // Since we held a read lock between the doPut() and get() calls, the block should not
          // have been evicted, so get() not returning the block indicates some internal error.
          releaseLock(blockId)
          throw new SparkException(s"get() failed for block $blockId even though we held a lock")
        }
        // We already hold a read lock on the block from the doPut() call and getLocalValues()
        // acquires the lock again, so we need to call releaseLock() here so that the net number
        // of lock acquisitions is 1 (since the caller will only call release() once).
        releaseLock(blockId)
        Left(blockResult)
      case Some(iter) =>
        // The put failed, likely because the data was too large to fit in memory and could not be
        // dropped to disk. Therefore, we need to pass the input iterator back to the caller so
        // that they can decide what to do with the values (e.g. process them without caching).
       Right(iter)
    }
  }

getOrElseUpdate.getOrElseUpdate首先嚐試從cache中獲取block,若是沒有則調用doPutIterator計算並放到cache中;

org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:996)
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:700)
org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)

因此jstack中的堆棧doPutIterator代表cache中沒有,須要從新計算;

org.apache.spark.rdd.RDD

  /**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def cache(): this.type = persist()

  /**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

cache使用的StorageLevel是MEMORY_ONLY,若是內存不夠有些分區可能會被evict掉,具體策略在org.apache.spark.storage.memory.MemoryStore中

下面看StorageLevel:

org.apache.spark.storage.StorageLevel

/**
 * :: DeveloperApi ::
 * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
 * or ExternalBlockStore, whether to drop the RDD to disk if it falls out of memory or
 * ExternalBlockStore, whether to keep the data in memory in a serialized format, and whether
 * to replicate the RDD partitions on multiple nodes.
 *
 * The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants
 * for commonly useful storage levels. To create your own storage level object, use the
 * factory method of the singleton object (`StorageLevel(...)`).
 */
@DeveloperApi
class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable {
...

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

因此一些昂貴的操做以後不要覺得Rdd.cache就能夠避免重複計算,由於MEMORY_ONLY只是儘可能幫你把數據緩存在內存,並非一種保證,應該使用RDD.persist(StorageLevel.MEMORY_AND_DISK)

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息