Spark中的鍵值對操做-scala

1.PairRDD介紹
    Spark爲包含鍵值對類型的RDD提供了一些專有的操做。這些RDD被稱爲PairRDD。PairRDD提供了並行操做各個鍵或跨節點從新進行數據分組的操做接口。例如,PairRDD提供了reduceByKey()方法,能夠分別規約每一個鍵對應的數據,還有join()方法,能夠把兩個RDD中鍵相同的元素組合在一塊兒,合併爲一個RDD。
2.建立Pair RDD
    程序示例:對一個英語單詞組成的文本行,提取其中的第一個單詞做爲key,將整個句子做爲value,創建 PairRDD
val rdd=sc.parallelize(List("this is a test","how are you","do you love me","can you tell me"));
//獲取第一個單詞做爲鍵
val words =rdd.map(x=>(x.split(" ")(0),x));
words.collect().foreach(println);
輸出結果:
(this,this is a test)
(how,how are you)
(do,do you love me)
(can,can you tell me)

3.PairRDD的轉化操做
    PairRDD可使用全部標準RDD上可用的轉化操做。傳遞函數的規則也適用於PairRDD。因爲PairRDD中包含二元組,因此須要傳遞的函數應當操做而元素而不是獨立的元素。
                                       PairRDD的相關轉化操做以下表所示
針對兩個PairRDD的轉化操做 rdd={(1,2),(3,4),(3,6)} other={(3,9)}
函數名 目的 示例 結果
substractByKey 刪掉RDD中鍵與other RDD
中的鍵相同的元素
rdd.subtractByKey(other) {(1,2)}
join 對兩個RDD進行內鏈接
rdd.join(other) {(3,(4,9)),(3,(6,9))}
rightOuterJoin 對兩個RDD進行鏈接操做,右外鏈接 rdd.rightOuterJoin(other) {(3,(4,9)),(3,(6,9))}
leftOuterJoin 對兩個RDD進行鏈接操做,左外鏈接 rdd.rightOuterJoin(other) {(1,(2,None)),(3,(4,9)),(3,(6,9))}
cogroup 將兩個RDD中擁有相同鍵的數據分組 rdd.cogroup(other) {1,([2],[]),(3,[4,6],[9])}
程序實例:
針對2 中程序生成的PairRDD,刪選掉長度超過20個字符的行。
val results=words.filter(value => value._2.length()<20);
results.foreach(println)
    RDD上有fold(),combine(),reduce()等行動操做,pair RDD上則有相應的針對鍵的轉化操做。
    (1)reduceByKey()與reduce()操做相似,它們都接收一個函數,並使用該函數對值進行合併。reduceByKey()會爲數據集中的每一個鍵進行並行的規約操做,每一個規約操做會將鍵相同的值合併起來。reduceBykey()最終返回一個由各鍵規約出來的結果值組成的新的RDD。
程序示例:用reduceByKey實現單詞計數
val rdd=sc.parallelize(List("this is a test","how are you","do you love me","can you tell me"));
val words =rdd.flatMap(line => line.split(" "));
val results=words.map(word => (word,1)).reduceByKey( {case(x,y) => x+y});
results.foreach(println)
輸出:
(are,1)
(this,1)
(is,1)
(you,3)
(can,1)
(a,1)
(love,1)
(do,1)
(how,1)
(tell,1)
(me,2)
(test,1)

  (2)foldByKey()與fold()操做相似,他們都使用一個與RDD和合並函數中的數據類型相同的零值做爲初始值。與fold()同樣,foldByKey()操做所使用的合併函數對零值與另外一個元素進行合併,結果仍爲該元素。
    程序示例:求對應key的value之和
val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(1, 3), Tuple2(2, 2), Tuple2(2, 8)));
val results=nums.foldByKey(0)({case(x,y)=>x+y})
results.collect().foreach(println)
結果:
(1,4)
(2,10)
(3)
    combineByKey()是最爲經常使用的基於鍵進行聚合的函數。大多數基於鍵聚合的函數都是用它實現的。和aggregate()同樣,combineByKey()可讓用戶返回與輸入數據類型不一樣的返回值。combineByKey()會遍歷分區中的全部元素,所以,每一個元素的鍵要麼還麼有遇到過,要麼就和以前的某個元素的鍵相同。若是這是一個新的元素,combineByKey()會使用一個叫作 createCombiner()的函數來建立那個鍵對應的累加器的初始值。須要注意的是,這一過程會在每一個分區中第一次出現每一個鍵時發生,而不是在整個RDD中第一次出現一個鍵時發生。
    若是這是一個處理當前分區以前就已經遇到的鍵,它會使用mergeValue()方法將該鍵的累加器對應的當前值與這個新的值進行合併。
    因爲每一個分區都是獨立處理的,所以對於同一個鍵能夠有多個累加器。若是有兩個或者更多的分區都有對應一個鍵的累加器,就須要使用用戶提供的mergeCombiners()方法將各個分區的結果進行合併。
     如下程序示例使用combineBykey()求每一個鍵對應的平均值。
