Spark 速度很是快的一個緣由是 RDD 支持緩存。成功緩存後,若是以後的操做使用到了該數據集,則直接從緩存中獲取。雖然緩存也有丟失的風險,可是因爲 RDD 之間的依賴關係,若是某個分區的緩存數據丟失,只須要從新計算該分區便可。算法
涉及到的算子:persist、cache、unpersist;都是 Transformation數組
緩存是將計算結果寫入不一樣的介質,用戶定義可定義存儲級別(存儲級別定義了緩存存儲的介質,目前支持內存、堆
外內存、磁盤);緩存
經過緩存,Spark避免了RDD上的重複計算,可以極大地提高計算速度;
RDD持久化或緩存,是Spark最重要的特徵之一。能夠說,緩存是Spark構建迭代式算法和快速交互式查詢的關鍵因
素;大數據
Spark速度很是快的緣由之一,就是在內存中持久化(或緩存)一個數據集。當持久化一個RDD後,每個節點都將
把計算的分片結果保存在內存中,並在對此數據集(或者衍生出的數據集)進行的其餘動做(Action)中重用。這使
得後續的動做變得更加迅速;使用persist()方法對一個RDD標記爲持久化。之因此說「標記爲持久化」,是由於出現persist()語句的地方,並不會馬
上計算生成RDD並把它持久化,而是要等到遇到第一個行動操做觸發真正計算之後,纔會把計算結果進行持久化;經過persist()或cache()方法能夠標記一個要被持久化的RDD,持久化被觸發,RDD將會被保留在計算節點的內存中
並重用;人工智能
何時緩存數據,須要對空間和速度進行權衡。通常狀況下,若是多個動做須要用到某個 RDD,而它的計算代價
又很高,那麼就應該把這個 RDD 緩存起來;spa
緩存有可能丟失,或者存儲於內存的數據因爲內存不足而被刪除。RDD的緩存的容錯機制保證了即便緩存丟失也能保
證計算的正確執行。經過基於RDD的一系列的轉換,丟失的數據會被重算。RDD的各個Partition是相對獨立的,所以
只須要計算丟失的部分便可,並不須要重算所有Partition。3d
啓動堆外內存須要配置兩個參數:code
Spark 速度很是快的一個緣由是 RDD 支持緩存。成功緩存後,若是以後的操做使用到了該數據集,則直接從緩存中獲取。雖然緩存也有丟失的風險,可是因爲 RDD 之間的依賴關係,若是某個分區的緩存數據丟失,只須要從新計算該分區便可。orm
Spark 支持多種緩存級別 :對象
Storage Level(存儲級別) | Meaning(含義) |
---|---|
MEMORY_ONLY |
默認的緩存級別,將 RDD 以反序列化的 Java 對象的形式存儲在 JVM 中。若是內存空間不夠,則部分分區數據將再也不緩存。 |
MEMORY_AND_DISK |
將 RDD 以反序列化的 Java 對象的形式存儲 JVM 中。若是內存空間不夠,將未緩存的分區數據存儲到磁盤,在須要使用這些分區時從磁盤讀取。 |
MEMORY_ONLY_SER |
將 RDD 以序列化的 Java 對象的形式進行存儲(每一個分區爲一個 byte 數組)。這種方式比反序列化對象節省存儲空間,但在讀取時會增長 CPU 的計算負擔。僅支持 Java 和 Scala 。 |
MEMORY_AND_DISK_SER |
相似於 MEMORY_ONLY_SER ,可是溢出的分區數據會存儲到磁盤,而不是在用到它們時從新計算。僅支持 Java 和 Scala。 |
DISK_ONLY |
只在磁盤上緩存 RDD |
MEMORY_ONLY_2 , MEMORY_AND_DISK_2 |
與上面的對應級別功能相同,可是會爲每一個分區在集羣中的兩個節點上創建副本。 |
OFF_HEAP |
與 MEMORY_ONLY_SER 相似,但將數據存儲在堆外內存中。這須要啓用堆外內存。 |
啓動堆外內存須要配置兩個參數:
緩存數據的方法有兩個:persist
和 cache
。cache
內部調用的也是 persist
,它是 persist
的特殊化形式,等價於 persist(StorageLevel.MEMORY_ONLY)
。示例以下:
// 全部存儲級別均定義在 StorageLevel 對象中 fileRDD.persist(StorageLevel.MEMORY_AND_DISK) fileRDD.cache()
被緩存的RDD在DAG圖中有一個綠色的圓點。
Spark 會自動監視每一個節點上的緩存使用狀況,並按照最近最少使用(LRU)的規則刪除舊數據分區。固然,你也可使用 RDD.unpersist()
方法進行手動刪除。
Spark中對於數據的保存除了持久化操做以外,還提供了檢查點的機制;檢查點本質是經過將RDD寫入高可靠的磁盤,主要目的是爲了容錯。檢查點經過將數據寫入到HDFS文件系統實現了
RDD的檢查點功能。Lineage過長會形成容錯成本太高,這樣就不如在中間階段作檢查點容錯,若是以後有節點出現問題而丟失分區,從
作檢查點的RDD開始重作Lineage,就會減小開銷。
cache 和 checkpoint 是有顯著區別的,緩存把 RDD 計算出來而後放在內存中,可是 RDD 的依賴鏈不能丟掉, 當某個點某個 executor 宕了,上面 cache 的RDD就會丟掉, 須要經過依賴鏈重放計算。不一樣的是,checkpoint 是把
RDD 保存在 HDFS中,是多副本可靠存儲,此時依賴鏈能夠丟掉,因此斬斷了依賴鏈。
如下場景適合使用檢查點機制:
DAG中的Lineage過長,若是重算,則開銷太大
在寬依賴上作 Checkpoint 得到的收益更大
與cache相似 checkpoint 也是 lazy 的。
val rdd1 = sc.parallelize(1 to 100000) // 設置檢查點目錄 sc.setCheckpointDir("/tmp/checkpoint") val rdd2 = rdd1.map(_*2) rdd2.checkpoint // checkpoint是lazy操做 rdd2.isCheckpointed // checkpoint以前的rdd依賴關係 rdd2.dependencies(0).rdd rdd2.dependencies(0).rdd.collect // 執行一次action,觸發checkpoint的執行 rdd2.count rdd2.isCheckpointed // 再次查看RDD的依賴關係。能夠看到checkpoint後,RDD的lineage被截斷,變成從checkpointRDD開始 rdd2.dependencies(0).rdd rdd2.dependencies(0).rdd.collect //查看RDD所依賴的checkpoint文件 rdd2.getCheckpointFile
備註:checkpoint的文件做業執行完畢後不會被刪除
吳邪,小三爺,混跡於後臺,大數據,人工智能領域的小菜鳥。
更多請關注