Spark Core應用解析

一。RDD概念

1.1。RDD概述

1.1.1。什麼是RDD

  RDD(Resilient Distributed Dataset)叫作分佈式數據集,是Spark中最基本的數據抽象,它表明一個不可變、可分區、裏面的元素可並行計算的集合。在 Spark 中,對數據的全部操做不外乎建立 RDD、轉化已有RDD 以及調用 RDD 操做進行求值。每一個 RDD 都被分爲多個分區,這些分區運行在集羣中的不一樣節點上。RDD 能夠包含 Python、Java、Scala 中任意類型的對象, 甚至能夠包含用戶自定義的對象。RDD具備數據流模型的特色:自動容錯、位置感知性調度和可伸縮性。RDD容許用戶在執行多個查詢時顯式地將工做集緩存在內存中,後續的查詢可以重用工做集,這極大地提高了查詢速度。java

  RDD支持兩種操做:轉化操做和行動操做。RDD 的轉化操做是返回一個新的 RDD的操做,好比 map()和 filter(),而行動操做則是向驅動器程序返回結果或把結果寫入外部系統的操做。好比 count() 和 first()。es6

  Spark採用惰性計算模式,RDD只有第一次在一個行動操做中用到時,纔會真正計算。Spark能夠優化整個計算過程。默認狀況下,Spark 的 RDD 會在你每次對它們進行行動操做時從新計算。若是想在多個行動操做中重用同一個 RDD,可使用 RDD.persist() 讓 Spark 把這個 RDD 緩存下來。算法

1.1.2。RDD的屬性

1)   一組分片(Partition),即數據集的基本組成單位。對於RDD來講,每一個分片都會被一個計算任務處理,並決定並行計算的粒度。用戶能夠在建立RDD時指定RDD的分片個數,若是沒有指定,那麼就會採用默認值。默認值就是程序所分配到的CPU Core的數目。shell

2)   一個計算每一個分區的函數。Spark中RDD的計算是以分片爲單位的,每一個RDD都會實現compute函數以達到這個目的。compute函數會對迭代器進行復合,不須要保存每次計算的結果。數據庫

3)   RDD之間的依賴關係。RDD的每次轉換都會生成一個新的RDD,因此RDD之間就會造成相似於流水線同樣的先後依賴關係。在部分分區數據丟失時,Spark能夠經過這個依賴關係從新計算丟失的分區數據,而不是對RDD的全部分區進行從新計算。apache

4)   一個Partitioner,即RDD的分片函數。當前Spark中實現了兩種類型的分片函數,一個是基於哈希的HashPartitioner,另一個是基於範圍的RangePartitioner。只有對於於key-value的RDD,纔會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD自己的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。編程

5)一個列表,存儲存取每一個Partition的優先位置(preferred location)。對於一個HDFS文件來講,這個列表保存的就是每一個Partition所在的塊的位置。按照「移動數據不如移動計算」的理念,Spark在進行任務調度的時候,會盡量地將計算任務分配到其所要處理數據塊的存儲位置。數組

  RDD是一個應用層面的邏輯概念。一個RDD多個分片。RDD就是一個元數據記錄集,記錄了RDD內存全部的關係數據。緩存

1.2。RDD彈性

1)   自動進行內存和磁盤數據存儲的切換安全

    Spark優先把數據放到內存中,若是內存放不下,就會放到磁盤裏面,程序進行自動的存儲切換

2)   基於血統的高效容錯機制

    在RDD進行轉換和動做的時候,會造成RDD的Lineage依賴鏈,當某一個RDD失效的時候,能夠經過從新計算上游的RDD來從新生成丟失的RDD數據。

3)   Task若是失敗會自動進行特定次數的重試

    RDD的計算任務若是運行失敗,會自動進行任務的從新計算,默認次數是4次。

4)   Stage若是失敗會自動進行特定次數的重試

    若是Job的某個Stage階段計算失敗,框架也會自動進行任務的從新計算,默認次數也是4次。

5)   Checkpoint和Persist可主動或被動觸發

    RDD能夠經過Persist持久化將RDD緩存到內存或者磁盤,當再次用到該RDD時直接讀取就行。也能夠將RDD進行檢查點,檢查點會將數據存儲在HDFS中,該RDD的全部父RDD依賴都會被移除。

6)   數據調度彈性

    Spark把這個JOB執行模型抽象爲通用的有向無環圖DAG,能夠將多Stage的任務串聯或並行執行,調度引擎自動處理Stage的失敗以及Task的失敗。

7)   數據分片的高度彈性

    能夠根據業務的特徵,動態調整數據分片的個數,提高總體的應用執行效率。

  RDD全稱叫作彈性分佈式數據集(Resilient Distributed Datasets),它是一種分佈式的內存抽象,表示一個只讀的記錄分區的集合,它只能經過其餘RDD轉換而建立,爲此,RDD支持豐富的轉換操做(如map, join, filter, groupBy等),經過這種轉換操做,新的RDD則包含了如何從其餘RDDs衍生所必需的信息,因此說RDDs之間是有依賴關係的。基於RDDs之間的依賴,RDDs會造成一個有向無環圖DAG,該DAG描述了整個流式計算的流程,實際執行的時候,RDD是經過血緣關係(Lineage)一鼓作氣的,即便出現數據分區丟失,也能夠經過血緣關係重建分區。

1.3。RDD特色

  RDD表示只讀的分區的數據集,對RDD進行改動,只能經過RDD的轉換操做,由一個RDD獲得一個新的RDD,新的RDD包含了從其餘RDD衍生所必需的信息。RDDs之間存在依賴,RDD的執行是按照血緣關係延時計算的。若是血緣關係較長,能夠經過持久化RDD來切斷血緣關係。

1.3.1 分區

  RDD邏輯上是分區的,每一個分區的數據是抽象存在的,計算的時候會經過一個compute函數獲得每一個分區的數據。若是RDD是經過已有的文件系統構建,則compute函數是讀取指定文件系統中的數據,若是RDD是經過其餘RDD轉換而來,則compute函數是執行轉換邏輯將其餘RDD的數據進行轉換。

