Apache Spark 的設計與實現(cache和checkpoint功能)

Cache 和 Checkpoint

做爲區別於 Hadoop 的一個重要 feature,cache 機制保證了須要訪問重複數據的應用(如迭代型算法和交互式應用)能夠運行的更快。與 Hadoop MapReduce job 不一樣的是 Spark 的邏輯/物理執行圖可能很龐大,task 中 computing chain 可能會很長,計算某些 RDD 也可能會很耗時。這時,若是 task 中途運行出錯,那麼 task 的整個 computing chain 須要重算,代價過高。所以,有必要將計算代價較大的 RDD checkpoint 一下,這樣,當下遊 RDD 計算出錯時,能夠直接從 checkpoint 過的 RDD 那裏讀取數據繼續算。

Cache 機制

回到 Overview 提到的 GroupByTest 的例子,裏面對 FlatMappedRDD 進行了 cache,這樣 Job 1 在執行時就直接從 FlatMappedRDD 開始算了。可見 cache 可以讓重複數據在同一個 application 中的 jobs 間共享。 邏輯執行圖: deploy物理執行圖: deploy 問題:哪些 RDD 須要 cache? 會被重複使用的(但不能太大)。 問題:用戶怎麼設定哪些 RDD 要 cache? 由於用戶只與 driver program 打交道,所以只能用 rdd.cache() 去 cache 用戶能看到的 RDD。所謂能看到指的是調用 transformation() 後生成的 RDD,而某些在 transformation() 中 Spark 本身生成的 RDD 是不能被用戶直接 cache 的,好比 reduceByKey() 中會生成的 ShuffledRDD、MapPartitionsRDD 是不能被用戶直接 cache 的。 問題:driver program 設定 rdd.cache() 後,系統怎麼對 RDD 進行 cache? 先不看實現,本身來想象一下如何完成 cache:當 task 計算獲得 RDD 的某個 partition 的第一個 record 後,就去判斷該 RDD 是否要被 cache,若是要被 cache 的話,將這個 record 及後續計算的到的 records 直接丟給本地 blockManager 的 memoryStore,若是 memoryStore 存不下就交給 diskStore 存放到磁盤。 實際實現與設想的基本相似,區別在於:將要計算 RDD partition 的時候(而不是已經計算獲得第一個 record 的時候)就去判斷 partition 要不要被 cache。若是要被 cache 的話,先將 partition 計算出來,而後 cache 到內存。cache 只使用 memory,寫磁盤的話那就叫 checkpoint 了。 調用 rdd.cache() 後, rdd 就變成 persistRDD 了,其 StorageLevel 爲 MEMORY_ONLY。persistRDD 會告知 driver 說本身是須要被 persist 的。 cache 若是用代碼表示:
rdd.iterator()
=> SparkEnv.get.cacheManager.getOrCompute(thisRDD, split, context, storageLevel)
=> key = RDDBlockId(rdd.id, split.index)
=> blockManager.get(key)
=> computedValues = rdd.computeOrReadCheckpoint(split, context)
      if (isCheckpointed) firstParent[T].iterator(split, context) 
      else compute(split, context)
