scala> val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6,7,8)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24
scala> val rdd2= sc.textFile("hdfs://hadoop102:9000/RELEASE") rdd2: org.apache.spark.rdd.RDD[String] = hdfs:// hadoop102:9000/RELEASE MapPartitionsRDD[4] at textFile at <console>:24
後面詳解java
scala> var source = sc.parallelize(1 to 10) source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
scala> source.collect() res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val mapadd = source.map(_ * 2) mapadd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at <console>:26
scala> mapadd.collect() res8: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
測試:es6
scala> val rdd = sc.parallelize(Array(1,2,3,4,5)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24 scala> rdd.map(_+1).collect res16: Array[Int] = Array(2, 3, 4, 5, 6) scala> rdd.map((_,1)).collect res17: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1))
scala> val rdd = sc.parallelize(Array(1,2,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> rdd.mapPartitions(x=>x.map(_*2)) res3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:27
scala> res3.collect res4: Array[Int] = Array(2, 4, 6, 8)
測試:面試
scala> rdd.partitions.size res18: Int = 4 scala> rdd.mapPartitions(x => Iterator(x.mkString("|"))) res20: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at mapPartitions at <console>:27 scala> rdd.mapPartitions(x => Iterator(x.mkString("|"))).collect res21: Array[String] = Array(1, 2, 3, 4|5)
scala> val rdd = sc.parallelize(Array(1,2,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
(2)使每一個元素跟所在分區造成一個元組組成一個新的 RDDshell
scala> val indexRdd = rdd.mapPartitionsWithIndex((index,items)=>(items.map((index,_)))) indexRdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at mapPartitionsWithIndex at <console>:26
scala> indexRdd.collect res2: Array[(Int, Int)] = Array((0,1), (0,2), (1,3), (1,4))
測試:apache
val rdd = sc.makeRDD(Array(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24 scala> rdd.partitions.size res1: Int = 4 scala> rdd.mapPartitions mapPartitions mapPartitionsWithIndex scala> rdd.mapPartitionsWithIndex((x,y) => Iterator(x+":"+y.mkString("|"))).collect res2: Array[String] = Array(0:1, 1:2, 2:3, 3:4|5) scala> sc.makeRDD(Array(1,2,3,4),5).mapPartitionsWithIndex((x,y) => Iterator(x+":"+y.mkString("|"))).collect res3: Array[String] = Array(0:, 1:1, 2:2, 3:3, 4:4)
scala> val sourceFlat = sc.parallelize(1 to 5)
sourceFlat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24
scala> sourceFlat.collect()
res11: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val flatMap = sourceFlat.flatMap(1 to _)
flatMap: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at flatMap at <console>:26
scala> flatMap.collect()
res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)
測試:編程
scala> val rdd = sc.makeRDD(Array(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24 scala> rdd.flatMap(x => Array(x + 1)) res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at flatMap at <console>:27 scala> rdd.flatMap(x => Array(x + 1)).collect res2: Array[Int] = Array(2, 3, 4, 5, 6)
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24
scala> rdd.glom().collect()
res25: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13,14, 15, 16))
scala> val rdd = sc.parallelize(1 to 4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24
scala> val group = rdd.groupBy(_%2) group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:26
scala> group.collect res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4)), (1,CompactBuffer(1, 3)))
scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))
sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24
scala> sourceFilter.collect()
res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi)
scala> val filter = sourceFilter.filter(_.contains("xiao"))
filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:26
scala> filter.collect()
res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)
測試:數組
scala> val rdd = sc.makeRDD(Array(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24 scala> rdd.filter(_%2 == 0).collect res3: Array[Int] = Array(2, 4)
scala> val rdd = sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24
scala> rdd.collect() res15: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> var sample1 = rdd.sample(true,0.4,2) sample1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[21] at sample at <console>:26
scala> sample1.collect() res16: Array[Int] = Array(1, 2, 2, 7, 7, 8, 9)
scala> var sample2 = rdd.sample(false,0.2,3) sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[22] at sample at <console>:26
scala> sample2.collect() res17: Array[Int] = Array(1, 9)
測試:網絡
scala> val rdd = sc.parallelize(1 to 20) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24 scala> rdd.sample(true,0.3,2) res4: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[5] at sample at <console>:27 scala> rdd.sample(true,0.3,2).collect res6: Array[Int] = Array(1, 2, 3, 3, 4, 13, 13, 14, 15, 16, 17) scala> rdd.sample(false,0.3,2).collect res7: Array[Int] = Array(1, 3, 12, 15, 16, 19, 20)
scala> val distinctRdd = sc.parallelize(List(1,2,1,5,2,9,6,1)) distinctRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:24
scala> val unionRDD = distinctRdd.distinct() unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[37] at distinct at <console>:26
scala> unionRDD.collect() res20: Array[Int] = Array(1, 9, 5, 6, 2)
scala> val unionRDD = distinctRdd.distinct(2) unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[40] at distinct at <console>:26
scala> unionRDD.collect() res21: Array[Int] = Array(6, 2, 1, 9, 5)
測試:數據結構
scala> val rdd = sc.parallelize(Array(1,1,2)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24 scala> rdd.distinct.collect res9: Array[Int] = Array(1, 2)
scala> val rdd = sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at parallelize at <console>:24
scala> rdd.partitions.size res20: Int = 4
scala> val coalesceRDD = rdd.coalesce(3) coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[55] at coalesce at <console>:26
scala> coalesceRDD.partitions.size res21: Int = 3
scala> val rdd = sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at <console>:24
scala> rdd.partitions.size res22: Int = 4
scala> val rerdd = rdd.repartition(2) rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[60] at repartition at <console>:26
scala> rerdd.partitions.size res23: Int = 2
測試:app
scala> rdd.partitions.size res14: Int = 4 scala> rdd.repartition(2).partitions.size res15: Int = 2
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }
scala> val rdd = sc.parallelize(List(2,1,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24
scala> rdd.sortBy(x => x).collect() res11: Array[Int] = Array(1, 2, 3, 4)
scala> rdd.sortBy(x => x%3).collect() res12: Array[Int] = Array(3, 4, 1, 2)
測試:
scala> rdd.sortBy(_*2) res16: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[24] at sortBy at <console>:28 scala> rdd.sortBy(_*2).collect res17: Array[Int] = Array(1, 1, 2) scala> rdd.sortBy(_*2,false).collect res18: Array[Int] = Array(2, 1, 1)
repartitionAndSortWithinPartitions 函數是 repartition 函數的變量,與 repartition 函數不一樣的是
repartitionAndSortWithin在給定的 partitioner 內部進行排序,性能比 repartition 要高。
Shell 腳本:
#!/bin/sh echo "AA" while read LINE; do echo ">>>"${LINE} done
[lxl@hadoop102 spark]$ chmod 777 pipe.sh
scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),1) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[50] at parallelize at <console>:24
scala> rdd.pipe("/opt/module/spark/pipe.sh").collect() res18: Array[String] = Array(AA, >>>hi, >>>Hello, >>>how, >>>are, >>>you)
scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),2) rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[52] at parallelize at <console>:24
scala> rdd.pipe("/opt/module/spark/pipe.sh").collect() res19: Array[String] = Array(AA, >>>hi, >>>Hello, AA, >>>how, >>>are, >>>you)
測試:
scala> val rdd = sc.parallelize(Array(1,1,2)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
scala> rdd.pipe("/opt/module/spark/pipe.sh").collect res26: Array[String] = Array(AA, AA, >>>1, AA, >>>1, AA, >>>2)
scala> sc.parallelize(1 to 3,1).pipe("/opt/module/spark/pipe.sh").collect
res27: Array[String] = Array(AA, >>>1, >>>2, >>>3)
scala> val rdd1 = sc.parallelize(1 to 5) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(5 to 10) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> val rdd3 = rdd1.union(rdd2) rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[25] at union at <console>:28
scala> rdd3.collect() res18: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)
測試:
scala> val rdd = sc.parallelize(Array(1,1,2))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
scala> rdd.union(sc.parallelize(2 to 5)).collect res19: Array[Int] = Array(1, 1, 2, 2, 3, 4, 5)
scala> val rdd = sc.parallelize(3 to 8) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[70] at parallelize at <console>:24
scala> val rdd1 = sc.parallelize(1 to 5) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[71] at parallelize at <console>:24
scala> rdd.subtract(rdd1).collect() res27: Array[Int] = Array(8, 6, 7)
測試:
scala> val rdd = sc.parallelize(Array(1,1,2))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
scala> rdd.subtract(sc.parallelize(2 to 5)).collect res21: Array[Int] = Array(1, 1)
scala> val rdd1 = sc.parallelize(1 to 7) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(5 to 10) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:24
scala> val rdd3 = rdd1.intersection(rdd2) rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at intersection at <console>:28
scala> rdd3.collect() res19: Array[Int] = Array(5, 6, 7)
測試:
scala> val rdd = sc.parallelize(Array(1,1,2)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24 scala> rdd.intersection(sc.parallelize(2 to 5)).collect res22: Array[Int] = Array(2)
scala> val rdd1 = sc.parallelize(1 to 3) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[47] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(2 to 5) rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[48] at parallelize at <console>:24
scala> rdd1.cartesian(rdd2).collect() res17: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (2,2), (2,3), (2,4), (2,5), (3,2), (3,3), (3,4), (3,5))
測試:
scala> val rdd = sc.parallelize(Array(1,1,2)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24 scala> rdd.cartesian(sc.parallelize(2 to 5)).collect res23: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (1,2), (1,3), (1,4), (1,5), (2,2), (2,3), (2,4), (2,5))
scala> val rdd1 = sc.parallelize(Array(1,2,3),3) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(Array("a","b","c"),3) rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:24
scala> rdd1.zip(rdd2).collect res1: Array[(Int, String)] = Array((1,a), (2,b), (3,c))
scala> rdd2.zip(rdd1).collect res2: Array[(String, Int)] = Array((a,1), (b,2), (c,3))
scala> val rdd3 = sc.parallelize(Array("a","b","c"),2) rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> rdd1.zip(rdd3).collect java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(3, 2) at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.collect(RDD.scala:935) ... 48 elided
scala> val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[44] at parallelize at <console>:24
scala> rdd.partitions.size res24: Int = 4
scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2)) rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[45] at partitionBy at <console>:26
scala> rdd2.partitions.size res25: Int = 2
測試:
scala> val rdd = sc.parallelize(1 to 20) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24 scala> import org.apache.spark._ scala> rdd.map((_,1)).partitionBy(new org.apache.spark.HashPartitioner(4)) //導包後紅色字段代碼能夠省略 res12: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[13] at partitionBy at <console>:28 scala> rdd.map((_,1)).partitionBy(new org.apache.spark.HashPartitioner(4)).partitions.size res13: Int = 4
scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2))) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at parallelize at <console>:24
scala> val reduce = rdd.reduceByKey((x,y) => x+y) reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[47] at reduceByKey at <console>:26
scala> reduce.collect() res29: Array[(String, Int)] = Array((female,6), (male,7))
測試:
scala> val rdd = sc.parallelize(Array(1,1,2)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> rdd.map((_,1)).reduceByKey(_+_) res0: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[2] at reduceByKey at <console>:27 scala> rdd.map((_,1)).reduceByKey(_+_).collect res1: Array[(Int, Int)] = Array((1,2), (2,1)) scala> rdd.map((_,1)).reduceByKey((x,y) => x+y).collect res2: Array[(Int, Int)] = Array((1,2), (2,1))
scala> val words = Array("one", "two", "two", "three", "three", "three") words: Array[String] = Array(one, two, two, three, three, three) scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1)) wordPairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:26
scala> val group = wordPairsRDD.groupByKey() group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[5] at groupByKey at <console>:28
scala> group.collect() res1: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))
scala> group.map(t => (t._1, t._2.sum)) res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:31
scala> res2.collect() res3: Array[(String, Int)] = Array((two,2), (one,1), (three,3))
測試:
scala> val rdd = sc.parallelize(Array(1,1,2)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> rdd.map((_,1)).groupByKey.collect res3: Array[(Int, Iterable[Int])] = Array((1,CompactBuffer(1, 1)), (2,CompactBuffer(1)))
參數: ( zeroValue:U,[partitioner: Partitioner] //一個分區,一個 key
) (
seqOp: (U, V) => U, //分區內遇到 combOp: (U, U) => U )
scala> val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val agg = rdd.aggregateByKey(0)(math.max(_,_),_+_) agg: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[1] at aggregateByKey at <console>:26
scala> agg.collect() res0: Array[(String, Int)] = Array((b,3), (a,3), (c,12))
測試:
scala> val scores = Array(("Fred", 88), ("Fred", 95), ("Fred", 91), ("Wilma", 93), ("Wilma", 95), ("Wilma", 98)) scores: Array[(String, Int)] = Array((Fred,88), (Fred,95), (Fred,91), (Wilma,93), (Wilma,95), (Wilma,98)) scala> val input = sc.parallelize(scores) input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[9] at parallelize at <console>:26 scala> input.aggregateByKey((0,0))((u,v) => (u._1+v,u._2+1),(u1,u2) => (u1._1+u2._1,u1._2+u2._2)) res6: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[11] at aggregateByKey at <console>:29 scala> input.aggregateByKey((0,0))((u,v) => (u._1+v,u._2+1),(u1,u2) => (u1._1+u2._1,u1._2+u2._2)).collect res7: Array[(String, (Int, Int))] = Array((Wilma,(286,3)), (Fred,(274,3)))
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3) rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[91] at parallelize at <console>:24
scala> val agg = rdd.foldByKey(0)(_+_) agg: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[92] at foldByKey at <console>:26
scala> agg.collect() res61: Array[(Int, Int)] = Array((3,14), (1,9), (2,3))
參數: ( createCombiner: V => C, //分區內使用,第一次遇到某個 key 的時 mergeValue: (C, V) => C, //分區內使用,非第一次遇到某個 key 時 mergeCombiners: (C, C) => C //將多個分區的結果進行合併 )
scala> val input = sc.parallelize(Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),2) input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[52] at parallelize at <console>:26
scala> val combine = input.combineByKey((_,1),(acc:(Int,Int),v)=>(acc._1+v,acc._2+1),(acc1:(Int,Int),acc2:(Int,Int)) => (acc1._1+acc2._1,acc1._2+acc2._2)) combine: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[5] at combineByKey at <console>:28
scala> combine.collect res5: Array[(String, (Int, Int))] = Array((b,(286,3)), (a,(274,3)))
scala> val result = combine.map{case (key,value) => (key,value._1/value._2.toDouble)} result: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[54] at map at <console>:30
scala> result.collect() res33: Array[(String, Double)] = Array((b,95.33333333333333), (a,91.33333333333333))
測試:
scala> val scores = Array(("Fred", 88), ("Fred", 95), ("Fred", 91), ("Wilma", 93), ("Wilma", 95), ("Wilma", 98)) scores: Array[(String, Int)] = Array((Fred,88), (Fred,95), (Fred,91), (Wilma,93), (Wilma,95), (Wilma,98)) scala> val input = sc.parallelize(scores) input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[9] at parallelize at <console>:26 scala> input.combineByKey(x => (x,1),(a,b) => (b+a._1,a._2+1),(c1,c2) => (c1._1+c2._1,c1._2+c2._2)).collect <console>:29: error: missing parameter type input.combineByKey(x => (x,1),(a,b) => (b+a._1,a._2+1),(c1,c2) => (c1._1+c2._1,c1._2+c2._2)).collect ^ <console>:29: error: missing parameter type input.combineByKey(x => (x,1),(a,b) => (b+a._1,a._2+1),(c1,c2) => (c1._1+c2._1,c1._2+c2._2)).collect ^ <console>:29: error: missing parameter type input.combineByKey(x => (x,1),(a,b) => (b+a._1,a._2+1),(c1,c2) => (c1._1+c2._1,c1._2+c2._2)).collect ^ scala> input.combineByKey(x => (x,1),(a:(Int,Int),b) => (b+a._1,a._2+1),(c1:(Int,Int),c2:(Int,Int)) => (c1._1+c2._1,c1._2+c2._2)).collect res5: Array[(String, (Int, Int))] = Array((Wilma,(286,3)), (Fred,(274,3)))
scala> val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd"))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[14] at parallelize at <console>:24
scala> rdd.sortByKey(true).collect() res9: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc))
scala> rdd.sortByKey(false).collect() res10: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd))
測試:
scala> rdd.collect res12: Array[Int] = Array(1, 1, 2) scala> rdd.map((_,1)) res9: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[13] at map at <console>:27 scala> rdd.map((_,1)).collect res10: Array[(Int, Int)] = Array((1,1), (1,1), (2,1)) scala> rdd.map((_,1)).sortByKey(true).collect res11: Array[(Int, Int)] = Array((1,1), (1,1), (2,1))
scala> val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c"))) rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[67] at parallelize at <console>:24
scala> rdd3.mapValues(_+"|||").collect() res26: Array[(Int, String)] = Array((1,a|||), (1,d|||), (2,b|||), (3,c|||))
測試:
scala> rdd.collect res12: Array[Int] = Array(1, 1, 2) scala> rdd.map((_,1)).mapValues(_*2).collect res13: Array[(Int, Int)] = Array((1,2), (1,2), (2,2))
scala> rdd.map((_,1)).mapValues(_*2).collect //也可使用模式匹配 res13: Array[(Int, Int)] = Array((1,2), (1,2), (2,2))
scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c"))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[32] at parallelize at <console>:24
scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6))) rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24
scala> rdd.join(rdd1).collect() res13: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))
測試:
scala> val rdd1 = sc.parallelize(1 to 10).map((_,1)) rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[57] at map at <console>:25 scala> val rdd2 = sc.parallelize(5 to 15).map((_,1)) rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[59] at map at <console>:25 scala> rdd1.join(rdd2).collect res28: Array[(Int, (Int, Int))] = Array((8,(1,1)), (9,(1,1)), (5,(1,1)), (6,(1,1)), (10,(1,1)), (7,(1,1)))
scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c"))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[37] at parallelize at <console>:24
scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6))) rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[38] at parallelize at <console>:24
scala> rdd.cogroup(rdd1).collect() res14: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer(4))), (2,(CompactBuffer(b),CompactBuffer(5))), (3,(CompactBuffer(c),CompactBuffer(6))))
測試:
scala> val rdd1 = sc.parallelize(Array(0,1,1,3,3)).map((_,1)) rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[76] at map at <console>:25 scala> val rdd2 = sc.parallelize(Array(0,2,1,4,3)).map((_,1)) rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[78] at map at <console>:25 scala> rdd1.cogroup(rdd2).collect res34: Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((4,(CompactBuffer(),CompactBuffer(1))), (0,(CompactBuffer(1),CompactBuffer(1))), (1,(CompactBuffer(1, 1),CompactBuffer(1))), (2,(CompactBuffer(),CompactBuffer(1))), (3,(CompactBuffer(1, 1),CompactBuffer(1))))
1516609143867 6 7 64 16 1516609143869 9 4 75 18 1516609143869 1 7 87 12
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}
//需求:統計出每個省份廣告被點擊次數的 TOP3 object Practice { def main(args: Array[String]): Unit = {
//1.初始化 spark 配置信息並創建與 spark 的鏈接 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Test") val sc = new SparkContext(sparkConf)
//2.讀取數據生成 RDD:TS,Province,City,User,AD val line = sc.textFile("E:\\IDEAWorkSpace\\SparkTest\\src\\main\\resources\\agent.log")
//3.按照最小粒度聚合:((Province,AD),1) val provinceAdAndOne = line.map { x => val fields: Array[String] = x.split(" ") ((fields(1), fields(3)), 1) }
//4.計算每一個省中每一個廣告被點擊的總數:((Province,AD),sum) val provinceAdToSum = provinceAdAndOne.reduceByKey(_ + _)
//5.將省份做爲 key,廣告加點擊數爲 value:(Province,(AD,sum)) val provinceToAdSum = provinceAdToSum.map(x => (x._1._1, (x._1._2, x._2)))
//6.將同一個省份的全部廣告進行聚合(Province,List((AD1,sum1),(AD2,sum2)...)) val provinceGroup = provinceToAdSum.groupByKey()
//7.對同一個省份全部廣告的集合進行排序並取前 3 條,排序規則爲廣告點擊總數 val provinceAdTop3 = provinceGroup.mapValues { x => x.toList.sortWith((x, y) => x._2 > y._2).take(3) }
//8.將數據拉取到 Driver 端並打印 provinceAdTop3.collect().foreach(println)
//9.關閉與 spark 的鏈接 sc.stop() } }