Spark的分區機制的應用及PageRank算法的實現

佩奇排名(PageRank),又稱 網頁排名谷歌左側排名,是一種由 搜索引擎根據 網頁之間相互的 超連接計算的技術,而做爲網頁排名的要素之一,以 Google公司創辦人 拉里·佩奇(Larry Page)之姓來命名。 Google用它來體現網頁的相關性和重要性,在 搜索引擎優化操做中是常常被用來評估網頁優化的成效因素之一。

概念

Spark中有一個很重要的特性是對數據集在節點間的分區進行控制,由於在分佈式系統中,通訊的代價是很大的,所以控制數據分佈以得到最少的網絡傳輸能夠極大地提高總體性能,Spark程序能夠經過控制RDD分區方式來減小通訊開銷。分區適用於那種基於相似join操做基於鍵的操做,而且一方的RDD數據是比較少變更且須要屢次掃描的狀況,這個時候能夠對這個RDD作一個分區,最經常使用的是用Hash來進行分區,好比能夠對RDD分100個區,此時spark會用每一個鍵的hash值對100取模,而後把相同結果的放到同一個節點上。html

Spark分區的講解

如今用一個例子(來自《Learning Spark: Lightning-Fast Big Data Analysis》一書)來講明一下:算法

// Initialization code; we load the user info from a Hadoop SequenceFile on HDFS.
// This distributes elements of userData by the HDFS block where they are found,
// and doesn't provide Spark with any way of knowing in which partition a
// particular UserID is located.
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()

// Function called periodically to process a logfile of events in the past 5 minutes;
// we assume that this is a SequenceFile containing (UserID, LinkInfo) pairs.
def processNewLogs(logFileName: String) {
  val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
  val joined = userData.join(events)// RDD of (UserID, (UserInfo, LinkInfo)) pairs
  val offTopicVisits = joined.filter {
    case (userId, (userInfo, linkInfo)) => // Expand the tuple into its components
      !userInfo.topics.contains(linkInfo.topic)
  }.count()
  println("Number of visits to non-subscribed topics: " + offTopicVisits)
}

上面的例子中,有兩個RDD,userData的鍵值對是(UserID, UserInfo),UserInfo包含了一個該用戶訂閱的主題的列表,該程序會週期性地將這張表與一個小文件進行組合,這個小文件中存着過去五分鐘內某個網站各用戶的訪問狀況,由(UserID, LinkInfo)。如今,咱們須要對用戶訪問其未訂閱主題的頁面進行統計。能夠經過Spark的join()操做來完成這個功能,其中須要把UserInfo和LinkInfo的有序對根據UserID進行分組,如上代碼。緩存

能夠看出,由於每次調用processNewLogs()時都須要執行一次join()操做,可是數據具體的shuffle對咱們來講倒是不可控的,也就是咱們不知道spark是如何進行分區的。spark默認在執行join()的時候會將兩個RDD的鍵的hash值都算出來,而後將該hash值經過網絡傳輸到同一個節點上進行相同鍵值的記錄的鏈接操做,以下圖所示:網絡

由於userData這個RDD裏面的數據是幾乎不會變更的,或者說是極少會變更的,且它的內容也比events大不少,因此每次都要對它進行shuffle的話,是沒有必要且浪費時間的,實際上只須要進行一次shuffle就能夠了。分佈式

因此,能夠經過預先分區來解決這個問題:在進行join()以前,對userData使用partitionBy()轉化操做,把它變成一個哈希分區的RDD:ide

val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
                 .partitionBy(new HashPartitioner(100))   // Create 100 partitions
                 .persist()

調用partitionBy()以後,spark就能夠預先知道這個RDD是已經進行過哈希分區的了,等到執行join()之時,它就會利用這一點:只對events進行shuffle,將events中特定UserID的記錄發送到userData對應分區的機器節點上去。這樣的話,就減小了大量的重複的網絡通訊,程序性能也會大大提升。改進後的程序的執行過程以下:oop

