Spark筆記:複雜RDD的API的理解(上)

  本篇接着講解RDD的API,講解那些不是很容易理解的API,同時本篇文章還將展現如何將外部的函數引入到RDD的API裏使用,最後經過對RDD的API深刻學習,咱們還講講一些和RDD開發相關的scala語法。算法

1)  aggregate(zeroValue)(seqOp,combOp)數組

   該函數的功能和reduce函數同樣,也是對數據進行聚合操做,不過aggregate能夠返回和原RDD不一樣的數據類型,使用時候還要提供初始值。數據結構

  咱們來看看下面的用法,代碼以下:函數

    val rddInt: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5), 1)

    val rddAggr1: (Int, Int) = rddInt.aggregate((0, 0))((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2))
    println("====aggregate 1====:" + rddAggr1.toString()) // (15,5)

  該方法是將有數字組成的RDD的數值進行求和,同時還要統計元素的個數,這樣咱們就能夠計算出一個平均值,這點在實際運算中是很是有用的。學習

  假如讀者不太懂scala語言,或者就算懂那麼一點點scala語法,該API的使用仍是讓人很難理解的,這個x是什麼東西,這個y又是什麼東西,爲何它們放在一塊兒這麼運算就能夠獲得預期結果呢?spa

  其實aggregate方法使用了scala裏元組的結構,元組是scala裏很具特點的數據結構,咱們看看下面的代碼:scala

    val tuple2Param1:Tuple2[String,Int] = Tuple2("x01",12)// 標準定義二元組
    val tuple2Param2:(String,Int) = ("x02",29)// 字面量定義二元組
    
    /* 結果: x01:12*/
    println("====tuple2Param1====:" + tuple2Param1._1 + ":" + tuple2Param1._2)
    /* 結果: x02:29 */
    println("====tuple2Param2====:" + tuple2Param2._1 + ":" + tuple2Param2._2)
    
    val tuple6Param1:Tuple6[String,Int,Int,Int,Int,String] = Tuple6("xx01",1,2,3,4,"x1x")// 標準定義6元組
    val tuple6Param2:(String,Int,Int,Int,Int,String) = ("xx02",1,2,3,4,"x2x")// 字面量定義6元組
    
    /* 結果: xx01:1:2:3:4:x1x */
    println("====tuple6Param1====:" + tuple6Param1._1 + ":" + tuple6Param1._2 + ":" + tuple6Param1._3 + ":" + tuple6Param1._4 + ":" + tuple6Param1._5 + ":" + tuple6Param1._6)
    /* 結果: xx02:1:2:3:4:x2x */
    println("====tuple6Param2====:" + tuple6Param2._1 + ":" + tuple6Param2._2 + ":" + tuple6Param2._3 + ":" + tuple6Param2._4 + ":" + tuple6Param2._5 + ":" + tuple6Param2._6)

  元組在scala裏使用Tuple來構造,不過實際運用中咱們會給Tuple帶上數字後綴,例如Tuple2就是二元組它包含兩個元素,Tuple6是6元組它包含6個元素,元組看起來很像數組,可是數組只能存儲相同數據類型的數據結構,而元組是能夠存儲不一樣數據類型的數據結構,元組裏元素訪問使用_1,_2這樣的形式,第一個元素是從1開始標記的,這點和數組是不一樣的。實際使用中咱們不多使用Tuple構造元組,而是使用字面量定義方式(參見代碼註釋),由此咱們能夠看出spark裏鍵值對RDD其實就是使用二元組來表示鍵值對數據結構,回到aggregate方法,它的運算也是經過二元組這種數據結構完成的。blog

  下面咱們來看看aggregate的運算過程,這裏我將aggregate方法裏的算子都使用外部函數,代碼以下所示:開發

  def aggrFtnOne(par: ((Int, Int), Int)): (Int, Int) = {
    /*
       *aggregate的初始值爲(0,0):
        ====aggrFtnOne Param===:((0,0),1)
	====aggrFtnOne Param===:((1,1),2)
	====aggrFtnOne Param===:((3,2),3)
	====aggrFtnOne Param===:((6,3),4)
	====aggrFtnOne Param===:((10,4),5)*/
    /*
       *aggregate的初始值爲(1,1):
        ====aggrFtnOne Param===:((1,1),1)
        ====aggrFtnOne Param===:((2,2),2)
        ====aggrFtnOne Param===:((4,3),3)
        ====aggrFtnOne Param===:((7,4),4)
        ====aggrFtnOne Param===:((11,5),5)
       * */
    println("====aggrFtnOne Param===:" + par.toString())
    val ret: (Int, Int) = (par._1._1 + par._2, par._1._2 + 1)
    ret
  }

  def aggrFtnTwo(par: ((Int, Int), (Int, Int))): (Int, Int) = {
    /*aggregate的初始值爲(0,0):::::((0,0),(15,5))*/
    /*aggregate的初始值爲(1,1):::::((1,1),(16,6))*/
    println("====aggrFtnTwo Param===:" + par.toString())
    val ret: (Int, Int) = (par._1._1 + par._2._1, par._1._2 + par._2._2)
    ret
  }

    val rddAggr2: (Int, Int) = rddInt.aggregate((0, 0))((x, y) => aggrFtnOne(x, y), (x, y) => aggrFtnTwo(x, y)) // 參數能夠省略元組的括號
    println("====aggregate 2====:" + rddAggr2.toString()) // (15,5)

    val rddAggr3: (Int, Int) = rddInt.aggregate((1, 1))((x, y) => aggrFtnOne((x, y)), (x, y) => aggrFtnTwo((x, y))) // 參數使用元組的括號
    println("====aggregate 3====:" + rddAggr3.toString()) // (17,7)

  由以上代碼咱們就能夠清晰看到aggregate方法的實際運算過程了。spark

  aggrFtnOne方法的參數格式是((Int, Int), Int),這個複合二元組裏第二個元素纔是實際的值,而第一個元素就是咱們給出的初始化值,第一個元素裏的第一個值就是咱們實際求和的值,裏面第二個元素就是累計記錄元素個數的值。

  aggrFtnTwo方法的參數裏的二元組第一個元素仍是初始化值,第二個元素則是aggrFtnOne計算的結果,這樣咱們就能夠計算出咱們要的結果。

  做爲對比我將初始化參數改成(1,1)二元組,最終結果在求和運算以及計算元素個數上都會加2,這是由於初始化值兩次參入求和所致的,由上面代碼咱們能夠很清晰的看到緣由所在。

  若是咱們想要結果二元組裏第一個元素求積那麼初始化值就不能是(0,0),而應該是(1,0),理解了原理咱們就很清晰知道初始值該如何設定了,具體代碼以下:

    val rddAggr4: (Int, Int) = rddInt.aggregate((1, 0))((x, y) => (x._1 * y, x._2 + 1), (x, y) => (x._1 * y._1, x._2 + y._2))
    println("====aggregate 4====:" + rddAggr4.toString()) // (120,5) 

  

