RDD是Resilient Distributed Dataset
的英文縮寫,是spark的基本數據抽象,表明着一個不可變的、多分區的、可並行操做的元素集合。java
RDD有5個主要屬性:ide
abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging { def compute(split: Partition, context: TaskContext): Iterator[T] protected def getPartitions: Array[Partition] protected def getDependencies: Seq[Dependency[_]] = deps protected def getPreferredLocations(split: Partition): Seq[String] = Nil @transient val partitioner: Option[Partitioner] = None def this(@transient oneParent: RDD[_]) = this(oneParent.context, List(new OneToOneDependency(oneParent)))
可見血統關係是經過deps
依賴列表來保存的,若是不指定依賴列表則默認建立一對一的依賴關係OneToOneDependency
函數
RDD類中定義了一些通用的轉換函數如map``fliter``union
等同時RDD的伴生對象中經過隱式轉換的方式定義了一些額外的轉換函數,好比kv類型的RDD一些轉換函數:groupByKey
cogroup
等this
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = { new PairRDDFunctions(rdd) }
Dependency
抽象類來描述依賴關係,有兩種子類:spa
def getParents(partitionId: Int): Seq[Int]
函數計算出子RDD的某個分區依賴的父RDD的分區。override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None override def getPartitions: Array[Partition] = firstParent[T].partitions override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context)
ParallelCollectionPartition
對象保存每一個分區的數據。override def getPartitions: Array[Partition] = { val slices = ParallelCollectionRDD.slice(data, numSlices).toArray slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray } override def compute(s: Partition, context: TaskContext): Iterator[T] = { new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator) } override def getPreferredLocations(s: Partition): Seq[String] = { locationPrefs.getOrElse(s.index, Nil) }