RDD

1.概念:數據庫

RDD是spark整個體系中最基礎核心的概念,RDD(Resilient Distributed DataSet)即彈性分佈式數據集數組

彈性:數據結構

RDD支持橫向多分區,縱向操做內存不足寫入磁盤,hdfs等,實現數據在內存和外存的靈活切換。架構

RDD能夠在存儲在內存和磁盤之間,而且自動或者手動切換分佈式

RDD具備良好的容錯性(即RDD能夠經過血統轉化爲其餘RDD)ide

Task若是失敗,會進行特定次數的重試(default 4)函數

Stage若是失敗會就行特定次數的重試oop

RDD能夠存儲任意類型的數據性能

RDD的分區數目能夠自行設定this

分佈式:

RDD能夠存儲在多臺主機的內存或者磁盤之上。每一個RDD能夠分爲多個分區,每一個分區就是一個數據集片斷,而且一個RDD的不一樣分區能夠被保存到集羣中不一樣的節點上,從而在集羣中進行分佈式並行計算。

數據集:

RDD是數據集合的抽象,從外部看RDD就是封裝以後的可容錯的數據集

RDD至關因而一個代理,對RDD進行操做其實就是對分區進行操做,就是對每一臺機器上的迭代器進行操做,由於迭代器引用着咱們要操做的數據。
RDD存儲的是邏輯數據結構,不存儲真實數據,像關係數據庫中的view 視圖,只是表結構。

 

2.RDD的五個特徵:

A list of partitioner     一系列分區
A function for computing each split     會有一個函數做用在每一個切片上
A list of depedencies on other RDDs    即RDD具備血統,RDD和RDD之間存在依賴關係
Optionally, a Partitioner for key-value RDDs (可選)若是是RDD中裝的是KV類型的,那麼Shuffle時會有一個分區器。默認是HashPartitioner。目前只有HashPartitioner 和RangeRartitioner
Optionally, a list of preferred locations to compute each split on (可選)若是隻從HDFS中讀取數據,會感知數據則位置,將Executor啓動在數據所在的機器上

 

3.生成RDD的方式:

執行Transform操做(變換操做),根據已有的RDD計算獲得

讀取外部存儲系統的數據集,如HDFS,HBase,或任何與Hadoop有關的數據源。

將Driver的Scala集合經過並行化的方式變成RDD(試驗、測驗)
 

4.RDD的兩種操做:

針對RDD的操做,分兩種,一種是Transformation(變換),一種是Actions(執行)。

Transformation(變換)操做屬於懶操做(算子),不會真正觸發RDD的處理計算。

Actions(執行)操做纔會真正觸發。前者用於執行計算並指定輸出的形式,後者指定RDD之間的相互依賴關係。兩類操做的主要區別是,Transformation轉換操做(好比map、filter、join等)接受RDD並返回RDD,而Actions行動操做(好比count、collect等)接受RDD可是返回非RDD(即輸出一個值或結果)。

RDD採用了惰性調用,即在RDD的執行過程當中,真正的計算髮生在RDD的「行動」操做,對於「行動」以前的全部「轉換」操做,Spark只是記錄下「轉換」操做應用的一些基礎數據集以及RDD生成的軌跡,即相互之間的依賴關係,而不會觸發真正的計算。

<img  data-cke-saved-src='1.jpg' src='1.jpg'>


轉換操做:對於RDD而言,每一次轉換操做都會產生不一樣的RDD,供給下一個「轉換」使用。轉換獲得的RDD是惰性求值的,也就是說,整個轉換過程只是記錄了轉換的軌跡,並不會發生真正的計算,只有遇到行動操做時,纔會發生真正的計算,開始從血緣關係源頭開始,進行物理的轉換操做。
下面列出一些常見的轉換操做(Transformation API):

filter(func):篩選出知足函數func的元素,並返回一個新的數據集
map(func):將每一個元素傳遞到函數func中,並將結果返回爲一個新的數據集
flatMap(func):與map()類似,但每一個輸入元素均可以映射到0或多個輸出結果
reduceByKey(func):應用於(K,V)鍵值對的數據集時,返回一個新的(K, V)形式的數據集,其中的每一個值是將每一個key傳遞到函數func中進行聚合

行動操做:行動操做是真正觸發計算的地方。Spark程序執行到行動操做時,纔會執行真正的計算,從文件中加載數據,完成一次又一次轉換操做,最終,完成行動操做獲得結果。 
下面列出一些常見的行動操做(Action API):

count() 返回數據集中的元素個數
collect() 以數組的形式返回數據集中的全部元素
first() 返回數據集中的第一個元素
take(n) 以數組的形式返回數據集中的前n個元素
reduce(func) 經過函數func(輸入兩個參數並返回一個值)聚合數據集中的元素
foreach(func) 將數據集中的每一個元素傳遞到函數func中運行
 

