關鍵字:Spark算子、Spark RDD基本轉換、map、flatMap、distinctes6
將一個RDD中的每一個數據項,經過map中的函數映射變爲一個新的元素。apache
輸入分區與輸出分區一對一,即:有多少個輸入分區,就有多少個輸出分區。數組
- hadoop fs -cat /tmp/lxw1234/1.txt
- hello world
- hello spark
- hello hive
-
-
- //讀取HDFS文件到RDD
- scala> var data = sc.textFile("/tmp/lxw1234/1.txt")
- data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at :21
-
- //使用map算子
- scala> var mapresult = data.map(line => line.split("\\s+"))
- mapresult: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at :23
-
- //運算map算子結果
- scala> mapresult.collect
- res0: Array[Array[String]] = Array(Array(hello, world), Array(hello, spark), Array(hello, hive))
-
-
屬於Transformation算子,第一步和map同樣,最後將全部的輸出分區合併成一個。函數
- /使用flatMap算子
- scala> var flatmapresult = data.flatMap(line => line.split("\\s+"))
- flatmapresult: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at :23
-
- //運算flagMap算子結果
- scala> flatmapresult.collect
- res1: Array[String] = Array(hello, world, hello, spark, hello, hive)
-
使用flatMap時候須要注意:
flatMap會將字符串當作是一個字符數組。
看下面的例子:oop
- scala> data.map(_.toUpperCase).collect
- res32: Array[String] = Array(HELLO WORLD, HELLO SPARK, HELLO HIVE, HI SPARK)
- scala> data.flatMap(_.toUpperCase).collect
- res33: Array[Char] = Array(H, E, L, L, O, , W, O, R, L, D, H, E, L, L, O, , S, P, A, R, K, H, E, L, L, O, , H, I, V, E, H, I, , S, P, A, R, K)
-
再看:spa
- scala> data.map(x => x.split("\\s+")).collect
- res34: Array[Array[String]] = Array(Array(hello, world), Array(hello, spark), Array(hello, hive), Array(hi, spark))
-
- scala> data.flatMap(x => x.split("\\s+")).collect
- res35: Array[String] = Array(hello, world, hello, spark, hello, hive, hi, spark)
-
此次的結果好像是預期的,最終結果裏面並無把字符串當成字符數組。
這是由於此次map函數中返回的類型爲Array[String],並非String。
flatMap只會將String扁平化成字符數組,並不會把Array[String]也扁平化成字符數組。scala
參考:
http://alvinalexander.com/scala/collection-scala-flatmap-examples-map-flattenorm
對RDD中的元素進行去重操做。htm
- scala> data.flatMap(line => line.split("\\s+")).collect
- res61: Array[String] = Array(hello, world, hello, spark, hello, hive, hi, spark)
-
- scala> data.flatMap(line => line.split("\\s+")).distinct.collect
- res62: Array[String] = Array(hive, hello, world, spark, hi)