大數據教程:Transformation和Action算子演示
1、Transformation算子演示
val conf = new SparkConf().setAppName("Test").setMaster("local")數據庫
val sc = new SparkContext(conf) //經過並行化生成rdd val rdd = sc.parallelize(List(5,6,4,7,3,8,2,9,10)) //map:對rdd裏面每個元乘以2而後排序 val rdd2: RDD[Int] = rdd.map(_ * 2) //collect以數組的形式返回數據集的全部元素(是Action算子) println(rdd2.collect().toBuffer) //filter:該RDD由通過func函數計算後返回值爲true的輸入元素組成 val rdd3: RDD[Int] = rdd2.filter(_ > 10) println(rdd3.collect().toBuffer) val rdd4 = sc.parallelize(Array("a b c","b c d")) //flatMap:將rdd4中的元素進行切分後壓平 val rdd5: RDD[String] = rdd4.flatMap(_.split(" ")) println(rdd5.collect().toBuffer) //假如: List(List(" a,b" ,"b c"),List("e c"," i o")) //壓平 flatMap(_.flatMap(_.split(" "))) //sample隨機抽樣 //withReplacement表示是抽出的數據是否放回,true爲有放回的抽樣,false爲無放回的抽樣 //fraction抽樣比例例如30% 即0.3 可是這個值是一個浮動的值不許確 //seed用於指定隨機數生成器種子 默認參數不傳 val rdd5_1 = sc.parallelize(1 to 10) val sample = rdd.sample(false,0.5) println(sample.collect().toBuffer) //union:求並集 val rdd6 = sc.parallelize(List(5,6,7,8)) val rdd7 = sc.parallelize(List(1,2,5,6)) val rdd8 = rdd6 union rdd7 println(rdd8.collect.toBuffer) //intersection:求交集 val rdd9 = rdd6 intersection rdd7 println(rdd9.collect.toBuffer) //distinct:去重出重複 println(rdd8.distinct.collect.toBuffer) //join相同的key會被合併 val rdd10_1 = sc.parallelize(List(("tom",1),("jerry" ,3),("kitty",2))) val rdd10_2 = sc.parallelize(List(("jerry" ,2),("tom",2),("dog",10))) val rdd10_3 = rdd10_1 join rdd10_2 println(rdd10_3.collect().toBuffer) //左鏈接和右鏈接 //除基準值外是Option類型,由於可能存在空值因此使用Option val rdd10_4 = rdd10_1 leftOuterJoin rdd10_2 //以左邊爲基準沒有是null val rdd10_5 = rdd10_1 rightOuterJoin rdd10_2 //以右邊爲基準沒有是null println(rdd10_4.collect().toList) println(rdd10_5.collect().toBuffer) val rdd11_1 = sc.parallelize(List(("tom",1),("jerry" ,3),("kitty",2))) val rdd11_2 = sc.parallelize(List(("jerry" ,2),("tom",2),("dog",10))) //笛卡爾積 val rdd11_3 = rdd11_1 cartesian rdd11_2 println(rdd11_3.collect.toBuffer)
//根據傳入的參數進行分組數組
val rdd11_5_1 = rdd11_4.groupBy(_._1) println(rdd11_5_1.collect().toList) //按照相同key進行分組,而且能夠制定分區 val rdd11_5_2 = rdd11_4.groupByKey println(rdd11_5_2.collect().toList) //根據相同key進行分組[分組的話須要二元組] //cogroup 和 groupBykey的區別 //cogroup不須要對數據先進行合併就以進行分組 獲得的結果是 同一個key 和不一樣數據集中的數據集合 //groupByKey是須要先進行合併而後在根據相同key進行分組 val rdd11_6: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd11_1 cogroup rdd11_2 println(rdd11_6)
2、Action算子演示
val conf = new SparkConf().setAppName("Test").setMaster("local[*]")函數
val sc = new SparkContext(conf) /* Action 算子*/ //集合函數 val rdd1 = sc.parallelize(List(2,1,3,6,5),2) val rdd1_1 = rdd1.reduce(_+_) println(rdd1_1) //以數組的形式返回數據集的全部元素 println(rdd1.collect().toBuffer) //返回RDD的元素個數 println(rdd1.count()) //取出對應數量的值 默認降序, 若輸入0 會返回一個空數組 println(rdd1.top(3).toBuffer) //順序取出對應數量的值 println(rdd1.take(3).toBuffer) //順序取出對應數量的值 默認生序 println(rdd1.takeOrdered(3).toBuffer) //獲取第一個值 等價於 take(1) println(rdd1.first()) //將處理事後的數據寫成文件(存儲在HDFS或本地文件系統) //rdd1.saveAsTextFile("dir/file1") //統計key的個數並生成map k是key名 v是key的個數 val rdd2 = sc.parallelize(List(("key1",2),("key2",1),("key3",3),("key4",6),("key5",5)),2) val rdd2_1: collection.Map[String, Long] = rdd2.countByKey() println(rdd2_1) //遍歷數據 rdd1.foreach(x => println(x)) /*其餘算子*/ //統計value的個數 可是會將集合中的一個元素看作是一個vluae val value: collection.Map[(String, Int), Long] = rdd2.countByValue println(value) //filterByRange:對RDD中的元素進行過濾,返回指定範圍內的數據 val rdd3 = sc.parallelize(List(("e",5),("c",3),("d",4),("c",2),("a",1))) val rdd3_1: RDD[(String, Int)] = rdd3.filterByRange("c","e")//包括開始和結束的 println(rdd3_1.collect.toList) //flatMapValues對參數進行扁平化操做,是value的值 val rdd3_2 = sc.parallelize(List(("a","1 2"),("b","3 4"))) println( rdd3_2.flatMapValues(_.split(" ")).collect.toList) //foreachPartition 循環的是分區數據 // foreachPartiton通常應用於數據的持久化,存入數據庫,能夠進行分區的數據存儲 val rdd4 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3) rdd4.foreachPartition(x => println(x.reduce(_+_))) //keyBy 以傳入的函數返回值做爲key ,RDD中的元素爲value 新的元組 val rdd5 = sc.parallelize(List("dog","cat","pig","wolf","bee"),3) val rdd5_1: RDD[(Int, String)] = rdd5.keyBy(_.length) println(rdd5_1.collect.toList) //keys獲取全部的key values 獲取全部的values println(rdd5_1.keys.collect.toList) println(rdd5_1.values.collect.toList) //collectAsMap 將須要的二元組轉換成Map val map: collection.Map[String, Int] = rdd2.collectAsMap() println(map)