spark RDD

RDD

RDD是Resilient Distributed Dataset的英文縮寫,是spark的基本數據抽象,表明着一個不可變的、多分區的、可並行操做的元素集合。java

RDD有5個主要屬性:ide

  • 分區列表 (partition list)
  • 計算某個分區函數(compute)
  • 依賴列表 (dependency list)
  • kv類型RDD的分區器(可選的)
  • 計算某個分區最優位置的函數(可選的)
abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {
  
  def compute(split: Partition, context: TaskContext): Iterator[T]

  protected def getPartitions: Array[Partition]

  protected def getDependencies: Seq[Dependency[_]] = deps

  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

  @transient val partitioner: Option[Partitioner] = None
  
  def this(@transient oneParent: RDD[_]) =
    this(oneParent.context, List(new OneToOneDependency(oneParent)))

可見血統關係是經過deps依賴列表來保存的,若是不指定依賴列表則默認建立一對一的依賴關係OneToOneDependency函數

函數注入

RDD類中定義了一些通用的轉換函數如map``fliter``union等同時RDD的伴生對象中經過隱式轉換的方式定義了一些額外的轉換函數,好比kv類型的RDD一些轉換函數:groupByKey cogroupthis

implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
    new PairRDDFunctions(rdd)
  }

依賴關係

Dependency抽象類來描述依賴關係,有兩種子類:spa

  • NarrowDependency 這也就是常說的窄依賴,子RDD的每個分區依賴固定個父RDD的分區。這種依賴關係是固定的能夠經過def getParents(partitionId: Int): Seq[Int]函數計算出子RDD的某個分區依賴的父RDD的分區。
  • ShuffleDependency 也就是常說的寬依賴,這種依賴關係會觸發shuffle,也是spark任務劃分stage的標準。

具體實現

  • MapPartitionsRDD 不建立新的分區列表,採用一對一的依賴關係,每一個分區的計算就是在對應父分區上運用傳入的轉換函數。
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

  override def getPartitions: Array[Partition] = firstParent[T].partitions

  override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context)
  • ParallelCollectionRDD 做爲source類型的RDD,依賴列表爲空,會根據傳入的數據和並行度計算新的分區列表,用ParallelCollectionPartition對象保存每一個分區的數據。
override def getPartitions: Array[Partition] = {
    val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
    slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
  }

  override def compute(s: Partition, context: TaskContext): Iterator[T] = {
    new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)
  }

  override def getPreferredLocations(s: Partition): Seq[String] = {
    locationPrefs.getOrElse(s.index, Nil)
  }
  • 把握好5個主要屬性很容易實現自定義的RDD
相關文章
相關標籤/搜索