Spark GraphX

1、GraphX介紹

1.1 GraphX應用背景

Spark GraphX是一個分佈式圖處理框架,它是基於Spark平臺提供對圖計算和圖挖掘簡潔易用的而豐富的接口,極大的方便了對分佈式圖處理的需求。php

衆所周知·,社交網絡中人與人之間有不少關係鏈,例如Twitter、Facebook、微博和微信等,這些都是大數據產生的地方都須要圖計算,如今的圖處理基本都是分佈式的圖處理,而並不是單機處理。Spark GraphX因爲底層是基於Spark來處理的,因此自然就是一個分佈式的圖處理系統。html

圖的分佈式或者並行處理實際上是把圖拆分紅不少的子圖,而後分別對這些子圖進行計算,計算的時候能夠分別迭代進行分階段的計算,即對圖進行並行計算。下面咱們看一下圖計算的簡單示例:java

clip_image002

從圖中咱們能夠看出:拿到Wikipedia的文檔之後,能夠變成Link Table形式的視圖,而後基於Link Table形式的視圖能夠分析成Hyperlinks超連接,最後咱們可使用PageRank去分析得出Top Communities。在下面路徑中的Editor Graph到Community,這個過程能夠稱之爲Triangle Computation,這是計算三角形的一個算法,基於此會發現一個社區。從上面的分析中咱們能夠發現圖計算有不少的作法和算法,同時也發現圖和表格能夠作互相的轉換。node

1.2  GraphX的框架

設計GraphX時,點分割和GAS都已成熟,在設計和編碼中針對它們進行了優化,並在功能和性能之間尋找最佳的平衡點。如同Spark自己,每一個子模塊都有一個核心抽象。GraphX的核心抽象是Resilient Distributed Property Graph,一種點和邊都帶屬性的有向多重圖。它擴展了Spark RDD的抽象,有Table和Graph兩種視圖,而只須要一份物理存儲。兩種視圖都有本身獨有的操做符,從而得到了靈活操做和執行效率。python

clip_image004

如同Spark,GraphX的代碼很是簡潔。GraphX的核心代碼只有3千多行,而在此之上實現的Pregel模式,只要短短的20多行。GraphX的代碼結構總體下圖所示,其中大部分的實現,都是圍繞Partition的優化進行的。這在某種程度上說明了點分割的存儲和相應的計算優化,的確是圖計算框架的重點和難點。git

1.3 發展歷程

l早在0.5版本,Spark就帶了一個小型的Bagel模塊,提供了相似Pregel的功能。固然,這個版本還很是原始,性能和功能都比較弱,屬於實驗型產品。github

l到0.8版本時,鑑於業界對分佈式圖計算的需求日益見漲,Spark開始獨立一個分支Graphx-Branch,做爲獨立的圖計算模塊,借鑑GraphLab,開始設計開發GraphX。算法

l在0.9版本中,這個模塊被正式集成到主幹,雖然是Alpha版本,但已能夠試用,小麪包圈Bagel告別舞臺。1.0版本,GraphX正式投入生產使用。express

clip_image006

值得注意的是,GraphX目前依然處於快速發展中,從0.8的分支到0.9和1.0,每一個版本代碼都有很多的改進和重構。根據觀察,在沒有改任何代碼邏輯和運行環境,只是升級版本、切換接口和從新編譯的狀況下,每一個版本有10%~20%的性能提高。雖然和GraphLab的性能還有必定差距,但憑藉Spark總體上的一體化流水線處理,社區熱烈的活躍度及快速改進速度,GraphX具備強大的競爭力。apache

二、GraphX實現分析

如同Spark自己,每一個子模塊都有一個核心抽象。GraphX的核心抽象是Resilient Distributed Property Graph,一種點和邊都帶屬性的有向多重圖。它擴展了Spark RDD的抽象,有Table和Graph兩種視圖,而只須要一份物理存儲。兩種視圖都有本身獨有的操做符,從而得到了靈活操做和執行效率。

clip_image008

GraphX的底層設計有如下幾個關鍵點。

對Graph視圖的全部操做,最終都會轉換成其關聯的Table視圖的RDD操做來完成。這樣對一個圖的計算,最終在邏輯上,等價於一系列RDD的轉換過程。所以,Graph最終具有了RDD的3個關鍵特性:Immutable、Distributed和Fault-Tolerant,其中最關鍵的是Immutable(不變性)。邏輯上,全部圖的轉換和操做都產生了一個新圖;物理上,GraphX會有必定程度的不變頂點和邊的複用優化,對用戶透明。

 兩種視圖底層共用的物理數據,由RDD[Vertex-Partition]和RDD[EdgePartition]這兩個RDD組成。點和邊實際都不是以表Collection[tuple]的形式存儲的,而是由VertexPartition/EdgePartition在內部存儲一個帶索引結構的分片數據塊,以加速不一樣視圖下的遍歷速度。不變的索引結構在RDD轉換過程當中是共用的,下降了計算和存儲開銷。

clip_image010

圖的分佈式存儲採用點分割模式,並且使用partitionBy方法,由用戶指定不一樣的劃分策略(PartitionStrategy)。劃分策略會將邊分配到各個EdgePartition,頂點Master分配到各個VertexPartition,EdgePartition也會緩存本地邊關聯點的Ghost副本。劃分策略的不一樣會影響到所須要緩存的Ghost副本數量,以及每一個EdgePartition分配的邊的均衡程度,須要根據圖的結構特徵選取最佳策略。目前有EdgePartition2d、EdgePartition1d、RandomVertexCut和CanonicalRandomVertexCut這四種策略。

2.1 存儲模式

2.1.1 圖存儲模式

巨型圖的存儲整體上有邊分割和點分割兩種存儲方式。2013年,GraphLab2.0將其存儲方式由邊分割變爲點分割,在性能上取得重大提高,目前基本上被業界普遍接受並使用。

l邊分割(Edge-Cut):每一個頂點都存儲一次,但有的邊會被打斷分到兩臺機器上。這樣作的好處是節省存儲空間;壞處是對圖進行基於邊的計算時,對於一條兩個頂點被分到不一樣機器上的邊來講,要跨機器通訊傳輸數據,內網通訊流量大。

l點分割(Vertex-Cut):每條邊只存儲一次,都只會出如今一臺機器上。鄰居多的點會被複制到多臺機器上,增長了存儲開銷,同時會引起數據同步問題。好處是能夠大幅減小內網通訊量。

clip_image012

雖然兩種方法互有利弊,但如今是點分割佔上風,各類分佈式圖計算框架都將本身底層的存儲形式變成了點分割。主要緣由有如下兩個。

1.磁盤價格降低,存儲空間再也不是問題,而內網的通訊資源沒有突破性進展,集羣計算時內網帶寬是寶貴的,時間比磁盤更珍貴。這點就相似於常見的空間換時間的策略。

2.在當前的應用場景中,絕大多數網絡都是「無尺度網絡」,遵循冪律分佈,不一樣點的鄰居數量相差很是懸殊。而邊分割會使那些多鄰居的點所相連的邊大多數被分到不一樣的機器上,這樣的數據分佈會使得內網帶寬更加捉襟見肘,因而邊分割存儲方式被漸漸拋棄了。

2.1.2 GraphX存儲模式

Graphx借鑑PowerGraph,使用的是Vertex-Cut(點分割)方式存儲圖,用三個RDD存儲圖數據信息:

lVertexTable(id, data):id爲Vertex id,data爲Edge data

lEdgeTable(pid, src, dst, data):pid爲Partion id,src爲原定點id,dst爲目的頂點id

lRoutingTable(id, pid):id爲Vertex id,pid爲Partion id

點分割存儲實現以下圖所示:

clip_image014

2.2 計算模式

2.2.1 圖計算模式

目前基於圖的並行計算框架已經有不少,好比來自Google的Pregel、來自Apache開源的圖計算框架Giraph/HAMA以及最爲著名的GraphLab,其中Pregel、HAMA和Giraph都是很是相似的,都是基於BSP(Bulk Synchronous Parallell)模式。

Bulk Synchronous Parallell,即總體同步並行,它將計算分紅一系列的超步(superstep)的迭代(iteration)。從縱向上看,它是一個串行模式,而從橫向上看,它是一個並行的模式,每兩個superstep之間設置一個柵欄(barrier),即總體同步點,肯定全部並行的計算都完成後再啓動下一輪superstep。

clip_image015

每個超步(superstep)包含三部份內容:

1.計算compute:每個processor利用上一個superstep傳過來的消息和本地的數據進行本地計算;

2.消息傳遞:每個processor計算完畢後,將消息傳遞個與之關聯的其它processors

3.總體同步點:用於總體同步,肯定全部的計算和消息傳遞都進行完畢後,進入下一個superstep。

2.2.2GraphX計算模式

如同Spark同樣,GraphX的Graph類提供了豐富的圖運算符,大體結構以下圖所示。能夠在官方GraphX Programming Guide中找到每一個函數的詳細說明,本文僅講述幾個須要注意的方法。

clip_image017

2.2.2.1 圖的緩存

每一個圖是由3個RDD組成,因此會佔用更多的內存。相應圖的cache、unpersist和checkpoint,更須要注意使用技巧。出於最大限度複用邊的理念,GraphX的默認接口只提供了unpersistVertices方法。若是要釋放邊,調用g.edges.unpersist()方法才行,這給用戶帶來了必定的不便,但爲GraphX的優化提供了便利和空間。參考GraphX的Pregel代碼,對一個大圖,目前最佳的實踐是:

clip_image018

大致之意是根據GraphX中Graph的不變性,對g作操做並賦回給g以後,g已不是原來的g了,並且會在下一輪迭代使用,因此必須cache。另外,必須先用prevG保留住對原來圖的引用,並在新圖產生後,快速將舊圖完全釋放掉。不然,十幾輪迭代後,會有內存泄漏問題,很快耗光做業緩存空間。

2.2.2.2 鄰邊聚合

mrTriplets(mapReduceTriplets)是GraphX中最核心的一個接口。Pregel也基於它而來,因此對它的優化能很大程度上影響整個GraphX的性能。mrTriplets運算符的簡化定義是:

clip_image019

