Spark 代碼走讀之 Cache

Spark是基於內存的計算模型,可是當compute chain很是長或者某個計算代價很是大時,能將某些計算的結果進行緩存就顯得很方便了。Spark提供了兩種緩存的方法 Cache 和 checkPoint。本章只關注 Cache (基於spark-core_2.10),在後續的章節中會提到 checkPoint.html

主要從如下三方面來看java

  1. persist時發生什麼
  2. 執行action時如何去緩存及讀取緩存
  3. 如何釋放緩存

定義緩存

spark的計算是lazy的,只有在執行action時才真正去計算每一個RDD的數據。要使RDD緩存,必須在執行某個action以前定義RDD.persist(),此時也就定義了緩存,可是沒有真正去作緩存。RDD.persist會調用到SparkContext.persistRDD(rdd),同時將RDD註冊到ContextCleaner中(後面會講到這個ContextCleaner)。git

def persist(newLevel: StorageLevel): this.type = {
    // TODO: Handle changes of StorageLevel
    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
      throw new UnsupportedOperationException(
        "Cannot change storage level of an RDD after it was already assigned a level")
    }
    sc.persistRDD(this)
    // Register the RDD with the ContextCleaner for automatic GC-based cleanup
    sc.cleaner.foreach(_.registerRDDForCleanup(this))
    storageLevel = newLevel
    this
  }

sc.persistRDD很簡單,將(rdd.id, rdd)加到persistentRdds中。persistentRDDs一個HashMap,key就是rdd.id,value是一個包含時間戳的對rdd的弱引用。persistentRDDs用來跟蹤已經被標記爲persist的RDD的引用的。github

因此在定義緩存階段,作了兩件事:一是設置了rdd的StorageLevel,而是將rdd加到了persistentRdds中並在ContextCleaner中註冊。apache

緩存

當執行到某個action時,真正計算纔開始,這時會調用DAGScheduler.submitJob去提交job,經過rdd.iterator()來計算partition。緩存

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

iterator的邏輯很清楚,若是srorageLevel被標記過了就去CacheManager取,不然本身compute或者從checkPoint讀取。markdown

在cacheManager.getOrCompute中,經過RDDBlockId嘗試去BlockManager中獲得緩存的數據。若是緩存得不到(第一次計算),並調用computeOrReadCheckPoint去計算,並將結果cache起來,cache是經過putInBlockManger實現。根據StorageLevel,若是是緩存在內存中,會將結果存在MemoryStore的一個HashMap中,若是是在disk,結果經過DiskStore.put方法存到磁盤的某個文件夾中。這個文件及最終由Utils中的方法肯定ide

private def getOrCreateLocalRootDirsImpl(conf: SparkConf): Array[String] = {
    if (isRunningInYarnContainer(conf)) {
      // If we are in yarn mode, systems can have different disk layouts so we must set it
      // to what Yarn on this system said was available. Note this assumes that Yarn has
      // created the directories already, and that they are secured so that only the
      // user has access to them.
      getYarnLocalDirs(conf).split(",")
    } else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
      conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
    } else {
      // In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user
      // configuration to point to a secure directory. So create a subdirectory with restricted
      // permissions under each listed directory.
      Option(conf.getenv("SPARK_LOCAL_DIRS"))
        .getOrElse(conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
        .split(",")
        .flatMap { root =>
          try {
            val rootDir = new File(root)
            if (rootDir.exists || rootDir.mkdirs()) {
              val dir = createTempDir(root)
              chmod700(dir)
              Some(dir.getAbsolutePath)
            } else {
              logError(s"Failed to create dir in $root. Ignoring this directory.")
              None
            }
          } catch {
            case e: IOException =>
            logError(s"Failed to create local root dir in $root. Ignoring this directory.")
            None
          }
        }
        .toArray
    }
  }

若是已經緩存了,那麼cacheManager.getOrCompute在調用blockManger.get(RDDBlockId)時會返回結果。get會先調用getLocal在本地獲取,若是本地沒有則調用getRemote去遠程尋找,getRemote會call BlockMangerMaster.getLocation獲得緩存的地址。ui

釋放

Spark經過調用rdd.unpersit來釋放緩存,這是經過SparkContext.unpersistRDD來實現的。在unpersistRDD中,rdd會從persistentRdds中移除,並通知BlockManagerMaster去刪除數據緩存。BlockManagerMaster會經過消息機制告訴exectutor去刪除內存或者disk上的緩存數據。this

那麼問題來了,若是用戶不經過手動來unpersit,那緩存豈不是越積越多,最後爆掉嗎?

是的,你的想法徹底合理。所以Spark會自動刪除不在scope內的緩存。「不在scope」指的是在用戶程序中已經沒有了該RDD的引用,RDD的數據是不可讀取的。這裏就要用到以前提到的ContextCleaner。ContextCleaner存了CleanupTaskWeakReference弱引用及存放該引用的隊列。當系統發生GC將沒有強引用的rdd對象回收後,這個弱引用會加入到隊列中。ContextCleaner起了單獨的一個線程輪詢該隊列,將隊列中的弱引用取出,根據引用中的rddId觸發sc.unpersistRDD。經過這樣Spark能及時的將已經垃圾回收的RDD對應的cache進行釋放。這裏要清楚rdd與數據集的關係,rdd只是一個定義了計算邏輯的對象,對象自己不會包含其所表明的數據,數據要經過rdd.compute計算獲得。因此係統回收rdd,只是回收了rdd對象,並無回收rdd表明的數據集。

此外,SparkContext中還有一個MetadataCleaner,該cleaner會移除persistentRdds中的過時的rdd。(筆者一直沒清楚這個移除和cache釋放有什麼關係??)

 

Reference:

https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence

http://jerryshao.me/architecture/2013/10/08/spark-storage-module-analysis/

https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md

http://blog.csdn.net/yueqian_zhu/article/details/48177353

http://www.cnblogs.com/jiaan-geng/p/5189177.html

相關文章
相關標籤/搜索