Spark函數詳解系列之RDD基本轉換

摘要:面試

 RDD:彈性分佈式數據集,是一種特殊集合 ‚ 支持多種來源 ‚ 有容錯機制 ‚ 能夠被緩存 ‚ 支持並行操做,一個RDD表明一個分區裏的數據集數據庫

 RDD有兩種操做算子:數組

     Transformation(轉換):Transformation屬於延遲計算,當一個RDD轉換成另外一個RDD時並無當即進行轉換,僅僅是記住了數據集的邏輯操做緩存

     Ation(執行):觸發Spark做業的運行,真正觸發轉換算子的計算架構

基礎轉換操做:app

1.map(func):數據集中的每一個元素通過用戶自定義的函數轉換造成一個新的RDD,新的RDD叫MappedRDDdom

(例1)分佈式

object Map {
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("map")
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(1 to 10)  //建立RDD
    val map = rdd.map(_*2)             //對RDD中的每一個元素都乘於2
    map.foreach(x => print(x+" "))
    sc.stop()
  }
}

輸出:函數

2 4 6 8 10 12 14 16 18 20

 

 

 

 

 

(RDD依賴圖:紅色塊表示一個RDD區,黑色塊表示該分區集合,下同)學習

 

 

 

 

 

2.flatMap(func):與map相似,但每一個元素輸入項均可以被映射到0個或多個的輸出項,最終將結果」扁平化「後輸出

(例2)

//...省略sc

   val rdd = sc.parallelize(1 to 5)
   val fm = rdd.flatMap(x => (1 to x)).collect()
   fm.foreach( x => print(x + " "))

輸出:

1 1 2 1 2 3 1 2 3 4 1 2 3 4 5

若是是map函數其輸出以下:

Range(1) Range(1, 2) Range(1, 2, 3) Range(1, 2, 3, 4) Range(1, 2, 3, 4, 5)

 (RDD依賴圖)

 

 

 

 

 

 

3.mapPartitions(func):相似與map,map做用於每一個分區的每一個元素,但mapPartitions做用於每一個分區工

func的類型:Iterator[T] => Iterator[U]

假設有N個元素,有M個分區,那麼map的函數的將被調用N次,而mapPartitions被調用M次,當在映射的過程當中不斷的建立對象時就可使用mapPartitions比map的效率要高不少,好比當向數據庫寫入數據時,若是使用map就須要爲每一個元素建立connection對象,但使用mapPartitions的話就須要爲每一個分區建立connetcion對象

(例3):輸出有女性的名字:

object MapPartitions {
//定義函數
  def partitionsFun(/*index : Int,*/iter : Iterator[(String,String)]) : Iterator[String] = {
    var woman = List[String]()
    while (iter.hasNext){
      val next = iter.next()
      next match {
        case (_,"female") => woman = /*"["+index+"]"+*/next._1 :: woman
        case _ =>
      }
    }
    return  woman.iterator
  }
 
  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("mappartitions")
    val sc = new SparkContext(conf)
    val l = List(("kpop","female"),("zorro","male"),("mobin","male"),("lucy","female"))
    val rdd = sc.parallelize(l,2)
    val mp = rdd.mapPartitions(partitionsFun)
    /*val mp = rdd.mapPartitionsWithIndex(partitionsFun)*/
    mp.collect.foreach(x => (print(x +" ")))   //將分區中的元素轉換成Aarray再輸出
  }
}

輸出:

kpop lucy

其實這個效果能夠用一條語句完成

1

val mp = rdd.mapPartitions(x => x.filter(_._2 == "female")).map(x => x._1) 

之因此不那麼作是爲了演示函數的定義

 

 

 

 

  (RDD依賴圖)

 

 

 

 

 

4.mapPartitionsWithIndex(func):與mapPartitions相似,不一樣的時函數多了個分區索引的參數

func類型:(Int, Iterator[T]) => Iterator[U]

(例4):將例3橙色的註釋部分去掉便是

輸出:(帶了分區索引)

[0]kpop [1]lucy

 

5.sample(withReplacement,fraction,seed):以指定的隨機種子隨機抽樣出數量爲fraction的數據,withReplacement表示是抽出的數據是否放回,true爲有放回的抽樣,false爲無放回的抽樣

(例5):從RDD中隨機且有放回的抽出50%的數據,隨機種子值爲3(便可能以1 2 3的其中一個起始值)

//省略
    val rdd = sc.parallelize(1 to 10)
    val sample1 = rdd.sample(true,0.5,3)
    sample1.collect.foreach(x => print(x + " "))
    sc.stop

 

6.union(ortherDataset):將兩個RDD中的數據集進行合併,最終返回兩個RDD的並集,若RDD中存在相同的元素也不會去重