1.3.2 只讀

  RDD是隻讀的,要想改變RDD中的數據,只能在現有的RDD基礎上建立新的RDD。以下圖所示

  由一個RDD轉換到另外一個RDD,能夠經過豐富的操做算子實現,再也不像MapReduce那樣只能寫map和reduce了,以下圖所示。

  RDD的操做算子包括兩類,一類叫作transformations,它是用來將RDD進行轉化,構建RDD的血緣關係;另外一類叫作actions,它是用來觸發RDD的計算,獲得RDD的相關計算結果或者將RDD保存的文件系統中

1.3.3 依賴

  RDDs經過操做算子進行轉換,轉換獲得的新RDD包含了從其餘RDDs衍生所必需的信息,RDDs之間維護着這種血緣關係,也稱之爲依賴。以下圖所示,依賴包括兩種,一種是窄依賴,RDDs之間分區是一一對應的,另外一種是寬依賴,下游RDD的每一個分區與上游RDD(也稱之爲父RDD)的每一個分區都有關,是多對多的關係。

  經過RDDs之間的這種依賴關係,一個任務流能夠描述爲DAG(有向無環圖),以下圖所示,在實際執行過程當中寬依賴對應於Shuffle(圖中的reduceByKey和join),窄依賴中的全部轉換操做能夠經過相似於管道的方式一鼓作氣執行(圖中map和union能夠一塊兒執行)。

1.3.4 緩存

  若是在應用程序中屢次使用同一個RDD,能夠將該RDD緩存起來,該RDD只有在第一次計算的時候會根據血緣關係獲得分區的數據,在後續其餘地方用到該RDD的時候,會直接從緩存處取而不用再根據血緣關係計算,這樣就加速後期的重用。以下圖所示,RDD-1通過一系列的轉換後獲得RDD-n並保存到hdfs,RDD-1在這一過程當中會有個中間結果,若是將其緩存到內存,那麼在隨後的RDD-1轉換到RDD-m這一過程當中,就不會計算其以前的RDD-0了。

1.3.5 checkpoint

  雖然RDD的血緣關係自然地能夠實現容錯,當RDD的某個分區數據失敗或丟失,能夠經過血緣關係重建。可是對於長時間迭代型應用來講,隨着迭代的進行,RDDs之間的血緣關係會愈來愈長,一旦在後續迭代過程當中出錯,則須要經過很是長的血緣關係去重建,勢必影響性能。爲此,RDD支持checkpoint將數據保存到持久化的存儲中,這樣就能夠切斷以前的血緣關係,由於checkpoint後的RDD不須要知道它的父RDDs了,它能夠從checkpoint處拿到數據。

  給定一個RDD咱們至少能夠知道以下幾點信息:

  一、分區數以及分區方式;

  二、由父RDDs衍生而來的相關依賴信息;

  三、計算每一個分區的數據,計算步驟爲:

    1)若是被緩存,則從緩存中取的分區的數據;

    2)若是被checkpoint,則從checkpoint處恢復數據;

    3)根據血緣關係計算分區的數據。

二 RDD編程

2.1 編程模型

  在Spark中,RDD被表示爲對象,經過對象上的方法調用來對RDD進行轉換。通過一系列的transformations定義RDD以後,就能夠調用actions觸發RDD的計算,action能夠是嚮應用程序返回結果(count, collect等),或者是向存儲系統保存數據(saveAsTextFile等)。在Spark中,只有遇到action,纔會執行RDD的計算(即延遲計算),這樣在運行時能夠經過管道的方式傳輸多個轉換。

    要使用Spark,須要編寫一個Driver程序,它被提交到集羣以調度運行Worker,以下圖所示。Driver中定義了一個或多個RDD,並調用RDD上的action,Worker則執行RDD分區計算任務。

  Dirver ,SparkContext ,Executor ,Master ,Worker 關係如圖

2.2 建立RDD

  在Spark中建立RDD的建立方式大概能夠分爲三種:(1)、從集合中建立RDD;(2)、從外部存儲建立RDD;(3)、從其餘RDD建立。

1)由一個已經存在的Scala集合建立,集合並行化。

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

而從集合中建立RDD,Spark主要提供了兩種函數:parallelize和makeRDD

makeRDD函數有兩種實現,第一種實現其實徹底和parallelize一致;而第二種實現能夠爲數據提供位置信息,而除此以外的實現和parallelize函數也是一致的。

scala> val gh01= sc.parallelize(List(1,2,3))
gh01: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:21
 
scala> val gh02 = sc.makeRDD(List(1,2,3))
gh022: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at makeRDD at <console>:21
 
scala> val seq = List((1, List("Hello", "World", "Spark")),
     | (2, List("At", "zgh")))
seq: List[(Int, List[String])] = List((1,List(Hello, World, Spark)),
 (2,List(At, zgh)))
 
scala> val gh03 = sc.makeRDD(seq)
gh03: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at makeRDD at <console>:23
 
scala> guigu3.preferredLocations(gh03.partitions(1))
res26: Seq[String] = List(At, zgh)
 
scala> gh03.preferredLocations(gh03.partitions(0))
res27: Seq[String] = List(Hello, World, Spark)
 
scala> gh01.preferredLocations(gh01.partitions(0))
res28: Seq[String] = List()

2)由外部存儲系統的數據集建立,包括本地的文件系統,還有全部Hadoop支持的數據集,好比HDFS、Cassandra、HBase等

scala> val atgh = sc.textFile("hdfs://master01:9000/RELEASE")
atgh: org.apache.spark.rdd.RDD[String] = hdfs://master01:9000/RELEASE MapPartitionsRDD[4] at textFile at <console>:24

2.3 RDD編程

  RDD通常分爲數值RDD和鍵值對RDD

2.3.1 Transformation

  RDD中的全部轉換都是延遲加載的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動做。只有當發生一個要求返回結果給Driver的動做時,這些轉換纔會真正運行。這種設計讓Spark更加有效率地運行。

經常使用的Transformation:

