鍵值對RDD一般用來進行聚合計算,Spark爲包含鍵值對類型的RDD提供了一些專有的操做。這些RDD被稱爲pair RDD。pair RDD提供了並行操做各個鍵或跨節點從新進行數據分組的操做接口。java
Spark中建立pair RDD的方法:存儲鍵值對的數據格式會在讀取時直接返回由其鍵值對數據組成的pair RDD,還可使用map()函數將一個普通的RDD轉爲pair RDD。python
理解combineByKey處理數據流程,首先須要知道combineByKey的createCombiner()函數用來建立那個鍵對應的累加器的初始值,mergeValue()方法將該鍵的累加器對應的當前值與這個新的值進行合併。mergeCombiners()方法將各個分區的結果進行合併。apache
使用combineByKey進行單詞計數的例子:網絡
import org.apache.spark.{SparkConf, SparkContext}
object word {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("word")
val sc = new SparkContext(conf)
val input = sc.parallelize(List(("coffee",1),("coffee",2),("panda",3),("coffee",9)))
val counts = input.combineByKey(
(v) => (v,1),
(acc:(Int,Int) ,v) => (acc._1 + v,acc._2+1),
(acc1:(Int,Int),acc2:(Int,Int)) => (acc1._1 + acc2._1,acc1._2 + acc2._2)
)
counts.foreach(println)
}
}
輸出結果:函數
這個例子中的數據流示意圖以下:spa
簡單說過程就是,將輸入鍵值對數據進行分區,每一個分區先根據鍵計算相應的值以及鍵出現的次數。而後對不一樣分區進行合併得出最後的結果。scala
4.groupByKey()使用RDD中的鍵來對數據進行分組,對於一個由類型K的鍵和類型V的值組成的RDD,所獲得的結果RDD類型會是[K, Iterable[V] ]3d
例如:code
import org.apache.spark.{SparkConf, SparkContext} object word { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("word") val sc = new SparkContext(conf) val input = sc.parallelize(List("scala spark scala core scala python java spark scala")) val words = input.flatMap(line => line.split(" ")).map(word => (word,1)) val counts = words.groupByKey() counts.foreach(println) } }
輸出:對象
五、cogroup函數對多個共享同一個鍵的RDD進行分組,對兩個鍵類型均爲K而值類型分別爲V和W的RDD進行cogroup時,獲得的結果RDD類型爲[(K,(Iterable[V],Iterable[W]))]
六、join(other)這樣的鏈接是內鏈接,只有在兩個pair RDD中都存在的鍵才輸出。若一個輸入對應的鍵有多個值時,生成的pair RDD會包括來自兩個輸入RDD的每一組相對應的記錄。理解這句話看下面的例子:
val rdd = sc.parallelize(List((1,2),(3,4),(3,6))) val other = sc.parallelize(List((3,9))) val joins = rdd.join(other)
輸出結果:
七、leftOuterJoin(other)左外鏈接和rightOuterJoin(other)右外鏈接都會根據鍵鏈接兩個RDD,可是容許結果中存在其中的一個pair RDD所缺失的鍵。
val rdd = sc.parallelize(List((1,2),(3,4),(3,6))) val other = sc.parallelize(List((3,9))) val join1 = rdd.rightOuterJoin(other)
輸出結果:
val rdd = sc.parallelize(List((1,2),(3,4),(3,6))) val other = sc.parallelize(List((3,9))) val join2 = rdd.leftOuterJoin(other)
輸出結果:
八、sortByKey()函數接收一個叫作ascending的參數,表示想要讓結果升序排序仍是降序排序。
val input = sc.parallelize(List("scala spark scala core scala python java spark scala")) val words = input.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey((x,y)=>x+y) val counts = words.sortByKey()
輸出結果:
val num = sc.parallelize(List((1,2),(3,4),(3,6))) println(num.collectAsMap().mkString(" "))
輸出結果:
Spark程序能夠經過控制RDD分區方式來減小通訊開銷。
運行下面這段代碼,用來查看用戶查閱了本身訂閱的主題的頁面的數量,結果返回3:
val list1 =List(Tuple2("Mike",List("sports","math")),Tuple2("Jack",List("travel","book")))//UserID用戶ID,UserInfo用戶訂閱的主題
val list2= List(Tuple2("Mike","sports"),Tuple2("Mike","stock"),Tuple2("Jack","travel"),Tuple2("Jack","book"))//UserID,LinkInfo用戶訪問狀況
val userData = sc.parallelize(list1)
val events = sc.parallelize(list2)
userData.persist()
val joined = userData.join(events)
val results = joined.filter({
case (id, (info, link)) =>
info.contains(link)
}
).count()
println(results)
上面這段代碼中,用到了join操做,會將兩個數據集中的全部鍵的哈希值都求出來,將該哈希值相同的記錄經過網絡傳到同一臺機器上,而後在那臺機器上對全部鍵相同的記錄進行鏈接操做。
假如userdata表很大很大,並且幾乎是不怎麼變化的,那麼每次都對userdata表進行哈希值計算和跨節點的數據混洗,就會產生不少的額外開銷。
以下:
解決這一產生額外開銷的方法就是,對userdata表使用partitionBy()轉化操做,將這張錶轉爲哈希分區。修改後的代碼以下:
val list1 =List(Tuple2("Mike",List("sports","math")),Tuple2("Jack",List("travel","book")))//UserID用戶ID,UserInfo用戶訂閱的主題 val list2= List(Tuple2("Mike","sports"),Tuple2("Mike","stock"),Tuple2("Jack","travel"),Tuple2("Jack","book"))//UserID,LinkInfo用戶訪問狀況 val userData = sc.parallelize(list1) val events = sc.parallelize(list2) userData.partitionBy(new DomainNamePartitioner(10)).persist() val joined = userData.join(events) val results = joined.filter({ case (id, (info, link)) => info.contains(link) } ).count() println(results)
構建userData時調用了partitionBy(),在調用join()時,Spark只會對events進行數據混洗操做,將events中特定UserID的記錄發送到userData的對應分區所在的那臺機器上。這樣,經過網絡傳輸的數據就大大減小,程序運行速度也能夠顯著提高。partitionBy()是一個轉化操做,所以它的返回值是一個新的RDD。
新的數據處理過程以下:
scala可使用RDD的partitioner屬性來獲取RDD的分區方式,它會返回一個scala.Option對象。
能夠從數據分區中獲益的操做有cogroup() , groupWith() , join() , leftOuterJoin() , rightOuterJoin() , groupByKey() , reduceByKey() , combineByKey()以及lookup()。
實現自定義分區器,須要繼承org.apache.spark.Partitioner類並實現下面的三個方法: