本文介紹是基於Spark 1.3源碼html
RDD能夠從普通數組建立出來,也能夠從文件系統或者HDFS中的文件建立出來。es6
舉例:從普通數組建立RDD,裏面包含了1到9這9個數字,它們分別在3個分區中。web
scala> val a = sc.parallelize(1 to 9, 3) a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[103] at parallelize at <console>:21
舉例:讀取文件README.md來建立RDD,文件中的每一行就是RDD中的一個元素shell
scala> val file = sc.textFile("README.md") file: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[3] at textFile at <console>:21
雖然還有別的方式能夠建立RDD,但在本文中咱們主要使用上述兩種方式來建立RDD以說明Transformations。apache
map是對RDD中的每一個元素都執行一個指定的函數來產生一個新的RDD。任何原RDD中的元素在新RDD中都有且只有一個元素與之對應。數組
def map[U: ClassTag](f: T => U): RDD[U]
scala> val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:21 scala> val b = a.map(_.length) b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at map at <console>:23 scala> val c=a.zip(b) c: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[6] at zip at <console>:25 scala> c.collect res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
filter做用於原RDD中每一個元素,過濾掉原RDD中f返回值爲false的元素app
def filter(f: T => Boolean): RDD[T]
scala> val file=sc.textFile("README.md") file: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[19] at textFile at <console>:21 scala> file.filter(line=>line.contains("spark")).count res5: Long = 11 scala> file.filter(line=>line.contains("spark")).collect res6: Array[String] = Array(<http://spark.apache.org/>, guide, on the [project web page](http://spark.apache.org/documentation.html), ["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html)., " ./bin/spark-shell", " ./bin/pyspark", "examples to a cluster. This can be a mesos:// or spark:// URL, ", " MASTER=spark://host:7077 ./bin/run-example SparkPi", Testing first requires [building Spark](#building-spark). Once Spark is built, tests, ["Specifying the Hadoop Version"](http://spark.apache.org/docs/latest/building-with-maven.html#specifying-the-hadoop-version), ["Third Party Hadoop Distributions"](http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html), Please refer to the [Configuration guide](http://spark.apache.org/docs/latest/configurat...
flatMap和map的區別是做用於map的函數只會返回一個元素,做用後元素個數不變,而做用於flatMap的函數返回包含0個或多個元素list的迭代器maven
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
scala> val file=sc.textFile("README.md") file: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[25] at textFile at <console>:21 scala> file.flatMap(_.split(" ")).take(5) res11: Array[String] = Array(#, Apache, Spark, "", Spark) scala> file.map(_.split(" ")).take(5) res12: Array[Array[String]] = Array(Array(#, Apache, Spark), Array(""), Array(Spark, is, a, fast, and, general, cluster, computing, system, for, Big, Data., It, provides), Array(high-level, APIs, in, Scala,, Java,, and, Python,, and, an, optimized, engine, that), Array(supports, general, computation, graphs, for, data, analysis., It, also, supports, a)) scala> file.map(_.length).take(5) res1: Array[Int] = Array(14, 0, 78, 72, 73)
咱們在統計一個文件中有多少單詞時,應該使用flatMap,若是使用map分詞,每行返回一個數組。若是計算每行的長度應該使用mapide
mapPartitions是map的一個變種。map的輸入函數是應用於RDD中每一個元素,而mapPartitions的輸入函數是應用於每一個分區,也就是把每一個分區中的內容做爲總體來處理的。函數
def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
f即爲輸入函數,它處理每一個分區裏面的內容。每一個分區中的內容將以Iterator[T]傳遞給輸入函數f,f的輸出結果是Iterator[U]。最終的RDD由全部分區通過輸入函數處理後的結果合併起來的。
scala> :paste // Entering paste mode (ctrl-D to finish) val nums=sc.parallelize(1 to 9,3) nums.mapPartitions(iter=>{ var res = List[(Int, Int)]() var pre = iter.next while (iter.hasNext) { val cur = iter.next; res ::= (pre, cur) pre = cur } res.iterator }).collect() // Exiting paste mode, now interpreting. nums: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:21 res1: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
上述例子通過mapPartitions把分區中一個元素和它的下一個元素組成一個Tuple。由於分區中最後一個元素沒有下一個元素了,因此(3,4)和(6,7)不在結果中。
mapPartitions還有些變種,好比mapPartitionsWithIndex、mapPartitionsWithContext、
mapPartitionsWithSplit,可是從1.2開始mapPartitionsWithContext、mapPartitionsWithSplit這些已做廢,下面介紹mapPartionsWithIndex。
def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
mapPartitionsWithIndex相似於mapPartitions,只是做用函數是兩個參數,多了partition的索引。
scala> :paste // Entering paste mode (ctrl-D to finish) val nums=sc.parallelize(1 to 9,3) nums.mapPartitionsWithIndex((index,iter)=>{ if(index == 0) iter.toList.map(_*2).iterator else iter }).collect() // Exiting paste mode, now interpreting. nums: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21 res0: Array[Int] = Array(2, 4, 6, 4, 5, 6, 7, 8, 9)
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
https://spark.apache.org/docs/latest/programming-guide.html