0七、RDD持久化


爲了不屢次計算同一個RDD(如上面的同一result RDD就調用了兩次Action操做),可讓Spark對數據進行持久化。當咱們讓Spark持久化存儲一個RDD時,計算出RDD的節點會分別保存它們所求出的分區數據。若是一個有持久化數據的節點發生故障,Spark會在須要用到緩存的數據時重算丟失的數據分區。

Spark很是重要的一個功能特性就是能夠將RDD持久化在內存中。當對RDD執行持久化操做時,每一個節點都會將本身操做的RDD的partition持久化到內存中,而且在以後對該RDD的反覆使用中,直接使用內存緩存的partition。這樣的話,對於針對一個RDD反覆執行多個操做的場景,就只要對RDD計算一次便可,後面直接使用該RDD,而不須要反覆計算屢次該RDD。
 
巧妙使用RDD持久化,甚至在某些場景下,能夠將spark應用程序的性能提高10倍。對於迭代式算法和快速交互式應用來講,RDD持久化,是很是重要的。
 
要持久化一個RDD,只要調用其 cache()或者 persist()方法便可。在該RDD第一次被計算出來時,就會直接緩存在每一個節點中。並且Spark的持久化機制仍是自動容錯的,若是持久化的RDD的任何partition丟失了,那麼Spark會自動經過其源RDD,使用transformation操做從新計算該partition。
 
cache()和persist()的區別在於,cache()是persist()的一種簡化方式,cache()的底層就是調用的persist()的無參版本,同時就是調用persist(MEMORY_ONLY),將數據持久化到內存中。若是須要從內存中清理緩存,那麼可使用unpersist()方法。
 
Spark本身也會在shuffle操做時,進行數據的持久化,好比寫入磁盤,主要是爲了在節點失敗時,避免須要從新計算整個過程。
 
 RDD持久化策略
 RDD持久化是能夠手動選擇不一樣的策略的。好比能夠將RDD持久化在內存中、持久化到磁盤上、使用序列化的方式持久化,多持久化的數據進行多路複用。只要在調用persist()時傳入對應的StorageLevel便可。

持久化級別 html

 

MEMORY_ONLY 算法

以非序列化的Java對象的方式持久化在JVM內存中。若是內存沒法徹底存儲RDD全部的partition,那麼那些沒有持久化的partition就會在下一次須要使用它的時候,從新被計算 緩存

MEMORY_AND_DISK app

同上,可是當某些partition沒法存儲在內存中時,會持久化到磁盤中。下次須要使用這些partition時,須要從磁盤上讀取 ide

MEMORY_ONLY_SER 性能

MEMORY_ONLY,可是會使用Java序列化方式,將Java對象序列化後進行持久化。能夠減小內存開銷,可是須要進行反序列化,所以會加大CPU開銷 ui

MEMORY_AND_DSK_SER spa

MEMORY_AND_DSK。可是使用序列化方式持久化Java對象 3d

DISK_ONLY orm

使用非序列化Java對象的方式持久化,徹底存儲到磁盤上

MEMORY_ONLY_2

MEMORY_AND_DISK_2

等等

若是是尾部加了2的持久化級別,表示會將持久化數據複用一份,保存到其餘節點,從而在數據丟失時,不須要再次計算,只須要使用備份數據便可

 

如何選擇RDD持久化策略?
Spark提供的多種持久化級別,主要是爲了在CPU和內存消耗之間進行取捨。下面是一些通用的持久化級別的選擇建議:
 
一、優先使用MEMORY_ONLY,若是能夠緩存全部數據的話,那麼就使用這種策略。由於純內存速度最快,並且沒有序列化,不須要消耗CPU進行反序列化操做。
二、若是MEMORY_ONLY策略,沒法存儲的下全部數據的話,那麼使用MEMORY_ONLY_SER,將數據進行序列化進行存儲,純內存操做仍是很是快,只是要消耗CPU進行反序列化。
三、若是須要進行快速的失敗恢復,那麼就選擇帶後綴爲_2的策略,進行數據的備份,這樣在失敗時,就不須要從新計算了。
四、能不使用DISK相關的策略,就不用使用,有的時候,從磁盤讀取數據,還不如從新計算一次。
 
 正確調用:

注意,須要在第一次調用Acton操做以前就要調用  persist() 方法。
若是要緩存的數據太多,內存中放不下,Spark會自動利用最近最少使用(LRU)的緩存策略把最老的分區從內存中移除。對於僅把數據存放在內存中的緩存級別,下一次要用到已經被移除的分區時,這些分區就須要從新計算。可是對於使用內存與磁盤的緩存級別(MEMORY_AND_DISK、MEMORY_AND_DISK_SER)的分區來講,被移除的分區都會寫入磁盤。不論哪種狀況,都沒必要擔憂你的做業由於緩存了太多數據而被打斷。
 
/**
 * RDD持久化
 */
public class Persist {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("Persist").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        // cache()或者persist()的使用,是有規則的
        // 必須在transformation或者textFile等建立了一個RDD以後,直接連續調用cache()或persist()才能夠
        // 若是你先建立一個RDD,而後單獨另起一行執行cache()或persist()方法,是沒有用的
        // 並且,會報錯,大量的文件會丟失
        JavaRDD<String> lines = sc.textFile("test.txt").cache();
        long beginTime = System.currentTimeMillis();
        long count = lines.count();
        System.out.println(count);
        long endTime = System.currentTimeMillis();
        System.out.println("cost " + (endTime - beginTime) + " milliseconds.");
        beginTime = System.currentTimeMillis();
        count = lines.count();
        System.out.println(count);
        endTime = System.currentTimeMillis();
        System.out.println("cost " + (endTime - beginTime) + " milliseconds.");
        sc.close();
    }
}
相關文章
相關標籤/搜索