理解Spark的核心RDD

與許多專有的大數據處理平臺不一樣,Spark創建在統一抽象的RDD之上,使得它能夠以基本一致的方式應對不一樣的大數據處理場景,包括MapReduce,Streaming,SQL,Machine Learning以及Graph等。這即Matei Zaharia所謂的「設計一個通用的編程抽象(Unified Programming Abstraction)。這正是Spark這朵小火花讓人着迷的地方。 要理解Spark,就需得理解RDD。node

###RDD是什麼? RDD,全稱爲Resilient Distributed Datasets,是一個容錯的、並行的數據結構,可讓用戶顯式地將數據存儲到磁盤和內存中,並能控制數據的分區。同時,RDD還提供了一組豐富的操做來操做這些數據。在這些操做中,諸如map、flatMap、filter等轉換操做實現了monad模式,很好地契合了Scala的集合操做。除此以外,RDD還提供了諸如join、groupBy、reduceByKey等更爲方便的操做(注意,reduceByKey是action,而非transformation),以支持常見的數據運算。編程

一般來說,針對數據處理有幾種常見模型,包括:Iterative Algorithms,Relational Queries,MapReduce,Stream Processing。例如Hadoop MapReduce採用了MapReduces模型,Storm則採用了Stream Processing模型。RDD混合了這四種模型,使得Spark能夠應用於各類大數據處理場景。數組

RDD做爲數據結構,本質上是一個只讀的分區記錄集合。一個RDD能夠包含多個分區,每一個分區就是一個dataset片斷。RDD能夠相互依賴。若是RDD的每一個分區最多隻能被一個Child RDD的一個分區使用,則稱之爲narrow dependency;若多個Child RDD分區均可以依賴,則稱之爲wide dependency。不一樣的操做依據其特性,可能會產生不一樣的依賴。例如map操做會產生narrow dependency,而join操做則產生wide dependency。網絡

Spark之因此將依賴分爲narrow與wide,基於兩點緣由。數據結構

首先,narrow dependencies能夠支持在同一個cluster node上以管道形式執行多條命令,例如在執行了map後,緊接着執行filter。相反,wide dependencies須要全部的父分區都是可用的,可能還須要調用相似MapReduce之類的操做進行跨節點傳遞。架構

其次,則是從失敗恢復的角度考慮。narrow dependencies的失敗恢復更有效,由於它只須要從新計算丟失的parent partition便可,並且能夠並行地在不一樣節點進行重計算。而wide dependencies牽涉到RDD各級的多個Parent Partitions。下圖說明了narrow dependencies與wide dependencies之間的區別: app

本圖來自Matei Zaharia撰寫的論文An Architecture for Fast and General Data Processing on Large Clusters。圖中,一個box表明一個RDD,一個帶陰影的矩形框表明一個partition。分佈式

###RDD如何保障數據處理效率? RDD提供了兩方面的特性persistence和patitioning,用戶能夠經過persist與patitionBy函數來控制RDD的這兩個方面。RDD的分區特性與並行計算能力(RDD定義了parallerize函數),使得Spark能夠更好地利用可伸縮的硬件資源。若將分區與持久化兩者結合起來,就能更加高效地處理海量數據。例如:ide

input.map(parseArticle _).partitionBy(partitioner).cache()

partitionBy函數須要接受一個Partitioner對象,如:函數

val partitioner = new HashPartitioner(sc.defaultParallelism)

RDD本質上是一個內存數據集,在訪問RDD時,指針只會指向與操做相關的部分。例如存在一個面向列的數據結構,其中一個實現爲Int的數組,另外一個實現爲Float的數組。若是隻須要訪問Int字段,RDD的指針能夠只訪問Int數組,避免了對整個數據結構的掃描。

RDD將操做分爲兩類:transformation與action。不管執行了多少次transformation操做,RDD都不會真正執行運算,只有當action操做被執行時,運算纔會觸發。而在RDD的內部實現機制中,底層接口則是基於迭代器的,從而使得數據訪問變得更高效,也避免了大量中間結果對內存的消耗。

在實現時,RDD針對transformation操做,都提供了對應的繼承自RDD的類型,例如map操做會返回MappedRDD,而flatMap則返回FlatMappedRDD。當咱們執行map或flatMap操做時,不過是將當前RDD對象傳遞給對應的RDD對象而已。例如:

def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))

這些繼承自RDD的類都定義了compute函數。該函數會在action操做被調用時觸發,在函數內部是經過迭代器進行對應的轉換操做:

private[spark]
class MappedRDD[U: ClassTag, T: ClassTag](prev: RDD[T], f: T => U)
  extends RDD[U](prev) {

  override def getPartitions: Array[Partition] = firstParent[T].partitions

  override def compute(split: Partition, context: TaskContext) =
    firstParent[T].iterator(split, context).map(f)
}

###RDD對容錯的支持 支持容錯一般採用兩種方式:數據複製或日誌記錄。對於以數據爲中心的系統而言,這兩種方式都很是昂貴,由於它須要跨集羣網絡拷貝大量數據,畢竟帶寬的數據遠遠低於內存。

RDD天生是支持容錯的。首先,它自身是一個不變的(immutable)數據集,其次,它可以記住構建它的操做圖(Graph of Operation),所以當執行任務的Worker失敗時,徹底能夠經過操做圖得到以前執行的操做,進行從新計算。因爲無需採用replication方式支持容錯,很好地下降了跨網絡的數據傳輸成本。

不過,在某些場景下,Spark也須要利用記錄日誌的方式來支持容錯。例如,在Spark Streaming中,針對數據進行update操做,或者調用Streaming提供的window操做時,就須要恢復執行過程的中間狀態。此時,須要經過Spark提供的checkpoint機制,以支持操做可以從checkpoint獲得恢復。

針對RDD的wide dependency,最有效的容錯方式一樣仍是採用checkpoint機制。不過,彷佛Spark的最新版本仍然沒有引入auto checkpointing機制。 ###總結 RDD是Spark的核心,也是整個Spark的架構基礎。它的特性能夠總結以下:

  • 它是不變的數據結構存儲
  • 它是支持跨集羣的分佈式數據結構
  • 能夠根據數據記錄的key對結構進行分區
  • 提供了粗粒度的操做,且這些操做都支持分區
  • 它將數據存儲在內存中,從而提供了低延遲性
相關文章
相關標籤/搜索