val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(1, 3), Tuple2(2, 2), Tuple2(2, 8)));
val results=nums.combineByKey(
(v)=>(v,1),
(acc:(Int,Int),v) =>(acc._1+v,acc._2+1),
(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)
).map{case(key,value)=>(key,value._1/value._2.toFloat)}
results.collectAsMap().map(println)
結果:
(2,5.0)
(1,2.0)
成功求出每一個key對應value對應的平均值
*(4)並行度調優
    每一個RDD都有固定數目的分區,分區數決定了在RDD上執行操做時的並行度。
    在執行聚合或者分組操做時,能夠要求Spark使用給定的分區數。Spark始終嘗試根據集羣的大小推斷出一個有意義的默認值,可是你能夠經過對並行度進行調優來得到更好的性能表現。
    在Scala中,combineByKey()函數和reduceByKey()函數的最後一個可選的參數用於指定分區的數目,即numPartitions,使用以下:
val results=nums.reduceByKey({(x,y) =>x+y},2);
5.數據分組
(1)groupByKey()
    groupByKey()會使用RDD中的鍵來對數據進行分組。對於一個由類型K的鍵和類型V的值組成的RDD,獲得的RDD類型會是[K,Iterable[v]]。
    如下是程序示例,對PairRDD調用groupByKey()函數以後,返回的RDD類型是RDD [K,Iterable[v]]
val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(1, 3), Tuple2(2, 2), Tuple2(2, 8)));
val group=nums.groupByKey();
val results=group.collect();
for(value <- results){
print(value._1+": ")
for(elem <- value._2)
print(elem+" ")
println()

}
輸出結果:
1: 1 3 
2: 2 8 
(2)cogroup()
    除了對單個RDD的數據進行分組,還可使用cogroup()函數對對個共享同一個鍵的RDD進行分組。對兩個鍵的類型均爲K而值得類型分別爲V和W的RDD進行cogroup()時,獲得結果的RDD類型爲[(K,(Iterable[V],Iterable[W]))]。若是其中一個RDD對於另外一個RDD中存在的某個鍵沒有對應的記錄,那麼對應的迭代器則爲空。
舉例:
val nums1 = sc.parallelize(List(Tuple2(1, 1), Tuple2(2, 2), Tuple2(1, 3),Tuple2(2, 4),Tuple2(3, 4)));
val nums2 = sc.parallelize(List(Tuple2(1,1),Tuple2(1,3),Tuple2(2,3)))
val results=nums1.cogroup(nums2)
for(tuple2 <- results.collect()){
print(tuple2._1+" [ ")
for(it <- tuple2._2._1)
print(it+" ")
print("] [ ")
for(it<-tuple2._2._2)
print(it+" ")
println("]")
}
輸出:
1 [ 1 3 ] [ 1 3 ]
3 [ 4 ] [ ]
2 [ 2 4 ] [ 3 ]
6.數據排序
在Scala中以字符串順序對正數進行自定義排序
(1)對RDD進行排序:
val nums =sc.parallelize(List(12,4,6,8,0,8));
//隱式轉換聲明排序的依據
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(x: Int, y: Int): Int = x.toString().compareTo(y.toString())
}
val results=nums.sortBy(value=>value);
results.collect().foreach(println)
(2)對PairRDD,按key的值進行排序
val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(2, 2), Tuple2(1, 3),Tuple2(2, 4),Tuple2(3, 4)));
//隱式轉換聲明排序的依據
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(x: Int, y: Int): Int = x.toString().compareTo(y.toString())
}
val results=nums.sortByKey();
results.collect().foreach(println)
7.數據分區
(1)建立數據分區
    在分佈式程序中,通訊的代價很大,控制數據分佈以得到最少的網絡傳輸能夠極大地提高總體性能。Spark程序能夠經過控制RDD分區的方式來減小通訊消耗。只有當數據集屢次在諸如鏈接這種基於鍵的操做中,分區纔會有做用
    Spark中全部的鍵值對RDD均可以進行分區。系統會根據一個針對鍵的函數對元素進行分組。Spark能夠確保同一組的鍵出如今一個節點上。
    舉個簡單的例子,應用以下:內存中保存着很大的用戶信息表,由(UserID,UserInfo[])組成的RDD,UserInfo是用戶所訂閱的全部主題列表。該應用會週期性地將這張表和一個小文件進行組合,這個小文件中存這過去5分鐘發生的時間,其實就是一系列(UserId,LinkInfo)RDD,其中LinkInfo是用戶訪問的連接的主題。咱們須要對用戶訪問其未訂閱主題的頁面狀況進行統計。咱們可使用Spark的join()操做進行組合操做。將二者根據UserId鏈接以後,過濾出不在UserInfo[]中的LinkInfo,就是用戶訪問其未訂閱主題的狀況。