轉換

含義

map(func)

返回一個新的RDD,該RDD由每個輸入元素通過func函數轉換後組成

scala> var source  = sc.parallelize(1 to 10)
source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

scala> source.collect()
res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val mapadd = source.map(_ * 2)
mapadd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at <console>:26

scala> mapadd.collect()
res8: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

filter(func)

返回一個新的RDD,該RDD由通過func函數計算後返回值爲true的輸入元素組成

scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))
sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24
 
scala> val filter = sourceFilter.filter(_.contains("xiao"))
filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:26

scala> sourceFilter.collect()
res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi)

scala> filter.collect()
res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)

flatMap(func)

相似於map,可是每個輸入元素能夠被映射爲0或多個輸出元素(因此func應該返回一個序列,而不是單一元素)

 

scala> val sourceFlat = sc.parallelize(1 to 5)
sourceFlat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24

scala> sourceFlat.collect()
res11: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val flatMap = sourceFlat.flatMap(1 to _)
flatMap: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at flatMap at <console>:26

scala> flatMap.collect()
res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)

mapPartitions(func)

相似於map,但獨立地在RDD的每個分片上運行,所以在類型爲TRDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]。假設有N個元素,有M個分區,那麼map的函數的將被調用N,mapPartitions被調用M,一個函數一次處理全部分區

scala> val rdd = sc.parallelize(List(("kpop","female"),("zorro","male"),("mobin","male"),("lucy","female")))

rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[16] at parallelize at <console>:24

scala> :paste
// Entering paste mode (ctrl-D to finish)
def partitionsFun(iter : Iterator[(String,String)]) : Iterator[String] = {

  var woman = List[String]()

  while (iter.hasNext){

    val next = iter.next()

    next match {

       case (_,"female") => woman = next._1 :: woman

       case _ =>

    }

  }

  woman.iterator

}

// Exiting paste mode, now interpreting.

partitionsFun: (iter: Iterator[(String, String)])Iterator[String]

scala> val result = rdd.mapPartitions(partitionsFun)
result: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at mapPartitions at <console>:28

scala> result.collect()
res13: Array[String] = Array(kpop, lucy)

mapPartitionsWithIndex(func)

相似於mapPartitions,但func帶有一個整數參數表示分片的索引值,所以在類型爲TRDD上運行時,func的函數類型必須是(Int, Interator[T]) => Iterator[U]

scala> val rdd = sc.parallelize(List(("kpop","female"),("zorro","male"),("mobin","male"),("lucy","female")))
rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[18] at parallelize at <console>:24

scala> :paste
// Entering paste mode (ctrl-D to finish)

def partitionsFun(index : Int, iter : Iterator[(String,String)]) : Iterator[String] = {

  var woman = List[String]()

  while (iter.hasNext){

    val next = iter.next()

    next match {

       case (_,"female") => woman = "["+index+"]"+next._1 :: woman

       case _ =>

    }

  }

  woman.iterator

}

// Exiting paste mode, now interpreting.

partitionsFun: (index: Int, iter: Iterator[(String, String)])Iterator[String]

scala> val result = rdd.mapPartitionsWithIndex(partitionsFun)
result: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[19] at mapPartitionsWithIndex at <console>:28

scala> result.collect()
res14: Array[String] = Array([0]kpop, [3]lucy)

sample(withReplacement, fraction, seed)

以指定的隨機種子隨機抽樣出數量爲fraction的數據,withReplacement表示是抽出的數據是否放回,true爲有放回的抽樣,false爲無放回的抽樣seed用於指定隨機數生成器種子。例子從RDD中隨機且有放回的抽出50%的數據,隨機種子值爲3(便可能以1 2 3的其中一個起始值)

scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24

scala> rdd.collect()
res15: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> var sample1 = rdd.sample(true,0.4,2)
sample1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[21] at sample at <console>:26

scala> sample1.collect()
res16: Array[Int] = Array(1, 2, 2, 7, 7, 8, 9)

scala> var sample2 = rdd.sample(false,0.2,3)
sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[22] at sample at <console>:26

scala> sample2.collect()
res17: Array[Int] = Array(1, 9)

takeSample

 

Sample的區別是:takeSample返回的是最終的結果集合。

 

union(otherDataset)

對源RDD和參數RDD求並集後返回一個新的RDD

scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(5 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24

scala> val rdd3 = rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[25] at union at <console>:28

scala> rdd3.collect()
res18: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)

intersection(otherDataset)

對源RDD和參數RDD求交集後返回一個新的RDD

