Spark之RDD的定義及五大特性

  RDD是分佈式內存的一個抽象概念,是一種高度受限的共享內存模型,即RDD是隻讀的記錄分區的集合,能橫跨集羣全部節點並行計算,是一種基於工做集的應用抽象。apache

  RDD底層存儲原理:其數據分佈存儲於多臺機器上,事實上,每一個RDD的數據都以Block的形式存儲於多臺機器上,每一個Executor會啓動一個BlockManagerSlave,並管理一部分Block;而Block的元數據由Driver節點上的BlockManagerMaster保存,BlockManagerSlave生成Block後向BlockManagerMaster註冊該Block,BlockManagerMaster管理RDD與Block的關係,當RDD再也不須要存儲的時候,將向BlockManagerSlave發送指令刪除相應的Block。數組

  BlockManager管理RDD的物理分區,每一個Block就是節點上對應的一個數據塊,能夠存儲在內存或者磁盤上。而RDD中的Partition是一個邏輯數據塊,對應相應的物理塊Block。本質上,一個RDD在代碼中至關於數據的一個元數據結構,存儲着數據分區及其邏輯結構映射關係,存儲着RDD以前的依賴轉換關係。緩存

  BlockManager在每一個節點上運行管理Block(Driver和Executors),它提供一個接口檢索本地和遠程的存儲變量,如memory、disk、off-heap。使用BlockManager前必須先初始化。BlockManager.scala的部分源碼以下所示:安全