val list1 =List(Tuple2("zhou",List("it","math")),Tuple2("gan",List("money","book")))
val list2= List(Tuple2("zhou","it"),Tuple2("zhou","stock"),Tuple2("gan","money"),Tuple2("gan","book"))
val userData =sc.parallelize(list1)
val events = sc.parallelize(list2)
val joined=userData.join(events)
val results=joined.filter({
case (id, (info, link)) =>
!info.contains(link)
}
).count()
println(results)
輸出:1
    這段代碼能夠正確運行,可是效率不高。由於每5分鐘就要進行一次join()操做,而咱們對數據集如何分區卻一無所知。默認狀況下,鏈接操做會將兩個數據集中的全部鍵的哈希值都求出來,將該哈希值相同的記錄經過網絡傳到同一臺機器上,而後在那臺機器上對全部鍵相同的記錄進行鏈接操做。由於userData表比每5分鐘出現的訪問日誌表events要大不少,因此要浪費時間進行額外的工做:在每次調用時都對userDAta表進行哈希值計算和跨節點數據混洗,雖然這些數據歷來不會變化。
    要解決此問題:在程序開始的時候,對userData表進行partitionBy()轉化操做,將這張錶轉化爲哈希分區。能夠經過向patitionBy傳遞一個spark.HashPartitioner對象來實現該操做。
    scala自定義分區方式:
val list1 =List(Tuple2("zhou",List("it","math")),Tuple2("gan",List("money","book")))
val list2= List(Tuple2("zhou","it"),Tuple2("zhou","stock"),Tuple2("gan","money"),Tuple2("gan","book"))
val userData =sc.parallelize(list1).partitionBy(new HashPartitioner(100)).persist(StorageLevel.MEMORY_ONLY)
    這樣之後在調用join()時,Spark就知道了該RDD是根據鍵的哈希值來分區的,這樣在調用join()時,Spark就會利用這一點,只會對events進行數據混洗操做,將events中特定userId的記錄發送到userData的對應分區所在的那臺機器上。這樣,須要網絡傳輸的數據就大大減少了,程序運行的速度也顯著提升。
    請注意,咱們還對userData 這個RDD進行了持久化操做默認狀況下,每個由轉化操做獲得的RDD都會在每次執行啓動操做時從新計算生成,將userData持久化以後,就能保證userData可以在訪問時被快速獲取。
    *進一步解釋數據分區帶來的好處:
    若是沒有將partitionBy()轉化操做的結果進行持久化,那麼後面每次用到這個RDD時都會重複對數據進行分區操做。不進行持久化會致使整個RDD譜系圖從新求值。那樣的話,partitionBy()帶來的好處就會抵消,致使重複對數據進行分區以及跨節點的混洗,和沒有指定分區方式時發生的狀況是十分類似的。
(2)獲取數據分區的方式
接(1)中程序:
val list1 =List(Tuple2("zhou",List("it","math")),Tuple2("gan",List("money","book")))
val list2= List(Tuple2("zhou","it"),Tuple2("zhou","stock"),Tuple2("gan","money"),Tuple2("gan","book"))
val userData =sc.parallelize(list1).partitionBy(new HashPartitioner(100)).persist(StorageLevel.MEMORY_ONLY)
println(userData.partitioner)
  RDD的屬性partitioner就是存儲了對應的分區方式
(3)從分區中獲益的操做
    Spark中的不少操做都引入了根據鍵跨結點進行混洗的過程。全部這些操做都會從數據分區中獲益。可以從數據分區中獲益的操做有:groupWith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey(),以及lockup()。
    對於像reduceByKey()這樣只做用於單個RDD的操做,運行在未分區的RDD的時候或致使每一個鍵全部對應值都在每臺機器上進行本地計算,只須要把本地最終歸約出的結果值從各工做節點傳回主節點,因此本來的網絡開銷就不太大。而對於諸如cogroup()和join()這樣的二元操做,預先進行數據分區會致使其中至少一個RDD(使用已知分區器的那個RDD)不發生數據混洗。若是兩個RDD使用一樣的分區方式,而且它們還緩存在一樣的機器上(好比一個RDD是經過mapValues()從另外一個RDD中建立出來的,這兩個RDD就會擁有相同的鍵和分區方式),或者其中一個RDD尚未計算出來,那麼跨節點數據混洗就不會發生了。
(4)影響分區方式的操做
    全部會爲生成的結果RDD設好分區方式的操做:cogroup(),groupWith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey(),partitionBy(),sort(),mapValues()(若是父RDD有分區方式的話),filter()(若是父RDD有分區方式的話)。其餘全部操做生成的結果都不會存在特定的分區方式。
