摘要:面試
RDD:彈性分佈式數據集,是一種特殊集合 ‚ 支持多種來源 ‚ 有容錯機制 ‚ 能夠被緩存 ‚ 支持並行操做,一個RDD表明一個分區裏的數據集數據庫
RDD有兩種操做算子:數組
Transformation(轉換):Transformation屬於延遲計算,當一個RDD轉換成另外一個RDD時並無當即進行轉換,僅僅是記住了數據集的邏輯操做緩存
Ation(執行):觸發Spark做業的運行,真正觸發轉換算子的計算架構
基礎轉換操做:app
1.map(func):數據集中的每一個元素通過用戶自定義的函數轉換造成一個新的RDD,新的RDD叫MappedRDDdom
(例1)分佈式
object Map { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local").setAppName("map") val sc = new SparkContext(conf) val rdd = sc.parallelize(1 to 10) //建立RDD val map = rdd.map(_*2) //對RDD中的每一個元素都乘於2 map.foreach(x => print(x+" ")) sc.stop() } }
輸出:函數
2 4 6 8 10 12 14 16 18 20
(RDD依賴圖:紅色塊表示一個RDD區,黑色塊表示該分區集合,下同)學習
2.flatMap(func):與map相似,但每一個元素輸入項均可以被映射到0個或多個的輸出項,最終將結果」扁平化「後輸出
(例2)
//...省略sc
val rdd = sc.parallelize(1 to 5) val fm = rdd.flatMap(x => (1 to x)).collect() fm.foreach( x => print(x + " "))
輸出:
1 1 2 1 2 3 1 2 3 4 1 2 3 4 5
若是是map函數其輸出以下:
Range(1) Range(1, 2) Range(1, 2, 3) Range(1, 2, 3, 4) Range(1, 2, 3, 4, 5)
(RDD依賴圖)
3.mapPartitions(func):相似與map,map做用於每一個分區的每一個元素,但mapPartitions做用於每一個分區工
func的類型:Iterator[T] => Iterator[U]
假設有N個元素,有M個分區,那麼map的函數的將被調用N次,而mapPartitions被調用M次,當在映射的過程當中不斷的建立對象時就可使用mapPartitions比map的效率要高不少,好比當向數據庫寫入數據時,若是使用map就須要爲每一個元素建立connection對象,但使用mapPartitions的話就須要爲每一個分區建立connetcion對象
(例3):輸出有女性的名字:
object MapPartitions { //定義函數 def partitionsFun(/*index : Int,*/iter : Iterator[(String,String)]) : Iterator[String] = { var woman = List[String]() while (iter.hasNext){ val next = iter.next() next match { case (_,"female") => woman = /*"["+index+"]"+*/next._1 :: woman case _ => } } return woman.iterator } def main(args: Array[String]) { val conf = new SparkConf().setMaster("local").setAppName("mappartitions") val sc = new SparkContext(conf) val l = List(("kpop","female"),("zorro","male"),("mobin","male"),("lucy","female")) val rdd = sc.parallelize(l,2) val mp = rdd.mapPartitions(partitionsFun) /*val mp = rdd.mapPartitionsWithIndex(partitionsFun)*/ mp.collect.foreach(x => (print(x +" "))) //將分區中的元素轉換成Aarray再輸出 } }
輸出:
kpop lucy
其實這個效果能夠用一條語句完成
1
val mp = rdd.mapPartitions(x => x.filter(_._2 == "female")).map(x => x._1)
之因此不那麼作是爲了演示函數的定義
(RDD依賴圖)
4.mapPartitionsWithIndex(func):與mapPartitions相似,不一樣的時函數多了個分區索引的參數
func類型:(Int, Iterator[T]) => Iterator[U]
(例4):將例3橙色的註釋部分去掉便是
輸出:(帶了分區索引)
[0]kpop [1]lucy
5.sample(withReplacement,fraction,seed):以指定的隨機種子隨機抽樣出數量爲fraction的數據,withReplacement表示是抽出的數據是否放回,true爲有放回的抽樣,false爲無放回的抽樣
(例5):從RDD中隨機且有放回的抽出50%的數據,隨機種子值爲3(便可能以1 2 3的其中一個起始值)
//省略 val rdd = sc.parallelize(1 to 10) val sample1 = rdd.sample(true,0.5,3) sample1.collect.foreach(x => print(x + " ")) sc.stop
6.union(ortherDataset):將兩個RDD中的數據集進行合併,最終返回兩個RDD的並集,若RDD中存在相同的元素也不會去重
//省略sc val rdd1 = sc.parallelize(1 to 3) val rdd2 = sc.parallelize(3 to 5) val unionRDD = rdd1.union(rdd2) unionRDD.collect.foreach(x => print(x + " ")) sc.stop
輸出:
1 2 3 3 4 5
7.intersection(otherDataset):返回兩個RDD的交集
//省略sc val rdd1 = sc.parallelize(1 to 3) val rdd2 = sc.parallelize(3 to 5) val unionRDD = rdd1.intersection(rdd2) unionRDD.collect.foreach(x => print(x + " ")) sc.stop
輸出:
3 4
8.distinct([numTasks]):對RDD中的元素進行去重
//省略sc val list = List(1,1,2,5,2,9,6,1) val distinctRDD = sc.parallelize(list) val unionRDD = distinctRDD.distinct() unionRDD.collect.foreach(x => print(x + " "))
輸出:
1 6 9 5 2
9.cartesian(otherDataset):對兩個RDD中的全部元素進行笛卡爾積操做
//省略 val rdd1 = sc.parallelize(1 to 3) val rdd2 = sc.parallelize(2 to 5) val cartesianRDD = rdd1.cartesian(rdd2) cartesianRDD.foreach(x => println(x + " "))
輸出:
(1,2) (1,3) (1,4) (1,5) (2,2) (2,3) (2,4) (2,5) (3,2) (3,3) (3,4) (3,5)
(RDD依賴圖)
10.coalesce(numPartitions,shuffle):對RDD的分區進行從新分區,shuffle默認值爲false,當shuffle=false時,不能增長分區數
目,但不會報錯,只是分區個數仍是原來的
(例9:)shuffle=false
//省略 val rdd = sc.parallelize(1 to 16,4) val coalesceRDD = rdd.coalesce(3) //當suffle的值爲false時,不能增長分區數(即分區數不能從5->7) println("從新分區後的分區個數:"+coalesceRDD.partitions.size)
輸出:
從新分區後的分區個數:3 //分區後的數據集 List(1, 2, 3, 4) List(5, 6, 7, 8) List(9, 10, 11, 12, 13, 14, 15, 16)
(例9.1:)shuffle=true
//...省略 val rdd = sc.parallelize(1 to 16,4) val coalesceRDD = rdd.coalesce(7,true) println("從新分區後的分區個數:"+coalesceRDD.partitions.size) println("RDD依賴關係:"+coalesceRDD.toDebugString)
輸出:
從新分區後的分區個數:5 RDD依賴關係:(5) MapPartitionsRDD[4] at coalesce at Coalesce.scala:14 [] | CoalescedRDD[3] at coalesce at Coalesce.scala:14 [] | ShuffledRDD[2] at coalesce at Coalesce.scala:14 [] +-(4) MapPartitionsRDD[1] at coalesce at Coalesce.scala:14 [] | ParallelCollectionRDD[0] at parallelize at Coalesce.scala:13 [] //分區後的數據集 List(10, 13) List(1, 5, 11, 14) List(2, 6, 12, 15) List(3, 7, 16) List(4, 8, 9)
(RDD依賴圖:coalesce(3,flase))
(RDD依賴圖:coalesce(3,true))
11.repartition(numPartition):是函數coalesce(numPartition,true)的實現,效果和例9.1的coalesce(numPartition,true)的同樣
12.glom():將RDD的每一個分區中的類型爲T的元素轉換換數組Array[T]
//省略 val rdd = sc.parallelize(1 to 16,4) val glomRDD = rdd.glom() //RDD[Array[T]] glomRDD.foreach(rdd => println(rdd.getClass.getSimpleName)) sc.stop
輸出:
int[] //說明RDD中的元素被轉換成數組Array[Int]
13.randomSplit(weight:Array[Double],seed):根據weight權重值將一個RDD劃分紅多個RDD,權重越高劃分獲得的元素較多的概率就越大
//省略sc val rdd = sc.parallelize(1 to 10) val randomSplitRDD = rdd.randomSplit(Array(1.0,2.0,7.0)) randomSplitRDD(0).foreach(x => print(x +" ")) randomSplitRDD(1).foreach(x => print(x +" ")) randomSplitRDD(2).foreach(x => print(x +" ")) sc.stop
輸出:
2 4 3 8 9 1 5 6 7 10
歡迎工做一到五年的Java工程師朋友們加入Java架構開發:jq.qq.com/?_wv=1027&k…
本羣提供免費的學習指導 架構資料 以及免費的解答
不懂得問題均可以在本羣提出來 以後還會有職業生涯規劃以及面試指導
同時你們能夠多多關注一下小編 你們一塊兒學習進步