它的計算過程爲:map,應用於每個Triplet上,生成一個或者多個消息,消息以Triplet關聯的兩個頂點中的任意一個或兩個爲目標頂點;reduce,應用於每個Vertex上,將發送給每個頂點的消息合併起來。

mrTriplets最後返回的是一個VertexRDD[A],包含每個頂點聚合以後的消息(類型爲A),沒有接收到消息的頂點不會包含在返回的VertexRDD中。

在最近的版本中,GraphX針對它進行了一些優化,對於Pregel以及全部上層算法工具包的性能都有重大影響。主要包括如下幾點。

1. Caching for Iterative mrTriplets & Incremental Updates for Iterative mrTriplets:在不少圖分析算法中,不一樣點的收斂速度變化很大。在迭代後期,只有不多的點會有更新。所以,對於沒有更新的點,下一次mrTriplets計算時EdgeRDD無需更新相應點值的本地緩存,大幅下降了通訊開銷。

2.Indexing Active Edges:沒有更新的頂點在下一輪迭代時不須要向鄰居從新發送消息。所以,mrTriplets遍歷邊時,若是一條邊的鄰居點值在上一輪迭代時沒有更新,則直接跳過,避免了大量無用的計算和通訊。

3.Join Elimination:Triplet是由一條邊和其兩個鄰居點組成的三元組,操做Triplet的map函數經常只需訪問其兩個鄰居點值中的一個。例如,在PageRank計算中,一個點值的更新只與其源頂點的值有關,而與其所指向的目的頂點的值無關。那麼在mrTriplets計算中,就不須要VertexRDD和EdgeRDD的3-way join,而只須要2-way join。

全部這些優化使GraphX的性能逐漸逼近GraphLab。雖然還有必定差距,但一體化的流水線服務和豐富的編程接口,能夠彌補性能的微小差距。

2.2.2.3 進化的Pregel模式

GraphX中的Pregel接口,並不嚴格遵循Pregel模式,它是一個參考GAS改進的Pregel模式。定義以下:

clip_image020

這種基於mrTrilets方法的Pregel模式,與標準Pregel的最大區別是,它的第2段參數體接收的是3個函數參數,而不接收messageList。它不會在單個頂點上進行消息遍歷,而是將頂點的多個Ghost副本收到的消息聚合後,發送給Master副本,再使用vprog函數來更新點值。消息的接收和發送都被自動並行化處理,無需擔憂超級節點的問題。

常見的代碼模板以下所示:

clip_image021

能夠看到,GraphX設計這個模式的用意。它綜合了Pregel和GAS二者的優勢,即接口相對簡單,又保證性能,能夠應對點分割的圖存儲模式,勝任符合冪律分佈的天然圖的大型計算。另外,值得注意的是,官方的Pregel版本是最簡單的一個版本。對於複雜的業務場景,根據這個版本擴展一個定製的Pregel是很常見的作法。

2.2.2.4 圖算法工具包

GraphX也提供了一套圖算法工具包,方便用戶對圖進行分析。目前最新版本已支持PageRank、數三角形、最大連通圖和最短路徑等6種經典的圖算法。這些算法的代碼實現,目的和重點在於通用性。若是要得到最佳性能,能夠參考其實現進行修改和擴展知足業務需求。另外,研讀這些代碼,也是理解GraphX編程最佳實踐的好方法。

3、GraphX實例

3.1  圖例演示

3.1.1 例子介紹

下圖中有6我的,每一個人有名字和年齡,這些人根據社會關係造成8條邊,每條邊有其屬性。在如下例子演示中將構建頂點、邊和圖,打印圖的屬性、轉換操做、結構操做、鏈接操做、聚合操做,並結合實際要求進行演示。

clip_image023

3.1.2 程序代碼

import org.apache.log4j.{Level, Logger}

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.spark.graphx._

import org.apache.spark.rdd.RDD 

object GraphXExample {

  def main(args: Array[String]) {

    //屏蔽日誌

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) 

    //設置運行環境

    val conf = new SparkConf().setAppName("SimpleGraphX").setMaster("local")

    val sc = new SparkContext(conf) 

    //設置頂點和邊,注意頂點和邊都是用元組定義的Array

    //頂點的數據類型是VD:(String,Int)

    val vertexArray = Array(

      (1L, ("Alice", 28)),

      (2L, ("Bob", 27)),

      (3L, ("Charlie", 65)),

      (4L, ("David", 42)),

      (5L, ("Ed", 55)),

      (6L, ("Fran", 50))

    )

    //邊的數據類型ED:Int

    val edgeArray = Array(

      Edge(2L, 1L, 7),

      Edge(2L, 4L, 2),

      Edge(3L, 2L, 4),

      Edge(3L, 6L, 3),

      Edge(4L, 1L, 1),

      Edge(5L, 2L, 2),

      Edge(5L, 3L, 8),

      Edge(5L, 6L, 3)

    ) 

    //構造vertexRDD和edgeRDD

    val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)

    val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray) 

    //構造圖Graph[VD,ED]

    val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD) 

    //***********************************************************************************

    //***************************  圖的屬性    ****************************************

    //**********************************************************************************    

    println("***********************************************")

    println("屬性演示")

    println("**********************************************************")

    println("找出圖中年齡大於30的頂點:")

    graph.vertices.filter { case (id, (name, age)) => age > 30}.collect.foreach {

      case (id, (name, age)) => println(s"$name is $age")

    } 

    //邊操做:找出圖中屬性大於5的邊

    println("找出圖中屬性大於5的邊:")

graph.edges.filter(e => e.attr > 5).collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))

    println 

    //triplets操做,((srcId, srcAttr), (dstId, dstAttr), attr)

    println("列出邊屬性>5的tripltes:")

    for (triplet <- graph.triplets.filter(t => t.attr > 5).collect) {

      println(s"${triplet.srcAttr._1} likes ${triplet.dstAttr._1}")

    }

    println 

    //Degrees操做

    println("找出圖中最大的出度、入度、度數:")

    def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {

      if (a._2 > b._2) a else b

    }

 println("max of outDegrees:" + graph.outDegrees.reduce(max) + " max of inDegrees:" + graph.inDegrees.reduce(max) + " max of Degrees:" + graph.degrees.reduce(max))

    println   

    //***********************************************************************************

    //***************************  轉換操做    ****************************************

    //**********************************************************************************  

    println("**********************************************************")

    println("轉換操做")

    println("**********************************************************")

    println("頂點的轉換操做,頂點age + 10:")

    graph.mapVertices{ case (id, (name, age)) => (id, (name, age+10))}.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))

    println

    println("邊的轉換操做,邊的屬性*2:")

    graph.mapEdges(e=>e.attr*2).edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))

    println  

      //***********************************************************************************

    //***************************  結構操做    ****************************************

    //**********************************************************************************  

    println("**********************************************************")

    println("結構操做")

    println("**********************************************************")

    println("頂點年紀>30的子圖:")

    val subGraph = graph.subgraph(vpred = (id, vd) => vd._2 >= 30)

    println("子圖全部頂點:")

    subGraph.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))

    println

    println("子圖全部邊:")

    subGraph.edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))

    println   

      //***********************************************************************************

    //***************************  鏈接操做    ****************************************

    //**********************************************************************************  

    println("**********************************************************")

    println("鏈接操做")

    println("**********************************************************")

    val inDegrees: VertexRDD[Int] = graph.inDegrees

    case class User(name: String, age: Int, inDeg: Int, outDeg: Int) 

    //建立一個新圖,頂點VD的數據類型爲User,並從graph作類型轉換

    val initialUserGraph: Graph[User, Int] = graph.mapVertices { case (id, (name, age)) => User(name, age, 0, 0)} 

    //initialUserGraph與inDegrees、outDegrees(RDD)進行鏈接,並修改initialUserGraph中inDeg值、outDeg值

    val userGraph = initialUserGraph.outerJoinVertices(initialUserGraph.inDegrees) {

      case (id, u, inDegOpt) => User(u.name, u.age, inDegOpt.getOrElse(0), u.outDeg)

    }.outerJoinVertices(initialUserGraph.outDegrees) {

      case (id, u, outDegOpt) => User(u.name, u.age, u.inDeg,outDegOpt.getOrElse(0))

    } 

    println("鏈接圖的屬性:")

userGraph.vertices.collect.foreach(v => println(s"${v._2.name} inDeg: ${v._2.inDeg}  outDeg: ${v._2.outDeg}"))

    println 

    println("出度和入讀相同的人員:")

    userGraph.vertices.filter {

      case (id, u) => u.inDeg == u.outDeg

    }.collect.foreach {

      case (id, property) => println(property.name)

    }

    println 

      //***********************************************************************************

    //***************************  聚合操做    ****************************************

    //**********************************************************************************  

    println("**********************************************************")

    println("聚合操做")

    println("**********************************************************")

    println("找出年紀最大的追求者:")

    val oldestFollower: VertexRDD[(String, Int)] = userGraph.mapReduceTriplets[(String, Int)](

      // 將源頂點的屬性發送給目標頂點,map過程

      edge => Iterator((edge.dstId, (edge.srcAttr.name, edge.srcAttr.age))),

      // 獲得最大追求者,reduce過程

      (a, b) => if (a._2 > b._2) a else b

    ) 

    userGraph.vertices.leftJoin(oldestFollower) { (id, user, optOldestFollower) =>

      optOldestFollower match {

        case None => s"${user.name} does not have any followers."

        case Some((name, age)) => s"${name} is the oldest follower of ${user.name}."

      }

    }.collect.foreach { case (id, str) => println(str)}

    println 

     //***********************************************************************************

    //***************************  實用操做    ****************************************

    //**********************************************************************************

    println("**********************************************************")

    println("聚合操做")

    println("**********************************************************")

    println("找出5到各頂點的最短:")

    val sourceId: VertexId = 5L // 定義源點

    val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)

    val sssp = initialGraph.pregel(Double.PositiveInfinity)(

      (id, dist, newDist) => math.min(dist, newDist),

      triplet => {  // 計算權重

        if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {

          Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))

        } else {

          Iterator.empty

        }

      },

      (a,b) => math.min(a,b) // 最短距離

    )

    println(sssp.vertices.collect.mkString("\n")) 

    sc.stop()

  }

}