=> elements = new ArrayBuffer[Any]
=> elements ++= computedValues
=> updatedBlocks = blockManager.put(key, elements, tellMaster = true)
  當 rdd.iterator() 被調用的時候,也就是要計算該 rdd 中某個 partition 的時候,會先去 cacheManager 那裏領取一個 blockId,代表是要存哪一個 RDD 的哪一個 partition,這個 blockId 類型是 RDDBlockId(memoryStore 裏面可能還存放有 task 的 result 等數據,所以 blockId 的類型是用來區分不一樣的數據)。而後去 blockManager 裏面查看該 partition 是否是已經被 checkpoint 了,若是是,代表之前運行過該 task,那就不用計算該 partition 了,直接從 checkpoint 中讀取該 partition 的全部 records 放到叫作 elements 的 ArrayBuffer 裏面。若是沒有被 checkpoint 過,先將 partition 計算出來,而後將其全部 records 放到 elements 裏面。最後將 elements 交給 blockManager 進行 cache。 blockManager 將 elements(也就是 partition) 存放到 memoryStore 管理的 LinkedHashMap[BlockId, Entry] 裏面。若是 partition 大於 memoryStore 的存儲極限(默認是 60% 的 heap),那麼直接返回說存不下。若是剩餘空間也許能放下,會先 drop 掉一些早先被 cached 的 RDD 的 partition,爲新來的 partition 騰地方,若是騰出的地方夠,就把新來的 partition 放到 LinkedHashMap 裏面,騰不出就返回說存不下。注意 drop 的時候不會去 drop 與新來的 partition 同屬於一個 RDD 的 partition。drop 的時候先 drop 最先被 cache 的 partition。(說好的 LRU 替換算法呢?) 問題:cached RDD 怎麼被讀取? 下次計算(通常是同一 application 的下一個 job 計算)時若是用到 cached RDD,task 會直接去 blockManager 的 memoryStore 中讀取。具體地講,當要計算某個 rdd 中的 partition 時候(經過調用 rdd.iterator())會先去 blockManager 裏面查找是否已經被 cache 了,若是 partition 被 cache 在本地,就直接使用 blockManager.getLocal() 去本地 memoryStore 裏讀取。若是該 partition 被其餘節點上 blockManager cache 了,會經過 blockManager.getRemote() 去其餘節點上讀取,讀取過程以下圖。 cacheRead 獲取 cached partitions 的存儲位置:partition 被 cache 後所在節點上的 blockManager 會通知 driver 上的 blockMangerMasterActor 說某 rdd 的 partition 已經被我 cache 了,這個信息會存儲在 blockMangerMasterActor 的 blockLocations: HashMap中。等到 task 執行須要 cached rdd 的時候,會調用 blockManagerMaster 的 getLocations(blockId) 去詢問某 partition 的存儲位置,這個詢問信息會發到 driver 那裏,driver 查詢 blockLocations 得到位置信息並將信息送回。 讀取其餘節點上的 cached partition:task 獲得 cached partition 的位置信息後,將 GetBlock(blockId) 的請求經過 connectionManager 發送到目標節點。目標節點收到請求後從本地 blockManager 那裏的 memoryStore 讀取 cached partition,最後發送回來。

Checkpoint

問題:哪些 RDD 須要 checkpoint? 運算時間很長或運算量太大才能獲得的 RDD,computing chain 過長或依賴其餘 RDD 不少的 RDD。 實際上,將 ShuffleMapTask 的輸出結果存放到本地磁盤也算是 checkpoint,只不過這個 checkpoint 的主要目的是去 partition 輸出數據。 問題:何時 checkpoint? cache 機制是每計算出一個要 cache 的 partition 就直接將其 cache 到內存了。但 checkpoint 沒有使用這種第一次計算獲得就存儲的方法,而是等到 job 結束後另外啓動專門的 job 去完成 checkpoint 。 也就是說須要 checkpoint 的 RDD 會被計算兩次。所以,在使用 rdd.checkpoint() 的時候,建議加上 rdd.cache(),這樣第二次運行的 job 就不用再去計算該 rdd 了,直接讀取 cache 寫磁盤。其實 Spark 提供了 rdd.persist(StorageLevel.DISK_ONLY) 這樣的方法,至關於 cache 到磁盤上,這樣能夠作到 rdd 第一次被計算獲得時就存儲到磁盤上,但這個 persist 和 checkpoint 有不少不一樣,以後會討論。 問題:checkpoint 怎麼實現? RDD 須要通過 [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ] 這幾個階段才能被 checkpoint。 Initialized: 首先 driver program 須要使用 rdd.checkpoint() 去設定哪些 rdd 須要 checkpoint,設定後,該 rdd 就接受 RDDCheckpointData 管理。用戶還要設定 checkpoint 的存儲路徑,通常在 HDFS 上。 marked for checkpointing:初始化後,RDDCheckpointData 會將 rdd 標記爲 MarkedForCheckpoint。 checkpointing in progress:每一個 job 運行結束後會調用 finalRdd.doCheckpoint(),finalRdd 會順着 computing chain 回溯掃描,碰到要 checkpoint 的 RDD 就將其標記爲 CheckpointingInProgress,而後將寫磁盤(好比寫 HDFS)須要的配置文件(如 core-site.xml 等)broadcast 到其餘 worker 節點上的 blockManager。完成之後,啓動一個 job 來完成 checkpoint(使用 rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf)))。 checkpointed:job 完成 checkpoint 後,將該 rdd 的 dependency 所有清掉,並設定該 rdd 狀態爲 checkpointed。而後, 爲該 rdd 強加一個依賴,設置該 rdd 的 parent rdd 爲 CheckpointRDD,該 CheckpointRDD 負責之後讀取在文件系統上的 checkpoint 文件,生成該 rdd 的 partition。 有意思的是我在 driver program 裏 checkpoint 了兩個 rdd,結果只有一個(下面的 result)被 checkpoint 成功,pairs2 沒有被 checkpoint,也不知道是 bug 仍是故意只 checkpoint 下游的 RDD:
val data1 = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'), 
    (4, 'd'), (5, 'e'), (3, 'f'), (2, 'g'), (1, 'h'))