scala> val rdd1 = sc.parallelize(1 to 7)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(5 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:24

scala> val rdd3 = rdd1.intersection(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at intersection at <console>:28

scala> rdd3.collect()
[Stage 15:=============================>                       (2 + 2)                                 
res19: Array[Int] = Array(5, 6, 7)

distinct([numTasks]))

對源RDD進行去重後返回一個新的RDD. 默認狀況下,只有8個並行任務來操做,可是能夠傳入一個可選的numTasks參數改變它。

scala> val distinctRdd = sc.parallelize(List(1,2,1,5,2,9,6,1))
distinctRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:24

scala> val unionRDD = distinctRdd.distinct()
unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[37] at distinct at <console>:26

scala> unionRDD.collect()
[Stage 16:> (0 + 4) [Stage 16:=============================>                            (2 + 2)    
res20: Array[Int] = Array(1, 9, 5, 6, 2) scala> val unionRDD = distinctRdd.distinct(2) unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[40] at distinct at <console>:26 scala> unionRDD.collect() res21: Array[Int] = Array(6, 2, 1, 9, 5)

partitionBy

 

RDD進行分區操做,若是原有的partionRDD和現有的partionRDD是一致的話就不進行分區, 
不然會生成ShuffleRDD. 

scala> val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[44] at parallelize at <console>:24

scala> rdd.partitions.size
res24: Int = 4

scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[45] at partitionBy at <console>:26

scala> rdd2.partitions.size
res25: Int = 2

reduceByKey(func, [numTasks])

在一個(K,V)RDD上調用,返回一個(K,V)RDD,使用指定的reduce函數,將相同key的值聚合到一塊兒,reduce任務的個數能夠經過第二個可選的參數來設置

 

scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at parallelize at <console>:24

scala> val reduce = rdd.reduceByKey((x,y) => x+y)
reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[47] at reduceByKey at <console>:26

scala> reduce.collect()
res29: Array[(String, Int)] = Array((female,6), (male,7))

groupByKey

groupByKey也是對每一個key進行操做,但只生成一個sequence

 

scala> val words = Array("one", "two", "two", "three", "three", "three")
words: Array[String] = Array(one, two, two, three, three, three)

scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
wordPairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:26

scala> val group = wordPairsRDD.groupByKey()
group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[5] at groupByKey at <console>:28

scala> group.collect()
res1: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))

scala> group.map(t => (t._1, t._2.sum))
res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:31

scala> res2.collect()
res3: Array[(String, Int)] = Array((two,2), (one,1), (three,3))

scala> val map = group.map(t => (t._1, t._2.sum))
map: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[7] at map at <console>:30

scala> map.collect()
res4: Array[(String, Int)] = Array((two,2), (one,1), (three,3))

combineByKey[C](  

createCombiner: V => C,  

mergeValue: (C, V) => C,  

mergeCombiners: (C, C) => C) 

 

對相同K,把V合併成一個集合.

createCombiner: combineByKey() 會遍歷分區中的全部元素,所以每一個元素的鍵要麼尚未遇到過,要麼就 和以前的某個元素的鍵相同。若是這是一個新的元素,combineByKey() 會使用一個叫做 createCombiner() 的函數來建立 
那個鍵對應的累加器的初始值

mergeValue: 若是這是一個在處理當前分區以前已經遇到的鍵,它會使用 mergeValue() 方法將該鍵的累加器對應的當前值與這個新的值進行合併

mergeCombiners: 因爲每一個分區都是獨立處理的,所以對於同一個鍵能夠有多個累加器。若是有兩個或者更多的分區都有對應同一個鍵的累加器,就須要使用用戶提供的 mergeCombiners() 方法將各個分區的結果進行合併。

scala> val scores = Array(("Fred", 88), ("Fred", 95), ("Fred", 91), ("Wilma", 93), ("Wilma", 95), ("Wilma", 98))
scores: Array[(String, Int)] = Array((Fred,88), (Fred,95), (Fred,91), (Wilma,93), (Wilma,95), (Wilma,98))

scala> val input = sc.parallelize(scores)
input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[52] at parallelize at <console>:26

scala> val combine = input.combineByKey(

     |     (v)=>(v,1),

     |     (acc:(Int,Int),v)=>(acc._1+v,acc._2+1),

     |     (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2))

combine: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[53] at combineByKey at <console>:28

scala> val result = combine.map{

     |     case (key,value) => (key,value._1/value._2.toDouble)}

result: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[54] at map at <console>:30

scala> result.collect()
res33: Array[(String, Double)] = Array((Wilma,95.33333333333333), (Fred,91.33333333333333))

aggregateByKey(zeroValue:U,[partitioner: Partitioner])(seqOp: (U, V) => U,combOp: (U, U) => U)

kv對的RDD中,,按keyvalue進行分組合並,合併時,將每一個value和初始值做爲seq函數的參數,進行計算,返回的結果做爲一個新的kv對,而後再將結果按照key進行合併,最後將每一個分組的value傳遞給combine函數進行計算(先將前兩個value進行計算,將返回結果和下一個value傳給combine函數,以此類推),將key與計算結果做爲一個新的kv對輸出。

seqOp函數用於在每個分區中用初始值逐步迭代valuecombOp函數用於合併每一個分區中的結果

例如:分一個分區,以key1的分區爲例,0先和3比較得33在和2比較得33在和4比較得4,因此整個key1的組最終結果爲(14),同理,key2的最終結果爲(23),key3的爲(38.

若是分三個分區,前兩個是一個分區,中間兩個是一個分區,最後兩個是一個分區,第一個分區的最終結果爲(13),第二個分區爲(14)(23),最後一個分區爲(38),combine後爲 (3,8), (1,7), (2,3)

 

scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[12] at parallelize at <console>:24

scala> val agg = rdd.aggregateByKey(0)(math.max(_,_),_+_)
agg: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[13] at aggregateByKey at <console>:26

scala> agg.collect()
res7: Array[(Int, Int)] = Array((3,8), (1,7), (2,3))

scala> agg.partitions.size
res8: Int = 3

scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),1)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[10] at parallelize at <console>:24

scala> val agg = rdd.aggregateByKey(0)(math.max(_,_),_+_).collect()
agg: Array[(Int, Int)] = Array((1,4), (3,8), (2,3))

foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

aggregateByKey的簡化操做,seqopcombop相同

scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[91] at parallelize at <console>:24

scala> val agg = rdd.foldByKey(0)(_+_)
agg: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[92] at foldByKey at <console>:26

scala> agg.collect()
res61: Array[(Int, Int)] = Array((3,14), (1,9), (2,3))

sortByKey([ascending], [numTasks])

在一個(K,V)RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)RDD

 

scala> val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[14] at parallelize at <console>:24

scala> rdd.sortByKey(true).collect()
res9: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc))

scala> rdd.sortByKey(false).collect()
res10: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd))

sortBy(func,[ascending], [numTasks])

sortByKey相似,可是更靈活,能夠用func先對數據進行處理,按照處理後的數據比較結果排序。

scala> val rdd = sc.parallelize(List(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24

scala> rdd.sortBy(x => x).collect()
res11: Array[Int] = Array(1, 2, 3, 4)

scala> rdd.sortBy(x => x%3).collect()
res12: Array[Int] = Array(3, 4, 1, 2)

join(otherDataset, [numTasks])

在類型爲(K,V)(K,W)RDD上調用,返回一個相同key對應的全部元素對在一塊兒的(K,(V,W))RDD

scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[32] at parallelize at <console>:24

scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24

scala> rdd.join(rdd1).collect()
res13: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))

