Spark學習筆記3:鍵值對操做

鍵值對RDD一般用來進行聚合計算,Spark爲包含鍵值對類型的RDD提供了一些專有的操做。這些RDD被稱爲pair RDD。pair RDD提供了並行操做各個鍵或跨節點從新進行數據分組的操做接口。java

Spark中建立pair RDD的方法:存儲鍵值對的數據格會在讀取時直接返回由其鍵值對數據組成的pair RDD,還可使用map()函數將一個普通的RDD轉爲pair RDD。python

  • Pair RDD的轉化操做
  1. reduceByKey()  與reduce相似 ,接收一個函數,並使用該函數對值進行合併,爲每一個數據集中的每一個鍵進行並行的歸約操做。返回一個由各鍵和對應鍵歸約出來的結果值組成的新的RDD。例如 :上一章中單詞計數的例子:val counts  =  words.map(word => (word,1)).reduceByKey{ case (x,y) => x + y}
  2. foldByKey()與fold()相似,都使用一個與RDD和合並函數中的數據類型相同的零值最爲初始值。val counts  =  words.map(word => (word,1)).foldByKey{ case (x,y) => x + y}
  3. combineByKey()是最爲經常使用的基於鍵進行聚合的函數,能夠返回與輸入類型不一樣的返回值。

  理解combineByKey處理數據流程,首先須要知道combineByKey的createCombiner()函數用來建立那個鍵對應的累加器的初始值,mergeValue()方法將該鍵的累加器對應的當前值與這個新的值進行合併。mergeCombiners()方法將各個分區的結果進行合併。apache

使用combineByKey進行單詞計數的例子:網絡

import org.apache.spark.{SparkConf, SparkContext}

object word {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("word")
val sc = new SparkContext(conf)
val input = sc.parallelize(List(("coffee",1),("coffee",2),("panda",3),("coffee",9)))
val counts = input.combineByKey(
(v) => (v,1),
(acc:(Int,Int) ,v) => (acc._1 + v,acc._2+1),
(acc1:(Int,Int),acc2:(Int,Int)) => (acc1._1 + acc2._1,acc1._2 + acc2._2)
)
counts.foreach(println)
}
}

 輸出結果:函數

 

這個例子中的數據流示意圖以下:spa

 

 簡單說過程就是,將輸入鍵值對數據進行分區,每一個分區先根據鍵計算相應的值以及鍵出現的次數。而後對不一樣分區進行合併得出最後的結果。scala

  4.groupByKey()使用RDD中的鍵來對數據進行分組,對於一個由類型K的鍵和類型V的值組成的RDD,所獲得的結果RDD類型會是[K, Iterable[V] ]3d

 例如:code

import org.apache.spark.{SparkConf, SparkContext}

object word {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("word")
    val sc = new SparkContext(conf)
    val input = sc.parallelize(List("scala spark scala core scala python java spark scala"))
    val words = input.flatMap(line => line.split(" ")).map(word => (word,1))
    val counts = words.groupByKey()
    counts.foreach(println)
  }
}

  輸出:對象

   五、cogroup函數對多個共享同一個鍵的RDD進行分組,對兩個鍵類型均爲K而值類型分別爲V和W的RDD進行cogroup時,獲得的結果RDD類型爲[(K,(Iterable[V],Iterable[W]))] 

  六、join(other)這樣的鏈接是內鏈接,只有在兩個pair RDD中都存在的鍵才輸出。若一個輸入對應的鍵有多個值時,生成的pair RDD會包括來自兩個輸入RDD的每一組相對應的記錄。理解這句話看下面的例子:

val rdd = sc.parallelize(List((1,2),(3,4),(3,6)))
val other = sc.parallelize(List((3,9)))
val joins = rdd.join(other)

  輸出結果:

  七、leftOuterJoin(other)左外鏈接和rightOuterJoin(other)右外鏈接都會根據鍵鏈接兩個RDD,可是容許結果中存在其中的一個pair RDD所缺失的鍵。

val rdd = sc.parallelize(List((1,2),(3,4),(3,6)))
val other = sc.parallelize(List((3,9)))
val join1 = rdd.rightOuterJoin(other)

  輸出結果:

 

