RDD(Resilient Distributed Dataset),是指彈性分佈式數據集。數據集:Spark中的編程是基於RDD的,將原始數據加載到內存變成RDD,RDD再通過若干次轉化,仍爲RDD。分佈式:讀數據通常都是從分佈式系統中去讀,如hdfs、kafka等,因此原始文件存在磁盤是分佈式的,spark加載完數據的RDD也是分佈式的,換句話說RDD是抽象的概念,實際數據仍在分佈式文件系統中;由於有了RDD,在開發代碼過程會很是方便,只須要將原始數據理解爲一個集合,而後對集合進行操做便可。RDD裏面每一塊數據/partition,分佈在某臺機器的物理節點上,這是物理概念。彈性:這裏是指數據集會進行轉換,因此會忽大忽小,partition數量忽多忽少。node
Spark-1.6.1源碼在org.apache.spark.rdd下的RDD.scala指出了每個RDD都具備五個主要特色,以下:apache
RDD是由一組partition組成。例如要讀取hdfs上的文本文件的話,可使用textFile()方法把hdfs的文件加載過來,把每臺機器的數據放到partition中,而且封裝了一個HadoopRDD,這就是一個抽象的概念。每個partition都對應了機器中的數據。由於在hdfs中的一個Datanode,有不少的block,讀機器的數據時,會將每個block變成一個partition,與MapReduce中split的大小由min split,max split,block size (max(min split, min(max split, block size)))決定的相同,spark中的partition大小實際上對應了一個split的大小。通過轉化,HadoopRDD會轉成其餘RDD,如FilteredRDD、PairRDD等,可是partition仍是相應的partition,只是由於有函數應用裏面的數據變化了。編程
對每一個split(partition)都有函數操做。一個函數應用在一個RDD上,能夠理解爲一個函數對集合(RDD)內的每一個元素(split)的操做。緩存
一個RDD依賴於一組RDD。例如,下列代碼片斷框架
val lines=sc.textFlie("hdfs://namenode:8020/path/file.txt") val wc=lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2) wc.foreach(println)
sc.stop()
這裏就存在RDD的依賴關係。dom
該可選項意思是對於一個RDD,若是其中的每個元素是Key-Value形式時,能夠傳一個Partitioner(自定義分區),讓這個RDD從新分區。這種狀況的本質是shuffle,多點到多點的數據傳輸。分佈式
textFile()過程當中,能夠指定加載到性能好的機器中。例如,hdfs中的數據可能放在一大堆破舊的機器上,hdfs數據在磁盤上,磁盤可能很大,CPU、內存的性能不好。Spark默認作的事情是,把數據加載進來,會把數據抽象成一個RDD,抽象進來的數據在內存中,這內存指的是本機的內存,這是由於在分佈式文件系統中,要遵循數據本地性原則,即移動計算(把函數、jar包發過去)而不移動數據(移動數據成本較高)。而通常hdfs的集羣機器的內存比較差,若是要把這麼多數據加載到爛機器的內存中,會存在問題,一是內存可能裝不下,二是CPU差、計算能力差,這就等於沒有發揮出spark的性能。在這種狀況下,Spark的RDD能夠提供一個可選項,能夠指定一個preferred locations,即指定一個位置來加載數據。這樣就能夠指定加載到性能好的機器去計算。例如,能夠將hdfs數據加載到Tachyon內存文件系統中,而後再基於Tachyon來作spark程序。 ide
源碼org.apache.spark.storage包下的StorageLevel.scala中定義緩存策略。函數
StorageLevel類默認的構造器有五個屬性,以下圖所示:oop
class StorageLevel private( private var _useDisk: Boolean,/*使用磁盤*/ private var _useMemory: Boolean,/*使用內存*/ private var _useOffHeap: Boolean,/*不使用堆內存(堆在JVM中)*/ private var _deserialized: Boolean,/*不序列化*/ private var _replication: Int = 1)/*副本數,默認爲1*/
val NONE = new StorageLevel(false, false, false, false)
NONE表示不須要緩存。(不使用磁盤,不用內存,使用堆,序列化)
val DISK_ONLY = new StorageLevel(true, false, false, false)
DISK_ONLY表示使用磁盤。(使用磁盤,不用內存,使用堆,序列化)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
DISK_ONLY_2表示使用磁盤,兩個副本。(使用磁盤,不用內存,使用堆,序列化,2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
MEMORY_ONLY表示只使用內存,例如1G的數據要放入512M的內存,會將數據切成兩份,先將512M加載到內存,剩下的512M還在原來位置(如hdfs),以後若是有RDD的運算,會從內存和磁盤中去找各自的512M數據。(不使用磁盤,使用內存,使用堆,不序列化)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
MEMORY_ONLY_2表示只使用內存,2個副本。(不使用磁盤,使用內存,使用堆,不序列化,2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
MEMORY_ONLY_SER表示只使用內存,序列化。(不使用磁盤,使用內存,使用堆,序列化)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
MEMORY_ONLY_SER表示只使用內存,序列化,2個副本。(不使用磁盤,使用內存,使用堆,序列化,2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
MEMORY_AND_DISK和MEMORY_ONLY很相似,都使用到了內存和磁盤,只是使用的是本機本地磁盤,例如1G數據要加載到512M的內存中,首先將hdfs的1G數據的512M加載到內存,另外的512M加載到本地的磁盤緩存着(和hdfs就沒有關係了),RDD要讀取數據的話就在內存和本地磁盤中找。(使用磁盤,使用內存,使用堆,不序列化)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
MEMORY_AND_DISK_2表示兩個副本。(使用磁盤,使用內存,使用堆,不序列化,2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
MEMORY_AND_DISK_SER本地內存和磁盤,序列化。序列化的好處在於能夠壓縮,可是壓縮就意味着要解壓縮,須要消耗一些CPU。
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
MEMORY_AND_DISK_SER2,兩個副本。
val OFF_HEAP = new StorageLevel(false, false, true, false)
OFF_HEAP不使用堆內存(例如可使用Tachyon的分佈式內存文件系統)。(不使用磁盤,不用內存,不使用堆,序列化)
package com.huidoo.spark import org.apache.spark.{SparkConf, SparkContext} object TestCache { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("TestCache").setMaster("local[2]") val sc = new SparkContext(conf) val lines = sc.textFile("hdfs://cdh01:8020/flume/2018-03-23/2230") //目錄下有17個文件,總大小約爲335MB,不作緩存 val beginTime1 = System.currentTimeMillis() //記錄第1個job開始時間 val count1 = lines.count() //調用count()方法,會產生一個job val endTime1 = System.currentTimeMillis() //記錄第1個job結束時間 val beginTime2 = System.currentTimeMillis() //記錄第2個job開始時間 val count2 = lines.count() //調用count()方法,會產生一個job val endTime2 = System.currentTimeMillis() //記錄第2個job結束時間 println(count1) println("第1個job總共消耗時間" + (endTime1 - beginTime1) + "毫秒") println(count2) println("第2個job總共消耗時間" + (endTime2 - beginTime2) + "毫秒") sc.stop() } }
運行結果以下:
可見,全部文件的總行數爲1935077行,第一個job和第二個job的用時分別爲14.7s和12.2s,差異不大。
只需在原代碼基礎上將HadoopRDD lines添加調用cache()方法便可。
val lines = sc.textFile("hdfs://cdh01:8020/flume/2018-03-23/2230").cache() //目錄下有17個文件,總大小約爲335MB,作緩存
運行結果以下:
可見,全部文件的總行數爲1935077行,第一個job和第二個job的用時分別爲19.4s和0.09s,速度相比不作緩存明顯提高。這是由於沒有作緩存,第二個job還須要先從hdfs上讀取數據,須要消耗更長時間;而作了緩存則直接從緩存中讀取(cache方法默認緩存策略是MEMORY_ONLY),因此速度會快不少。
一系列RDD到RDD的transformation操做,稱爲lineage(血統)。某個RDD依賴於它前面的全部RDD。例如一個由10個RDD到RDD的轉化構成的lineage,若是在計算到第9個RDD時失敗了,通常較好的計算框架會自動從新計算。通常地,這種錯誤發生了會去找上一個RDD,可是實際上若是不作緩存是找不到的,由於即便RDD9知道它是由RDD8轉化過來的,可是由於它並無存RDD數據自己,在內存中RDD瞬時轉化,瞬間就會在內存中消失,因此仍是找不到數據。若是這時RDD8作過cache緩存,那麼就是在RDD8的時候進行了數據的保存並記錄了位置,這時若是RDD9失敗了就會從緩存中讀取RDD8的數據;若是RDD8沒有作cache就會找RDD7,以此類推,若是都沒有作cache就須要從新從HDFS中讀取數據。因此所謂的容錯就是指,當計算過程複雜,爲了下降因某些關鍵點計算出錯而須要從新計算的帶來的慘重代價的風險,則須要在某些關鍵點使用cache或用persist方法作一下緩存。
上述緩存策略還存在一個問題。使用cache或persist的緩存策略是使用默認的僅在內存,因此實際的RDD緩存位置是在內存當中,若是機器出現問題,也會形成內存中的緩存RDD數據丟失。因此能夠將要作容錯的RDD數據存到指定磁盤(能夠是hdfs)路徑中,能夠對RDD作doCheckpoint()方法。使用doCheckpoint()方法的前提示,須要在sc中要先設置SparkContext.setCheckpointDir(),設置數據存儲路徑。這時候若是程序計算過程當中出錯了,會先到cache中找緩存數據,若是cache中沒有就會到設置的磁盤路徑中找。
在RDD計算,經過checkpoint進行容錯,作checkpoint有兩種方式,一個是checkpoint data,一個是logging the updates。用戶能夠控制採用哪一種方式來實現容錯,默認是logging the updates方式,經過記錄跟蹤全部生成RDD的轉換(transformations)也就是記錄每一個RDD的lineage(血統)來從新計算生成丟失的分區數據。
//RDD.scala中的doCheckpoint方法: /** * Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD * has completed (therefore the RDD has been materialized and potentially stored in memory). * doCheckpoint() is called recursively on the parent RDDs. */ private[spark] def doCheckpoint(): Unit = { RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) { //若是doCheckpointCalled不爲true,就先將其改成true if (!doCheckpointCalled) { doCheckpointCalled = true //若是checkpointData已定義,就把data get出來,而後作一下checkpoint。 if (checkpointData.isDefined) { checkpointData.get.checkpoint() } else { //若是checkpointData沒有的話,就把這個RDD的全部依賴拿出來,foreach一把,把裏面的每一個元素RDD,再遞歸調用本方法。 dependencies.foreach(_.rdd.doCheckpoint()) } } } }
//RDD.scala中的checkpoint()方法 def checkpoint(): Unit = RDDCheckpointData.synchronized { // NOTE: we use a global lock here due to complexities downstream with ensuring // children RDD partitions point to the correct parent partitions. In the future // we should revisit this consideration. //首先檢查context的checkpointDir是否爲空,若是沒有設置就會拋出異常 if (context.checkpointDir.isEmpty) { throw new SparkException("Checkpoint directory has not been set in the SparkContext") } else if (checkpointData.isEmpty) { checkpointData = Some(new ReliableRDDCheckpointData(this)) } }
//SparkContext.scala中的setCheckpointDir方法 /** * Set the directory under which RDDs are going to be checkpointed. The directory must * be a HDFS path if running on a cluster. */ def setCheckpointDir(directory: String) { // If we are running on a cluster, log a warning if the directory is local. // Otherwise, the driver may attempt to reconstruct the checkpointed RDD from // its own local file system, which is incorrect because the checkpoint files // are actually on the executor machines. //若是運行了集羣模式,checkpointDir必須是非本地的。 if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) { logWarning("Checkpoint directory must be non-local " + "if Spark is running on a cluster: " + directory) } checkpointDir = Option(directory).map { dir => val path = new Path(dir, UUID.randomUUID().toString) val fs = path.getFileSystem(hadoopConfiguration) fs.mkdirs(path) fs.getFileStatus(path).getPath.toString } }