//省略sc
   val rdd1 = sc.parallelize(1 to 3)
   val rdd2 = sc.parallelize(3 to 5)
   val unionRDD = rdd1.union(rdd2)
   unionRDD.collect.foreach(x => print(x + " "))
   sc.stop 

輸出:

1 2 3 3 4 5

 

7.intersection(otherDataset):返回兩個RDD的交集

//省略sc
val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(3 to 5)
val unionRDD = rdd1.intersection(rdd2)
unionRDD.collect.foreach(x => print(x + " "))
sc.stop 

輸出:

3 4

 

8.distinct([numTasks]):對RDD中的元素進行去重

//省略sc
val list = List(1,1,2,5,2,9,6,1)
val distinctRDD = sc.parallelize(list)
val unionRDD = distinctRDD.distinct()
unionRDD.collect.foreach(x => print(x + " "))

輸出:

1 6 9 5 2

 

9.cartesian(otherDataset):對兩個RDD中的全部元素進行笛卡爾積操做

//省略
val rdd1 = sc.parallelize(1 to 3)
val rdd2 = sc.parallelize(2 to 5)
val cartesianRDD = rdd1.cartesian(rdd2)
cartesianRDD.foreach(x => println(x + " ")) 

輸出:

(1,2)
(1,3)
(1,4)
(1,5)
(2,2)
(2,3)
(2,4)
(2,5)
(3,2)
(3,3)
(3,4)
(3,5)

 (RDD依賴圖)

 

 

 

 

 

 

 

10.coalesce(numPartitions,shuffle):對RDD的分區進行從新分區,shuffle默認值爲false,當shuffle=false時,不能增長分區數

目,但不會報錯,只是分區個數仍是原來的

(例9:)shuffle=false

//省略 
val rdd = sc.parallelize(1 to 16,4)
val coalesceRDD = rdd.coalesce(3) //當suffle的值爲false時,不能增長分區數(即分區數不能從5->7)
println("從新分區後的分區個數:"+coalesceRDD.partitions.size) 

輸出:

從新分區後的分區個數:3
//分區後的數據集
List(1, 2, 3, 4)
List(5, 6, 7, 8)
List(9, 10, 11, 12, 13, 14, 15, 16)

 

(例9.1:)shuffle=true

//...省略
val rdd = sc.parallelize(1 to 16,4)
val coalesceRDD = rdd.coalesce(7,true)
println("從新分區後的分區個數:"+coalesceRDD.partitions.size)
println("RDD依賴關係:"+coalesceRDD.toDebugString)

輸出:

從新分區後的分區個數:5
RDD依賴關係:(5) MapPartitionsRDD[4] at coalesce at Coalesce.scala:14 []
| CoalescedRDD[3] at coalesce at Coalesce.scala:14 []
| ShuffledRDD[2] at coalesce at Coalesce.scala:14 []
+-(4) MapPartitionsRDD[1] at coalesce at Coalesce.scala:14 []
| ParallelCollectionRDD[0] at parallelize at Coalesce.scala:13 []
//分區後的數據集
List(10, 13)
List(1, 5, 11, 14)
List(2, 6, 12, 15)
List(3, 7, 16)
List(4, 8, 9)

 (RDD依賴圖:coalesce(3,flase))

 

 

 

 

 

 (RDD依賴圖:coalesce(3,true))

 

 

 

 

 

 

11.repartition(numPartition):是函數coalesce(numPartition,true)的實現,效果和例9.1的coalesce(numPartition,true)的同樣

12.glom():將RDD的每一個分區中的類型爲T的元素轉換換數組Array[T]

//省略
val rdd = sc.parallelize(1 to 16,4)
val glomRDD = rdd.glom() //RDD[Array[T]]
glomRDD.foreach(rdd => println(rdd.getClass.getSimpleName))
sc.stop 

輸出:

int[] //說明RDD中的元素被轉換成數組Array[Int]

 

 

 

 

13.randomSplit(weight:Array[Double],seed):根據weight權重值將一個RDD劃分紅多個RDD,權重越高劃分獲得的元素較多的概率就越大

//省略sc
val rdd = sc.parallelize(1 to 10)
val randomSplitRDD = rdd.randomSplit(Array(1.0,2.0,7.0))
randomSplitRDD(0).foreach(x => print(x +" "))
randomSplitRDD(1).foreach(x => print(x +" "))
randomSplitRDD(2).foreach(x => print(x +" "))
sc.stop 

輸出:

2 4
3 8 9
1 5 6 7 10

歡迎工做一到五年的Java工程師朋友們加入Java架構開發:jq.qq.com/?_wv=1027&k…

本羣提供免費的學習指導 架構資料 以及免費的解答

不懂得問題均可以在本羣提出來 以後還會有職業生涯規劃以及面試指導

同時你們能夠多多關注一下小編 你們一塊兒學習進步

相關文章
相關標籤/搜索