val rdd = sc.parallelize(List((1,2),(3,4),(3,6)))
val other = sc.parallelize(List((3,9)))
val join2 = rdd.leftOuterJoin(other)

  輸出結果: 

 

  八、sortByKey()函數接收一個叫作ascending的參數,表示想要讓結果升序排序仍是降序排序。

val input = sc.parallelize(List("scala spark scala core scala python java spark scala"))
    val words = input.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((x,y)=>x+y)
    val counts = words.sortByKey()

  輸出結果:

     

  •  Pair RDD的行動操做
  1. countByKey() 對每一個鍵對應的元素分別計數。
  2. collectAsMap()將結果以映射表的形式返回,注意後面的value會覆蓋前面的。
    val num = sc.parallelize(List((1,2),(3,4),(3,6)))
    println(num.collectAsMap().mkString(" "))
    

    輸出結果:

  3. lookup(key)返回給定鍵對應的全部值。 
  • 數據分區

  Spark程序能夠經過控制RDD分區方式來減小通訊開銷。

運行下面這段代碼,用來查看用戶查閱了本身訂閱的主題的頁面的數量,結果返回3:

val list1 =List(Tuple2("Mike",List("sports","math")),Tuple2("Jack",List("travel","book")))//UserID用戶ID,UserInfo用戶訂閱的主題
val list2= List(Tuple2("Mike","sports"),Tuple2("Mike","stock"),Tuple2("Jack","travel"),Tuple2("Jack","book"))//UserID,LinkInfo用戶訪問狀況
val userData = sc.parallelize(list1)
val events = sc.parallelize(list2)
userData.persist()
val joined = userData.join(events)
val results = joined.filter({
case (id, (info, link)) =>
info.contains(link)
}
).count()
println(results)

  上面這段代碼中,用到了join操做,會將兩個數據集中的全部鍵的哈希值都求出來,將該哈希值相同的記錄經過網絡傳到同一臺機器上,而後在那臺機器上對全部鍵相同的記錄進行鏈接操做。

  假如userdata表很大很大,並且幾乎是不怎麼變化的,那麼每次都對userdata表進行哈希值計算和跨節點的數據混洗,就會產生不少的額外開銷。

以下:

解決這一產生額外開銷的方法就是,對userdata表使用partitionBy()轉化操做,將這張錶轉爲哈希分區。修改後的代碼以下:

    val list1 =List(Tuple2("Mike",List("sports","math")),Tuple2("Jack",List("travel","book")))//UserID用戶ID,UserInfo用戶訂閱的主題
    val list2= List(Tuple2("Mike","sports"),Tuple2("Mike","stock"),Tuple2("Jack","travel"),Tuple2("Jack","book"))//UserID,LinkInfo用戶訪問狀況
    val userData = sc.parallelize(list1)
    val events = sc.parallelize(list2)
    userData.partitionBy(new DomainNamePartitioner(10)).persist()
    val joined = userData.join(events)
    val results = joined.filter({
      case (id, (info, link)) =>
        info.contains(link)
    }
    ).count()
    println(results)

  構建userData時調用了partitionBy(),在調用join()時,Spark只會對events進行數據混洗操做,將events中特定UserID的記錄發送到userData的對應分區所在的那臺機器上。這樣,經過網絡傳輸的數據就大大減小,程序運行速度也能夠顯著提高。partitionBy()是一個轉化操做,所以它的返回值是一個新的RDD。

  新的數據處理過程以下:

  scala可使用RDD的partitioner屬性來獲取RDD的分區方式,它會返回一個scala.Option對象。

  能夠從數據分區中獲益的操做有cogroup() , groupWith() , join() , leftOuterJoin() , rightOuterJoin() , groupByKey() , reduceByKey() , combineByKey()以及lookup()。

  實現自定義分區器,須要繼承org.apache.spark.Partitioner類並實現下面的三個方法:

  • numPartitions: Int :返回建立出來的分區數
  • getPartition(key: Any):Int : 返回給定鍵的分區編號(0 到 numPartitions - 1)
  • equals() : Java判斷相等的方法,Spark用這個方法來檢查分區器對象是否和其餘分區器實例相同,這樣Spark才能夠判斷兩個RDD的分區方式是否相同。
相關文章
相關標籤/搜索