cogroup(otherDataset, [numTasks])

在類型爲(K,V)(K,W)RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD

 

scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[37] at parallelize at <console>:24

scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[38] at parallelize at <console>:24

scala> rdd.cogroup(rdd1).collect()
res14: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer(4))), 
(2,(CompactBuffer(b),CompactBuffer(5))), (3,(CompactBuffer(c),CompactBuffer(6)))) scala> val rdd2 = sc.parallelize(Array((4,4),(2,5),(3,6))) rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[41] at parallelize at <console>:24 scala> rdd.cogroup(rdd2).collect() res15: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((4,(CompactBuffer(),CompactBuffer(4))),
(1,(CompactBuffer(a),CompactBuffer())), (2,(CompactBuffer(b),CompactBuffer(5))), (3,(CompactBuffer(c),CompactBuffer(6)))) scala> val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c"))) rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[44] at parallelize at <console>:24 scala> rdd3.cogroup(rdd2).collect() [Stage 36:> (0 + 0)
res16: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((4,(CompactBuffer(),CompactBuffer(4))),
(1,(CompactBuffer(d, a),CompactBuffer())), (2,(CompactBuffer(b),CompactBuffer(5))), (3,(CompactBuffer(c),CompactBuffer(6))))

cartesian(otherDataset)

笛卡爾積

scala> val rdd1 = sc.parallelize(1 to 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[47] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(2 to 5)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[48] at parallelize at <console>:24

scala> rdd1.cartesian(rdd2).collect()
res17: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (2,2), (2,3), (2,4), (2,5), (3,2), (3,3), (3,4), (3,5))

pipe(command, [envVars])

對於每一個分區,都執行一個perl或者shell腳本,返回輸出的RDD

scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),1)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[50] at parallelize at <console>:24

scala> rdd.pipe("/home/bigdata/pipe.sh").collect()
res18: Array[String] = Array(AA, >>>hi, >>>Hello, >>>how, >>>are, >>>you)

scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),2)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[52] at parallelize at <console>:24

scala> rdd.pipe("/home/bigdata/pipe.sh").collect()
res19: Array[String] = Array(AA, >>>hi, >>>Hello, AA, >>>how, >>>are, >>>you)

pipe.sh:

#!/bin/sh

echo "AA"

while read LINE; do

   echo ">>>"${LINE}

done

注意:shell腳本須要集羣中的全部節點都能訪問到。

coalesce(numPartitions)    

縮減分區數,用於大數據集過濾後,提升小數據集的執行效率。

scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at parallelize at <console>:24

scala> rdd.partitions.size
res20: Int = 4

scala> val coalesceRDD = rdd.coalesce(3)
coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[55] at coalesce at <console>:26

scala> coalesceRDD.partitions.size
res21: Int = 3

repartition(numPartitions)

根據分區數,重新經過網絡隨機洗牌全部數據。

scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at <console>:24

scala> rdd.partitions.size
res22: Int = 4

scala> val rerdd = rdd.repartition(2)
rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[60] at repartition at <console>:26

scala> rerdd.partitions.size
res23: Int = 2

scala> val rerdd = rdd.repartition(4)
rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[64] at repartition at <console>:26

scala> rerdd.partitions.size
res24: Int = 4

repartitionAndSortWithinPartitions(partitioner)

repartitionAndSortWithinPartitions函數是repartition函數的變種,與repartition函數不一樣的是,repartitionAndSortWithinPartitions在給定的partitioner內部進行排序,性能比repartition要高。 

 

 

glom

將每個分區造成一個數組,造成新的RDD類型時RDD[Array[T]]

scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24

scala> rdd.glom().collect()
res25: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))

mapValues

針對於(K,V)形式的類型只對V進行操做 

scala> val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c")))
rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[67] at parallelize at <console>:24

scala> rdd3.mapValues(_+"|||").collect()
res26: Array[(Int, String)] = Array((1,a|||), (1,d|||), (2,b|||), (3,c|||))

subtract

計算差的一種函數去除兩個RDD中相同的元素,不一樣的RDD將保留下來 

scala> val rdd = sc.parallelize(3 to 8)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[70] at parallelize at <console>:24

scala> val rdd1 = sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[71] at parallelize at <console>:24

scala> rdd.subtract(rdd1).collect()
res27: Array[Int] = Array(8, 6, 7)

 

2.3.2 Action

經常使用的Action:

動做

含義

reduce(func)

經過func函數彙集RDD中的全部元素,這個功能必須是可交換且可並聯的

scala> val rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at makeRDD at <console>:24

scala> rdd1.reduce(_+_)
res50: Int = 55

scala> val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[86] at makeRDD at <console>:24

scala> rdd2.reduce((x,y)=>(x._1 + y._1,x._2 + y._2))
res51: (String, Int) = (adca,12)

collect()

在驅動程序中,以數組的形式返回數據集的全部元素

 

count()

返回RDD的元素個數

first()