2) fold(zero)(func)

 該函數和reduce函數功能同樣,只不過使用時候須要加上初始化值。

 代碼以下所示:

  def foldFtn(par: (Int, Int)): Int = {
    /*fold初始值爲0:
        =====foldFtn Param====:(0,1)
        =====foldFtn Param====:(1,2)
        =====foldFtn Param====:(3,3)
        =====foldFtn Param====:(6,4)
        =====foldFtn Param====:(10,5)
        =====foldFtn Param====:(0,15)
       * */
    /*
       * fold初始值爲1:
        =====foldFtn Param====:(1,1)
        =====foldFtn Param====:(2,2)
        =====foldFtn Param====:(4,3)
        =====foldFtn Param====:(7,4)
        =====foldFtn Param====:(11,5)
        =====foldFtn Param====:(1,16)
       * */
    println("=====foldFtn Param====:" + par.toString())
    val ret: Int = par._1 + par._2
    ret
  }

    val rddFold2: Int = rddInt.fold(0)((x, y) => foldFtn(x, y)) // 參數能夠省略元組的括號
    println("====fold 2=====:" + rddFold2) // 15

    val rddFold3: Int = rddInt.fold(1)((x, y) => foldFtn((x, y))) // 參數使用元組的括號
    println("====fold 3====:" + rddFold3) // 17

  咱們發現當初始化值爲1時候,求和增長的不是1而是2,緣由就是fold計算時候爲了湊齊一個完整的二元組,在第一個元素計算以及最後一個元素計算時候都會讓初始化值湊數組成二元組,所以初始值會被使用兩遍求和,所以實際結果就不是增長1,而是增長2了。

  做爲對比咱們看看reduce實際運算過程,代碼以下:

  def reduceFtn(par:(Int,Int)):Int = {
    /*
     * ======reduceFtn Param=====:1:2
			 ======reduceFtn Param=====:3:3
       ======reduceFtn Param=====:6:4
       ======reduceFtn Param=====:10:5
     */
    println("======reduceFtn Param=====:" + par._1 + ":" + par._2)
    par._1 + par._2
  }

    val rddReduce1:Int = rddInt.reduce((x,y) => x + y)
    println("====rddReduce 1====:" + rddReduce1)// 15
    
    val rddReduce2:Int = rddInt.reduce((x,y) => reduceFtn(x,y))
    println("====rddReduce 2====:" + rddReduce2)// 15

  

