有三個RDD ,分別是 rddA,rddB,rddC.取數據1,2,3,4,5
而且分紅三個分區,對輸入的數據的每個數據*2 ,只取大於 6 的數據.web
val rddA = sc.parallelize(List(1, 2, 3, 4, 5),3) //rddA: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] val rddB = rddA.map(_*2) //rddB: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] val rddC = rddB.filter(_>6) //rddC: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] rddC.collect() //res0: Array[Int] = Array(8, 10)
使用代碼rddC.toDebugString
打印依賴關係apache
res1: String = (2) MapPartitionsRDD[2] at filter at <console>:25 [] | MapPartitionsRDD[1] at map at <console>:25 [] | ParallelCollectionRDD[0] at parallelize at <console>:24 []
OneToOneDependency
RangeDependency
,它僅僅被org.apache.spark.rdd.UnionRDD
使用。UnionRDD
是把多個RDD合成一個RDD,這些RDD是被拼接而成,每一個父RDD的Partition
的相對順序不會變,只不過每一個父RDD在UnionRDD
中的Partition
的起始位置不一樣map
, filter
, union
, join
, mapPartitions
, mapValues
ide
Shuffle
的操做.在會 job
中產生一個stage
groupByKey
, join
,partitionBy
,reduce
常見算子ui
val path = "/user/spark/data/wc.txt" val lines = sc.textFile(path, 3) //查看每一個分區的數據 // lines.mapPartitionsWithIndex((n, partition) => { // partition.map(x => (s"分區編號${n}", s"分區數據${x}")) // }).foreach(println) val words = lines.flatMap(_.split(",")) val wordPair = words.map(x => (x, 1)) val result = wordPair.reduceByKey(_ + _) result.collect().foreach(println)
若是以爲文章不錯的話記得關注下公號哦idea