返回RDD的第一個元素(相似於take(1)

take(n)

返回一個由數據集的前n個元素組成的數組

takeSample(withReplacement,num, [seed])

返回一個數組,該數組由從數據集中隨機採樣的num個元素組成,能夠選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子

takeOrdered(n)

返回前幾個的排序

aggregate (zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)

 

aggregate函數將每一個分區裏面的元素經過seqOp和初始值進行聚合,而後用combine函數將每一個分區的結果和初始值(zeroValue)進行combine操做。這個函數最終返回的類型不須要和RDD中元素類型一致。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24

scala> rdd1.aggregate(1)(

     | {(x : Int,y : Int) => x + y},

     | {(a : Int,b : Int) => a + b}

     | )

res56: Int = 58

scala> rdd1.aggregate(1)(

     | {(x : Int,y : Int) => x * y},

     | {(a : Int,b : Int) => a + b}

     | )

res57: Int = 30361

fold(num)(func)

摺疊操做,aggregate的簡化操做,seqopcombop同樣。

scala> var rdd1 = sc.makeRDD(1 to 4,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[90] at makeRDD at <console>:24

scala> rdd1.aggregate(1)(

     | {(x : Int,y : Int) => x + y},

     | {(a : Int,b : Int) => a + b}

     | )

res59: Int = 13

scala> rdd1.fold(1)(_+_)
res60: Int = 13

saveAsTextFile(path)

將數據集的元素以textfile的形式保存到HDFS文件系統或者其餘支持的文件系統,對於每一個元素,Spark將會調用toString方法,將它裝換爲文件中的文本

saveAsSequenceFile(path) 

將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可使HDFS或者其餘Hadoop支持的文件系統。

saveAsObjectFile(path) 

用於將RDD中的元素序列化成對象,存儲到文件中。

 

countByKey()

針對(K,V)類型的RDD,返回一個(K,Int)map,表示每個key對應的元素個數。

scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[95] at parallelize at <console>:24

scala> rdd.countByKey()
res63: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)

foreach(func)

在數據集的每個元素上,運行函數func進行更新。

scala> var rdd = sc.makeRDD(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[107] at makeRDD at <console>:24

scala> var sum = sc.accumulator(0)
warning: there were two deprecation warnings; re-run with -deprecation for details
sum: org.apache.spark.Accumulator[Int] = 0

scala> rdd.foreach(sum+=_)

scala> sum.value
res68: Int = 55

scala> rdd.collect().foreach(println)
1

2

3

4

5

6

7

8

9

10


2.3.3 數值RDD的統計操做

  Spark 對包含數值數據的 RDD 提供了一些描述性的統計操做。 Spark 的數值操做是經過流式算法實現的,容許以每次一個元素的方式構建出模型。這些 統計數據都會在調用 stats() 時經過一次遍歷數據計算出來,並以 StatsCounter 對象返回。

scala> var rdd1 = sc.makeRDD(1 to 100)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[42] at makeRDD at <console>:32

scala> rdd1.sum()
res34: Double = 5050.0

scala> rdd1.max()
res35: Int = 100

2.3.4 向RDD操做傳遞函數注意

  Spark 的大部分轉化操做和一部分行動操做,都須要依賴用戶傳遞的函數來計算。 在 Scala 中,咱們能夠把定義的內聯函數、方法的引用或靜態方法傳遞給 Spark,就像 Scala 的其餘函數式 API 同樣。咱們還要考慮其餘一些細節,好比所傳遞的函數及其引用 的數據須要是可序列化的(實現了 Java 的 Serializable 接口)。 傳遞一個對象的方法或者字段時,會包含對整個對象的引用。

class SearchFunctions(val query: String) extends java.io.Serializable{
  def isMatch(s: String): Boolean = {
    s.contains(query)
  }
def getMatchesFunctionReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String]
= { // "isMatch"表示"this.isMatch",所以咱們要傳遞整個"this" rdd.filter(isMatch) }
def getMatchesFieldReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String]
= {   // "query"表示"this.query",所以咱們要傳遞整個"this"   rdd.filter(x => x.contains(query)) }
def getMatchesNoReference(rdd: org.apache.spark.rdd.RDD[String]): org.apache.spark.rdd.RDD[String]
= { // 安全:只把咱們須要的字段拿出來放入局部變量中    val query_ = this.query rdd.filter(x => x.contains(query_)) } }

  若是在 Scala 中出現了 NotSerializableException,一般問題就在於咱們傳遞了一個不可序列 化的類中的函數或字段。

2.3.5 在不一樣RDD類型間轉換

  有些函數只能用於特定類型的 RDD,好比 mean() 和 variance() 只能用在數值 RDD 上, 而 join() 只能用在鍵值對 RDD 上。在 Scala 和 Java 中,這些函數都沒有定義在標準的 RDD 類中,因此要訪問這些附加功能,必需要確保得到了正確的專用 RDD 類。

  在 Scala 中,將 RDD 轉爲有特定函數的 RDD(好比在 RDD[Double] 上進行數值操做)是 由隱式轉換來自動處理的。

2.4 RDD持久化

2.4.1 RDD的緩存

  Spark速度很是快的緣由之一,就是在不一樣操做中能夠在內存中持久化或緩存個數據集。當持久化某個RDD後,每個節點都將把計算的分片結果保存在內存中,並在對此RDD或衍生出的RDD進行的其餘動做中重用。這使得後續的動做變得更加迅速。RDD相關的持久化和緩存,是Spark最重要的特徵之一。能夠說,緩存是Spark構建迭代式算法和快速交互式查詢的關鍵。若是一個有持久化數據的節點發生故障,Spark 會在須要用到緩存的數據時重算丟失的數據分區。若是 但願節點故障的狀況不會拖累咱們的執行速度,也能夠把數據備份到多個節點上。

2.4.2 RDD緩存方式

  RDD經過persist方法或cache方法能夠將前面的計算結果緩存,默認狀況下 persist() 會把數據以序列化的形式緩存在 JVM 的堆空 間中。

  可是並非這兩個方法被調用時當即緩存,而是觸發後面的action時,該RDD將會被緩存在計算節點的內存中,並供後面重用。

  cache最終也是調用了persist方法,默認的存儲級別都是僅在內存存儲一份,Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。

scala> val rdd = sc.makeRDD(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at makeRDD at <console>:25

scala> val nocache = rdd.map(_.toString+"["+System.currentTimeMillis+"]")
nocache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[20] at map at <console>:27

scala> val cache =  rdd.map(_.toString+"["+System.currentTimeMillis+"]")
cache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at map at <console>:27

scala> cache.cache
res24: cache.type = MapPartitionsRDD[21] at map at <console>:27

scala> nocache.collect
res25: Array[String] = Array(1[1505479375155], 2[1505479374674], 3[1505479374674], 4[1505479375153], 5[1505479375153], 6[1505479374675], 7[1505479375154], 8[1505479375154], 9[1505479374676], 10[1505479374676])

scala> nocache.collect
res26: Array[String] = Array(1[1505479375679], 2[1505479376157], 3[1505479376157], 4[1505479375680], 5[1505479375680], 6[1505479376159], 7[1505479375680], 8[1505479375680], 9[1505479376158], 10[1505479376158])

scala> nocache.collect
res27: Array[String] = Array(1[1505479376743], 2[1505479377218], 3[1505479377218], 4[1505479376745], 5[1505479376745], 6[1505479377219], 7[1505479376747], 8[1505479376747], 9[1505479377218], 10[1505479377218])

scala> cache.collect
res28: Array[String] = Array(1[1505479382745], 2[1505479382253], 3[1505479382253], 4[1505479382748], 5[1505479382748], 6[1505479382257], 7[1505479382747], 8[1505479382747], 9[1505479382253], 10[1505479382253])

scala> cache.collect
res29: Array[String] = Array(1[1505479382745], 2[1505479382253], 3[1505479382253], 4[1505479382748], 5[1505479382748], 6[1505479382257], 7[1505479382747], 8[1505479382747], 9[1505479382253], 10[1505479382253])

scala> cache.collect
res30: Array[String] = Array(1[1505479382745], 2[1505479382253], 3[1505479382253], 4[1505479382748], 5[1505479382748], 6[1505479382257], 7[1505479382747], 8[1505479382747], 9[1505479382253], 10[1505479382253])

cache.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)

  在存儲級別的末尾加上「_2」來把持久化數據存爲兩份

  緩存有可能丟失,或者存儲存儲於內存的數據因爲內存不足而被刪除,RDD的緩存容錯機制保證了即便緩存丟失也能保證計算的正確執行。經過基於RDD的一系列轉換,丟失的數據會被重算,因爲RDD的各個Partition是相對獨立的,所以只須要計算丟失的部分便可,並不須要重算所有Partition。

  注意:使用 Tachyon能夠實現堆外緩存

2.5 RDD檢查點機制

  Spark中對於數據的保存除了持久化操做以外,還提供了一種檢查點的機制,檢查點(本質是經過將RDD寫入Disk作檢查點)是爲了經過lineage作容錯的輔助,lineage過長會形成容錯成本太高,這樣就不如在中間階段作檢查點容錯,若是以後有節點出現問題而丟失分區,從作檢查點的RDD開始重作Lineage,就會減小開銷。檢查點經過將數據寫入到HDFS文件系統實現了RDD的檢查點功能。

  cache 和 checkpoint 是有顯著區別的,  緩存把 RDD 計算出來而後放在內存中,可是RDD 的依賴鏈(至關於數據庫中的redo 日誌), 也不能丟掉, 當某個點某個 executor 宕了,上面cache 的RDD就會丟掉, 須要經過 依賴鏈重放計算出來, 不一樣的是, checkpoint 是把 RDD 保存在 HDFS中, 是多副本可靠存儲,因此依賴鏈就能夠丟掉了,就斬斷了依賴鏈, 是經過複製實現的高容錯。

  若是存在如下場景,則比較適合使用檢查點機制:

1)   DAG中的Lineage過長,若是重算,則開銷太大(如在PageRank中)。

2)   在寬依賴上作Checkpoint得到的收益更大。

爲當前RDD設置檢查點。該函數將會建立一個二進制的文件,並存儲到checkpoint目錄中,該目錄是用SparkContext.setCheckpointDir()設置的。在checkpoint的過程當中,該RDD的全部依賴於父RDD中的信息將所有被移出。對RDD進行checkpoint操做並不會立刻被執行,必須執行Action操做才能觸發。

scala> val data = sc.parallelize(1 to 100 , 5)
data: org.apache.spark.rdd.RDD[Int] =
  ParallelCollectionRDD[12] at parallelize at <console>:12
 
scala> sc.setCheckpointDir("hdfs://master01:9000/checkpoint")
 
scala> data.checkpoint
 
scala> data.count

scala> val ch1 = sc.parallelize(1 to 2)
ch1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:25

scala> val ch2 = ch1.map(_.toString+"["+System.currentTimeMillis+"]")
ch2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[36] at map at <console>:27

scala> val ch3 = ch1.map(_.toString+"["+System.currentTimeMillis+"]")
ch3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[37] at map at <console>:27

scala> ch3.checkpoint

scala> ch2.collect
res62: Array[String] = Array(1[1505480940726], 2[1505480940243])

scala> ch2.collect
res63: Array[String] = Array(1[1505480941957], 2[1505480941480])

scala> ch2.collect
res64: Array[String] = Array(1[1505480942736], 2[1505480942257])

scala> ch3.collect
res65: Array[String] = Array(1[1505480949080], 2[1505480948603])

scala> ch3.collect
res66: Array[String] = Array(1[1505480948683], 2[1505480949161])

scala> ch3.collect
res67: Array[String] = Array(1[1505480948683], 2[1505480949161])

2.5.1 checkpoint 寫流程

RDD checkpoint 過程當中會通過如下幾個狀態,

[ Initialized → marked for checkpointing → checkpointing in progress → checkpointed ]

轉換流程以下

1)   data.checkpoint 這個函數調用中, 設置的目錄中, 全部依賴的 RDD 都會被刪除, 函數必須在 job 運行以前調用執行, 強烈建議 RDD 緩存 在內存中(注意), 不然保存到文件的時候須要從頭計算。初始化RDD的 checkpointData 變量爲 ReliableRDDCheckpointData。  這時候標記爲 Initialized 狀態,

2)   在全部 job action 的時候, runJob 方法中都會調用 rdd.doCheckpoint ,  這個會向前遞歸調用全部的依賴的RDD, 看看需不須要  checkpoint。 若是須要 checkpoint, 而後調用  checkpointData.get.checkpoint(), 裏面標記 狀態爲 CheckpointingInProgress,  裏面調用具體實現類的 ReliableRDDCheckpointData 的 doCheckpoint 方法,

3)   doCheckpoint -> writeRDDToCheckpointDirectory, 注意這裏會把 job 再運行一次, 若是已經cache 了,就能夠直接使用緩存中的 RDD 了, 就不須要重頭計算一遍了(注意),  這時候直接把RDD, 輸出到 hdfs, 每一個分區一個文件, 會先寫到一個臨時文件, 若是所有輸出完,進行 rename , 若是輸出失敗,就回滾delete。

4)標記 狀態爲 Checkpointed, markCheckpointed方法中清除全部的依賴, 怎麼清除依賴的呢, 就是 吧RDD 變量的強引用 設置爲 null, 垃圾回收了,會觸發 ContextCleaner 裏面監聽清除實際 BlockManager 緩存中的數據

2.5.2 checkpoint 讀流程

  若是一個RDD 咱們已經 checkpoint了那麼是何時用呢, checkpoint 將 RDD 持久化到 HDFS 或本地文件夾,若是不被手動 remove 掉,是一直存在的,也就是說能夠被下一個 driver program 使用。 好比 spark streaming 掛掉了, 重啓後就可使用以前 checkpoint 的數據進行 recover (下面會講到) ,  固然在同一個 driver program 也可使用。 考慮在同一個 driver program 中是怎麼使用 checkpoint 數據的。

  若是 一個 RDD 被checkpoint了, 若是這個 RDD 上有 action 操做時候,或者回溯的這個 RDD 的時候,這個 RDD 進行計算的時候,裏面判斷若是已經 checkpoint 過,  對分區和依賴的處理都是使用的 RDD 內部的 checkpointRDD 變量。

  具體細節以下,

  若是 一個 RDD 被checkpoint了,  那麼這個 RDD 中對分區和依賴的處理都是使用的 RDD 內部的 checkpointRDD 變量, 具體實現是  ReliableCheckpointRDD 類型。 這個是在 checkpoint 寫流程中建立的。依賴和獲取分區方法中先判斷是否已經checkpoint, 若是已經checkpoint了, 就斬斷依賴,  使用ReliableCheckpointRDD, 來處理依賴和獲取分區。
    若是沒有,才往前回溯依賴。  依賴就是沒有依賴, 由於已經斬斷了依賴, 獲取分區數據就是讀取 checkpoint 到 hdfs目錄中不一樣分區保存下來的文件。

