佩奇排名(PageRank),又稱 網頁排名、 谷歌左側排名,是一種由 搜索引擎根據 網頁之間相互的 超連接計算的技術,而做爲網頁排名的要素之一,以 Google公司創辦人 拉里·佩奇(Larry Page)之姓來命名。 Google用它來體現網頁的相關性和重要性,在 搜索引擎優化操做中是常常被用來評估網頁優化的成效因素之一。
Spark中有一個很重要的特性是對數據集在節點間的分區進行控制,由於在分佈式系統中,通訊的代價是很大的,所以控制數據分佈以得到最少的網絡傳輸能夠極大地提高總體性能,Spark程序能夠經過控制RDD分區方式來減小通訊開銷。分區適用於那種基於相似join操做基於鍵的操做,而且一方的RDD數據是比較少變更且須要屢次掃描的狀況,這個時候能夠對這個RDD作一個分區,最經常使用的是用Hash來進行分區,好比能夠對RDD分100個區,此時spark會用每一個鍵的hash值對100取模,而後把相同結果的放到同一個節點上。html
如今用一個例子(來自《Learning Spark: Lightning-Fast Big Data Analysis》一書)來講明一下:算法
// Initialization code; we load the user info from a Hadoop SequenceFile on HDFS. // This distributes elements of userData by the HDFS block where they are found, // and doesn't provide Spark with any way of knowing in which partition a // particular UserID is located. val sc = new SparkContext(...) val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist() // Function called periodically to process a logfile of events in the past 5 minutes; // we assume that this is a SequenceFile containing (UserID, LinkInfo) pairs. def processNewLogs(logFileName: String) { val events = sc.sequenceFile[UserID, LinkInfo](logFileName) val joined = userData.join(events)// RDD of (UserID, (UserInfo, LinkInfo)) pairs val offTopicVisits = joined.filter { case (userId, (userInfo, linkInfo)) => // Expand the tuple into its components !userInfo.topics.contains(linkInfo.topic) }.count() println("Number of visits to non-subscribed topics: " + offTopicVisits) }
上面的例子中,有兩個RDD,userData的鍵值對是(UserID, UserInfo),UserInfo包含了一個該用戶訂閱的主題的列表,該程序會週期性地將這張表與一個小文件進行組合,這個小文件中存着過去五分鐘內某個網站各用戶的訪問狀況,由(UserID, LinkInfo)。如今,咱們須要對用戶訪問其未訂閱主題的頁面進行統計。能夠經過Spark的join()操做來完成這個功能,其中須要把UserInfo和LinkInfo的有序對根據UserID進行分組,如上代碼。緩存
能夠看出,由於每次調用processNewLogs()時都須要執行一次join()操做,可是數據具體的shuffle對咱們來講倒是不可控的,也就是咱們不知道spark是如何進行分區的。spark默認在執行join()的時候會將兩個RDD的鍵的hash值都算出來,而後將該hash值經過網絡傳輸到同一個節點上進行相同鍵值的記錄的鏈接操做,以下圖所示:網絡
由於userData這個RDD裏面的數據是幾乎不會變更的,或者說是極少會變更的,且它的內容也比events大不少,因此每次都要對它進行shuffle的話,是沒有必要且浪費時間的,實際上只須要進行一次shuffle就能夠了。分佈式
因此,能夠經過預先分區來解決這個問題:在進行join()以前,對userData使用partitionBy()轉化操做,把它變成一個哈希分區的RDD:ide
val sc = new SparkContext(...) val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...") .partitionBy(new HashPartitioner(100)) // Create 100 partitions .persist()
調用partitionBy()以後,spark就能夠預先知道這個RDD是已經進行過哈希分區的了,等到執行join()之時,它就會利用這一點:只對events進行shuffle,將events中特定UserID的記錄發送到userData對應分區的機器節點上去。這樣的話,就減小了大量的重複的網絡通訊,程序性能也會大大提升。改進後的程序的執行過程以下:oop
還有一點,大家可能注意到了新代碼裏最後還調用了一個persist()方法,這是另外一個小優化點:對於那些數據不常變更且數據量較大的RDD,在進行諸如join()這種鏈接操做的時候儘可能用persist()來作緩存,以提升性能。另外,分區數目的設置也有講究,分區數目決定了這個任務在執行鏈接操做時的並行度,因此通常來講這個數目應該和集羣中的總核心數保持一致。性能
最後,可能有人會問,能不能對events也進行分區進一步提升程序性能?這是沒有必要的,由於events RDD是本地變量,每次執行都會更新,因此對它進行分區沒有意義,即使對這種一次性變量進行分區,spark依然須要進行一次shuffle,因此,這是沒有必要的。優化
PageRank算法是一種從RDD分區獲益的更復雜的算法,下面咱們用它爲例來進一步講解Spark分區的使用。網站
若是有不清楚的PageRank算法的具體實現的能夠參考我之前的一篇文章:hadoop下基於mapreduce實現pagerank算法
PageRank是一個迭代算法,所以它是一個能從RDD分區中得到性能加速的很好的例子,先上代碼:
// Assume that our neighbor list was saved as a Spark objectFile val links = sc.objectFile[(String, Seq[String])]("links") .partitionBy(new HashPartitioner(100)) .persist() // Initialize each page's rank to 1.0; since we use mapValues, the resulting RDD // will have the same partitioner as links var ranks = links.mapValues(v => 1.0) // Run 10 iterations of PageRank for (i <- 0 until 10) { val contributions = links.join(ranks).flatMap { case (pageId, (links, rank)) => links.map(dest => (dest, rank / links.size)) } ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v) } // Write out the final ranks ranks.saveAsTextFile("ranks")
這個算法維護兩個RDD,一個的鍵值對是(pageID, linkList),包含了每一個頁面的出鏈指向的相鄰頁面列表(由pageID組成);另外一個的鍵值對是(pageID, rank),包含了每一個頁面的當前權重值。算法流程以下:
不斷迭代步驟2和3,過程當中算法會逐漸收斂於每一個頁面的實際PageRank值,實際運行之時大概迭代10+次以上便可。
算法將ranksRDD的每一個元素的值設置爲1.0,而後在每次迭代中不斷更新ranks變量:首先對ranksRDD和靜態的linksRDD進行一次join()操做,來獲取每一個頁面ID對應的相鄰頁面列表和當前的權重值,而後使用flatMap建立出『contributions』來記錄每一個頁面對各相鄰頁面的貢獻值。而後再把這些貢獻值按照pageID分別累加起來,把該頁面的權重值設爲0.15 + 0.85 * contributionsReceived。
接下來分析下上述代碼作的的一些優化點:
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html