3.1.3 運行結果

在IDEA(如何使用IDEA參見第3課《3.Spark編程模型(下)--IDEA搭建及實戰》)中首先對GraphXExample.scala代碼進行編譯,編譯經過後進行執行,執行結果以下:

**********************************************************

屬性演示

**********************************************************

找出圖中年齡大於30的頂點:

David is 42

Fran is 50

Charlie is 65

Ed is 55

找出圖中屬性大於5的邊:

2 to 1 att 7

5 to 3 att 8 

列出邊屬性>5的tripltes:

Bob likes Alice

Ed likes Charlie 

找出圖中最大的出度、入度、度數:

max of outDegrees:(5,3) max of inDegrees:(2,2) max of Degrees:(2,4) 

**********************************************************

轉換操做

**********************************************************

頂點的轉換操做,頂點age + 10

4 is (David,52)

1 is (Alice,38)

6 is (Fran,60)

3 is (Charlie,75)

5 is (Ed,65)

2 is (Bob,37) 

邊的轉換操做,邊的屬性*2

2 to 1 att 14

2 to 4 att 4

3 to 2 att 8

3 to 6 att 6

4 to 1 att 2

5 to 2 att 4

5 to 3 att 16

5 to 6 att 6 

**********************************************************

結構操做

**********************************************************

頂點年紀>30的子圖:

子圖全部頂點:

David is 42

Fran is 50

Charlie is 65

Ed is 55

 

子圖全部邊:

3 to 6 att 3

5 to 3 att 8

5 to 6 att 3 

**********************************************************

鏈接操做

**********************************************************

鏈接圖的屬性:

David inDeg: 1  outDeg: 1

Alice inDeg: 2  outDeg: 0

Fran inDeg: 2  outDeg: 0

Charlie inDeg: 1  outDeg: 2

Ed inDeg: 0  outDeg: 3

Bob inDeg: 2  outDeg: 2 

出度和入讀相同的人員:

David

Bob

 **********************************************************

聚合操做

**********************************************************

找出年紀最大的追求者:

Bob is the oldest follower of David.

David is the oldest follower of Alice.

Charlie is the oldest follower of Fran.

Ed is the oldest follower of Charlie.

Ed does not have any followers.

Charlie is the oldest follower of Bob. 

**********************************************************

實用操做

**********************************************************

找出5到各頂點的最短:

(4,4.0)

(1,5.0)

(6,3.0)

(3,8.0)

(5,0.0)

(2,2.0)

clip_image025

3.2 PageRank 演示

3.2.1 例子介紹

PageRank, 即網頁排名,又稱網頁級別、Google 左側排名或佩奇排名。它是Google 創始人拉里· 佩奇和謝爾蓋· 布林於1997 年構建早期的搜索系統原型時提出的連接分析算法。目前不少重要的連接分析算法都是在PageRank 算法基礎上衍生出來的。PageRank 是Google 用於用來標識網頁的等級/ 重要性的一種方法,是Google 用來衡量一個網站的好壞的惟一標準。在揉合了諸如Title 標識和Keywords 標識等全部其它因素以後,Google 經過PageRank 來調整結果,使那些更具「等級/ 重要性」的網頁在搜索結果中令網站排名得到提高,從而提升搜索結果的相關性和質量。

clip_image027

3.2.2 測試數據

在這裏測試數據爲頂點數據graphx-wiki-vertices.txt和邊數據graphx-wiki-edges.txt,能夠在本系列附帶資源/data/class9/目錄中找到這兩個數據文件,其中格式爲:

l  頂點爲頂點編號和網頁標題

clip_image029

l  邊數據由兩個頂點構成

clip_image031

3.2.3 程序代碼

import org.apache.log4j.{Level, Logger}

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.spark.graphx._

import org.apache.spark.rdd.RDD

 

object PageRank {

  def main(args: Array[String]) {

    //屏蔽日誌

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) 

    //設置運行環境

    val conf = new SparkConf().setAppName("PageRank").setMaster("local")

    val sc = new SparkContext(conf) 

    //讀入數據文件

    val articles: RDD[String] = sc.textFile("/home/hadoop/IdeaProjects/data/graphx/graphx-wiki-vertices.txt")

    val links: RDD[String] = sc.textFile("/home/hadoop/IdeaProjects/data/graphx/graphx-wiki-edges.txt") 

    //裝載頂點和邊

    val vertices = articles.map { line =>

      val fields = line.split('\t')

      (fields(0).toLong, fields(1))

    } 

    val edges = links.map { line =>

      val fields = line.split('\t')

      Edge(fields(0).toLong, fields(1).toLong, 0)

    } 

    //cache操做

    //val graph = Graph(vertices, edges, "").persist(StorageLevel.MEMORY_ONLY_SER)

    val graph = Graph(vertices, edges, "").persist()

    //graph.unpersistVertices(false) 

    //測試

    println("**********************************************************")

    println("獲取5個triplet信息")

    println("**********************************************************")

    graph.triplets.take(5).foreach(println(_)) 

    //pageRank算法裏面的時候使用了cache(),故前面persist的時候只能使用MEMORY_ONLY

    println("**********************************************************")

    println("PageRank計算,獲取最有價值的數據")

    println("**********************************************************")

    val prGraph = graph.pageRank(0.001).cache() 

    val titleAndPrGraph = graph.outerJoinVertices(prGraph.vertices) {

      (v, title, rank) => (rank.getOrElse(0.0), title)

    } 

    titleAndPrGraph.vertices.top(10) {

      Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1)

    }.foreach(t => println(t._2._2 + ": " + t._2._1)) 

    sc.stop()

  }

}

3.2.4 運行結果

在IDEA中首先對PageRank.scala代碼進行編譯,編譯經過後進行執行,執行結果以下:

**********************************************************

獲取5個triplet信息

**********************************************************

((146271392968588,Computer Consoles Inc.),(7097126743572404313,Berkeley Software Distribution),0)

((146271392968588,Computer Consoles Inc.),(8830299306937918434,University of California, Berkeley),0)

((625290464179456,List of Penguin Classics),(1735121673437871410,George Berkeley),0)

((1342848262636510,List of college swimming and diving teams),(8830299306937918434,University of California, Berkeley),0)

((1889887370673623,Anthony Pawson),(8830299306937918434,University of California, Berkeley),0) 

**********************************************************

PageRank計算,獲取最有價值的數據

**********************************************************

University of California, Berkeley: 1321.111754312097

Berkeley, California: 664.8841977233583

Uc berkeley: 162.50132743397873

Berkeley Software Distribution: 90.4786038848606

Lawrence Berkeley National Laboratory: 81.90404939641944

George Berkeley: 81.85226118457985

Busby Berkeley: 47.871998218019655

Berkeley Hills: 44.76406979519754

Xander Berkeley: 30.324075347288037

Berkeley County, South Carolina: 28.908336483710308

clip_image033

4、參考資料

(1)《GraphX:基於Spark的彈性分佈式圖計算系統》 http://lidrema.blog.163.com/blog/static/20970214820147199643788/

(2)《快刀初試:Spark GraphX在淘寶的實踐》 http://www.csdn.net/article/2014-08-07/2821097

概述

  • GraphX是 Spark中用於圖(如Web-Graphs and Social Networks)和圖並行計算(如 PageRank and Collaborative Filtering)的API,能夠認爲是GraphLab(C++)和Pregel(C++)在Spark(Scala)上的重寫及優化,跟其餘分佈式 圖計算框架相比,GraphX最大的貢獻是,在Spark之上提供一站式數據解決方案,能夠方便且高效地完成圖計算的一整套流水做業。
  • Graphx是Spark生態中的很是重要的組件,融合了圖並行以及數據並行的優點,雖然在單純的計算機段的性能相比不如GraphLab等計算框架,可是若是從整個圖處理流水線的視角(圖構建,圖合併,最終結果的查詢)看,那麼性能就很是具備競爭性了。 
    這裏寫圖片描述

圖計算應用場景

    「圖計算」是以「圖論」爲基礎的對現實世界的一種「圖」結構的抽象表達,以及在這種數據結構上的計算模式。一般,在圖計算中,基本的數據結構表達就是:G = (V,E,D) V = vertex (頂點或者節點) E = edge (邊) D = data (權重)。 
    圖數據結構很好的表達了數據之間的關聯性,所以,不少應用中出現的問題均可以抽象成圖來表示,以圖論的思想或者以圖爲基礎創建模型來解決問題。 
下面是一些圖計算的應用場景: 
PageRank讓連接來」投票」 
基於GraphX的社區發現算法FastUnfolding分佈式實現 
http://bbs.pinggu.org/thread-3614747-1-1.html 
社交網絡分析 
如基於Louvian社區發現的新浪微博社交網絡分析 
社交網絡最適合用圖來表達和計算了,圖的「頂點」表示社交中的人,「邊」表示人與人之間的關係。 
基於三角形計數的關係衡量 
基於隨機遊走的用戶屬性傳播 
推薦應用 
如淘寶推薦商品,騰訊推薦好友等等(一樣是基於社交網絡這個大數據,能夠很好構建一張大圖) 
淘寶應用 
度分佈、二跳鄰居數、連通圖、多圖合併、能量傳播模型 
全部的關係均可以從「圖」的角度來看待和處理,但到底一個關係的價值多大?健康與否?適合用於什麼場景? 
快刀初試:Spark GraphX在淘寶的實踐 
http://www.csdn.net/article/2014-08-07/2821097

Spark中圖的創建及圖的基本操做

圖的構建

          首先利用「頂點」和「邊」RDD創建一個簡單的屬性圖,經過這個例子,瞭解完整的GraphX圖構建的基本流程。 
          以下圖所示,頂點的屬性包含用戶的姓名和職業,帶標註的邊表示不一樣用戶之間的關係。 
這裏寫圖片描述

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

object myGraphX {

  def main(args:Array[String]){

    // Create the context 
    val sparkConf = new SparkConf().setAppName("myGraphPractice").setMaster("local[2]")
    val sc=new SparkContext(sparkConf) 

    // 頂點RDD[頂點的id,頂點的屬性值]
    val users: RDD[(VertexId, (String, String))] =
      sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
    // 邊RDD[起始點id,終點id,邊的屬性(邊的標註,邊的權重等)]
    val relationships: RDD[Edge[String]] =
      sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
    // 默認(缺失)用戶
    //Define a default user in case there are relationship with missing user
    val defaultUser = ("John Doe", "Missing")

    //使用RDDs創建一個Graph(有許多創建Graph的數據來源和方法,後面會詳細介紹)
    val graph = Graph(users, relationships, defaultUser)     
  }
}

          上面是一個簡單的例子,說明如何創建一個屬性圖,那麼創建一個圖主要有哪些方法呢?咱們先看圖的定義:

object Graph {
  def apply[VD, ED](
      vertices: RDD[(VertexId, VD)],
      edges: RDD[Edge[ED]],
      defaultVertexAttr: VD = null)
    : Graph[VD, ED]

  def fromEdges[VD, ED](
      edges: RDD[Edge[ED]],
      defaultValue: VD): Graph[VD, ED]

  def fromEdgeTuples[VD](
      rawEdges: RDD[(VertexId, VertexId)],
      defaultValue: VD,
      uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]

}
  •           由上面的定義咱們能夠看到,GraphX主要有三種方法能夠創建圖: 

          (1)在構造圖的時候,會自動使用apply方法,所以前面那個例子中實際上就是使用apply方法。至關於Java/C++語言的構造函數。有三個參數,分別是:vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]], defaultVertexAttr: VD = null),前兩個必須有,最後一個可選擇。「頂點「和」邊「的RDD來自不一樣的數據源,與Spark中其餘RDD的創建並無區別。 
          這裏再舉讀取文件,產生RDD,而後利用RDD創建圖的例子:

(1)讀取文件,創建頂點和邊的RRD,而後利用RDD創建屬性圖

//讀入數據文件
val articles: RDD[String] = sc.textFile("E:/data/graphx/graphx-wiki-vertices.txt")
val links: RDD[String] = sc.textFile("E:/data/graphx/graphx-wiki-edges.txt")

//裝載「頂點」和「邊」RDD
val vertices = articles.map { line =>
    val fields = line.split('\t')
      (fields(0).toLong, fields(1))
    }//注意第一列爲vertexId,必須爲Long,第二列爲頂點屬性,能夠爲任意類型,包括Map等序列。

val edges = links.map { line =>
    val fields = line.split('\t')
      Edge(fields(0).toLong, fields(1).toLong, 1L)//起始點ID必須爲Long,最後一個是屬性,能夠爲任意類型
    }
//創建圖
val graph = Graph(vertices, edges, "").persist()//自動使用apply方法創建圖

(2)Graph.fromEdges方法:這種方法相對而言最爲簡單,只是由」邊」RDD創建圖,由邊RDD中出現全部「頂點」(不管是起始點src仍是終點dst)自動產生頂點vertextId,頂點的屬性將被設置爲一個默認值。 
      Graph.fromEdges allows creating a graph from only an RDD of edges, automatically creating any vertices mentioned by edges and assigning them the default value. 
          舉例以下:

//讀入數據文件 
val records: RDD[String] = sc.textFile("/microblogPCU/microblogPCU/follower_followee")   
//微博數據:000000261066,郜振博585,3044070630,redashuaicheng,1929305865,1994,229,3472,male,first
// 第三列是粉絲Id:3044070630,第五列是用戶Id:1929305865
val followers=records.map {case x => val fields=x.split(",")
          Edge(fields(2).toLong, fields(4).toLong,1L )       
      }    
val graph=Graph.fromEdges(followers, 1L)

(3)Graph.fromEdgeTuples方法 
          Graph.fromEdgeTuples allows creating a graph from only an RDD of edge tuples, assigning the edges the value 1, and automatically creating any vertices mentioned by edges and assigning them the default value. It also supports deduplicating the edges; to deduplicate, pass Some of a PartitionStrategy as the uniqueEdges parameter (for example, uniqueEdges = Some(PartitionStrategy.RandomVertexCut)). A partition strategy is necessary to colocate identical edges on the same partition so they can be deduplicated.

          除了三種方法,還能夠用GraphLoader構建圖。以下面GraphLoader.edgeListFile: 
(4)GraphLoader.edgeListFile創建圖的基本結構,而後Join屬性 
(a)首先創建圖的基本結構: 
          利用GraphLoader.edgeListFile函數從邊List文件中創建圖的基本結構(全部「頂點」+「邊」),且頂點和邊的屬性都默認爲1。

object GraphLoader {
  def edgeListFile(
      sc: SparkContext,
      path: String,
      canonicalOrientation: Boolean = false,
      minEdgePartitions: Int = 1)
    : Graph[Int, Int]
}

使用方法以下:

val graph=GraphLoader.edgeListFile(sc, "/data/graphx/followers.txt") 
//文件的格式以下:
//2 1
//4 1
//1 2 依次爲第一個頂點和第二個頂點

(b)而後讀取屬性文件,得到RDD後和(1)中獲得的基本結構圖join在一塊兒,就能夠組合成完整的屬性圖。

三種視圖及操做

  Spark中圖有如下三種視圖能夠訪問,分別經過graph.vertices,graph.edges,graph.triplets來訪問。 
這裏寫圖片描述

          在Scala語言中,能夠用case語句進行形式簡單、功能強大的模式匹配

//假設graph頂點屬性(String,Int)-(name,age),邊有一個權重(int)
val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)
用case匹配能夠很方便訪問頂點和邊的屬性及id
graph.vertices.map{
      case (id,(name,age))=>//利用case進行匹配
        (age,name)//能夠在這裏加上本身想要的任何轉換
    }

graph.edges.map{
      case Edge(srcid,dstid,weight)=>//利用case進行匹配
        (dstid,weight*0.01)//能夠在這裏加上本身想要的任何轉換
    }

          也能夠經過下標訪問

graph.vertices.map{
      v=>(v._1,v._2._1,v._2._2)//v._1,v._2._1,v._2._2分別對應Id,name,age
}

graph.edges.map {
      e=>(e.attr,e.srcId,e.dstId)
}

graph.triplets.map{
      triplet=>(triplet.srcAttr._1,triplet.dstAttr._2,triplet.srcId,triplet.dstId)
    }

     能夠不用graph.vertices先提取頂點再map的方法,也能夠經過graph.mapVertices直接對頂點進行map,返回是相同結構的另外一個Graph,訪問屬性的方法和上述方法是如出一轍的。以下:

graph.mapVertices{
      case (id,(name,age))=>//利用case進行匹配
        (age,name)//能夠在這裏加上本身想要的任何轉換
}

graph.mapEdges(e=>(e.attr,e.srcId,e.dstId))

graph.mapTriplets(triplet=>(triplet.srcAttr._1))

Spark GraphX中的圖的函數大全

/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
  // Information about the Graph 
 //圖的基本信息統計 ===================================================================
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]

  // Views of the graph as collections 
 // 圖的三種視圖 =============================================================
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
  val triplets: RDD[EdgeTriplet[VD, ED]]

  // Functions for caching graphs ==================================================================
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def cache(): Graph[VD, ED]
  def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
  // Change the partitioning heuristic  ============================================================
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]

  // Transform vertex and edge attributes 
 // 基本的轉換操做 ==========================================================
  def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
 : Graph[VD, ED2]

  // Modify the graph structure 
 //圖的結構操做(僅給出四種基本的操做,子圖提取是比較重要的操做) ====================================================================
  def reverse: Graph[VD, ED]
  def subgraph(
 epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
 vpred: (VertexID, VD) => Boolean = ((v, d) => true))
 : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]

  // Join RDDs with the graph 
  // 兩種聚合方式,能夠完成各類圖的聚合操做  ======================================================================
  def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])
 (mapFunc: (VertexID, VD, Option[U]) => VD2)

  // Aggregate information about adjacent triplets 
  //圖的鄰邊信息聚合,collectNeighborIds都是效率不高的操做,優先使用aggregateMessages,這也是GraphX最重要的操做之一。
  =================================================
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]]
  def aggregateMessages[Msg: ClassTag](
 sendMsg: EdgeContext[VD, ED, Msg] => Unit,
 mergeMsg: (Msg, Msg) => Msg,
 tripletFields: TripletFields = TripletFields.All)
 : VertexRDD[A]

  // Iterative graph-parallel computation ==========================================================
  def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
 vprog: (VertexID, VD, A) => VD,
 sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
 mergeMsg: (A, A) => A)
 : Graph[VD, ED]

  // Basic graph algorithms 
  //圖的算法API(目前給出了三類四個API)  ========================================================================
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  def connectedComponents(): Graph[VertexID, ED]
  def triangleCount(): Graph[Int, ED]
  def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED]
}

結構操做

Structural Operators 
      Spark2.0版本中,僅僅有四種最基本的結構操做,將來將開發更多的結構操做。

class Graph[VD, ED] { def reverse: Graph[VD, ED] def subgraph(epred: EdgeTriplet[VD,ED] => Boolean, vpred: (VertexId, VD) => Boolean): Graph[VD, ED] def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED] }

子圖subgraph

      子圖(subgraph)是圖論的基本概念之一。子圖是指節點集和邊集分別是某一圖的節點集的子集和邊集的子集的圖。 
  Spark API–subgraph利用EdgeTriplet(epred)或/和頂點(vpred)知足必定條件,來提取子圖。利用這個操做可使頂點和邊被限制在感興趣的範圍內,好比刪除失效的連接。 
        The subgraph operator takes vertex and edge predicates and returns the graph containing only the vertices that satisfy the vertex predicate (evaluate to true) and edges that satisfy the edge predicate and connect vertices that satisfy the vertex predicate. The subgraph operator can be used in number of situations to restrict the graph to the vertices and edges of interest or eliminate broken links. For example in the following code we remove broken links:

//假設graph有以下的頂點和邊 頂點RDD(id,(name,age) 邊上有一個Int權重(屬性)
(4,(David,42))(6,(Fran,50))(2,(Bob,27)) (1,(Alice,28))(3,(Charlie,65))(5,(Ed,55))
Edge(5,3,8)Edge(2,1,7)Edge(3,2,4) Edge(5,6,3)Edge(3,6,3)

//可使用如下三種操做方法獲取知足條件的子圖
//方法1,對頂點進行操做
val subGraph1=graph.subgraph(vpred=(id,attr)=>attr._2>30) //vpred=(id,attr)=>attr._2>30 頂點vpred第二個屬性(age)>30歲
subGraph1.vertices.foreach(print)
println
subGraph1.edges.foreach {print}
println
輸出結果:
頂點:(4,(David,42))(6,(Fran,50))(3,(Charlie,65))(5,(Ed,55))
邊:Edge(3,6,3)Edge(5,3,8)Edge(5,6,3)

//方法2--對EdgeTriplet進行操做
val subGraph2=graph.subgraph(epred=>epred.attr>2)
//epred(邊)的屬性(權重)大於2
輸出結果:
頂點:(4,(David,42))(6,(Fran,50))(2,(Bob,27))(1,(Alice,28)) (3,(Charlie,65))(5,(Ed,55))
邊:Edge(5,3,8)Edge(5,6,3)Edge(2,1,7)Edge(3,2,4) Edge(3,6,3)
//也能夠定義以下的操做
val subGraph2=graph.subgraph(epred=>pred.srcAttr._2<epred.dstAttr._2))
//起始頂點的年齡小於終點頂點年齡
頂點:1,(Alice,28))(4,(David,42))(3,(Charlie,65))(6,(Fran,50)) (2,(Bob,27))(5,(Ed,55))
邊 :Edge(5,3,8)Edge(2,1,7)Edge(2,4,2)

//方法3--對頂點和邊Triplet兩種同時操做「,」號隔開epred和vpred
val subGraph3=graph.subgraph(epred=>epred.attr>3,vpred=(id,attr)=>attr._2>30) 輸出結果: 頂點:(3,(Charlie,65))(5,(Ed,55))(4,(David,42))(6,(Fran,50)) 邊:Edge(5,3,8)

圖的基本信息統計-度計算

度分佈:這是一個圖最基礎和重要的指標。度分佈檢測的目的,主要是瞭解圖中「超級節點」的個數和規模,以及全部節點度的分佈曲線。超級節點的存在對各類傳播算法都會有重大的影響(不管是正面助力仍是反面阻力),所以要預先對這些數據量有個預估。藉助GraphX最基本的圖信息接口degrees: VertexRDD[Int](包括inDegrees和outDegrees),這個指標能夠輕鬆計算出來,並進行各類各樣的統計(摘自《快刀初試:Spark GraphX在淘寶的實踐》。

//-----------------度的Reduce,統計度的最大值-----------------
def max(a:(VertexId,Int),b:(VertexId,Int)):(VertexId,Int)={
            if (a._2>b._2) a  else b }

val totalDegree=graph.degrees.reduce((a,b)=>max(a, b)) val inDegree=graph.inDegrees.reduce((a,b)=>max(a,b)) val outDegree=graph.outDegrees.reduce((a,b)=>max(a,b)) print("max total Degree = "+totalDegree) print("max in Degree = "+inDegree) print("max out Degree = "+outDegree) //小技巧:如何知道ab的類型爲(VertexId,Int)? //當你敲完graph.degrees.reduce((a,b)=>,再將鼠標點到a和b上查看, //就會發現a和b是(VertexId,Int),固然reduce後的返回值也是(VertexId,Int) //這樣就很清楚本身該如何定義max函數了 //平均度 val sumOfDegree=graph.degrees.map(x=>(x._2.toLong)).reduce((a,b)=>a+b) val meanDegree=sumOfDegree.toDouble/graph.vertices.count().toDouble print("meanDegree "+meanDegree) println //------------------使用RDD自帶的統計函數進行度分佈分析-------- //度的統計分析 //最大,最小 val degree2=graph.degrees.map(a=>(a._2,a._1)) //graph.degrees是VertexRDD[Int],即(VertexID,Int)。 //經過上面map調換成map(a=>(a._2,a._1)),即RDD[(Int,VetexId)] //這樣下面就能夠將度(Int)看成鍵值(key)來操做了, //包括下面的min,max,sortByKey,top等等,由於這些函數都是對第一個值也就是key操做的 //max degree print("max degree = " + (degree2.max()._2,degree2.max()._1)) println //min degree print("min degree =" +(degree2.min()._2,degree2.min()._1)) println //top(N) degree"超級節點" print("top 3 degrees:\n") degree2.sortByKey(true, 1).top(3).foreach(x=>print(x._2,x._1)) println /*輸出結果: * max degree = (2,4)//(Vetext,degree) * min degree =(1,2) * top 3 degrees: * (2,4)(5,3)(3,3) */ 

相鄰聚合—消息聚合

       相鄰聚合(Neighborhood Aggregation) 
       圖分析任務的一個關鍵步驟是彙總每一個頂點附近的信息。例如咱們可能想知道每一個用戶的追隨者的數量或者每一個用戶的追隨者的平均年齡。許多迭代圖算法(如PageRank,最短路徑和連通體) 屢次聚合相鄰頂點的屬性。 
       聚合消息(aggregateMessages) 
 GraphX中的核心聚合操做是 aggregateMessages,它主要功能是向鄰邊發消息,合併鄰邊收到的消息,返回messageRDD。這個操做將用戶定義的sendMsg函數應用到圖的每一個邊三元組(edge triplet),而後應用mergeMsg函數在其目的頂點聚合這些消息。

class Graph[VD, ED] { def aggregateMessages[Msg: ClassTag]( sendMsg: EdgeContext[VD, ED, Msg] => Unit,//(1)--sendMsg:向鄰邊發消息,至關與MR中的Map函數 mergeMsg: (Msg, Msg) => Msg,//(2)--mergeMsg:合併鄰邊收到的消息,至關於Reduce函數 tripletFields: TripletFields = TripletFields.All)//(3)可選項,TripletFields.Src/Dst/All : VertexRDD[Msg]//(4)--返回messageRDD }

(1)sendMsg: 
        將sendMsg函數看作map-reduce過程當中的map函數,向鄰邊發消息,應用到圖的每一個邊三元組(edge triplet),即函數的左側爲每一個邊三元組(edge triplet)。 
    The user defined sendMsg function takes an EdgeContext, which exposes the source and destination attributes along with the edge attribute and functions (sendToSrc, and sendToDst) to send messages to the source and destination attributes. Think of sendMsg as the map function in map-reduce.

//關鍵數據結構EdgeContext源碼解析

package org.apache.spark.graphx

/** * Represents an edge along with its neighboring vertices and allows sending messages along the * edge. Used in [[Graph#aggregateMessages]]. */
abstract class EdgeContext[VD, ED, A] {//三個類型分別是:頂點、邊、自定義發送消息的類型(返回值的類型)
  /** The vertex id of the edge's source vertex. */
  def srcId: VertexId
  /** The vertex id of the edge's destination vertex. */
  def dstId: VertexId
  /** The vertex attribute of the edge's source vertex. */
  def srcAttr: VD
  /** The vertex attribute of the edge's destination vertex. */
  def dstAttr: VD
  /** The attribute associated with the edge. */
  def attr: ED

  /** Sends a message to the source vertex. */
  def sendToSrc(msg: A): Unit
  /** Sends a message to the destination vertex. */
  def sendToDst(msg: A): Unit

  /** Converts the edge and vertex properties into an [[EdgeTriplet]] for convenience. */
  def toEdgeTriplet: EdgeTriplet[VD, ED] = {
    val et = new EdgeTriplet[VD, ED]
    et.srcId = srcId
    et.srcAttr = srcAttr
    et.dstId = dstId
    et.dstAttr = dstAttr
    et.attr = attr
    et
  }
}

(2)mergeMsg : 
        用戶自定義的mergeMsg函數指定兩個消息到相同的頂點並保存爲一個消息。能夠將mergeMsg函數看作map-reduce過程當中的reduce函數。

    The user defined mergeMsg function takes two messages destined to the same vertex and yields a single message. Think of mergeMsg as the reduce function in map-reduce.

這裏寫代碼片

(3)TripletFields可選項 
        它指出哪些數據將被訪問(源頂點特徵,目的頂點特徵或者二者同時,即有三種可選擇的值:TripletFields.Src,TripletFieldsDst,TripletFields.All。 
      所以這個參數的做用是通知GraphX僅僅只須要EdgeContext的一部分參與計算,是一個優化的鏈接策略。例如,若是咱們想計算每一個用戶的追隨者的平均年齡,咱們僅僅只須要源字段。 因此咱們用TripletFields.Src表示咱們僅僅只須要源字段。 
     takes an optional tripletsFields which indicates what data is accessed in the EdgeContext (i.e., the source vertex attribute but not the destination vertex attribute). The possible options for the tripletsFields are defined in TripletFields and the default value is TripletFields.All which indicates that the user defined sendMsg function may access any of the fields in the EdgeContext. The tripletFields argument can be used to notify GraphX that only part of the EdgeContext will be needed allowing GraphX to select an optimized join strategy. For example if we are computing the average age of the followers of each user we would only require the source field and so we would use TripletFields.Src to indicate that we only require the source field

(4)返回值: 
    The aggregateMessages operator returns a VertexRDD[Msg] containing the aggregate message (of type Msg) destined to each vertex. Vertices that did not receive a message are not included in the returned VertexRDD.

//假設已經定義好以下圖:
//頂點:[Id,(name,age)]
//(4,(David,18))(1,(Alice,28))(6,(Fran,40))(3,(Charlie,30))(2,(Bob,70))(5,Ed,55))
//邊:Edge(4,2,2)Edge(2,1,7)Edge(4,5,8)Edge(2,4,2)Edge(5,6,3)Edge(3,2,4)
// Edge(6,1,2)Edge(3,6,3)Edge(6,2,8)Edge(4,1,1)Edge(6,4,3)(4,(2,110))

//定義一個相鄰聚合,統計比本身年紀大的粉絲數(count)及其平均年齡(totalAge/count)
val olderFollowers=graph.aggregateMessages[(Int,Int)](
//方括號內的元組(Int,Int)是函數返回值的類型,也就是Reduce函數(mergeMsg )右側獲得的值(count,totalAge)
        triplet=> {
          if(triplet.srcAttr._2>triplet.dstAttr._2){            
              triplet.sendToDst((1,triplet.srcAttr._2))
          }
        },//(1)--函數左側是邊三元組,也就是對邊三元組進行操做,有兩種發送方式sendToSrc和 sendToDst
        (a,b)=>(a._1+b._1,a._2+b._2),//(2)至關於Reduce函數,a,b各表明一個元組(count,Age)
        //對count和Age不斷相加(reduce),最終獲得總的count和totalAge
        TripletFields.All)//(3)可選項,TripletFields.All/Src/Dst
olderFollowers.collect().foreach(println)
輸出結果:
(4,(2,110))//頂點Id=4的用戶,有2個年齡比本身大的粉絲,同年齡是110歲
(6,(1,55))
(1,(2,110))

//計算平均年齡
val averageOfOlderFollowers=olderFollowers.mapValues((id,value)=>value match{
      case (count,totalAge) =>(count,totalAge/count)//因爲不是全部頂點都有結果,因此用match-case語句
    })    

averageOfOlderFollowers.foreach(print)  
輸出結果:
(1,(2,55))(4,(2,55))(6,(1,55))//Id=1的用戶,有2個粉絲,平均年齡是55歲

Spark Join鏈接操做

         許多狀況下,須要將圖與外部獲取的RDDs進行鏈接。好比將一個額外的屬性添加到一個已經存在的圖上,或者將頂點屬性從一個圖導出到另外一圖中(在本身編寫圖計算API 時,每每須要屢次進行aggregateMessages和Join操做,所以這兩個操做能夠說是Graphx中很是重要的操做,須要很是熟練地掌握,在本文最後的實例中,有更多的例子可供學習) 
         In many cases it is necessary to join data from external collections (RDDs) with graphs. For example, we might have extra user properties that we want to merge with an existing graph or we might want to pull vertex properties from one graph into another.

有兩個join API可供使用:

class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
 : Graph[VD, ED]

  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
 : Graph[VD2, ED]
}

         兩個鏈接方式差異很是大。下面分別來講明

joinVertices鏈接

          返回值的類型就是graph頂點屬性的類型,不能新增,也不能夠減小(即不能改變原始graph頂點屬性類型和個數)。 
         常常會遇到這樣的情形,」一個額外的費用(extraCost)增長到老的費用(oldCost)中」,oldCost爲graph的頂點屬性值,extraCost來自外部RDD,這時候就要用到joinVertices: 
         extraCosts: RDD[(VertexID, Double)]//額外的費用 
         graph:Graph[Double,Long]//oldCost 
         val totlCosts = graph.joinVertices(extraCosts)( (id, oldCost, extraCost) => oldCost + extraCost) 
         //extraCost和oldCost數據類型一致,且返回時無需改變原始graph頂點屬性的類型。

再舉一個例子:

// 假設graph的頂點以下[id,(user_name,initial_energy)]
//(6,(Fran,0))(2,(Bob,3))(4,(David,3))(3,(Charlie,1))(1,(Alice,2))(5,(Ed,2))

// graph邊以下:
//Edge(2,1,1)Edge(2,4,1)Edge(4,1,1)Edge(5,2,1)Edge(5,3,1)Edge(5,6,1)Edge(3,2,1)Edge(3,6,1)

// 每一個src向dst鄰居發送生命值爲2能量
val energys=graph.aggregateMessages[Long](
            triplet=>triplet.sendToDst(2), (a,b)=>a+b)      

// 輸出結果:
// (1,4)(4,2)(3,2)(6,4)(2,4)
val energys_name=graph.joinVertices(energys){
              case(id,(name,initialEnergy),energy)=>(name,initialEnergy+energy)
              }
//輸出結果:
// (3,(Charlie,3))(1,(Alice,6))(5,(Ed,2))(4,(David,5))(6,(Fran,4))(2,(Bob,7))

// 咱們注意到,若是energys:RDD中沒有graph某些頂點對應的值,則graph不進行任何改變,如(5,(Ed,2))。

         從上面的例子咱們知道:將外部RDD joinvertices到graph中,對應於graph某些頂點,RDD中無對應的屬性,則保留graph原有屬性值不進行任何改變。 
         而與之相反的是另外一種狀況,對應於graph某一些頂點,RDD中的值不止一個,這種狀況下將只有一個值在join時起做用。能夠先使用aggregateUsingIndex的進行reduce操做,而後再join graph。

val nonUniqueCosts: RDD[(VertexID, Double)]
val uniqueCosts: VertexRDD[Double] =
  graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b) val joinedGraph = graph.joinVertices(uniqueCosts)( (id, oldCost, extraCost) => oldCost + extraCost)

         If the RDD contains more than one value for a given vertex only one will be used. It is therefore recommended that the input RDD be made unique using the following which will also pre-index the resulting values to substantially accelerate the subsequent join.

(2)outerJoinVertices

         更爲經常使用,使用起來也更加自由的是outerJoinVertices,至於爲何後面會詳細分析。 
         The more general outerJoinVertices behaves similarly to joinVertices except that the user defined map function is applied to all vertices and can change the vertex property type. Because not all vertices may have a matching value in the input RDD the map function takes an Option type.

         從下面函數的定義咱們注意到,與前面JoinVertices不一樣之處在於map函數右側類型是VD2,再也不是VD,所以不受原圖graph頂點屬性類型VD的限制,在outerJoinVertices中使用者能夠隨意定義本身想要的返回類型,從而能夠徹底改變圖的頂點屬性值的類型和屬性的個數。

class Graph[VD, ED] {

  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
 : Graph[VD2, ED]

}

用上面例子中的graph和energys數據:

val graph_energy_total=graph.outerJoinVertices(energys){
      case(id,(name,initialEnergy),Some(energy))=>(name,initialEnergy,energy,initialEnergy+energy)
      case(id,(name,initialEnergy),None)=>(name,initialEnergy,0,initialEnergy)
    }

// 輸出結果:
// (3,(Charlie,1,2,3))(1,(Alice,2,4,6))(5,(Ed,2,0,2))
// (4,(David,3,2,5))(6,(Fran,0,4,4))(2,(Bob,3,4,7))

Spark Scala幾個語法問題

(1)遇到null怎麼處理? 
可參考【Scala】使用Option、Some、None,避免使用null 
http://www.jianshu.com/p/95896d06a94d

         大多數語言都有一個特殊的關鍵字或者對象來表示一個對象引用的是「無」,在Java,它是null。 
         Scala鼓勵你在變量和函數返回值可能不會引用任何值的時候使用Option類型。在沒有值的時候,使用None,這是Option的一個子類。若是有值能夠引用,就使用Some來包含這個值。Some也是Option的子類。 
         經過模式匹配分離可選值,若是匹配的值是Some的話,將Some裏的值抽出賦給x變量。舉一個綜合的例子:

def showCapital(x: Option[String]) = x match {
    case Some(s) => s
    case None => "?"
}

/*
Option用法:Scala推薦使用Option類型來表明一些可選值。使用Option類型,讀者一眼就能夠看出這種類型的值可能爲None。
如上面:x: Option[String])參數,就是由於參數多是String,也可能爲null,這樣程序不會在爲null時拋出異常
*/

Spark中,常用在map中使用case語句進行匹配None和Some,再舉一個例子

//假設graph.Vertice:(id,(name,weight))以下:
//(4,(David,Some(2)))(3,(Charlie,Some(2)))(6,(Fran,Some(4)))(2,(Bob,Some(4)))(1,(Alice,Some(4)))(5,(Ed,None))
//id=5時,weight=None,其餘的爲Some

val weights=graph.vertices.map{
      case (id,(name,Some(weight)))=>(id,weight)
      case (id,(name,None))=>(id,0)
    }    
weights.foreach(print)
println

//輸出結果以下(id,weight):
//(3,2)(6,4)(2,4)(4,2)(1,4)(5,0)

在上面的例子中,其實咱們也能夠選用另一個方法,getOrElse。這個方法在這個Option是Some的實例時返回對應的值,而在是None的實例時返函數參數。 
上面例子能夠用下面的語句得到一樣的結果:

val weights=graph.vertices.map{
      attr=>(attr._1,attr._2._2.getOrElse(0))
      //若是attr._2._2!=None,返回attr._2._2(weight)的值,
      //不然(即attr._2._2==None),返回本身設置的函數參數(0)
    }

//輸出一樣的結果:
//(id,weight)
(4,2)(6,4)(2,4)(3,2)(1,4)(5,0)

圖算法工具包

1.數三角形

TriangleCount主要用途之一是用於社區發現,以下圖所示: 
這裏寫圖片描述 
例如說在微博上你關注的人也互相關注,你們的關注關係中就會有不少三角形,這說明社區很強很穩定,你們的聯繫都比較緊密;若是說只是你一我的關注不少人,這說明你的社交羣體是很是小的。(摘自《大數據Spark企業級實戰》一書)

graph.triangleCount().vertices.foreach(x=>print(x+"\n"))
    /*輸出結果 * (1,1)//頂點1有1個三角形 * (3,2)//頂點3有2個三角形 * (5,2) * (4,1) * (6,1) * (2,2) */

2.連通圖

        現實生活中存在各類各樣的網絡,諸如人際關係網、交易網、運輸網等等。對這些網絡進行社區發現具備極大的意義,如在人際關係網中,能夠發現出具備不一樣興趣、背景的社會團體,方便進行不一樣的宣傳策略;在交易網中,不一樣的社區表明不一樣購買力的客戶羣體,方便運營爲他們推薦合適的商品;在資金網絡中,社區有多是潛在的洗錢團伙、刷鑽聯盟,方便安所有門進行相應處理;在類似店鋪網絡中,社區發現能夠檢測出商幫、價格聯盟等,對商家進行指導等等。總的來看,社區發如今各類具體的網絡中都能有重點的應用場景,圖1展現了基於圖的拓撲結構進行社區發現的例子。

這裏寫圖片描述

        檢測連通圖能夠弄清一個圖有幾個連通部分及每一個連通部分有多少頂點。這樣能夠將一個大圖分割爲多個小圖,並去掉零碎的連通部分,從而能夠在多個小子圖上進行更加精細的操做。目前,GraphX提供了ConnectedComponents和StronglyConnected-Components算法,使用它們能夠快速計算出相應的連通圖。 
        連通圖能夠進一步演化變成社區發現算法,而該算法優劣的評判標準之一,是計算模塊的Q值,來查看所謂的modularity狀況。 
         若是一個有向圖中的每對頂點均可以從經過路徑可達,那麼就稱這個圖是強連通的。一個 strongly connected component就是一個有向圖中最大的強連通子圖。下圖中就有三個強連通子圖: 
這裏寫圖片描述

//連通圖
def connectedComponents(maxIterations: Int): Graph[VertexId, ED]
def connectedComponents(): Graph[VertexId, ED]

//強連通圖
//numIter:the maximum number of iterations to run for
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
//連通圖計算社區發現
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.log4j.{Level, Logger}

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

object myConnectComponent {
  def main(args:Array[String]){    

    val sparkConf = new SparkConf().setAppName("myGraphPractice").setMaster("local[2]")
    val sc=new SparkContext(sparkConf) 
    //屏蔽日誌
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val graph=GraphLoader.edgeListFile(sc, "/spark-2.0.0-bin-hadoop2.6/data/graphx/followers.txt")    

    graph.vertices.foreach(print)
    println
    graph.edges.foreach(print)  
    println

    val cc=graph.connectedComponents().vertices
    cc.foreach(print)
    println 
    /*輸出結果 * (VertexId,cc) * (4,1)(1,1)(6,1)(3,1)(2,1)(7,1) */

    //強連通圖-stronglyConnectedComponents
    val maxIterations=10//the maximum number of iterations to run for
    val cc2=graph.stronglyConnectedComponents(maxIterations).vertices
    cc2.foreach(print)
    println 


    val path2="/spark-2.0.0-bin-hadoop2.6/data/graphx/users.txt"
    val users=sc.textFile(path2).map{//map 中包含多行 必須使用{}    
      line=>val fields=line.split(",")
      (fields(0).toLong,fields(1))//(id,name) 多行書寫 最後一行纔是返回值 且與上行splitsplit(",")之間要有換行
    }    
    users.collect().foreach { println}
    println
    /*輸出結果 (VertexId,name) * (1,BarackObama) * (2,ladygaga) * ... */


    val joint=cc.join(users)
    joint.collect().foreach { println}
    println

    /*輸出結果 * (VertexId,(cc,name)) * (4,(1,justinbieber)) * (6,(3,matei_zaharia)) */

    val name_cc=joint.map{
      case (VertexId,(cc,name))=>(name,cc)
    }
    name_cc.foreach(print)   
    /* * (name,cc) * (BarackObama,1)(jeresig,3)(odersky,3)(justinbieber,1)(matei_zaharia,3)(ladygaga,1) */

  }  

}

3.PageRank讓連接來」投票」

        一個頁面的「得票數」由全部鏈向它的頁面的重要性來決定,到一個頁面的超連接至關於對該頁投一票。一個頁面的PageRank是由全部鏈向它的頁面(「鏈入頁面」)的重要性通過遞歸算法獲得的。一個有較多鏈入的頁面會有較高的等級,相反若是一個頁面沒有任何鏈入頁面,那麼它沒有等級。 
這裏寫圖片描述

Spark Graphx實例直接參考: 
http://www.cnblogs.com/shishanyuan/p/4747793.html

def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
//兩個參數
//tol:the tolerance allowed at convergence (smaller => more accurate).
//tol越小計算結果越精確,可是會花更長的時間
//resetProb:the random reset probability (alpha)
//返回一個圖,頂點的屬性是PageRank(Double);邊的屬性是規範化的權重(Double)

Run a dynamic version of PageRank returning a graph with vertex attributes containing the PageRank and edge attributes containing the normalized edge weight.
val prGraph = graph.pageRank(tol=0.001).cache()

pregel

     在迭代計算中,釋放內存是必要的,在新圖產生後,須要快速將舊圖完全釋放掉,不然,十幾輪迭代後,會有內存泄漏問題,很快耗光做業緩存空間。可是直接使用Spark提供的API cache、unpersist和checkpoint,很是須要使用技巧。因此Spark官方文檔建議:對於迭代計算,建議使用Pregal API,它可以正確的釋放中間結果,這樣就不須要本身費心去操做了。 
     In iterative computations, uncaching may also be necessary for best performance.However, because graphs are composed of multiple RDDs, it can be difficult to unpersist them correctly. For iterative computation we recommend using the Pregel API, which correctly unpersists intermediate results. 
        圖是自然的迭代數據結構,頂點的屬性值依賴於鄰居的屬性值,而鄰居們的屬性值一樣也依賴於他們各自鄰居屬性值(即鄰居的鄰居)。許多重要的圖算法迭代式的從新計算頂點的屬性直到達到預設的迭代條件。這些迭代的圖算法被抽象成一系列圖並行操做。 
     Graphs are inherently recursive data structures as properties of vertices depend on properties of their neighbors which in turn depend on properties of their neighbors. As a consequence many important graph algorithms iteratively recompute the properties of each vertex until a fixed-point condition is reached. A range of graph-parallel abstractions have been proposed to express these iterative algorithms. GraphX exposes a variant of the Pregel API. 
      
     At a high level the Pregel operator in GraphX is a bulk-synchronous parallel messaging abstraction constrained to the topology of the graph. The Pregel operator executes in a series of super steps in which vertices receive the sum of their inbound messages from the previous super step, compute a new value for the vertex property, and then send messages to neighboring vertices in the next super step. Unlike Pregel, messages are computed in parallel as a function of the edge triplet and the message computation has access to both the source and destination vertex attributes. Vertices that do not receive a message are skipped within a super step. The Pregel operators terminates iteration and returns the final graph when there are no messages remaining.

Note, unlike more standard Pregel implementations, vertices in GraphX can only send messages to neighboring vertices and the message construction is done in parallel using a user defined messaging function. These constraints allow additional optimization within GraphX.

//Graphx 中pregel 所用到的主要優化:
1.   Caching for Iterative mrTriplets & Incremental Updates for Iterative mrTriplets :在
不少圖分析算法中,不一樣點的收斂速度變化很大。在迭代後期,只有不多的點會有更新。所以,對於沒有更新的點,下一
次 mrTriplets 計算時 EdgeRDD 無需更新相應點值的本地緩存,大幅下降了通訊開銷。

2. Indexing Active Edges :沒有更新的頂點在下一輪迭代時不須要向鄰居從新發送消息。所以, mrTriplets 
遍歷邊時,若是一條邊的鄰居點值在上一輪迭代時沒有更新,則直接跳過,避免了大量無用的計算和通訊。

3. Join Elimination : Triplet 是由一條邊和其兩個鄰居點組成的三元組,操做 Triplet 的 map 函數經常只
需訪問其兩個鄰居點值中的一個。例如,在 PageRank 計算中,一個點值的更新只與其源頂點的值有關,而與其所指向
的目的頂點的值無關。那麼在 mrTriplets 計算中,就不須要 VertexRDD 和 EdgeRDD 的 3-way join ,而只需
要 2-way join 。

全部這些優化使 GraphX 的性能逐漸逼近 GraphLab 。雖然還有必定差距,但一體化的流水線服務和豐富的編程接口,
能夠彌補性能的微小差距。

//pregel 操做計算過程分析:
class GraphOps[VD, ED] {
  def pregel[A]
      //包含兩個參數列表
      //第一個參數列表包含配置參數初始消息、最大迭代數、發送消息的邊的方向(默認是沿邊方向出)。
      //VD:頂點的數據類型。
      //ED:邊的數據類型
      //A:Pregel message的類型。
      //graph:輸入的圖
      //initialMsg:在第一次迭代的時候頂點收到的消息。

maxIterations:迭代的次數

      (initialMsg: A,
       maxIter: Int = Int.MaxValue,
       activeDir: EdgeDirection = EdgeDirection.Out)

      //第二個參數列表包含用戶 自定義的函數用來接收消息(vprog)、計算消息(sendMsg)、合併消息(mergeMsg)。 
      //vprog:用戶定義的頂點程序運行在每個頂點中,負責接收進來的信息,和計算新的頂點值。
      //在第一次迭代的時候,全部的頂點程序將會被默認的defaultMessage調用,
      //在次輪迭代中,頂點程序只有接收到message纔會被調用。 
      (vprog: (VertexId, VD, A) => VD,//vprog:
      //sendMsg:用戶提供的函數,應用於邊緣頂點在當前迭代中接收message
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      //用戶提供定義的函數,將兩個類型爲A的message合併爲一個類型爲A的message
      mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {

    // Receive the initial message at each vertex
    // 在第一次迭代的時候,全部的頂點都會接收到initialMsg消息,
    // 在次輪迭代的時候,若是頂點沒有接收到消息,verteProgram(vprog)就不會被調用。
    var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()

    // 使用mapReduceTriplets compute the messages(即map和reduce message,不斷減小messages)
    var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
    var activeMessages = messages.count()
    // Loop until no messages remain or maxIterations is achieved
    var i = 0
    while (activeMessages > 0 && i < maxIterations) {
      // Receive the messages and update the vertices.
      g = g.joinVertices(messages)(vprog).cache()
      val oldMessages = messages

      // Send new messages, skipping edges where neither side received a message. We must cache
      // messages so it can be materialized on the next line, allowing us to uncache the previous
      // iteration.
      messages = g.mapReduceTriplets(
        sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
      activeMessages = messages.count()
      i += 1
    }
    g
  }
}
整個過程不是很容易理解,更詳細的計算過程分析能夠參考:
Spark的Graphx學習筆記--Pregel:http://www.ithao123.cn/content-3510265.html

總之,把握住整個迭代過程: 
vertexProgram(vprog)在第一次在初始化的時候,會在全部頂點上運行,以後,只有接收到消息的頂點纔會運行vertexProgram,重複這個步驟直到迭代條件。

//計算最短路徑代碼
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

object myPregal {
  def main(args:Array[String]){

    //設置運行環境
    val conf = new SparkConf().setAppName("myGraphPractice").setMaster("local[4]")
    val sc=new SparkContext(conf)

    //屏蔽日誌
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)    

    val vertexArray = Array(
      (1L, ("Alice", 28)),(2L, ("Bob", 27)),(3L, ("Charlie", 65)),(4L, ("David", 42)),
      (5L, ("Ed", 55)),(6L, ("Fran", 50))
    )
    //邊的數據類型ED:Int
    val edgeArray = Array(
      Edge(2L, 1L, 7),Edge(2L, 4L, 2),Edge(3L, 2L, 4),Edge(3L, 6L, 3),
      Edge(4L, 1L, 1),Edge(5L, 2L, 2),Edge(5L, 3L, 8),Edge(5L, 6L, 3)
    )

    //構造vertexRDD和edgeRDD
    val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)
    val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)

    //構造圖Graph[VD,ED]
    val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)    


    val sourceId:VertexId=5//定義源點
    val initialGraph=graph.mapVertices((id,_)=>if (id==sourceId) 0 else Double.PositiveInfinity)  
    //pregel函數有兩個參數列表
    val shorestPath=initialGraph.pregel(initialMsg=Double.PositiveInfinity,
                                        maxIterations=100,                                  
                                        activeDirection=EdgeDirection.Out)(

                                 //1-頂點屬性迭代更新方式,與上一次迭代後保存的屬性相比,取較小值
                                 //(將從源點到頂點的最小距離放在頂點屬性中) 
                                 (id,dist,newDist)=>math.min(dist,newDist), 

                                 //2-Send Message,在全部能到達目的點的鄰居中,計算鄰居頂點屬性+邊屬性
                                 //即(鄰居-源點的距離+鄰居-目的點的距離,並將這個距離放在迭代器中
                                 triplet=>{
                                   if(triplet.srcAttr+triplet.attr<triplet.dstAttr){
                                     Iterator((triplet.dstId,triplet.srcAttr+triplet.attr))
                                   }else{
                                     Iterator.empty
                                   }
                                 }, 

                                 //3-Merge Message,至關於Reduce函數
                                 //對全部能達到目的點的鄰居發送的消息,進行min-reduce
                                 //鄰居中最終reduce後最小的結果,做爲newDist,發送至目的點,
                                 //至此,目的點中有新舊兩個dist了,在下一次迭代開始的時候,步驟1中就能夠進行更新迭代了
                                 (a,b)=>math.min(a,b))

    shorestPath.vertices.map(x=>(x._2,x._1)).top(30).foreach(print)  

    /*outprint(shorest distance,vertexId) * 8.0,3)(5.0,1)(4.0,4)(3.0,6)(2.0,2)(0.0,5) */ 
  }  
}

應用實例1:Louvain算法社區發現

      實例來自《Spark最佳實踐》陳歡等著 一書,整個這個實例可參考原書便可。 
      源代碼來自https://github.com/Sotera/spark-distributed-louvain-modularity git clone後就可使用了。 
      可是2.0版本Spark源碼須要進行修改,由於老的聚合函數不能再使用了,須要修改爲消息聚合函數,《Spark最佳實踐》一書中已經進行了修改,惋惜這本書沒有給出完整的修改後代碼,後面我會貼出修改的後的代碼,替換git上的相應部分就可使用了。

      社區發現算法可供參考的資料也比較多,算法也比較多。 
http://blog.csdn.net/peghoty/article/details/9286905

關鍵概念–模塊度(Modularity ) 
      不少的社區發現算法都是基於模塊度設計的,模塊度用於衡量社區劃分結構的合理性。 
      用某種算法劃分結果的內聚性與隨機劃分結果的內聚性的差值,對劃分結果進行評估。 
  模塊度是評估一個社區網絡劃分好壞的度量方法,它的物理含義是社區內節點的連邊數與隨機狀況下的邊數只差,它的取值範圍是 [−1/2,1),其定義以下:

Q=12mi,j[Aijkikj2m]δ(ci,cj)Q=12m∑i,j[Aij−kikj2m]δ(ci,cj)

δ(u,v)={1when u==v0 elseδ(u,v)={0 else1when u==v

  其中, AijAij 節點i和節點j之間邊的權重,網絡不是帶權圖時,全部邊的權重能夠看作是1; ki=jAijki=∑jAij 表示全部與節點i相連的邊的權重之和(度數); cici 表示節點i所屬的社區; m=12ijAijm=12∑ijAij 表示全部邊的權重之和(邊的數目)。

  公式中Aijkikj2m=Aijkikj2mAij−kikj2m=Aij−kikj2m,節點j鏈接到任意一個節點的機率是kj2mkj2m,如今節點i有kiki的度數,所以在隨機狀況下節點i與j的邊爲kikj2mkikj2m
  模塊度的公式定義能夠做以下簡化: 
  

Q=12mi,j[Aijkikj2m]δ(ci,cj)  =12m[i,jAijikijkj2m]δ(ci,cj)  =12mc[Σin(Σtot)22m]Q=12m∑i,j[Aij−kikj2m]δ(ci,cj)  =12m[∑i,jAij−∑iki∑jkj2m]δ(ci,cj)  =12m∑c[Σin−(Σtot)22m]

其中ΣinΣin表示社區c內的邊的權重之和,ΣtotΣtot表示與社區c內的節點相連的邊的權重之和。

  上面的公式還能夠進一步簡化成:

Q=c[Σin2m(Σtot2m)2]=c[ecac2]Q=∑c[Σin2m−(Σtot2m)2]=∑c[ec−ac2]

  這樣模塊度也能夠理解是社區內部邊的權重減去全部與社區節點相連的邊的權重和,對無向圖更好理解,即社區內部邊的度數減去社區內節點的總度數。

  基於模塊度的社區發現算法,大都是以最大化模塊度Q爲目標。

Louvain算法流程 
Louvain算法的思想很簡單:

  1)將圖中的每一個節點當作一個獨立的社區,次數社區的數目與節點個數相同;

  2)對每一個節點i,依次嘗試把節點i分配到其每一個鄰居節點所在的社區,計算分配前與分配後的模塊度變化ΔQ ΔQ,並記錄ΔQ ΔQ最大的那個鄰居節點,若是maxΔQ>0 maxΔQ>0,則把節點i分配ΔQ ΔQ最大的那個鄰居節點所在的社區,不然保持不變;

  3)重複2),直到全部節點的所屬社區再也不變化;

  4)對圖進行壓縮,將全部在同一個社區的節點壓縮成一個新節點,社區內節點之間的邊的權重轉化爲新節點的環的權重,社區間的邊權重轉化爲新節點間的邊權重;

  5)重複1)直到整個圖的模塊度再也不發生變化。 
  從流程來看,該算法可以產生層次性的社區結構,其中計算耗時較多的是最底一層的社區劃分,節點按社區壓縮後,將大大縮小邊和節點數目,而且計算節點i分配到其鄰居j的時模塊度的變化只與節點i、j的社區有關,與其餘社區無關,所以計算很快。

代碼修改 
因爲版本的問題,Spark2.0中再也不使用不穩定的mapReduceTriplets函數,替換爲aggregateMessages。

(第1處修改) def createLouvainGraph[VD: ClassTag](graph: Graph[VD,Long]) : Graph[VertexState,Long]與 def compressGraph(graph:Graph[VertexState,Long],debug:Boolean=true) : Graph[VertexState,Long]函數中: //老版本 val nodeWeightMapFunc = (e:EdgeTriplet[VD,Long]) =>Iterator((e.srcId,e.attr), (e.dstId,e.attr)) val nodeWeightReduceFunc = (e1:Long,e2:Long) => e1+e2 val nodeWeights = graph.mapReduceTriplets(nodeWeightMapFunc,nodeWeightReduceFunc) //修改成: val nodeWeights = graph.aggregateMessages[Long](triplet=> (triplet.sendToSrc(triplet.attr),triplet.sendToDst(triplet.attr)), (a,b)=>a+b) (第2處修改)def louvain(sc:SparkContext...)函數中sendMsg函數: //老版本 private def sendMsg(et:EdgeTriplet[VertexState,Long]) = { val m1 = (et.dstId,Map((et.srcAttr.community,et.srcAttr.communitySigmaTot)->et.attr)) val m2 = (et.srcId,Map((et.dstAttr.community,et.dstAttr.communitySigmaTot)->et.attr)) Iterator(m1, m2) } //修改成 //import scala.collection.immutable.Map private def sendMsg(et:EdgeContext[VertexState,Long,Map[(Long,Long),Long]]) = { et.sendToDst(Map((et.srcAttr.community,et.srcAttr.communitySigmaTot)->et.attr)) et.sendToSrc(Map((et.dstAttr.community,et.dstAttr.communitySigmaTot)->et.attr)) } 

使用新浪微博數據進行分析 
詳細分析能夠參考《Spark最佳實踐一書》

參考文獻

(1)Spark 官方文檔 
http://spark.apache.org/docs/latest/graphx-programming-guide.html#pregel-api 
(2)大數據Spark企業級實戰 王家林 
(3)GraphX迭代的瓶頸與分析 
http://blog.csdn.net/pelick/article/details/50630003 
(4)基於Spark的圖計算框架 GraphX 入門介紹 
http://www.open-open.com/lib/view/open1420689305781.html 
(5)Spark入門實戰系列–9.Spark圖計算GraphX介紹及實例 
http://www.cnblogs.com/shishanyuan/p/4747793.html 
(6)快刀初試:Spark GraphX在淘寶的實踐 
http://www.csdn.net/article/2014-08-07/2821097 
(7)基於GraphX的社區發現算法FastUnfolding分佈式實現 
http://bbs.pinggu.org/thread-3614747-1-1.html 
(8)關於圖計算和graphx的一些思考 
http://www.tuicool.com/articles/3MjURj 
(9)用 LDA 作主題模型:當 MLlib 邂逅 GraphX 
http://blog.jobbole.com/86130/ 
(10)Spark的Graphx學習筆記–Pregel 
http://www.ithao123.cn/content-3510265.html 
(11)Spark最佳實踐 陳歡 林世飛 
(12)社區發現(Community Detection)算法 
http://blog.csdn.net/peghoty/article/details/9286905

相關文章
相關標籤/搜索