private[spark] class BlockManager(
    executorId: String,
    rpcEnv: RpcEnv,
    val master: BlockManagerMaster,
    serializerManager: SerializerManager,
    val conf: SparkConf,
    memoryManager: MemoryManager,
    mapOutputTracker: MapOutputTracker,
    shuffleManager: ShuffleManager,
    val blockTransferService: BlockTransferService,
    securityManager: SecurityManager,
    numUsableCores: Int)
  extends BlockDataManager with BlockEvictionHandler with Logging {

  BlockManagerMaster會持有整個Application的Block的位置、Block所佔用的存儲空間等元數據信息,在Spark的Driver的DAGScheduler中,就是經過這些信息來確認數據運行的本地性的。Spark支持重分區,數據經過Spark默認的或者用戶自定義的分區器決定數據塊分佈在哪些節點。RDD的物理分區是由Block-Manager管理的,每一個Block就是節點上對應的一個數據塊,能夠存儲在內存或者磁盤。而RDD中的partition是一個邏輯數據塊,對應相應的物理塊Block。本質上,一個RDD在代碼中至關於數據的一個元數據結構(一個RDD就是一組分區),存儲着數據分區及Block、Node等的映射關係,以及其餘元數據信息,存儲着RDD以前的依賴轉換關係。分區是一個邏輯概念,Transformation先後的新舊分區在物理上多是同一塊內存存儲。  數據結構

  Spark經過讀取外部數據建立RDD,或經過其餘RDD執行肯定的轉換Transformation操做(如map、union和groubByKey)而建立,從而構成了線性依賴關係,或者說血統關係(Lineage),在數據分片丟失時能夠從依賴關係中恢復本身獨立的數據分片,對其餘數據分片或計算機沒有影響,基本沒有檢查點開銷,使得實現容錯的開銷很低,失效時只須要從新計算RDD分區,就能夠在不一樣節點上並行執行,而不須要回滾(Roll Back)整個程序。落後任務(即運行很慢的節點)是經過任務備份,從新調用執行進行處理的。分佈式

  由於RDD自己支持基於工做集的運用,因此可使Spark的RDD持久化(persist)到內存中,在並行計算中高效重用。多個查詢時,咱們就能夠顯性地將工做集中的數據緩存到內存中,爲後續查詢提供複用,這極大地提高了查詢的速度。在Spark中,一個RDD就是一個分佈式對象集合,每一個RDD可分爲多個片(Partitions),而分片能夠在集羣環境的不一樣節點上計算。ide

  RDD做爲泛型的抽象的數據結構,支持兩種計算操做算子:Transformation(變換)與Action(行動)。且RDD的寫操做是粗粒度的,讀操做既能夠是粗粒度的,也能夠是細粒度的。RDD.scala的源碼以下: 函數

/**
 * Internally, each RDD is characterized by five main properties:
 * 每一個RDD都有5個主要特性
 *  - A list of partitions    分區列表
 *  - A function for computing each split    每一個分區都有一個計算函數
 *  - A list of dependencies on other RDDs    依賴於其餘RDD的列表
 *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)    數據類型(key-value)的RDD分區器
 *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for    每一個分區都有一個分區位置列表
 */
abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {

  其中,SparkContext是Spark功能的主要入口點,一個SparkContext表明一個集羣鏈接,能夠用其在集羣中建立RDD、累加變量、廣播變量等,在每個可用的JVM中只有一個SparkContext,在建立一個新的SparkContext以前,必須先中止該JVM中可用的SparkContext,這種限制可能最終會被修改。SparkContext被實例化時須要一個SparkConf對象去描述應用的配置信息,在這個配置對象中設置的信息,會覆蓋系統默認的配置。大數據

  RDD五大特性:this

  (1)分區列表(a list of partitions)。Spark RDD是被分區的,每個分區都會被一個計算任務(Task)處理,分區數決定並行計算數量,RDD的並行度默認從父RDD傳給子RDD。默認狀況下,一個HDFS上的數據分片就是一個Partition,RDD分片數決定了並行計算的力度,能夠在建立RDD時指定RDD分片個數,若是不指定分區數量,當RDD從集合建立時,則默認分區數量爲該程序所分配到的資源的CPU核數(每一個Core能夠承載2~4個Partition),若是是從HDFS文件建立,默認爲文件的Block數。

  (2)每個分區都有一個計算函數(a function for computing each split)。每一個分區都會有計算函數,Spark的RDD的計算函數是以分片爲基本單位的,每一個RDD都會實現compute函數,對具體的分片進行計算,RDD中的分片是並行的,因此是分佈式並行計算。有一點很是重要,就是因爲RDD有先後依賴關係,遇到寬依賴關係,例如,遇到reduceBykey等寬依賴操做的算子,Spark將根據寬依賴劃分Stage,Stage內部經過Pipeline操做,經過Block Manager獲取相關的數據,由於具體的split要從外界讀數據,也要把具體的計算結果寫入外界,因此用了一個管理器,具體的split都會映射成BlockManager的Block,而具體split會被函數處理,函數處理的具體形式是以任務的形式進行的。

  (3)依賴於其餘RDD的列表(a list of dependencies on other RDDs)。RDD的依賴關係,因爲RDD每次轉換都會生成新的RDD,因此RDD會造成相似流水線的先後依賴關係,固然,寬依賴就不相似於流水線了,寬依賴後面的RDD具體的數據分片會依賴前面全部的RDD的全部的數據分片,這時數據分片就不進行內存中的Pipeline,這時通常是跨機器的。由於有先後的依賴關係,因此當有分區數據丟失的時候,Spark會經過依賴關係從新計算,算出丟失的數據,而不是對RDD全部的分區進行從新計算。RDD之間的依賴有兩種:窄依賴(Narrow Dependency)、寬依賴(Wide Dependency)。RDD是Spark的核心數據結構,經過RDD的依賴關係造成調度關係。經過對RDD的操做造成整個Spark程序。

    RDD有Narrow Dependency和Wide Dependency兩種不一樣類型的依賴,其中的Narrow Dependency指的是每個parent RDD的Partition最多被child RDD的一個Partition所使用,而Wide Dependency指的是多個child RDD的Partition會依賴於同一個parent RDD的Partition。能夠從兩個方面來理解RDD之間的依賴關係:一方面是該RDD的parent RDD是什麼;另外一方面是依賴於parent RDD的哪些Partitions;根據依賴於parent RDD的Partitions的不一樣狀況,Spark將Dependency分爲寬依賴和窄依賴兩種。Spark中寬依賴指的是生成的RDD的每個partition都依賴於父RDD的全部partition,寬依賴典型的操做有groupByKey、sortByKey等,寬依賴意味着shuffle操做,這是Spark劃分Stage邊界的依據,Spark中寬依賴支持兩種Shuffle Manager,即HashShuffleManager和SortShuffleManager,前者是基於Hash的Shuffle機制,後者是基於排序的Shuffle機制。Spark 2.2如今的版本中已經沒有Hash Shuffle的方式。

  (4)key-value數據類型的RDD分區器(-Optionally,a Partitioner for key-value RDDS),控制分區策略和分區數。每一個key-value形式的RDD都有Partitioner屬性,它決定了RDD如何分區。固然,Partition的個數還決定每一個Stage的Task個數。RDD的分片函數,想控制RDD的分片函數的時候能夠分區(Partitioner)傳入相關的參數,如HashPartitioner、RangePartitioner,它自己針對key-value的形式,若是不是key-value的形式,它就不會有具體的Partitioner。Partitioner自己決定了下一步會產生多少並行的分片,同時,它自己也決定了當前並行(parallelize)Shuffle輸出的並行數據,從而使Spark具備可以控制數據在不一樣節點上分區的特性,用戶能夠自定義分區策略,如Hash分區等。Spark提供了「partitionBy」運算符,能經過集羣對RDD進行數據再分配來建立一個新的RDD。

  (5)每一個分區都有一個優先位置列表(-Optionally,a list of preferred locations to compute each split on)。它會存儲每一個Partition的優先位置,對於一個HDFS文件來講,就是每一個Partition塊的位置。觀察運行spark集羣的控制檯會發現Spark的具體計算,具體分片前,它已經清楚地知道任務發生在什麼節點上,也就是說,任務自己是計算層面的、代碼層面的,代碼發生運算以前已經知道它要運算的數據在什麼地方,有具體節點的信息。這就符合大數據中數據不動代碼動的特色。數據不動代碼動的最高境界是數據就在當前節點的內存中。這時有多是memory級別或Alluxio級別的,Spark自己在進行任務調度時候,會盡量將任務分配處處理數據的數據塊所在的具體位置。據Spark的RDD.Scala源碼函數getPreferredLocations可知,每次計算都符合完美的數據本地性。
RDD類源碼文件中的4個方法和一個屬性對應上述闡述的RDD的5大特性。RDD.scala的源碼以下:

  /**
   * :: DeveloperApi ::
   * Implemented by subclasses to compute a given partition. 經過子類實現給定分區的計算
   */
  @DeveloperApi
  def compute(split: Partition, context: TaskContext): Iterator[T]

  /**
   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   * 經過子類實現,返回一個RDD分區列表,這個方法只被調用一次,它是安全的執行一次耗時計算
   * 
   * 數組中的分區必須符合如下屬性設置
   * The partitions in this array must satisfy the following property:
   *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
   */
  protected def getPartitions: Array[Partition]

  /**
   * 返回對父RDD的依賴列表,這個方法僅只被調用一次,它是安全的執行一次耗時計算
   * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   */
  protected def getDependencies: Seq[Dependency[_]] = deps

  /**
   * 可選的,指定優先位置,輸入參數是spilt分片,輸出結果是一組優先的節點位置
   * Optionally overridden by subclasses to specify placement preferences.
   */
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

  /** 
   * Optionally overridden by subclasses to specify how they are partitioned. 
   * 可選的,經過子類實現,指定如何分區
   */
  @transient val partitioner: Option[Partitioner] = None

  其中,TaskContext是讀取或改變執行任務的環境,用org.apache.spark.TaskContext.get()可返回當前可用的TaskContext,能夠調用內部的函數訪問正在運行任務的環境信息。Partitioner是一個對象,定義瞭如何在key-Value類型的RDD元素中用Key分區,從0到numPartitions-1區間內映射每個Key到Partition ID。Partition是一個RDD的分區標識符。Partition.scala的源碼以下。  

/**
 * An identifier for a partition in an RDD.
 */
trait Partition extends Serializable {
  /**
   * Get the partition's index within its parent RDD
   */
  def index: Int

  // A better default implementation of HashCode
  override def hashCode(): Int = index

  override def equals(other: Any): Boolean = super.equals(other)
}
相關文章
相關標籤/搜索