注意:     
    對於二元操做,輸出數據的分區方式取決於父RDD的分區方式。默認狀況下,結果會採用哈希分區,分區的數量和操做的並行度是同樣的。若是其中一個父RDD已經設置過度區方式,那麼結果就會採用那種分區方式;若是兩個父RDD都設置過度區方式,結果RDD會採用第一個RDD的分區方式。
8.示例程序-PageRank
     PageRank算法是一種從RDD分區中獲益的更復雜的算法,咱們以它爲例進行分析。PageRank算法用來根據外部文檔指向一個文檔的連接,對集合中每一個文檔的重要程度賦一個度量值。該算法能夠用於對網頁進行排序,固然,也能夠用於排序科技文章或社交網絡中有影響的用戶。
    算法會維護兩個數據集,一個由(pageID,linklist[])組成,包含每一個頁面的連接到的頁面的列表;另外一個由(pageID,rank)元素組成,包含每一個頁面的當前排序值。它按如下步驟進行計算:
     ① 將每一個頁面的排序值初始化爲1.0
          ②在每次迭代中,向每一個有直接連接的頁面,發送一個值爲rank(p)/numNeighbors(p)(出鏈數目)   的貢獻量
        ③將每一個頁面的排序值設置爲0.15+0.85*contributionsReceived
           最後兩步會重複幾個循環,在此過程當中,算法會逐漸收斂於每一個頁面的實際PageRank值。在實際操做中,收斂一般須要進行十個迭代。
下面用Scala來實現PageRank算法:
/*
#如下是url的內容:
www.baidu.com www.hao123.com
www.baidu.com www.2345.com
www.baidu.com www.zhouyang.com
www.hao123.com www.baidu.com
www.hao123.com www.zhouyang.com
www.zhouyang.com www.baidu.com
*/
val inputs =sc.textFile("C:\\url.txt")
//url,[urls]
val links =inputs.map(x=>(x.split(" ")(0),x.split(" ")(1)))
.distinct()
.groupByKey()
.cache()
//url,rank
var ranks =links.mapValues(value =>1.0)
for(i<-0 until 10){

val contribs =links.join(ranks).flatMap({
case(pageid,(links,rank))=>
//url Double
links.map(dest=>(dest,rank/links.size))
})
//reduce and add the contribs
ranks=contribs.reduceByKey((x,y)=>x+y).mapValues(v => 0.15+0.85*v)
}
ranks.collect().foreach(println)
結果:
(www.hao123.com,0.3685546839262259)
(www.baidu.com,0.761571325242544)
(www.2345.com,0.3685546839262259)
(www.zhouyang.com,0.5269013026650011)
9.Scala設置自定義分區方式
    Spark容許你經過自定義Partitioner對象來控制RDD的分區方式,這樣可讓你利用領域知識進一步減小通訊消耗。
    舉個例子,假設咱們要在一個網頁的集合上運行前一屆中的PageRank算法。在這裏,每一個頁面的ID是頁面的URL。當咱們使用簡單的哈希函數進行分區時,擁有類似的URL的頁面好比 http://www.baidu.com/news 與 http://www.baidu.com/map 可能被分在徹底不一樣的節點上。可是咱們知道,同一個域名下的網頁更有可能相互鏈接。因爲PageRank須要在每次迭代中從每一個頁面向它全部相鄰的頁面發送一條消息,因襲把這些頁面分組在同一個分區中會更好。可使用自定義的分區器來實現僅根據域名而不是整個URL進行分區。
    要實現先自定義Partitioner,須要繼承Partitioner類並實現其下述方法:
    override def numPartitions: Int = ???
    返回建立的分區數量
    override def getPartition(key: Any): Int = ???
    返回給定鍵的數量
          override def equals(other:Any):Boolean = ???
    Spark須要這個方法來檢查分區器對象是否與其餘分區器實例相同,這樣Spark才能判斷兩個RDD的分區方式是否相同。

class DomainNamePartitioner (numParts:Int) extends Partitioner{
override def numPartitions: Int = numParts
//根據hashCodenumPartitions取餘來獲得Partition,由於返回的必須是非負數,因此對於hashCode爲負的狀況作了特殊處理
override def getPartition(key: Any): Int = {
val domain = new URL(key.toString).getHost();
val code=(domain.hashCode%numPartitions)
if(code<0){
code+numPartitions
}else{
code
}
}

override def equals(other:Any):Boolean = other
match {
//這個實例是DomainNamePartitioner的實例,而且numPartitions相同,返回true
case dnp:DomainNamePartitioner =>
dnp.numPartitions==numPartitions
//不然,返回false
case _ => false
}
}





















相關文章
相關標籤/搜索