2.6 RDD的依賴關係

  RDD和它依賴的父RDD(s)的關係有兩種不一樣的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。

2.6.1 窄依賴

窄依賴指的是每個父RDD的Partition最多被子RDD的一個Partition使用

總結:窄依賴咱們形象的比喻爲獨生子女

2.6.2 寬依賴

寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition,會引發shuffle

總結:窄依賴咱們形象的比喻爲超生

2.6.3  Lineage

  RDD只支持粗粒度轉換,即在大量記錄上執行的單個操做。將建立RDD的一系列Lineage(即血統)記錄下來,以便恢復丟失的分區。RDD的Lineage會記錄RDD的元數據信息和轉換行爲,當該RDD的部分分區數據丟失時,它能夠根據這些信息來從新運算和恢復丟失的數據分區。

 

scala> val text = sc.textFile("README.md")
text: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24

scala> val words = text.flatMap(_.split)
split   splitAt

scala> val words = text.flatMap(_.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:26

scala> words.map((_,1))
res0: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:29

scala> res0.reduceByKey
reduceByKey   reduceByKeyLocally

scala> res0.reduceByKey(_+_)
res1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:31

scala> res1.dependencies
res2: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@6cfe48a4)

scala> res0.dependencies
res3: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@6c9e24c4)

2.7 DAG的生成

  DAG(Directed Acyclic Graph)叫作有向無環圖,原始的RDD經過一系列的轉換就就造成了DAG,根據RDD之間的依賴關係的不一樣將DAG劃分紅不一樣的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算。對於寬依賴,因爲有Shuffle的存在,只能在parent RDD處理完成後,才能開始接下來的計算,所以寬依賴是劃分Stage的依據。

2.8 RDD相關概念關係

輸入可能以多個文件的形式存儲在HDFS上,每一個File都包含了不少塊,稱爲Block。

當Spark讀取這些文件做爲輸入時,會根據具體數據格式對應的InputFormat進行解析,通常是將若干個Block合併成一個輸入分片,稱爲InputSplit,注意InputSplit不能跨越文件。
隨後將爲這些輸入分片生成具體的Task。InputSplit與Task是一一對應的關係。

隨後這些具體的Task每一個都會被分配到集羣上的某個節點的某個Executor去執行。

1)   每一個節點能夠起一個或多個Executor。

2)   每一個Executor由若干core組成,每一個Executor的每一個core一次只能執行一個Task。

3)    每一個Task執行的結果就是生成了目標RDD的一個partiton。

 

注意: 這裏的core是虛擬的core而不是機器的物理CPU核,能夠理解爲就是Executor的一個工做線程。

 

而 Task被執行的併發度 = Executor數目 * 每一個Executor核數。

至於partition的數目:

1)   對於數據讀入階段,例如sc.textFile,輸入文件被劃分爲多少InputSplit就會須要多少初始Task。

2)   在Map階段partition數目保持不變。

3)    在Reduce階段,RDD的聚合會觸發shuffle操做,聚合後的RDD的partition數目跟具體操做有關,例如repartition操做會聚合成指定分區數,還有一些算子是可配置的。

RDD在計算的時候,每一個分區都會起一個task,因此rdd的分區數目決定了總的的task數目。

申請的計算節點(Executor)數目和每一個計算節點核數,決定了你同一時刻能夠並行執行的task。

好比的RDD有100個分區,那麼計算的時候就會生成100個task,你的資源配置爲10個計算節點,每一個2個核,同一時刻能夠並行的task數目爲20,計算這個RDD就須要5個輪次。

若是計算資源不變,你有101個task的話,就須要6個輪次,在最後一輪中,只有一個task在執行,其他核都在空轉。

若是資源不變,你的RDD只有2個分區,那麼同一時刻只有2個task運行,其他18個核空轉,形成資源浪費。這就是在spark調優中,增大RDD分區數目,增大任務並行度的作法。

相關文章
相關標籤/搜索