Spark是基於內存的計算模型,可是當compute chain很是長或者某個計算代價很是大時,能將某些計算的結果進行緩存就顯得很方便了。Spark提供了兩種緩存的方法 Cache 和 checkPoint。本章只關注 Cache (基於spark-core_2.10),在後續的章節中會提到 checkPoint.html
主要從如下三方面來看java
spark的計算是lazy的,只有在執行action時才真正去計算每一個RDD的數據。要使RDD緩存,必須在執行某個action以前定義RDD.persist(),此時也就定義了緩存,可是沒有真正去作緩存。RDD.persist會調用到SparkContext.persistRDD(rdd),同時將RDD註冊到ContextCleaner中(後面會講到這個ContextCleaner)。git
def persist(newLevel: StorageLevel): this.type = { // TODO: Handle changes of StorageLevel if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) { throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level") } sc.persistRDD(this) // Register the RDD with the ContextCleaner for automatic GC-based cleanup sc.cleaner.foreach(_.registerRDDForCleanup(this)) storageLevel = newLevel this }
sc.persistRDD很簡單,將(rdd.id, rdd)加到persistentRdds中。persistentRDDs一個HashMap,key就是rdd.id,value是一個包含時間戳的對rdd的弱引用。persistentRDDs用來跟蹤已經被標記爲persist的RDD的引用的。github
因此在定義緩存階段,作了兩件事:一是設置了rdd的StorageLevel,而是將rdd加到了persistentRdds中並在ContextCleaner中註冊。apache
當執行到某個action時,真正計算纔開始,這時會調用DAGScheduler.submitJob去提交job,經過rdd.iterator()來計算partition。緩存
final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) } else { computeOrReadCheckpoint(split, context) } }
iterator的邏輯很清楚,若是srorageLevel被標記過了就去CacheManager取,不然本身compute或者從checkPoint讀取。markdown
在cacheManager.getOrCompute中,經過RDDBlockId嘗試去BlockManager中獲得緩存的數據。若是緩存得不到(第一次計算),並調用computeOrReadCheckPoint去計算,並將結果cache起來,cache是經過putInBlockManger實現。根據StorageLevel,若是是緩存在內存中,會將結果存在MemoryStore的一個HashMap中,若是是在disk,結果經過DiskStore.put方法存到磁盤的某個文件夾中。這個文件及最終由Utils中的方法肯定ide
private def getOrCreateLocalRootDirsImpl(conf: SparkConf): Array[String] = { if (isRunningInYarnContainer(conf)) { // If we are in yarn mode, systems can have different disk layouts so we must set it // to what Yarn on this system said was available. Note this assumes that Yarn has // created the directories already, and that they are secured so that only the // user has access to them. getYarnLocalDirs(conf).split(",") } else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) { conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator) } else { // In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user // configuration to point to a secure directory. So create a subdirectory with restricted // permissions under each listed directory. Option(conf.getenv("SPARK_LOCAL_DIRS")) .getOrElse(conf.get("spark.local.dir", System.getProperty("java.io.tmpdir"))) .split(",") .flatMap { root => try { val rootDir = new File(root) if (rootDir.exists || rootDir.mkdirs()) { val dir = createTempDir(root) chmod700(dir) Some(dir.getAbsolutePath) } else { logError(s"Failed to create dir in $root. Ignoring this directory.") None } } catch { case e: IOException => logError(s"Failed to create local root dir in $root. Ignoring this directory.") None } } .toArray } }
若是已經緩存了,那麼cacheManager.getOrCompute在調用blockManger.get(RDDBlockId)時會返回結果。get會先調用getLocal在本地獲取,若是本地沒有則調用getRemote去遠程尋找,getRemote會call BlockMangerMaster.getLocation獲得緩存的地址。ui
Spark經過調用rdd.unpersit來釋放緩存,這是經過SparkContext.unpersistRDD來實現的。在unpersistRDD中,rdd會從persistentRdds中移除,並通知BlockManagerMaster去刪除數據緩存。BlockManagerMaster會經過消息機制告訴exectutor去刪除內存或者disk上的緩存數據。this
那麼問題來了,若是用戶不經過手動來unpersit,那緩存豈不是越積越多,最後爆掉嗎?
是的,你的想法徹底合理。所以Spark會自動刪除不在scope內的緩存。「不在scope」指的是在用戶程序中已經沒有了該RDD的引用,RDD的數據是不可讀取的。這裏就要用到以前提到的ContextCleaner。ContextCleaner存了CleanupTaskWeakReference弱引用及存放該引用的隊列。當系統發生GC將沒有強引用的rdd對象回收後,這個弱引用會加入到隊列中。ContextCleaner起了單獨的一個線程輪詢該隊列,將隊列中的弱引用取出,根據引用中的rddId觸發sc.unpersistRDD。經過這樣Spark能及時的將已經垃圾回收的RDD對應的cache進行釋放。這裏要清楚rdd與數據集的關係,rdd只是一個定義了計算邏輯的對象,對象自己不會包含其所表明的數據,數據要經過rdd.compute計算獲得。因此係統回收rdd,只是回收了rdd對象,並無回收rdd表明的數據集。
此外,SparkContext中還有一個MetadataCleaner,該cleaner會移除persistentRdds中的過時的rdd。(筆者一直沒清楚這個移除和cache釋放有什麼關係??)
Reference:
https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence
http://jerryshao.me/architecture/2013/10/08/spark-storage-module-analysis/
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md