先啓動spark-shell 經過並行化生成rdd scala> val rdd1 = sc.parallelize(List(63,45,89,23,144,777,888)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:15 查看該RDD的分區數量 scala> rdd1.partitions.length res0: Int = 1 建立時指定分區數量 scala> val rdd1 = sc.parallelize(List(63,45,89,23,144,777,888),3) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:15 查看分區數量 scala> rdd1.partitions.length res1: Int = 3
對rdd1裏的每一個元素乘以2,而後排序 scala> val rdd1 = sc.parallelize(List(1,2,100,3,4)) scala> val rdd2 = rdd1.map(x => x*2).collect rdd2: Array[Int] = Array(2, 4, 200, 6, 8) scala> val rdd3 = rdd1.map(_*2).sortBy(x =>x,true).collect rdd3: Array[Int] = Array(2, 4, 6, 8, 200) 過濾出大於等於50的元素 scala> val rdd1 = sc.parallelize(List(1,2,100,3,4)) scala> val rdd2 = rdd1.filter(_>50).collect rdd2: Array[Int] = Array(100) scala> val rdd2 = rdd1.filter(x => x>50).collect rdd2: Array[Int] = Array(100) 過濾出偶數 scala> val rdd2 = rdd1.filter(_%2==0).collect rdd2: Array[Int] = Array(2, 100, 4)
scala> val rdd1 = sc.parallelize(Array("w u","j i a","d o n g")) 按空格分隔 scala> rdd1.map(_.split(" ")).collect res28: Array[Array[String]] = Array(Array(w, u), Array(j, i, a), Array(d, o, n, g)) 分隔並壓平 scala> val rdd2 = rdd1.flatMap(_.split(" ")) scala> rdd2.collect res5: Array[String] = Array(w, u, j, i, a, d, o, n, g) scala> val rdd1 = sc.parallelize(List(List("w u","j i a","d o n g"),List("j i a n g","r u i"))) scala> val rdd2 = rdd1.map(_.map(_.split(" "))).collect rdd2: Array[List[Array[String]]] = Array(List(Array(w, u), Array(j, i, a), Array(d, o, n, g)), List(Array(j, i, a, n, g), Array(r, u, i))) scala> val rdd2 = rdd1.map(_.flatMap(_.split(" "))).collect rdd2: Array[List[String]] = Array(List(w, u, j, i, a, d, o, n, g), List(j, i, a, n, g, r, u, i)) scala> val rdd2 = rdd1.flatMap(_.flatMap(_.split(" "))).collect rdd2: Array[String] = Array(w, u, j, i, a, d, o, n, g, j, i, a, n, g, r, u, i)
練習3(union,intersecttion,distinct)shell
scala> val rdd1 = sc.parallelize(List(1,2,3,4)) scala> val rdd2 = sc.parallelize(List(5,6,4,3)) 求並集 scala> val rdd3 = rdd1.union(rdd2) rdd3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 4, 3) 求交集 scala> val rdd4 = rdd1.intersection(rdd2).collect rdd4: Array[Int] = Array(4, 3) 去重 scala> val rdd5 = rdd3.distinct rdd5: Array[Int] = Array(1, 2, 3, 4, 5, 6)
對rdd1裏的每一個元素乘以2,而後排序 scala> val rdd1 = sc.parallelize(List(1,2,100,3,4)) scala> val rdd2 = rdd1.map(_*2).sortBy(x => x,true)//爲何sortBy裏面用下劃線不行? scala> rdd2.collect res21: Array[Int] = Array(2, 4, 6, 8, 200) 注意區別一下兩種狀況 scala> rdd1.sortBy(x=>x+"",true).collect res22: Array[Int] = Array(1, 100, 2, 3, 4) scala> rdd1.sortBy(x=>"x",true).collect res23: Array[Int] = Array(1, 2, 100, 3, 4) 轉換成字符串排序 scala> val rdd2 = rdd1.sortBy(x=>x+"",true).collect rdd2: Array[Int] = Array(144, 23, 45, 63, 777, 888, 89) scala> val rdd2 = rdd1.sortBy(x=>x.toString,true).collect//或者sortBu(_.toString,true) rdd2: Array[Int] = Array(144, 23, 45, 63, 777, 888, 89)
scala> val rdd1 = sc.parallelize(Array(("class1",50),("class2",80),("class2",70),("class1",90))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[29] at parallelize at <console>:15 scala> val rdd2 = rdd1.groupByKey() rdd2: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[30] at groupByKey at <console>:17 scala> val rdd2 = rdd1.groupByKey().collect rdd2: Array[(String, Iterable[Int])] = Array((class1,CompactBuffer(50, 90)), (class2,CompactBuffer(80, 70))) scala> rdd2.foreach(score => {println(score._1);score._2.foreach(singlescore => println(singlescore))}) class1 50 90 class2 80 70 scala> val rdd1 = sc.parallelize(Array(("class1",50),("class2",80),("class2",70),("class1",90))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at parallelize at <console>:15 scala> val rdd2 = rdd1.reduceByKey(_+_).collect rdd2: Array[(String, Int)] = Array((class1,140), (class2,150)) scala> val rdd1 = sc.parallelize(List(("tom",1),("jerry",2),("kitty",3))) scala> val rdd2 = sc.parallelize(List(("jerry",9),("tom",8),("shuke",7))) scala> val rdd3 = rdd1.union(rdd2) 按key進行聚合 scala> val rdd4 = rdd3.reduceByKey(_+_) scala> rdd4.collect res23: Array[(String, Int)] = Array((tom,9), (jerry,11), (shuke,7), (kitty,3)) 按value的降序排序 scala> val rdd5 = rdd4.map(t=>(t._2,t._1)).sortByKey(false).map(t=>(t._2,t._1)) scala> rdd5.collect res24: Array[(String, Int)] = Array((jerry,11), (tom,9), (shuke,7), (kitty,3)) scala> val rdd1 = sc.parallelize(Array(("class1",50),("class2",80),("class2",70),("class1",90))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:15 scala> val rdd2 = rdd1.sortByKey().collect rdd2: Array[(String, Int)] = Array((class1,50), (class1,90), (class2,80), (class2,70)) scala> rdd2.foreach(score => println(score._1+":"+score._2)) class1:50 class1:90 class2:80 class2:70
scala> val rdd1 = sc.parallelize(List(("tom",1),("jerry",2),("kitty",3))) scala> val rdd2 = sc.parallelize(List(("jerry",9),("tom",8),("shuke",7))) scala> val rdd3 = rdd1.join(rdd2).collect rdd3: Array[(String, (Int, Int))] = Array((tom,(1,8)), (jerry,(2,9))) scala> rdd2.join(rdd1).collect res5: Array[(String, (Int, Int))] = Array((tom,(8,1)), (jerry,(9,2))) scala> val rdd2 = sc.parallelize(List(("jerry",9),("tom",8),("shuke",7),("tom",2))) scala> val rdd3 = rdd1.join(rdd2).collect rdd3: Array[(String, (Int, Int))] = Array((tom,(1,8)), (tom,(1,2)), (jerry,(2,9))) scala> val rdd3 = rdd1.leftOuterJoin(rdd2).collect rdd3: Array[(String, (Int, Option[Int]))] = Array((tom,(1,Some(8))), (tom,(1,Some(2))), (jerry,(2,Some(9))), (kitty,(3,None))) scala> val rdd3 = rdd1.rightOuterJoin(rdd2).collect rdd3: Array[(String, (Option[Int], Int))] = Array((tom,(Some(1),8)), (tom,(Some(1),2)), (jerry,(Some(2),9)), (shuke,(None,7))) scala> val rdd3 = rdd1.union(rdd2).collect rdd3: Array[(String, Int)] = Array((tom,1), (jerry,2), (kitty,3), (jerry,9), (tom,8), (shuke,7), (tom,2)) scala> val rdd3 = rdd1.union(rdd2) scala> val rdd4 = rdd3.groupByKey scala> rdd4.collect res11: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(1, 8, 2)), (jerry,CompactBuffer(2, 9)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3) 求每一個單詞出現的次數 scala> val rdd5 = rdd3.groupByKey.map(x=>(x._1,x._2.sum)) scala> rdd5.collect res12: Array[(String, Int)] = Array((tom,11), (jerry,11), (shuke,7), (kitty,3)) scala> rdd3.groupByKey.mapValues(_.sum).collect res14: Array[(String, Int)] = Array((tom,11), (jerry,11), (shuke,7), (kitty,3))
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5),2) scala> val rdd2 = rdd1.reduce(_+_) rdd2: Int = 15 scala> rdd1.count res15: Long = 5 排序後取最大2個 scala> rdd1.top(2) res17: Array[Int] = Array(5, 4) 取前2個 scala> rdd1.take(2) res18: Array[Int] = Array(1, 2) 取第1個元素 scala> rdd1.first res20: Int = 1 scala> rdd1.takeOrdered(3) res22: Array[Int] = Array(1, 2, 3)
scala> val rdd1 = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c"))) scala> val rdd2 = sc.parallelize(Array((1,100),(2,97),(3,100))) scala> val rdd3 = rdd1.cogroup(rdd2) scala> rdd3.collect res2: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer(100))), (3,(CompactBuffer(c),CompactBuffer(100))), (2,(CompactBuffer(b),CompactBuffer(97))))