3) combineByKey[C](createCombiner: Int => C, mergeValue: (C, Int) => C, mergeCombiners: (C, C) => C): RDD[(String, C)]

  combineByKey做用是使用不一樣的返回類型合併具備相同鍵的值,combineByKey適用鍵值對RDD,普通RDD是沒有這個方法。

  有上面定義咱們看到combineByKey會通過三輪運算,前一個運算步驟結果就是下一個運算步驟的參數,咱們看下面的代碼:

  def combineFtnOne(par:Int):(Int,Int) = {
    /*
     * ====combineFtnOne Param====:2
       ====combineFtnOne Param====:5
       ====combineFtnOne Param====:8
       ====combineFtnOne Param====:3
     */
    println("====combineFtnOne Param====:" + par)
    val ret:(Int,Int) = (par,1)
    ret
  }
  
  def combineFtnTwo(par:((Int,Int),Int)):(Int,Int) = {
    /*
      ====combineFtnTwo Param====:((2,1),12)
      ====combineFtnTwo Param====:((8,1),9) 
     * */
    println("====combineFtnTwo Param====:" + par.toString())
    val ret:(Int,Int) = (par._1._1 + par._2,par._1._2 + 1)
    ret
  }
  
  def combineFtnThree(par:((Int,Int),(Int,Int))):(Int,Int) = {
    /*
     * 無結果打印
     */
    println("@@@@@@@@@@@@@@@@@@")
    println("====combineFtnThree Param===:" + par.toString())
    val ret:(Int,Int) = (par._1._1 + par._2._1,par._1._2 + par._2._2)
    ret
  }

    val rddPair: RDD[(String, Int)] = sc.parallelize(List(("x01", 2), ("x02", 5), ("x03", 8), ("x04", 3), ("x01", 12), ("x03", 9)), 1)
    
    /* def combineByKey[C](createCombiner: Int => C, mergeValue: (C, Int) => C, mergeCombiners: (C, C) => C): RDD[(String, C)] */    
    val rddCombine1:RDD[(String,(Int,Int))] = rddPair.combineByKey(x => (x, 1), (com: (Int, Int), x) => (com._1 + x, com._2 + 1), (com1: (Int, Int), com2: (Int, Int)) => (com1._1 + com2._1, com1._2 + com2._2))
    println("====combineByKey 1====:" + rddCombine1.collect().mkString(",")) // (x02,(5,1)),(x03,(17,2)),(x01,(14,2)),(x04,(3,1))
    
    val rddCombine2:RDD[(String,(Int,Int))] = rddPair.combineByKey(x => combineFtnOne(x), (com: (Int, Int), x) => combineFtnTwo(com,x), (com1: (Int, Int), com2: (Int, Int)) => combineFtnThree(com1,com2))
    println("=====combineByKey 2====:" + rddCombine2.collect().mkString(",")) // (x02,(5,1)),(x03,(17,2)),(x01,(14,2)),(x04,(3,1))

  這個算法和上面aggregate求和方法很像,不過combineByKey很奇怪,它第三個算子彷佛並無被執行,第二個算子打印的信息也不齊備,不過我認爲它們都執行了,只不過有些語句沒有打印出來,至於緣由爲什麼,我之後再研究下吧。

  本篇就寫到這裏吧,其他內容我在下篇裏講解了。

相關文章
相關標籤/搜索