val pairs1 = sc.parallelize(data1, 3)

val data2 = Array[(Int, Char)]((1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'))
val pairs2 = sc.parallelize(data2, 2)

pairs2.checkpoint

val result = pairs1.join(pairs2)
result.checkpoint
  問題:怎麼讀取 checkpoint 過的 RDD? 在 runJob() 的時候會先調用 finalRDD 的 partitions() 來肯定最後會有多個 task。rdd.partitions() 會去檢查(經過 RDDCheckpointData 去檢查,由於它負責管理被 checkpoint 過的 rdd)該 rdd 是會否被 checkpoint 過了,若是該 rdd 已經被 checkpoint 過了,直接返回該 rdd 的 partitions 也就是 Array[Partition]。 當調用 rdd.iterator() 去計算該 rdd 的 partition 的時候,會調用 computeOrReadCheckpoint(split: Partition) 去查看該 rdd 是否被 checkpoint 過了,若是是,就調用該 rdd 的 parent rdd 的 iterator() 也就是 CheckpointRDD.iterator(),CheckpointRDD 負責讀取文件系統上的文件,生成該 rdd 的 partition。 這就解釋了爲何那麼 trickly 地爲 checkpointed rdd 添加一個 parent CheckpointRDD。 問題:cache 與 checkpoint 的區別? 關於這個問題,Tathagata Das 有一段回答: There is a significant difference between cache and checkpoint. Cache materializes the RDD and keeps it in memory and/or disk(其實只有 memory). But the lineage(也就是 computing chain) of RDD (that is, seq of operations that generated the RDD) will be remembered, so that if there are node failures and parts of the cached RDDs are lost, they can be regenerated. However, checkpoint saves the RDD to an HDFS file and actually forgets the lineage completely. This is allows long lineages to be truncated and the data to be saved reliably in HDFS (which is naturally fault tolerant by replication). 深刻一點討論,rdd.persist(StorageLevel.DISK_ONLY) 與 checkpoint 也有區別。前者雖然能夠將 RDD 的 partition 持久化到磁盤,但該 partition 由 blockManager 管理。一旦 driver program 執行結束,也就是 executor 所在進程 CoarseGrainedExecutorBackend stop,blockManager 也會 stop,被 cache 到磁盤上的 RDD 也會被清空(整個 blockManager 使用的 local 文件夾被刪除)。而 checkpoint 將 RDD 持久化到 HDFS 或本地文件夾,若是不被手動 remove 掉( 話說怎麼 remove checkpoint 過的 RDD?),是一直存在的,也就是說能夠被下一個 driver program 使用,而 cached RDD 不能被其餘 dirver program 使用。

Discussion

Hadoop MapReduce 在執行 job 的時候,不停地作持久化,每一個 task 運行結束作一次,每一個 job 運行結束作一次(寫到 HDFS)。在 task 運行過程當中也不停地在內存和磁盤間 swap 來 swap 去。 但是諷刺的是,Hadoop 中的 task 太傻,中途出錯須要徹底從新運行,好比 shuffle 了一半的數據存放到了磁盤,下次從新運行時仍然要從新 shuffle。Spark 好的一點在於儘可能不去持久化,因此使用 pipeline,cache 等機制。用戶若是感受 job 可能會出錯能夠手動去 checkpoint 一些 critical 的 RDD,job 若是出錯,下次運行時直接從 checkpoint 中讀取數據。惟一不足的是,checkpoint 須要兩次運行 job。

Example

貌似尚未發現官方給出的 checkpoint 的例子,這裏我寫了一個:
package internals

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object groupByKeyTest {

   def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("GroupByKey").setMaster("local")
    val sc = new SparkContext(conf) 
    sc.setCheckpointDir("/Users/xulijie/Documents/data/checkpoint")

    val data = Array[(Int, Char)]((1, 'a'), (2, 'b'),
                                     (3, 'c'), (4, 'd'),
                                     (5, 'e'), (3, 'f'),
                                     (2, 'g'), (1, 'h')
                                    )                                
    val pairs = sc.parallelize(data, 3)

    pairs.checkpoint
    pairs.count

    val result = pairs.groupByKey(2)

    result.foreachWith(i => i)((x, i) => println("[PartitionIndex " + i + "] " + x))

    println(result.toDebugString)
   }
}
 
相關文章
相關標籤/搜索