咱們知道,在Spark中建立RDD的建立方式大概能夠分爲三種:node
而從集合中建立RDD,Spark主要提供了兩種函數:parallelize和makeRDD。咱們能夠先看看這兩個函數的聲明:apache
def parallelize[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism):RDD[T] def makeRDD[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]
咱們能夠從上面看出makeRDD有兩種實現,並且第一個makeRDD函數接收的參數和parallelize徹底一致。其實第一種makeRDD函數實現是依賴了parallelize函數的實現,來看看Spark中是怎麼實現這個makeRDD函數的:函數
def makeRDD[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { parallelize(seq, numSlices) }
咱們能夠看出,這個makeRDD函數徹底和parallelize函數一致。可是咱們得看看第二種makeRDD函數函數實現了,它接收的參數類型是Seq[(T, Seq[String])],Spark文檔的說明是this
Distribute a local Scala collection to form an RDD, with one or more location preferences (hostnames of Spark nodes) for each object. Create a new partition for each collection item.spa
原來,這個函數還爲數據提供了位置信息,來看看咱們怎麼使用:scala
scala> val iteblog1 = sc.parallelize(List(1,2,3)) iteblog1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[10] at parallelize at <console>:21 scala> val iteblog2 = sc.makeRDD(List(1,2,3)) iteblog2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at makeRDD at <console>:21 scala> val seq = List((1, List("iteblog.com", "sparkhost1.com", "sparkhost2.com")),(2, List("iteblog.com", "sparkhost2.com"))) seq: List[(Int, List[String])] = List((1,List(iteblog.com, sparkhost1.com,sparkhost2.com)),(2,List(iteblog.com, sparkhost2.com))) scala> val iteblog3 = sc.makeRDD(seq) iteblog3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at makeRDD at <console>:23 scala> iteblog3.preferredLocations(iteblog3.partitions(1)) res26: Seq[String] = List(iteblog.com, sparkhost2.com) scala> iteblog3.preferredLocations(iteblog3.partitions(0)) res27: Seq[String] = List(iteblog.com, sparkhost1.com, sparkhost2.com) scala> iteblog1.preferredLocations(iteblog1.partitions(0)) res28: Seq[String] = List()
咱們能夠看到,makeRDD函數有兩種實現,第一種實現其實徹底和parallelize一致;而第二種實現能夠爲數據提供位置信息,而除此以外的實現和parallelize函數也是一致的,以下:code
def parallelize[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { assertNotStopped() new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) } def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope { assertNotStopped() val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs) }
都是返回ParallelCollectionRDD,並且這個makeRDD的實現不能夠本身指定分區的數量,而是固定爲seq參數的size大小。orm