還有一點,大家可能注意到了新代碼裏最後還調用了一個persist()方法,這是另外一個小優化點:對於那些數據不常變更且數據量較大的RDD,在進行諸如join()這種鏈接操做的時候儘可能用persist()來作緩存,以提升性能。另外,分區數目的設置也有講究,分區數目決定了這個任務在執行鏈接操做時的並行度,因此通常來講這個數目應該和集羣中的總核心數保持一致。性能

最後,可能有人會問,能不能對events也進行分區進一步提升程序性能?這是沒有必要的,由於events RDD是本地變量,每次執行都會更新,因此對它進行分區沒有意義,即使對這種一次性變量進行分區,spark依然須要進行一次shuffle,因此,這是沒有必要的。優化

使用分區來加快PageRank算法

PageRank算法是一種從RDD分區獲益的更復雜的算法,下面咱們用它爲例來進一步講解Spark分區的使用。網站

若是有不清楚的PageRank算法的具體實現的能夠參考我之前的一篇文章:hadoop下基於mapreduce實現pagerank算法

PageRank是一個迭代算法,所以它是一個能從RDD分區中得到性能加速的很好的例子,先上代碼:

// Assume that our neighbor list was saved as a Spark objectFile
val links = sc.objectFile[(String, Seq[String])]("links")
              .partitionBy(new HashPartitioner(100))
              .persist()

// Initialize each page's rank to 1.0; since we use mapValues, the resulting RDD
// will have the same partitioner as links
var ranks = links.mapValues(v => 1.0)

// Run 10 iterations of PageRank
for (i <- 0 until 10) {
  val contributions = links.join(ranks).flatMap {
    case (pageId, (links, rank)) =>
      links.map(dest => (dest, rank / links.size))
  }
  ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v)
}

// Write out the final ranks
ranks.saveAsTextFile("ranks")

這個算法維護兩個RDD,一個的鍵值對是(pageID, linkList),包含了每一個頁面的出鏈指向的相鄰頁面列表(由pageID組成);另外一個的鍵值對是(pageID, rank),包含了每一個頁面的當前權重值。算法流程以下:

  1. 將每一個頁面的權重值初始化爲1.0;
  2. 在每次迭代中,對頁面p,向其每一個出鏈指向的頁面加上一個rank(p)/neighborsSize(p)的貢獻值contributionReceived;
  3. 將每一個頁面的權重值設置爲:0.15 + 0.85 *contributionReceived。

不斷迭代步驟2和3,過程當中算法會逐漸收斂於每一個頁面的實際PageRank值,實際運行之時大概迭代10+次以上便可。

算法將ranksRDD的每一個元素的值設置爲1.0,而後在每次迭代中不斷更新ranks變量:首先對ranksRDD和靜態的linksRDD進行一次join()操做,來獲取每一個頁面ID對應的相鄰頁面列表和當前的權重值,而後使用flatMap建立出『contributions』來記錄每一個頁面對各相鄰頁面的貢獻值。而後再把這些貢獻值按照pageID分別累加起來,把該頁面的權重值設爲0.15 + 0.85 * contributionsReceived。

接下來分析下上述代碼作的的一些優化點:

  1. linksRDD在每次迭代中都會和ranks發生鏈接操做,因爲links是一個靜態RDD(數據幾乎不會變更),因此在一開始能夠對它進行分區以減小網絡shuffle,下降網絡通訊的開銷。並且,linksRDD的字節數通常來講也會比ranks大不少,由於這個RDD包含了每一個頁面的出鏈指向的頁面列表,相似於一個笛卡爾積的數量級。因此經過預先分區能夠得到比原算法的普通MapReduce實現更好的性能;
  2. 用persist()方法緩存RDD,使得在每次迭代裏均可以複用,進一步提升性能;
  3. 第一次建立ranks時,使用mapValues()而不是map(),保留了父RDD(links)的分區方式(由於map操做理論上可能會修改鍵值致使父RDD的分區不可用,因此map操做不保留父RDD的分區),這樣第一次的join()操做的開銷也會更小;
  4. 在循環體中,調用reduceByKey()後使用mapValues();由於reduceByKey()的結果已是哈希分區的了,這樣一來,下一次循環中將映射操做的結果再次與links進行鏈接時就會更加高效。

參考

https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

相關文章
相關標籤/搜索