5.RDD的依賴關係:

RDD的依賴關係是spark計算優於hadoop的重要緣由之一。

RDD中不一樣的操做會使得不一樣RDD中的分區會產生不一樣的依賴。

RDD中的依賴關係分爲窄依賴(Narrow Dependency)與寬依賴(Wide Dependency)

窄依賴:對於窄依賴操做,它們只是將Partition的數據根據轉換的規則進行轉化,並不涉及其餘的處理,能夠簡單地認爲只是將數據從一個形式轉換到另外一個形式。  如 map  filter  union等 

窄依賴表現爲一個父RDD的分區對應於一個子RDD的分區,或多個父RDD的分區對應於一個子RDD的分區

寬依賴:表現爲存在一個父RDD的一個分區對應一個子RDD的多個分區。寬依賴典型的操做包括groupByKey、sortByKey等

窄依賴源碼:

abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {

    //返回子RDD的partitionId依賴的全部的parent RDD的Partition(s)

    def getParents(partitionId: Int): Seq[Int]

    override def rdd: RDD[T] = _rdd

}

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {

    override def getParents(partitionId: Int) = List(partitionId)

}

寬依賴源碼:

class ShuffleDependency[K, V, C](

    @transient _rdd: RDD[_ <: Product2[K, V]],

    val partitioner: Partitioner,

    val serializer: Option[Serializer] = None,

    val keyOrdering: Option[Ordering[K]] = None,

    val aggregator: Option[Aggregator[K, V, C]] = None,

    val mapSideCombine: Boolean = false)

extends Dependency[Product2[K, V]] {

 

override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]

//獲取新的shuffleId

val shuffleId: Int = _rdd.context.newShuffleId()

//向ShuffleManager註冊Shuffle的信息

val shuffleHandle: ShuffleHandle =

_rdd.context.env.shuffleManager.registerShuffle(

    shuffleId, _rdd.partitions.size, this)

    _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))

}

spark中一旦遇到寬依賴就須要進行shuffle的操做,所謂的shuffle的操做的本質就是將數據彙總後從新分發的過程。

這個過程數據要彙總到一塊兒,數據量可能很大因此不可避免的須要進行數據落磁盤的操做,會下降程序的性能,因此spark並非徹底內存不讀寫磁盤,只能說它盡力避免這樣的過程來提升效率 。

spark中的shuffle,在早期的版本中,會產生多個臨時文件,可是這種多臨時文件的策略形成大量文件的同時的讀寫,磁盤的性能被分攤給多個文件,每一個文件讀寫效率都不高,影響spark的執行效率。因此在後續的spark中(1.2.0以後的版本)的shuffle中,只會產生一個文件,而且數據會通過排序再附加索引信息,減小了文件的數量並經過排序索引的方式提高了性能。

 

6.RDD的運行流程

<img  data-cke-saved-src='5.jpg' src='5.jpg'>

1)Driver端 建立RDD對象 SparkContext根據用戶提交的程序計算RDD之間的依賴關係,構建DAG

2)Driver端 DAGScheduler將DAG 切分Stage(切分的依據是遇到寬依賴shuffle),將stage中生成的Task以TaskSet的形式給TaskScheduler

3)Driver端 TaskScheduler調度Task(根據資源狀況將Task調度到對應的Executor中)

 4)Executor接收Task,而後用實現了Runnable接口的包裝類將Task包裝起來丟入到線程池中執行。

七、RDD底層實現原理
RDD是一個分佈式數據集,顧名思義,其數據應該分部存儲於多臺機器上。事實上,每一個RDD的數據都以Block的形式存儲於多臺機器上,下圖是Spark的RDD存儲架構圖,其中每一個Executor會啓動一個BlockManagerSlave,並管理一部分Block;而Block的元數據由Driver節點的BlockManagerMaster保存。BlockManagerSlave生成Block後向BlockManagerMaster註冊該Block,BlockManagerMaster管理RDD與Block的關係,當RDD再也不須要存儲的時候,將向BlockManagerSlave發送指令刪除相應的Block。

八、RDD cache的原理
RDD的轉換過程當中,並非每一個RDD都會存儲,若是某個RDD會被重複使用,或者計算其代價很高,那麼能夠經過顯示調用RDD提供的cache()方法,把該RDD存儲下來。那RDD的cache是如何實現的呢?

RDD中提供的cache()方法只是簡單的把該RDD放到cache列表中。當RDD的iterator被調用時,經過CacheManager把RDD計算出來,並存儲到BlockManager中,下次獲取該RDD的數據時即可直接經過CacheManager從BlockManager讀出。  

相關文章
相關標籤/搜索