大數據技術之_19_Spark學習_05_Spark GraphX 應用解析 + Spark GraphX 概述、解析 + 計算模式 + Pregel API + 圖算法參考代碼 + PageRank

第1章 Spark GraphX 概述1.1 什麼是 Spark GraphX1.2 彈性分佈式屬性圖1.3 運行圖計算程序第2章 Spark GraphX 解析2.1 存儲模式2.1.1 圖存儲模式2.1.2 GraphX 存儲模式2.2 vertices、edges 以及 triplets2.2.1 vertices2.2.2 edges2.2.3 triplets2.3 圖的構建2.3.1 構建圖的方法2.3.2 構建圖的過程2.4 計算模式2.4.1 BSP 計算模式2.4.2 圖操做一覽2.4.3 基本信息操做2.4.4 轉換操做2.4.5 結構操做2.4.6 頂點關聯操做2.4.7 聚合操做2.4.8 緩存操做2.5 Pregel API2.5.1 pregel 計算模型2.5.2 pregel 實現最短路徑2.6 GraphX 實例第3章 圖算法3.1 PageRank 排名算法3.1.1 算法概述3.1.2 從入鏈數量到 PageRank3.1.3 PageRank 算法原理3.1.4 Spark GraphX 實現3.2 廣度優先遍歷(參考)3.3 單源最短路徑(參考)3.4 連通圖(參考)3.5 三角計數(參考)第4章 PageRank 實例css


第1章 Spark GraphX 概述

1.1 什麼是 Spark GraphX


  Spark GraphX 是一個分佈式圖處理框架,它是基於 Spark 平臺提供對圖計算和圖挖掘簡潔易用的而豐富的接口,極大的方便了對分佈式圖處理的需求。那麼什麼是圖,都計算些什麼?衆所周知社交網絡中人與人之間有不少關係鏈,例如 Twitter、Facebook、微博和微信等,數據中出現網狀結構關係都須要圖計算。
  GraphX 是一個新的 Spark API,它用於圖和分佈式圖(graph-parallel)的計算。GraphX 經過引入彈性分佈式屬性圖(Resilient Distributed Property Graph): 頂點和邊均有屬性的有向多重圖,來擴展Spark RDD。爲了支持圖計算,GraphX 開發了一組基本的功能操做以及一個優化過的 Pregel API。另外,GraphX 也包含了一個快速增加的圖算法和圖 builders 的集合,用以簡化圖分析任務。
  從社交網絡到語言建模,不斷增加的數據規模以及圖形數據的重要性已經推進了許多新的分佈式圖系統的發展。經過限制計算類型以及引入新的技術來切分和分配圖,這些系統能夠高效地執行復雜的圖形算法,比通常的分佈式數據計算(data-parallel,如 spark、MapReduce)快不少。

  分佈式圖(graph-parallel)計算和分佈式數據(data-parallel)計算相似,分佈式數據計算採用了一種 record-centric(以記錄爲中心)的集合視圖,而分佈式圖計算採用了一種 vertex-centric(以頂點爲中心)的圖視圖。 分佈式數據計算經過同時處理獨立的數據來得到併發的目的,分佈式圖計算則是經過對圖數據進行分區(即切分)來得到併發的目的。更準確的說,分佈式圖計算遞歸地定義特徵的轉換函數(這種轉換函數做用於鄰居特徵),經過併發地執行這些轉換函數來得到併發的目的。

  分佈式圖計算比分佈式數據計算更適合圖的處理,可是在典型的圖處理流水線中,它並不能很好地處理全部操做。例如,雖然分佈式圖系統能夠很好的計算 PageRank 等算法,可是它們不適合從不一樣的數據源構建圖或者跨過多個圖計算特徵。更準確的說,分佈式圖系統提供的更窄的計算視圖沒法處理那些構建和轉換圖結構以及跨越多個圖的需求。分佈式圖系統中沒法提供的這些操做須要數據在圖本體之上移動而且須要一個圖層面而不是單獨的頂點或邊層面的計算視圖。例如,咱們可能想限制咱們的分析到幾個子圖上,而後比較結果。 這不只須要改變圖結構,還須要跨多個圖計算。

  咱們如何處理數據取決於咱們的目標,有時同一原始數據可能會處理成許多不一樣表和圖的視圖,而且圖和表之間常常須要可以相互移動。以下圖所示:

  因此咱們的圖流水線必須經過組合 graph-parallel 和 data- parallel 來實現。可是這種組合必然會致使大量的數據移動以及數據複製,同時這樣的系統也很是複雜。例如,在傳統的圖計算流水線中,在 Table View 視圖下,可能須要 Spark 或者 Hadoop 的支持,在 Graph View 這種視圖下,可能須要 Prege 或者 GraphLab 的支持。也就是把圖和表分在不一樣的系統中分別處理。不一樣系統之間數據的移動和通訊會成爲很大的負擔。
  GraphX 項目將 graph-parallel 和 data-parallel 統一到一個系統中,並提供了一個惟一的組合 API。GraphX 容許用戶把數據當作一個圖和一個集合(RDD),而不須要數據移動或者複製。也就是說 GraphX 統一了 Graph View 和 Table View,能夠很是輕鬆的作 pipeline 操做。

 

1.2 彈性分佈式屬性圖


   GraphX 的核心抽象是彈性分佈式屬性圖,它是一個有向多重圖,帶有鏈接到每一個頂點和邊的用戶定義的對象。有向多重圖中多個並行的邊共享相同的源和目的頂點。支持並行邊的能力簡化了建模場景,相同的頂點可能存在多種關係(例如 co-worker 和 friend)。 每一個頂點用一個惟一的 64 位長的標識符(VertexID)做爲 key。GraphX 並無對頂點標識強加任何排序。一樣,邊擁有相應的源和目的頂點標識符。

  屬性圖擴展了 Spark RDD 的抽象,有 Table 和 Graph 兩種視圖,可是隻須要一份物理存儲。兩種視圖都有本身獨有的操做符,從而使咱們同時得到了操做的靈活性和執行的高效率。屬性圖以 vertex(VD) 和 edge(ED) 類型做爲參數類型,這些類型分別是頂點和邊相關聯的對象的類型。

  在某些狀況下,在一樣的圖中, 咱們可能但願擁有不一樣屬性類型的頂點。這能夠經過繼承完成。例如,將用戶和產品建模成一個二分圖,咱們能夠用以下方式:

 

class VertexProperty()
case class UserProperty(val nameStringextends VertexProperty
case class ProductProperty(val nameStringval priceDoubleextends VertexProperty
// The graph might then have the type:
var graphGraph[VertexPropertyString
null

  和 RDD 同樣,屬性圖是不可變的、分佈式的、容錯的。圖的值或者結構的改變須要生成一個新的圖來實現。注意,原始圖中不受影響的部分均可以在新圖中重用,用來減小存儲的成本。執行者使用一系列頂點分區方法來對圖進行分區。如 RDD 同樣,圖的每一個分區能夠在發生故障的狀況下被從新建立在不一樣的機器上。
  邏輯上,屬性圖對應於一對類型化的集合(RDD),這個集合包含每個頂點和邊的屬性。所以,圖的類中包含訪問圖中頂點和邊的成員變量。java

class Graph[VDED{
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
}

VertexRDD[VD] 和 EdgeRDD[ED] 類是 RDD[(VertexID, VD)] 和 RDD[Edge[ED]] 的繼承和優化版本。VertexRDD[VD] 和 EdgeRDD[ED] 都提供了額外的圖計算功能並提供內部優化功能。以下圖所示:算法

源碼以下:sql

abstract class VertexRDD[VD](scSparkContextdepsSeq[Dependency[_]]) extends RDD[(VertexIdVD)](scdeps)

abstract class EdgeRDD[ED](scSparkContextdepsSeq[Dependency[_]]) extends RDD[Edge[ED]](scdeps)

  GraphX 的底層設計有如下幾個關鍵點:
  • 對 Graph 視圖的全部操做,最終都會轉換成其關聯的 Table 視圖的 RDD 操做來完成。這樣對一個圖的計算,最終在邏輯上,等價於一系列 RDD 的轉換過程。所以,Graph 最終具有了 RDD 的3個關鍵特性:Immutable、Distributed和Fault-Tolerant,其中最關鍵的是 Immutable(不變性)。邏輯上,全部圖的轉換和操做都產生了一個新圖;物理上,GraphX 會有必定程度的不變頂點和邊的複用優化,對用戶透明。
  • 兩種視圖底層共用的物理數據,由 RDD[Vertex-Partition] 和 RDD[EdgePartition] 這兩個 RDD 組成。點和邊實際都不是以表 Collection[tuple] 的形式存儲的,而是由 VertexPartition/EdgePartition 在內部存儲一個帶索引結構的分片數據塊,以加速不一樣視圖下的遍歷速度。不變的索引結構在 RDD 轉換過程當中是共用的,下降了計算和存儲開銷。
  • 圖的分佈式存儲採用點分割模式,並且使用 partitionBy 方法,由用戶指定不一樣的劃分策略(PartitionStrategy)。劃分策略會將邊分配到各個 EdgePartition,頂點分配到各個 VertexPartition,EdgePartition 也會緩存本地邊關聯點的 Ghost 副本。劃分策略的不一樣會影響到所須要緩存的 Ghost 副本數量,以及每一個 EdgePartition 分配的邊的均衡程度,須要根據圖的結構特徵選取最佳策略。目前有 EdgePartition2d、EdgePartition1d、RandomVertexCut 和 CanonicalRandomVertexCut 這四種策略。shell

1.3 運行圖計算程序

假設咱們想構造一個包括不一樣合做者的屬性圖。頂點屬性可能包含用戶名和職業。咱們能夠用描述合做者之間關係的字符串標註邊緣。apache

Step一、開始的第一步是引入 Spark 和 GraphX 到你的項目中,以下面所示:api

import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

Step二、若是你沒有用到 Spark shell,你還將須要 SparkContext。
所得的圖形將具備類型簽名:val userGraph: Graph[(String, String), String]
有不少方式從一個原始文件、RDD 構造一個屬性圖。最通常的方法是利用 Graph object。下面的代碼從 RDD 集合生成屬性圖。數組

    // 建立 SparkConf() 並設置 App 名稱
    val conf = new SparkConf().setMaster("local[3]").setAppName("WC")
    // 建立 SparkContext,該對象是提交 spark App 的入口
    val sc = new SparkContext(conf)

    // Create an RDD for the vertices (頂點),這裏的頂點屬性是 一個二元組
    val users: RDD[(VertexId, (String, String))] =
      sc.parallelize(Array((3L, ("rxin""student")), (7L, ("jgonzal""postdoc")),
        (5L, ("franklin""prof")), (2L, ("istoica""prof"))))

    // Create an RDD for edges (邊),這裏的邊屬性是 String 類型
    val relationships: RDD[Edge[String]] =
      sc.parallelize(Array(Edge(3L7L"collab"), Edge(5L3L"advisor"),
        Edge(2L5L"colleague"), Edge(5L7L"pi")))

    val defaultUser = ("John Doe""Missing"// 缺省屬性
    // Build the initial Graph (圖)
    val graph = Graph(users, relationships, defaultUser)

Step三、在上面的例子中,咱們用到了 Edge 樣本類。邊有一個 srcId 和 dstId 分別對應於源和目標頂點的標示符。另外,Edge 類有一個 attr 成員用來存儲邊屬性。
咱們能夠分別用 graph.vertices 和 graph.edges 成員將一個圖解構爲相應的頂點和邊。瀏覽器

    // Count all users which are postdocs (過濾圖上的全部頂點,統計頂點的屬性的第二個值是 postdoc 的個數)
    val verticesCount = graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
    println(verticesCount)

    // Count all the edges where src > dst (統計圖上知足邊的 源頂點ID > 目標頂點ID 的個數)
    val edgeCount 
= graph.edges.filter(e => e.srcId > e.dstId).count
    println(edgeCount)

注意:graph.vertices 返回一個 VertexRDD[(String, String)],它繼承於 RDD[(VertexID, (String, String))]。因此咱們能夠用 scala 的 case 表達式解構這個元組。另外一方面,graph.edges 返回一個包含 Edge[String] 對象的 EdgeRDD。咱們也能夠用到 case 類的類型構造器,以下例所示。緩存

    graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count

Step四、除了屬性圖的頂點和邊視圖,GraphX 也包含了一個三元組視圖,三元視圖邏輯上將頂點和邊的屬性保存爲一個 RDD[EdgeTriplet[VD, ED]],它包含 EdgeTriplet 類的實例。能夠經過下面的 Sql 表達式表示這個鏈接。

SELECT
    src.id ,
    dst.id ,
    src.attr ,
    e.attr ,
    dst.attr
FROM
    edges AS e
LEFT JOIN vertices AS src ,
 vertices AS dst ON e.srcId = src.Id
AND e.dstId = dst.Id

或者經過下面的圖來表示:

Step五、EdgeTriplet 類繼承於 Edge 類,而且加入了 srcAttr 和 dstAttr 成員,這兩個成員分別包含源和目的的屬性。咱們能夠用一個三元組視圖渲染字符串集合用來描述用戶之間的關係。

val graph: Graph[(String, String), String] // Constructed from above
// Use the triplets view to create an RDD of facts.
val facts: RDD[String] = graph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
facts.collect.foreach(println(_))

第2章 Spark GraphX 解析

2.1 存儲模式

2.1.1 圖存儲模式

PowerGraph 巨型圖的存儲整體上有邊分割點分割兩種存儲方式。
  1)邊分割(Edge-Cut):每一個頂點都存儲一次,但有的邊會被打斷分到兩臺機器上。這樣作的好處是節省存儲空間;壞處是對圖進行基於邊的計算時,對於一條兩個頂點被分到不一樣機器上的邊來講,要跨機器通訊傳輸數據,內網通訊流量大。
  2)點分割(Vertex-Cut):每條邊只存儲一次,都只會出如今一臺機器上。鄰居多的點會被複制到多臺機器上,增長了存儲開銷,同時會引起數據同步問題。好處是能夠大幅減小內網通訊量。


雖然兩種方法互有利弊,但如今是點分割佔上風,各類分佈式圖計算框架都將本身底層的存儲形式變成了點分割。主要緣由有如下兩個。
  1)磁盤價格降低,存儲空間再也不是問題,而內網的通訊資源沒有突破性進展,集羣計算時內網帶寬是寶貴的,時間比磁盤更珍貴。這點就相似於常見的 空間換時間的策略。
  2)在當前的應用場景中,絕大多數網絡都是「無尺度網絡」,遵循 冪律分佈,不一樣點的鄰居數量相差很是懸殊。而邊分割會使那些多鄰居的點所相連的邊大多數被分到不一樣的機器上,這樣的數據分佈會使得內網帶寬更加捉襟見肘,因而 邊分割存儲方式被漸漸拋棄了

 

2.1.2 GraphX 存儲模式

Graphx 借鑑 PowerGraph,使用的是 Vertex-Cut(點分割)方式存儲圖,用三個 RDD 存儲圖數據信息:
  VertexTable(id, data):id 爲頂點 id, data 爲頂點屬性
  EdgeTable(pid, src, dst, data):pid 爲分區 id ,src 爲源頂點 id ,dst 爲目的頂點 id,data 爲邊屬性
  RoutingTable(id, pid):id 爲頂點 id ,pid 爲分區 id
點分割存儲實現以下圖所示:

GraphX 在進行圖分割時,有幾種不一樣的分區(partition)策略,它經過 PartitionStrategy 專門定義這些策略。在 PartitionStrategy 中,總共定義了 EdgePartition2D、EdgePartition1D、RandomVertexCut 以及 CanonicalRandomVertexCut 這四種不一樣的分區策略。下面分別介紹這幾種策略。

RandomVertexCut

case object RandomVertexCut extends PartitionStrategy {
  override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
    math.abs((src, dst).hashCode()) % numParts
  }
}

這個方法比較簡單,經過取源頂點和目標頂點 id 的哈希值來將邊分配到不一樣的分區。這個方法會產生一個隨機的邊分割,兩個頂點之間相同方向的邊會分配到同一個分區。

CanonicalRandomVertexCut

case object CanonicalRandomVertexCut extends PartitionStrategy {
  override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
    if (src < dst) {
      math.abs((src, dst).hashCode()) % numParts
    } else {
      math.abs((dst, src).hashCode()) % numParts
    }
  }
}

這種分割方法和前一種方法沒有本質的不一樣。不一樣的是,哈希值的產生帶有肯定的方向(即兩個頂點中較小 id 的頂點在前)。兩個頂點之間全部的邊都會分配到同一個分區,而無論方向如何。

EdgePartition1D

case object EdgePartition1D extends PartitionStrategy {
  override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
    val mixingPrime: VertexId = 1125899906842597L
    (math.abs(src * mixingPrime) % numParts).toInt
  }
}

這種方法僅僅根據源頂點 id 來將邊分配到不一樣的分區。有相同源頂點的邊會分配到同一分區。

EdgePartition2D

case object EdgePartition2D extends PartitionStrategy {
  override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
    val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
    val mixingPrime: VertexId = 1125899906842597L
    if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) {
      // Use old method for perfect squared to ensure we get same results
      val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
      val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
      (col * ceilSqrtNumParts + row) % numParts
    } else {
      // Otherwise use new method
      val cols = ceilSqrtNumParts
      val rows = (numParts + cols - 1) / cols
      val lastColRows = numParts - rows * (cols - 1)
      val col = (math.abs(src * mixingPrime) % numParts / rows).toInt
      val row = (math.abs(dst * mixingPrime) % (if (col < cols - 1) rows else lastColRows)).toInt
      col * rows + row
    }
  }
}

這種分割方法同時使用到了源頂點 id 和目的頂點 id。它使用稀疏邊鏈接矩陣的 2 維區分來將邊分配到不一樣的分區,從而保證頂點的備份數不大於 2 * sqrt(numParts) 的限制。這裏 numParts 表示分區數。 這個方法的實現分兩種狀況,即分區數能徹底開方和不能徹底開方兩種狀況。當分區數能徹底開方時,採用下面的方法:

val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
(col * ceilSqrtNumParts + row) % numParts

當分區數不能徹底開方時,採用下面的方法。這個方法的最後一列容許擁有不一樣的行數。

val cols = ceilSqrtNumParts
val rows = (numParts + cols - 1) / cols
//最後一列容許不一樣的行數
val lastColRows = numParts - rows * (cols - 1)
val col = (math.abs(src * mixingPrime) % numParts / rows).toInt
val row = (math.abs(dst * mixingPrime) % (if (col < cols - 1) rows else lastColRows)).toInt
col * rows + row

下面舉個例子來講明該方法。假設咱們有一個擁有 12 個頂點的圖,要把它切分到 9 臺機器。咱們能夠用下面的稀疏矩陣來表示:

          __________________________________
     v0   | P0 *     | P1       | P2    *  |
     v1   |  ****    |  *       |          |
     v2   |  ******* |      **  |  ****    |
     v3   |  *****   |  *  *    |       *  |
          ----------------------------------
     v4   | P3 *     | P4 ***   | P5 **  * |
     v5   |  *  *    |  *       |          |
     v6   |       *  |      **  |  ****    |
     v7   |  * * *   |  *  *    |       *  |
          ----------------------------------
     v8   | P6   *   | P7    *  | P8  *   *|
     v9   |     *    |  *    *  |          |
     v10  |       *  |      **  |  *  *    |
     v11  | * <-E    |  ***     |       ** |
          ----------------------------------

上面的例子中 * 表示分配處處理器上的邊。E 表示鏈接頂點 v11 和 v1 的邊,它被分配到了處理器 P6 上。爲了得到邊所在的處理器,咱們將矩陣切分爲 sqrt(numParts) * sqrt(numParts) 塊。 注意:上圖中與頂點 v11 相鏈接的邊只出如今第一列的塊 (P0,P3,P6) 或者最後一行的塊 (P6,P7,P8) 中,這保證了 V11 的副本數不會超過 2 * sqrt(numParts) 份,在上例中即副本不能超過 6 份。在上面的例子中,P0 裏面存在不少邊,這會形成工做的不均衡。爲了提升均衡,咱們首先用頂點 id 乘以一個大的素數,而後再 shuffle 頂點的位置。乘以一個大的素數本質上不能解決不平衡的問題,只是減小了不平衡的狀況發生。

2.2 vertices、edges 以及 triplets

vertices、edges 以及 triplets 是 GraphX 中三個很是重要的概念。咱們在前文 GraphX 介紹中對這三個概念有初步的瞭解。

2.2.1 vertices

在 GraphX 中,vertices 對應着名稱爲 VertexRDD 的 RDD。這個 RDD 有頂點 id 和頂點屬性兩個成員變量。它的源碼以下所示:

abstract class VertexRDD[VD](scSparkContextdepsSeq[Dependency[_]]) extends RDD[(VertexIdVD)](scdeps)

從源碼中咱們能夠看到,VertexRDD 繼承自 RDD[(VertexId, VD)],這裏 VertexId 表示頂點 id,VD 表示頂點所帶的屬性的類別。這從另外一個角度也說明 VertexRDD 擁有頂點 id 和頂點屬性。

2.2.2 edges

在 GraphX 中,edges 對應着 EdgeRDD。這個 RDD 擁有三個成員變量,分別是源頂點 id、目標頂點 id以及邊屬性。它的源碼以下所示:

abstract class EdgeRDD[ED](scSparkContextdepsSeq[Dependency[_]]) extends RDD[Edge[ED]](scdeps)

從源碼中咱們能夠看到,EdgeRDD 繼承自 RDD[Edge[ED]],即類型爲 Edge[ED] 的 RDD。

2.2.3 triplets

在 GraphX 中,triplets 對應着 EdgeTriplet。它是一個三元組視圖,這個視圖邏輯上將頂點和邊的屬性保存爲一個 RDD[EdgeTriplet[VD, ED]]。能夠經過下面的 Sql 表達式表示這個三元視圖的含義:

SELECT
    src.id ,
    dst.id ,
    src.attr ,
    e.attr ,
    dst.attr
FROM
    edges AS e
LEFT JOIN vertices AS src ,
 vertices AS dst ON e.srcId = src.Id
AND e.dstId = dst.Id

一樣,也能夠經過下面圖解的形式來表示它的含義:


EdgeTriplet 的源代碼以下所示:
class EdgeTriplet[VDEDextends Edge[ED{
  //源頂點屬性
  var srcAttr: VD = _ // nullValue[VD]
  //目標頂點屬性
  var dstAttr: VD = _ // nullValue[VD]
  protected[spark] def set(other: Edge[ED]): EdgeTriplet[VD, ED] = {
    srcId = other.srcId
    dstId = other.dstId
    attr = other.attr
    this
  }
}

EdgeTriplet 類繼承自 Edge 類,咱們來看看這個父類:

case class Edge[@specialized(CharIntBooleanByteLongFloatDoubleED] (var srcIdVertexId 0, var dstId: VertexId = 0, var attr: ED = null.asInstanceOf[ED]) extends Serializable

Edge 類中包含源頂點 id,目標頂點 id 以及邊的屬性。因此從源代碼中咱們能夠知道,triplets 既包含了邊屬性也包含了源頂點的 id 和屬性、目標頂點的 id 和屬性。

2.3 圖的構建

GraphX 的 Graph 對象是用戶操做圖的入口。前面的章節咱們介紹過,它包含了邊(edges)、頂點(vertices)以及 triplets 三部分,而且這三部分都包含相應的屬性,能夠攜帶額外的信息。

2.3.1 構建圖的方法

構建圖的入口方法有兩種,分別是根據邊構建和根據邊的兩個頂點構建。

根據邊構建圖(Graph.fromEdges)

def fromEdges[VD: ClassTag, ED: ClassTag](
       edges: RDD[Edge[ED]],
       defaultValue: VD,
       edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
       vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
  GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
}

根據邊的兩個頂點數據構建(Graph.fromEdgeTuples)

def fromEdgeTuples[VD: ClassTag](
      rawEdges: RDD[(VertexId, VertexId)],
      defaultValue: VD,
      uniqueEdges: Option[PartitionStrategy] = None,
      edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
      vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int] =
{
  val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
  val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
  uniqueEdges match {
    case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
    case None => graph
  }
}

從上面的代碼咱們知道,無論是根據邊構建圖仍是根據邊的兩個頂點數據構建,最終都是使用 GraphImpl 來構建的,即調用了 GraphImpl 的 apply 方法。

2.3.2 構建圖的過程

構建圖的過程很簡單,分爲三步,它們分別是 構建邊EdgeRDD、構建頂點VertexRDD、生成Graph對象。下面分別介紹這三個步驟:

Step一、構建邊 EdgeRDD
從源代碼看 構建邊EdgeRDD 也分爲三步,下圖的例子詳細說明了這些步驟。


• 1 從文件中加載信息,轉換成 tuple 的形式,即 (srcId, dstId)
val rawEdgesRdd: RDD[(Long, Long)] =
  sc.textFile(input).filter(s => s != "0,0").repartition(partitionNum).map {
    case line =>
      val ss = line.split(",")
      val src = ss(0).toLong
      val dst = ss(1).toLong
      if (src < dst)
        (src, dst)
      else
        (dst, src)
  }.distinct()

• 2 入口,調用 Graph.fromEdgeTuples(rawEdgesRdd)
源數據爲分割的兩個點 ID,把源數據映射成 Edge(srcId, dstId, attr) 對象, attr 默認爲 1。這樣元數據就構建成了 RDD[Edge[ED]],以下面的代碼:

val edges = rawEdges.map(p => Edge(p._1, p._2, 1))

• 3 將 RDD[Edge[ED]] 進一步轉化成 EdgeRDDImpl[ED, VD]
第二步構建完 RDD[Edge[ED]] 以後,GraphX 經過調用 GraphImpl 的 apply 方法來構建 Graph。

val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
def apply[VD: ClassTag, ED: ClassTag](
       edges: RDD[Edge[ED]],
       defaultVertexAttr: VD,
       edgeStorageLevel: StorageLevel,
       vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
  fromEdgeRDD(EdgeRDD.fromEdges(edges), defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
}

在 apply 調用 fromEdgeRDD 以前,代碼會調用 EdgeRDD.fromEdges(edges) 將 RDD[Edge[ED]] 轉化成 EdgeRDDImpl[ED, VD]。

def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDDImpl[ED, VD] = {
  val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
    val builder = new EdgePartitionBuilder[ED, VD]
    iter.foreach { e =>
      builder.add(e.srcId, e.dstId, e.attr)
    }
    Iterator((pid, builder.toEdgePartition))
  }
  EdgeRDD.fromEdgePartitions(edgePartitions)
}

程序遍歷 RDD[Edge[ED]] 的每一個分區,並調用 builder.toEdgePartition 對分區內的邊做相應的處理。

def toEdgePartition: EdgePartition[ED, VD] = {
  val edgeArray = edges.trim().array
  new Sorter(Edge.edgeArraySortDataFormat[ED])
    .sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering)
  val localSrcIds 
new Array[Int](edgeArray.size)
  val localDstIds = new Array[Int](edgeArray.size)
  val data = new Array[ED](edgeArray.size)
  val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
  val global2local = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
  val local2global = new PrimitiveVector[VertexId]
  var vertexAttrs = Array.empty[VD]
  //採用列式存儲的方式,節省了空間
  if (edgeArray.length > 0) {
    index.update(edgeArray(0).srcId, 0)
    var currSrcId: VertexId = edgeArray(0).srcId
    var currLocalId = -1
    var i = 0
    while (i < edgeArray.size) {
      val srcId = edgeArray(i).srcId
      val dstId = edgeArray(i).dstId
      localSrcIds(i) 
= global2local.changeValue(srcId,
        { currLocalId += 1; local2global += srcId; currLocalId }, identity)
      localDstIds(i) = global2local.changeValue(dstId,
        { currLocalId += 1; local2global += dstId; currLocalId }, identity)
      data(i) = edgeArray(i).attr
      //相同頂點srcId中第一個出現的srcId與其下標
      if (srcId != currSrcId) {
        currSrcId = srcId
        index.update(currSrcId, i)
      }
      i += 1
    }
    vertexAttrs = new Array[VD](currLocalId + 1)
  }
  new EdgePartition(
    localSrcIds, localDstIds, data, index, global2local, local2global.trim().array, vertexAttrs,
    None)
}

• toEdgePartition 的第一步就是對邊進行排序。
  按照 srcId 從小到大排序。排序是爲了遍歷時順序訪問,加快訪問速度。採用數組而不是 Map,是由於數組是連續的內存單元,具備原子性,避免了 Map 的 hash 問題,訪問速度快。
• toEdgePartition 的第二步就是填充 localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs。
  數組 localSrcIds, localDstIds 中保存的是經過 global2local.changeValue(srcId/dstId) 轉換而成的分區本地索引。能夠經過 localSrcIds、localDstIds 數組中保存的索引位從 local2global 中查到具體的 VertexId。
  global2local 是一個簡單的,key 值非負的快速 hash map:GraphXPrimitiveKeyOpenHashMap,保存 vertextId 和本地索引的映射關係。global2local 中包含當前 partition 全部 srcId、dstId 與本地索引的映射關係。
• data 就是當前分區的 attr 屬性數組。
  咱們知道相同的 srcId 可能對應不一樣的 dstId。按照 srcId 排序以後,相同的 srcId 會出現多行,如上圖中的 index desc 部分。index 中記錄的是相同 srcId 中第一個出現的 srcId 與其下標。
• local2global 記錄的是全部的 VertexId 信息的數組。形如:srcId, dstId, srcId, dstId, srcId, dstId, srcId, dstId。其中會包含相同的 srcId。即:當前分區全部 vertextId 的順序實際值。
  咱們能夠經過根據本地下標取 VertexId,也能夠根據 VertexId 取本地下標,取相應的屬性。

// 根據本地下標取 VertexId
localSrcIds/localDstIds -> index -> local2global -> VertexId
// 根據 VertexId 取本地下標,取屬性
VertexId -> global2local -> index -> data -> attr object

構建頂點 VertexRDD
緊接着上面構建邊 RDD 的代碼,咱們看看方法 fromEdgeRDD 的實現。

private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
         edges: EdgeRDDImpl[ED, VD],
         defaultVertexAttr: VD,
         edgeStorageLevel: StorageLevel,
         vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
  val edgesCached = edges.withTargetStorageLevel(edgeStorageLevel).cache()
  val vertices = VertexRDD.fromEdges(edgesCached, edgesCached.partitions.size, defaultVertexAttr)
    .withTargetStorageLevel(vertexStorageLevel)
  fromExistingRDDs(vertices, edgesCached)
}

從上面的代碼咱們能夠知道,GraphX 使用 VertexRDD.fromEdges 構建頂點 VertexRDD,固然咱們把邊 RDD 做爲參數傳入。

def fromEdges[VD: ClassTag](
                             edges: EdgeRDD[_], numPartitions: Int, defaultVal: VD): VertexRDD[VD] = {
  // 1 建立路由表
  val routingTables = createRoutingTables(edges, new HashPartitioner(numPartitions))
  // 2 根據路由表生成分區對象vertexPartitions
  val vertexPartitions = routingTables.mapPartitions({ routingTableIter =>
    val routingTable =
      if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
    Iterator(ShippableVertexPartition(Iterator.empty, routingTable, defaultVal))
  }, preservesPartitioning 
true)
  // 3 建立VertexRDDImpl對象
  new VertexRDDImpl(vertexPartitions)
}

構建頂點 VertexRDD 的過程分爲三步,如上代碼中的註釋。它的構建過程以下圖所示:

建立路由表
爲了能經過點找到邊,每一個點須要保存點到邊的信息,這些信息保存在 RoutingTablePartition 中。

private[graphx] def createRoutingTables(edges: EdgeRDD[_], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
  // 將edge partition中的數據轉換成RoutingTableMessage類型,
  val vid2pid = edges.partitionsRDD.mapPartitions(_.flatMap(
    Function.tupled(RoutingTablePartition.edgePartitionToMsgs)))
}

上述程序首先將邊分區中的數據轉換成 RoutingTableMessage 類型,即 tuple(VertexId,Int) 類型。

def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _])
: Iterator[RoutingTableMessage] 
= {
  val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, Byte]
  edgePartition.iterator.foreach { e =>
    map.changeValue(e.srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
    map.changeValue(e.dstId, 0x2, (b: Byte) => (b | 0x2).toByte)
  }
  map.iterator.map { vidAndPosition =>
    val vid = vidAndPosition._1
    val position = vidAndPosition._2
    toMessage(vid, pid, position)
  }
}
//`30-0`比特位表示邊分區`ID`,`32-31`比特位表示標誌位
private def toMessage(vid: VertexId, pid: PartitionID, position: Byte): RoutingTableMessage 
= {
  val positionUpper2 = position << 30
  val pidLower30 = pid & 0x3FFFFFFF
  (vid, positionUpper2 | pidLower30)
}

根據代碼,咱們能夠知道程序使用 int 的 32-31 比特位表示標誌位,即 01: isSrcId ,10: isDstId。30-0 比特位表示邊分區 ID。這樣作能夠節省內存。RoutingTableMessage 表達的信息是:頂點id和它相關聯的邊的分區id是放在一塊兒的,因此任什麼時候候,咱們均可以經過 RoutingTableMessage 找到頂點關聯的邊。

根據路由表生成分區對象

private[graphx] def createRoutingTables(
                                         edges: EdgeRDD[_], vertexPartitioner: Partitioner)
: RDD[RoutingTablePartition] 
= {
  // 將edge partition中的數據轉換成RoutingTableMessage類型,
  val numEdgePartitions = edges.partitions.size
  vid2pid.partitionBy(vertexPartitioner).mapPartitions(
    iter => Iterator(RoutingTablePartition.fromMsgs(numEdgePartitions, iter)),
    preservesPartitioning = true)
}

咱們將第 1 步生成的 vid2pid 按照 HashPartitioner 從新分區。咱們看看 RoutingTablePartition.fromMsgs 方法。

def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage])
: RoutingTablePartition 
= {
  val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
  val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
  val dstFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
  for (msg <- iter) {
    val vid = vidFromMessage(msg)
    val pid = pidFromMessage(msg)
    val position = positionFromMessage(msg)
    pid2vid(pid) += vid
    srcFlags(pid) +
= (position & 0x1) != 0
    dstFlags(pid) += (position & 0x2) != 0
  }
  new RoutingTablePartition(pid2vid.zipWithIndex.map {
    case (vids, pid) => (vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid)))
  })
}

  該方法從 RoutingTableMessage 獲取數據,將 vid, 邊pid, isSrcId/isDstId 從新封裝到 pid2vid,srcFlags,dstFlags 這三個數據結構中。它們表示當前頂點分區中的點在邊分區的分佈。想象一下,從新分區後,新分區中的點可能來自於不一樣的邊分區,因此一個點要找到邊,就須要先肯定邊的分區號 pid, 而後在肯定的邊分區中肯定是 srcId 仍是 dstId, 這樣就找到了邊。新分區中保存 vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid)) 這樣的記錄。這裏轉換爲 toBitSet 保存是爲了節省空間。
  根據上文生成的 routingTables,從新封裝路由表裏的數據結構爲 ShippableVertexPartition。ShippableVertexPartition 會合並相同重複點的屬性 attr 對象,補全缺失的 attr 對象。

def apply[VD: ClassTag](
                         iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD,
                         mergeFunc: (VD, VD) => VD): ShippableVertexPartition[VD] = {
  val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
  // 合併頂點
  iter.foreach { pair =>
    map.setMerge(pair._1, pair._2, mergeFunc)
  }
  // 不全缺失的屬性值
  routingTable.iterator.foreach { vid =>
    map.changeValue(vid, defaultVal, identity)
  }
  new ShippableVertexPartition(map.keySet, map._values, map.keySet.getBitSet, routingTable)
}
//ShippableVertexPartition定義
ShippableVertexPartition[VD: ClassTag](
val index: VertexIdToIndexMap,
val values: Array[VD],
val mask: BitSet,
val routingTable: RoutingTablePartition)

map 就是映射 vertexId->attr,index 就是頂點集合,values 就是頂點集對應的屬性集,mask 指頂點集的 BitSet。

生成 Graph 對象
使用上述構建的 edgeRDD 和 vertexRDD,使用 new GraphImpl(vertices, new ReplicatedVertexView(edges.asInstanceOf[EdgeRDDImpl[ED, VD]])) 就能夠生成 Graph 對象。ReplicatedVertexView 是點和邊的視圖,用來管理運送 (shipping) 頂點屬性到 EdgeRDD 的分區。當頂點屬性改變時,咱們須要運送它們到邊分區來更新保存在邊分區的頂點屬性。
注意:在 ReplicatedVertexView 中不要保存一個對邊的引用,由於在屬性運送等級升級後,這個引用可能會發生改變。

class ReplicatedVertexView[VDClassTagEDClassTag](var edgesEdgeRDDImpl[EDVD], var hasSrcIdBoolean false, var hasDstId: Boolean = false)

2.4 計算模式

2.4.1 BSP 計算模式

  目前基於圖的並行計算框架已經有不少,好比來自 Google 的 Pregel、來自 Apache 開源的圖計算框架 Giraph/HAMA 以及最爲著名的 GraphLab,其中 Pregel、HAMA 和 Giraph 都是很是相似的,都是基於 BSP(Bulk Synchronous Parallell)模式。 Bulk Synchronous Parallell,即總體同步並行。
  在 BSP 中,一次計算過程由一系列全局超步組成,每個超步由併發計算、通訊和同步三個步驟組成。同步完成,標誌着這個超步的完成及下一個超步的開始。 BSP 模式的準則是批量同步(bulk synchrony),其獨特之處在於超步(superstep) 概念的引入。一個 BSP 程序同時具備水平和垂直兩個方面的結構。從垂直上看,一個 BSP 程序由一系列串行的超步(superstep) 組成,如圖所示:


  從水平上看,在一個超步中,全部的進程並行執行局部計算。一個超步可分爲三個階段,如圖所示:

  • 本地計算階段,每一個處理器只對存儲在本地內存中的數據進行本地計算。
  • 全局通訊階段,對任何非本地數據進行操做。
  • 柵欄同步階段,等待全部通訊行爲的結束。

 

BSP 模型有以下幾個特色:
  一、將計算劃分爲一個一個的超步(superstep),有效避免死鎖。
  二、將處理器和路由器分開,強調了計算任務和通訊任務的分開,而路由器僅僅完成點到點的消息傳遞,不提供組合、複製和廣播等功能,這樣作既掩蓋具體的互連網絡拓撲,又簡化了通訊協議。
  三、採用障礙同步的方式、以硬件實現的全局同步是可控的粗粒度級,提供了執行緊耦合同步式並行算法的有效方式

2.4.2 圖操做一覽

正如 RDDs 有基本的操做 map、filter 和 reduceByKey 同樣,屬性圖也有基本的集合操做,這些操做採用用戶自定義的函數併產生包含轉換特徵和結構的新圖。定義在 Graph 中的核心操做是通過優化的實現。表示爲核心操做的組合的便捷操做定義在 GraphOps 中。然而,由於有 Scala 的隱式轉換,定義在 GraphOps 中的操做能夠做爲 Graph 的成員自動使用。例如,咱們能夠經過下面的方式計算每一個頂點(定義在 GraphOps 中)的入度。

val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees

區分核心圖操做和 GraphOps 的緣由是爲了在未來支持不一樣的圖表示。每一個圖表示都必須提供核心操做的實現並重用不少定義在 GraphOps 中的有用操做。

2.4.3 基本信息操做

如下是定義在 Graph 和 GraphOps 中(爲了簡單起見,表現爲圖的成員)的功能的快速瀏覽。注意,某些函數簽名已經簡化(如默認參數和類型的限制已刪除),一些更高級的功能已經被刪除,因此請參閱 API 文檔瞭解官方的操做列表。

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

val users: VertexRDD[(String, String)] = VertexRDD[(String, String)](sc.parallelize(Array((3L, ("rxin""student")), (7L, ("jgonzal""postdoc")), (5L, ("franklin""prof")), (2L, ("istoica""prof")))))

val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L7L"collab"), Edge(5L3L"advisor"), Edge(2L5L"colleague"), Edge(5L7L"pi")))

val graph = Graph(users, relationships)

圖屬性操做總結

/** 圖屬性操做總結 */
class Graph[VDED{
  // 圖信息操做
  // 獲取邊的數量
  val numEdges: Long
  // 獲取頂點的數量
  val numVertices: Long
  // 獲取全部頂點的入度
  val inDegrees: VertexRDD[Int]
  // 獲取全部頂點的出度
  val outDegrees: VertexRDD[Int]
  // 獲取全部頂點入度與出度之和
  val degrees: VertexRDD[Int]
  // 獲取全部頂點的集合
  val vertices: VertexRDD[VD]
  // 獲取全部邊的集合
  val edges: EdgeRDD[ED]
  // 獲取全部triplets表示的集合
  val triplets: RDD[EdgeTriplet[VD, ED]]

  // 緩存操做
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

  def cache(): Graph[VD, ED]

  // 取消緩存
  def unpersist(blocking: Boolean = true): Graph[VD, ED]

  // 圖從新分區
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]

  // 頂點和邊屬性轉換
  def mapVertices[VD2](map: (VertexID, VD) 
=> VD2): Graph[VD2, ED]

  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]

  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]

  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]

  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2]

  // 修改圖結構
  // 反轉圖
  def reverse: Graph[VD, ED]

  // 獲取子圖
  def subgraph(
                epred: EdgeTriplet[VD, ED] => Boolean = (x => true)
,
                vpred: (VertexID, VD) 
=> Boolean = ((v, d) => true))
  : Graph[VD, ED]

  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]

  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]

  // Join RDDs with the graph
  def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED]

  def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])
                               (mapFunc: (VertexID, VD, Option[U]) => VD2)
  : Graph[VD2, ED]

  // Aggregate information about adjacent triplets
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]

  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]]

  def aggregateMessages[Msg: ClassTag](
                                        sendMsg: EdgeContext[VD, ED, Msg] => Unit,
                                        mergeMsg: (Msg, Msg)
 
=> Msg,
                                        tripletFields: TripletFields = TripletFields.All)
  : VertexRDD[A]

  // Iterative graph-parallel computation
  def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
    vprog: (VertexID, VD, A) => VD,
    sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
    mergeMsg: (A, A) => A)
  : Graph[VD, ED]

  // Basic graph algorithms
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]

  def connectedComponents(): Graph[VertexID, ED]

  def triangleCount(): Graph[Int, ED]

  def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED]
}

2.4.4 轉換操做

GraphX 中的轉換操做主要有 mapVertices、mapEdges 和 mapTriplets 三個,它們在 Graph 文件中定義,在 GraphImpl 文件中實現。下面分別介紹這三個方法。

mapVertices
mapVertices 用來更新頂點屬性。從圖的構建那章咱們知道,頂點屬性保存在邊分區中,因此咱們須要改變的是邊分區中的屬性。
對當前圖每個頂點應用提供的 map 函數來修改頂點的屬性,返回一個新的圖。

override def mapVertices[VD2: ClassTag]
(f: (VertexId, VD) => VD2)(implicit eq: VD =:= VD2 = null): Graph[VD2, ED] = {
  if (eq != null) {
    vertices.cache()
    // 使用方法 f 處理 vertices
    val newVerts = vertices.mapVertexPartitions(_.map(f)).cache()
    // 得到兩個不一樣 vertexRDD 的不一樣
    val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
    // 更新 ReplicatedVertexView
    val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
      .updateVertices(changedVerts)
    new GraphImpl(newVerts, newReplicatedVertexView)
  } else {
    GraphImpl(vertices.mapVertexPartitions(_.map(f)), replicatedVertexView.edges)
  }
}

上面的代碼中,當 VD 和 VD2 類型相同時,咱們能夠重用沒有發生變化的點,不然須要從新建立全部的點。咱們分析 VD 和 VD2 相同的狀況,分四步處理。
•(1)使用方法 f 處理 vertices,得到新的 VertexRDD
•(2)使用在 VertexRDD 中定義的 diff 方法求出新 VertexRDD 和源 VertexRDD 的不一樣

override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
  val otherPartition = other match {
    case other: VertexRDD[_] if this.partitioner == other.partitioner =>
      other.partitionsRDD
    case _ =>
      VertexRDD(other.partitionBy(this.partitioner.get)).partitionsRDD
  }
  val newPartitionsRDD = partitionsRDD.zipPartitions(
    otherPartition, preservesPartitioning = true
  ) { (thisIter, otherIter) =>
    val thisPart = thisIter.next()
    val otherPart = otherIter.next()
    Iterator(thisPart.diff(otherPart))
  }
  this.withPartitionsRDD(newPartitionsRDD)
}

這個方法首先處理新生成的 VertexRDD 的分區,若是它的分區和源 VertexRDD 的分區一致,那麼直接取出它的 partitionsRDD,不然從新分區後取出它的 partitionsRDD。 針對新舊兩個 VertexRDD 的全部分區,調用 VertexPartitionBaseOps 中的 diff 方法求得分區的不一樣。

def diff(other: Self[VD]): Self[VD] = {
  // 首先判斷
  if (self.index != other.index) {
    diff(createUsingIndex(other.iterator))
  } else {
    val newMask = self.mask & other.mask
    var i = newMask.nextSetBit(0)
    while (i >= 0) {
      if (self.values(i) == other.values(i)) {
        newMask.unset(i)
      }
      i = newMask.nextSetBit(i + 1)
    }
    this.withValues(other.values).withMask(newMask)
  }
}

該方法隱藏兩個 VertexRDD 中相同的頂點信息,獲得一個新的 VertexRDD。
•(3)更新 ReplicatedVertexView

def updateVertices(updates: VertexRDD[VD]): ReplicatedVertexView[VD, ED] = {
  // 生成一個 VertexAttributeBlock
  val shippedVerts = updates.shipVertexAttributes(hasSrcId, hasDstId)
    .setName("ReplicatedVertexView.updateVertices - shippedVerts %s %s (broadcast)".format(
      hasSrcId, hasDstId))
    .partitionBy(edges.partitioner.get)
  // 生成新的邊 RDD
  val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
    (ePartIter, shippedVertsIter) => ePartIter.map {
      case (pid, edgePartition) =>
        (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
    }
  })
  new ReplicatedVertexView(newEdges, hasSrcId, hasDstId)
}

updateVertices 方法返回一個新的 ReplicatedVertexView,它更新了邊分區中包含的頂點屬性。咱們看看它的實現過程。首先看 shipVertexAttributes 方法的調用。調用 shipVertexAttributes 方法會生成一個 VertexAttributeBlock,VertexAttributeBlock 包含當前分區的頂點屬性,這些屬性能夠在特定的邊分區使用。

def shipVertexAttributes(
                          shipSrc: Boolean, shipDst: Boolean)
: Iterator[(PartitionID, VertexAttributeBlock[VD])
= {
  Iterator.tabulate(routingTable.numEdgePartitions) { pid =>
    val initialSize = if (shipSrc && shipDst) routingTable.partitionSize(pid) else 64
    val vids = new PrimitiveVector[VertexId](initialSize)
    val attrs = new PrimitiveVector[VD](initialSize)
    var i = 0
    routingTable.foreachWithinEdgePartition(pid, shipSrc, shipDst) { vid =>
      if (isDefined(vid)) {
        vids += vid
        attrs += this(vid)
      }
      i += 1
    }
    // (邊分區id, VertexAttributeBlock (頂點id, 屬性))
    (pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array))
  }
}

得到新的頂點屬性以後,咱們就能夠調用 updateVertices 更新邊中頂點的屬性了,以下面代碼所示:

edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator))
//更新EdgePartition的屬性
def updateVertices(iter: Iterator[(VertexId, VD)]): EdgePartition[ED, VD] = {
  val newVertexAttrs = new Array[VD](vertexAttrs.length)
  System.arraycopy(vertexAttrs, 0, newVertexAttrs, 0, vertexAttrs.length)
  while (iter.hasNext) {
    val kv = iter.next()
    //global2local得到頂點的本地index
    newVertexAttrs(global2local(kv._1)) = kv._2
  }
  new EdgePartition(
    localSrcIds, localDstIds, data, index, global2local, local2global, newVertexAttrs,
    activeSet)
}

例子:將頂點的2個屬性合併爲1個屬性(即將字符串合併)。

scala> graph.vertices.collect.foreach(println _)
(5,(franklin,prof))
(2,(istoica,prof))
(3,(rxin,student))
(7,(jgonzal,postdoc))

scala> graph.mapVertices{ case (vid, (attr1,attr2)) => attr1 + attr2 }   或者  graph.mapVertices((VertexId, VD) => VD._1 + VD._2)
res20: org.apache.spark.graphx.Graph[String,String] = org.apache.spark.graphx.impl.GraphImpl@4c819eab

scala> res20.vertices.collect.foreach(println _)
(5,franklinprof)
(2,istoicaprof)
(3,rxinstudent)
(7,jgonzalpostdoc)

mapEdges
mapEdges 用來更新邊屬性。對當前圖每一條邊應用提供的 map 函數來修改邊的屬性,返回一個新圖。

override def mapEdges[ED2: ClassTag](f: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = {
  val newEdges = replicatedVertexView.edges
    .mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator)))
  new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
}

相比於 mapVertices,mapEdges 顯然要簡單得多,它只須要根據方法 f 生成新的 EdgeRDD,而後再初始化便可。
例子:將邊的屬性都加一個前綴。

scala> graph.edges.collect.foreach(println _)
Edge(3,7,collab)
Edge(5,3,advisor)
Edge(2,5,colleague)
Edge(5,7,pi)

scala> graph.mapEdges(edge => "name:" + edge.attr)
res29: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@72da828b

scala> res29.edges.collect.foreach(println _)
Edge(3,7,name:collab)
Edge(5,3,name:advisor)
Edge(2,5,name:colleague)
Edge(5,7,name:pi)

mapTriplets
mapTriplets 用來更新邊屬性。

override def mapTriplets[ED2: ClassTag](
    f: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2],
    tripletFields: TripletFields): Graph[VD, ED2] = {
  vertices.cache()
  replicatedVertexView.upgrade(vertices, tripletFields.useSrc, tripletFields.useDst)
  val newEdges = replicatedVertexView.edges.mapEdgePartitions { (pid, part) =>
    part.map(f(pid, part.tripletIterator(tripletFields.useSrc, tripletFields.useDst)))
  }
  new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
}

這段代碼中,replicatedVertexView 調用 upgrade 方法修改當前的 ReplicatedVertexView,使調用者能夠訪問到指定級別的邊信息(如僅僅能夠讀源頂點的屬性)。

def upgrade(vertices: VertexRDD[VD], includeSrc: Boolean, includeDst: Boolean) {
  // 判斷傳遞級別
  val shipSrc = includeSrc && !hasSrcId
  val shipDst = includeDst && !hasDstId
  if (shipSrc || shipDst) 
{
    val shippedVerts: RDD[(Int, VertexAttributeBlock[VD])] =
      vertices.shipVertexAttributes(shipSrc, shipDst)
        .setName("ReplicatedVertexView.upgrade(%s, %s) - shippedVerts %s %s (broadcast)".format(
          includeSrc, includeDst, shipSrc, shipDst))
        .partitionBy(edges.partitioner.get)
    val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
      (ePartIter, shippedVertsIter) => ePartIter.map {
        case (pid, edgePartition) =>
          (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
      }
    })
    edges = newEdges
    hasSrcId = includeSrc
    hasDstId = includeDst
  }
}

最後,用 f 處理邊,生成新的 RDD,最後用新的數據初始化圖。
例子:邊屬性添加前綴。

scala> graph.edges.collect.foreach(println _)
Edge(3,7,collab)
Edge(5,3,advisor)
Edge(2,5,colleague)
Edge(5,7,pi)

scala> graph.mapTriplets(tri => "name:" + tri.attr).triplets
res37: org.apache.spark.rdd.RDD[org.apache.spark.graphx.EdgeTriplet[(String, String),String]] = MapPartitionsRDD[80] at mapPartitions at GraphImpl.scala:48

scala> graph.mapTriplets(tri => "name:" + tri.attr).triplets.collect
res39: Array[org.apache.spark.graphx.EdgeTriplet[(String, String),String]] = Array(((3,(rxin,student)),(7,(jgonzal,postdoc)),name:collab), ((5,(franklin,prof)),(3,(rxin,student)),name:advisor), ((2,(istoica,prof)),(5,(franklin,prof)),name:colleague), ((5,(franklin,prof)),(7,(jgonzal,postdoc)),name:pi))

scala> graph.mapTriplets(tri => "name:" + tri.attr).triplets.collect.foreach(println _)
((3,(rxin,student)),(7,(jgonzal,postdoc)),name:collab)
((5,(franklin,prof)),(3,(rxin,student)),name:advisor)
((2,(istoica,prof)),(5,(franklin,prof)),name:colleague)
((5,(franklin,prof)),(7,(jgonzal,postdoc)),name:pi)

2.4.5 結構操做

當前的 GraphX 僅僅支持一組簡單的經常使用結構性操做。下面是基本的結構性操做列表。

class Graph[VDED{
  def reverse: Graph[VD, ED]
  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
               vpred: (VertexId, VD)
 
=> Boolean): Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}

下面分別介紹這四種函數的原理:

reverse
reverse 操做返回一個新的圖,這個圖的邊的方向都是反轉的。例如,這個操做能夠用來計算反轉的 PageRank。由於反轉操做沒有修改頂點或者邊的屬性或者改變邊的數量,因此咱們能夠在不移動或者複製數據的狀況下有效地實現它。

override def reverse: Graph[VD, ED] = {
  new GraphImpl(vertices.reverseRoutingTables(), replicatedVertexView.reverse())
}
def reverse(): ReplicatedVertexView[VD, ED] = {
  val newEdges = edges.mapEdgePartitions((pid, part) => part.reverse)
  new ReplicatedVertexView(newEdges, hasDstId, hasSrcId)
}
// EdgePartition 中的 reverse
def reverse: EdgePartition[ED, VD] = {
  val builder = new ExistingEdgePartitionBuilder[ED, VD](
    global2local, local2global, vertexAttrs, activeSet, size)
  var i = 0
  while (i < size) {
    val localSrcId = localSrcIds(i)
    val localDstId = localDstIds(i)
    val srcId = local2global(localSrcId)
    val dstId = local2global(localDstId)
    val attr = data(i)
    // 將源頂點和目標頂點換位置
    builder.add(dstId, srcId, localDstId, localSrcId, attr)
    i += 1
  }
  builder.toEdgePartition
}

例子:圖的入度和出度轉換。

subgraph
subgraph 操做利用頂點和邊的判斷式(predicates),返回的圖僅僅包含知足頂點判斷式的頂點、知足邊判斷式的邊以及知足頂點判斷式的 triple。subgraph 操做能夠用於不少場景,如獲取 感興趣的頂點和邊組成的圖或者獲取清除斷開鏈接後的圖。

override def subgraph(
                       epred: EdgeTriplet[VD, ED] => Boolean = x => true,
                       vpred: (VertexId, VD)
 
=> Boolean = (a, b) => true): Graph[VD, ED] = {
  vertices.cache()
  // 過濾 vertices,重用 partitione和索引
  val newVerts = vertices.mapVertexPartitions(_.filter(vpred))
  // 過濾 triplets
  replicatedVertexView.upgrade(vertices, truetrue)
  val newEdges = replicatedVertexView.edges.filter(epred, vpred)
  new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges))
}

// 該代碼顯示,subgraph 方法的實現分兩步:先過濾 VertexRDD,而後再過濾 EdgeRDD。如上,過濾 VertexRDD 比較簡單,咱們重點看過濾 EdgeRDD 的過程。
def filter(
            epred: EdgeTriplet[VD, ED] => Boolean,
            vpred: (VertexId, VD)
 
=> Boolean): EdgeRDDImpl[ED, VD] = {
  mapEdgePartitions((pid, part) => part.filter(epred, vpred))
}

// EdgePartition 中的 filter 方法
def filter(
            epred: EdgeTriplet[VD, ED] => Boolean,
            vpred: (VertexId, VD)
 
=> Boolean): EdgePartition[ED, VD] = {
  val builder = new ExistingEdgePartitionBuilder[ED, VD](
    global2local, local2global, vertexAttrs, activeSet)
  var i = 0
  while (i < size) {
    // The user sees the EdgeTriplet, so we can't reuse it and must create one per edge.
    val localSrcId = localSrcIds(i)
    val localDstId = localDstIds(i)
    val et = new EdgeTriplet[VD, ED]
    et.srcId = local2global(localSrcId)
    et.dstId = local2global(localDstId)
    et.srcAttr = vertexAttrs(localSrcId)
    et.dstAttr = vertexAttrs(localDstId)
    et.attr = data(i)
    if (vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et)) {
      builder.add(et.srcId, et.dstId, localSrcId, localDstId, et.attr)
    }
    i += 1
  }
  builder.toEdgePartition
}

由於用戶能夠看到 EdgeTriplet 的信息,因此咱們不能重用 EdgeTriplet,須要從新建立一個,而後在用 epred 函數處理。
例子:

scala> graph.subgraph(Triplet => Triplet.attr.startsWith("c"), (VertexId, VD) => VD._2.startsWith("pro"))
res3: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@49db5438
scala> res3.vertices.collect
res4: Array[(org.apache.spark.graphx.VertexId, (String, String))] = Array((2,(istoica,prof)), (5,(franklin,prof)))
scala> res3.edges.collect
res5: Array[org.apache.spark.graphx.Edge[String]] = Array(Edge(2,5,colleague))

mask
mask 操做構造一個子圖,相似於交集,這個子圖包含輸入圖中包含的頂點和邊。它的實現很簡單,頂點和邊均作 inner join 操做便可。這個操做能夠和 subgraph 操做相結合,基於另一個相關圖的特徵去約束一個圖。

override def mask[VD2: ClassTag, ED2: ClassTag] (
      other: Graph[VD2, ED2]): Graph[VD, ED] = {
  val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v }
  val newEdges = replicatedVertexView.edges.innerJoin(other.edges) { (src, dst, v, w) => v }
  new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges))
}

groupEdges
groupEdges 操做合併多重圖中的並行邊(如頂點對之間重複的邊),並傳入一個函數來合併兩個邊的屬性。在大量的應用程序中,並行的邊能夠合併(它們的權重合並)爲一條邊從而下降圖的大小。

override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = {
  val newEdges = replicatedVertexView.edges.mapEdgePartitions(
    (pid, part) => part.groupEdges(merge))
  new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges))
}

def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED, VD] = {
  val builder = new ExistingEdgePartitionBuilder[ED, VD](
    global2local, local2global, vertexAttrs, activeSet)
  var currSrcId: VertexId = null.asInstanceOf[VertexId]
  var currDstId: VertexId = null.asInstanceOf[VertexId]
  var currLocalSrcId = -1
  var currLocalDstId = -1
  var currAttr: ED = null.asInstanceOf[ED]
  // 迭代處理全部的邊
  var i = 0
  while (i < size) {
    // 若是源頂點和目的頂點都相同
    if (i > 0 && currSrcId == srcIds(i) && currDstId == dstIds(i)) {
      // 合併屬性
      currAttr = merge(currAttr, data(i))
    } else {
      // This edge starts a new run of edges
      if (i > 0) {
        // 添加到 builder 中
        builder.add(currSrcId, currDstId, currLocalSrcId, currLocalDstId, currAttr)
      }
      // Then start accumulating for a new run
      currSrcId = srcIds(i)
      currDstId = dstIds(i)
      currLocalSrcId = localSrcIds(i)
      currLocalDstId = localDstIds(i)
      currAttr = data(i)
    }
    i += 1
  }
  if (size > 0) {
    builder.add(currSrcId, currDstId, currLocalSrcId, currLocalDstId, currAttr)
  }
  builder.toEdgePartition
}

在圖構建那章咱們說明過,存儲的邊按照源頂點 id 排過序,因此上面的代碼能夠經過一次迭代完成對全部相同邊的處理。

應用舉例

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin""student")), (7L, ("jgonzal""postdoc")),
    (5L, ("franklin""prof")), (2L, ("istoica""prof")),
    (4L, ("peter""student"))))

// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L7L"collab"),    Edge(5L3L"advisor"),
    Edge(2L5L"colleague"), Edge(5L7L"pi"),
    Edge(4L0L"student"),   Edge(5L0L"colleague")))

// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe""Missing")

// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users

// 4 (peter) and 5 (franklin).
graph.triplets.map(
  triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))

// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
  triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))

// Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field

// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")

// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)

2.4.6 頂點關聯操做

在許多狀況下,有必要將外部數據加入到圖中。例如,咱們可能有額外的用戶屬性須要合併到已有的圖中或者咱們可能想從一個圖中取出頂點特徵加入到另一個圖中。這些任務能夠用 join 操做完成。主要的 join 操做以下所示。

class Graph[VDED{
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
  : Graph[VD, ED]
  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
  : Graph[VD2, ED]
}

joinVertices 操做 join 輸入 RDD 和頂點,返回一個新的帶有頂點特徵的圖。這些特徵是經過在鏈接頂點的結果上使用用戶定義的 map 函數得到的。沒有匹配的頂點保留其原始值。下面詳細地來分析這兩個函數。

joinVertices
joinVertices 來 join 相同 ID 的頂點數據。

def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD)
: Graph[VD, ED] = {
  val uf = (id: VertexId, data: VD, o: Option[U]) => {
    o match {
      case Some(u) => mapFunc(id, data, u)
      case None => data
    }
  }
  graph.outerJoinVertices(table)(uf)
}

咱們能夠看到,joinVertices 的實現是經過 outerJoinVertices 來實現的。這是由於 join 原本就是 outer join 的一種特例。
例子:

scala> graph.vertices.collect.foreach(println _)
(5,(franklin,prof))
(2,(istoica,prof))
(3,(rxin,student))
(7,(jgonzal,postdoc))

scala> val join = sc.parallelize(Array((3L"123")))
join: org.apache.spark.rdd.RDD[(Long, String)] = ParallelCollectionRDD[137] at parallelize at <console>:31
scala> graph.joinVertices(join)((VertexId, VD, U) => (VD._1, VD._2 + U))
res33: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@4e5b8728
scala> res33.vertices.collect.foreach(println _)
(7,(jgonzal,postdoc))
(2,(istoica,prof))
(3,(rxin,student123))
(5,(franklin,prof))

outerJoinVertices
跟 JOIN 相似,只不過 table 中沒有的頂點默認值爲 None。

override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
(other: RDD[(VertexId, U)])
(updateF: (VertexId, VD, Option[U]) => VD2)
(implicit eq: VD =:= VD2 = null): Graph[VD2, ED] = {
  if (eq != null) {
    vertices.cache()
    // updateF preserves type, so we can use incremental replication
    val newVerts = vertices.leftJoin(other)(updateF).cache()
    val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
    val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
      .updateVertices(changedVerts)
    new GraphImpl(newVerts, newReplicatedVertexView)
  } else {
    // updateF does not preserve type, so we must re-replicate all vertices
    val newVerts = vertices.leftJoin(other)(updateF)
    GraphImpl(newVerts, replicatedVertexView.edges)
  }
}

經過以上的代碼咱們能夠看到,若是 updateF 不改變類型,咱們只須要建立改變的頂點便可,不然咱們要從新建立全部的頂點。咱們討論不改變類型的狀況。 這種狀況分三步。
•(1)修改頂點屬性值

val newVerts = vertices.leftJoin(other)(updateF).cache()

這一步會用頂點 RDD join 傳入的 RDD,而後用 updateF 做用 joinRDD 中的全部頂點,改變它們的值。
•(2)找到發生改變的頂點

val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)

•(3)更新 newReplicatedVertexView 中邊分區中的頂點屬性。

val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]].updateVertices(changedVerts)

例子:

scala> graph.vertices.collect.foreach(println _)
(5,(franklin,prof))
(2,(istoica,prof))
(3,(rxin,student))
(7,(jgonzal,postdoc))

scala> graph.outerJoinVertices(join)((VertexId, VD, U) => (VD._1, VD._2 + U))
res35: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@7c542a14
scala> res35.vertices.collect.foreach(println _)
(7,(jgonzal,postdocNone))
(2,(istoica,profNone))
(3,(rxin,studentSome(123)))
(5,(franklin,profNone))

2.4.7 聚合操做

GraphX 中提供的聚合操做有 aggregateMessages、collectNeighborIds 和 collectNeighbors 三個,其中 aggregateMessages 在 GraphImpl 中實現,collectNeighborIds 和 collectNeighbors 在 GraphOps 中實現。下面分別介紹這幾個方法。

aggregateMessages

aggregateMessages 接口:aggregateMessages 是 GraphX 最重要的 API,用於替換 mapReduceTriplets。目前 mapReduceTriplets 最終也是經過 aggregateMessages 來實現的。它主要功能是向鄰邊發消息,合併鄰邊收到的消息,返回 messageRDD。

aggregateMessages 的接口以下:

def aggregateMessages[A: ClassTag](
      sendMsg: EdgeContext[VD, ED, A] => Unit,
      mergeMsg: (A, A) => A,
      tripletFields: TripletFields = TripletFields.All)
: VertexRDD[A] = {
  aggregateMessagesWithActiveSet(sendMsg, mergeMsg, tripletFields, None)
}

該接口有三個參數,分別爲發消息函數、合併消息函數以及發消息的方向。
• sendMsg: 發消息函數

private def sendMsg(ctx: EdgeContext[KCoreVertex, Int, Map[Int, Int]]): Unit = {
  ctx.sendToDst(Map(ctx.srcAttr.preKCore -> -1, ctx.srcAttr.curKCore -> 1))
  ctx.sendToSrc(Map(ctx.dstAttr.preKCore -> -1, ctx.dstAttr.curKCore -> 1))
}

• mergeMsg:合併消息函數
該函數用於在 Map 階段每一個 edge 分區中每一個點收到的消息合併,而且它還用於 reduce 階段,合併不一樣分區的消息。合併 vertexId 相同的消息。
• tripletFields:定義發消息的方向

aggregateMessages 處理流程:aggregateMessages 方法分爲 Map 和 Reduce 兩個階段,下面咱們分別就這兩個階段說明。

Map 階段
從入口函數進入 aggregateMessagesWithActiveSet 函數,該函數首先使用 VertexRDD[VD] 更新 replicatedVertexView, 只更新其中 vertexRDD 中 attr 對象。如構建圖中介紹的,replicatedVertexView 是點和邊的視圖,點的屬性有變化,要更新邊中包含的點的 attr。

replicatedVertexView.upgrade(vertices, tripletFields.useSrc, tripletFields.useDst)
val view = activeSetOpt match {
  case Some((activeSet, _)=>
    // 返回只包含活躍頂點的 replicatedVertexView
    replicatedVertexView.withActiveSet(activeSet)
  case None =>
    replicatedVertexView
}

程序而後會對 replicatedVertexView 的 edgeRDD 作 mapPartitions 操做,全部的操做都在每一個邊分區的迭代中完成,以下面的代碼:

val preAgg = view.edges.partitionsRDD.mapPartitions(_.flatMap {
  case (pid, edgePartition) =>
    // 選擇 scan 方法
    val activeFraction = edgePartition.numActives.getOrElse(0) / edgePartition.indexSize.toFloat
    activeDirectionOpt match {
      case Some(EdgeDirection.Both) =>
        if (activeFraction < 0.8) {
          edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, tripletFields,
            EdgeActiveness.Both)
        } else {
          edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
            EdgeActiveness.Both)
        }
      case Some(EdgeDirection.Either) =>
        edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
          EdgeActiveness.Either)
      case Some(EdgeDirection.Out) =>
        if (activeFraction < 0.8) {
          edgePartition.aggregateMessagesIndexScan(sendMsg, mergeMsg, tripletFields,
            EdgeActiveness.SrcOnly)
        } else {
          edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
            EdgeActiveness.SrcOnly)
        }
      case Some(EdgeDirection.In) =>
        edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
          EdgeActiveness.DstOnly)
      case _ => // None
        edgePartition.aggregateMessagesEdgeScan(sendMsg, mergeMsg, tripletFields,
          EdgeActiveness.Neither)
    }
})

在分區內,根據 activeFraction 的大小選擇是進入 aggregateMessagesEdgeScan 仍是 aggregateMessagesIndexScan 處理。aggregateMessagesEdgeScan 會順序地掃描全部的邊, 而 aggregateMessagesIndexScan 會先過濾源頂點索引,而後再掃描。咱們重點去分析 aggregateMessagesEdgeScan。

def aggregateMessagesEdgeScan[A: ClassTag](
      sendMsg: EdgeContext[VD, ED, A] => Unit,
      mergeMsg: (A, A) => A,
      tripletFields: TripletFields,
      activeness: EdgeActiveness): Iterator[(VertexId, A)] = {
  var ctx = new AggregatingEdgeContext[VD, ED, A](mergeMsg, aggregates, bitset)
  var i = 0
  while (i < size) {
    val localSrcId = localSrcIds(i)
    val srcId = local2global(localSrcId)
    val localDstId = localDstIds(i)
    val dstId = local2global(localDstId)
    val srcAttr = if (tripletFields.useSrc) vertexAttrs(localSrcId) else null.asInstanceOf[VD]
    val dstAttr = if (tripletFields.useDst) vertexAttrs(localDstId) else null.asInstanceOf[VD]
    ctx.set(srcId, dstId, localSrcId, localDstId, srcAttr, dstAttr, data(i))
    sendMsg(ctx)
    i += 1
  }
}

該方法由兩步組成,分別是得到頂點相關信息,以及發送消息。

• 獲取頂點相關信息
在前文介紹 edge partitio n時,咱們知道它包含 localSrcIds, localDstIds, data, index, global2local, local2global, vertexAttrs 這幾個重要的數據結構。其中 localSrcIds, localDstIds 分別表示源頂點、目的頂點在當前分區中的索引。因此咱們能夠遍歷 localSrcIds,根據其下標去 localSrcIds 中拿到 srcId 在全局 local2global 中的索引,最後拿到 srcId。經過 vertexAttrs 拿到頂點屬性。經過 data 拿到邊屬性。

• 發送消息
發消息前會根據接口中定義的 tripletFields,拿到發消息的方向。發消息的過程就是遍歷到一條邊,向 localSrcIds/localDstIds 中添加數據,若是 localSrcIds/localDstIds 中已經存在該數據,則執行合併函數 mergeMsg。

override def sendToSrc(msg: A) {
  send(_localSrcId, msg)
}
override def sendToDst(msg: A) {
  send(_localDstId, msg)
}
@inline private def send(localId: Int, msg: A) {
  if (bitset.get(localId)) {
    aggregates(localId) = mergeMsg(aggregates(localId), msg)
  } else {
    aggregates(localId) = msg
    bitset.set(localId)
  }
}

每一個點之間在發消息的時候是獨立的,即:點單純根據方向,向以相鄰點的以 localId 爲下標的數組中插數據,互相獨立,能夠並行運行。Map 階段最後返回消息 RDD messages: RDD[(VertexId, VD2)]
Map 階段的執行流程以下例所示:

Reduce 階段
Reduce 階段的實現就是調用下面的代碼:

vertices.aggregateUsingIndex(preAgg, mergeMsg)
override def aggregateUsingIndex[VD2: ClassTag](
                                                 messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = {
  val shuffled = messages.partitionBy(this.partitioner.get)
  val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
    thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
  }
  this.withPartitionsRDD[VD2](parts)
}

面的代碼經過兩步實現。
• (1)對 messages 從新分區,分區器使用 VertexRDD 的 partitioner。而後使用 zipPartitions 合併兩個分區。
• (2)對等合併 attr, 聚合函數使用傳入的 mergeMs g函數。

def aggregateUsingIndex[VD2: ClassTag](
               iter: Iterator[Product2[VertexId, VD2]],
                reduceFunc: (VD2, VD2) => VD2): Self[VD2] = {
  val newMask = new BitSet(self.capacity)
  val newValues = new Array[VD2](self.capacity)
  iter.foreach { product =>
    val vid = product._1
    val vdata = product._2
    val pos = self.index.getPos(vid)
    if (pos >= 0) {
      if (newMask.get(pos)) {
        newValues(pos) = reduceFunc(newValues(pos), vdata)
      } else { // otherwise just store the new value
        newMask.set(pos)
        newValues(pos) = vdata
      }
    }
  }
  this.withValues(newValues).withMask(newMask)
}

根據傳參,咱們知道上面的代碼迭代的是 messagePartition,並非每一個節點都會收到消息,因此 messagePartition 集合最小,迭代速度會快。
這段代碼表示,咱們根據 vetexId 從 index 中取到其下標 pos,再根據下標,從 values 中取到 attr,存在 attr 就用 mergeMsg 合併 attr,不存在就直接賦值。
Reduce 階段的過程以下圖所示:

舉例
  
下面的例子計算比用戶年齡大的追隨者(即 followers)的平均年齡。

// Import random graph generation library
import org.apache.spark.graphx.util.GraphGenerators
// Create a graph with "age" as the vertex property.  Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )

// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
  triplet => { // Map Function
    if (triplet.srcAttr > triplet.dstAttr) {
      // Send message to destination vertex containing counter and age
      triplet.sendToDst(1, triplet.srcAttr)
    }
  },
  // Add counter and age
  (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)

// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
  olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } )

// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))

collectNeighbors
該方法的做用是收集每一個頂點的鄰居頂點的頂點 id 和頂點屬性。須要指定方向。

def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]] = {
  val nbrs = edgeDirection match {
    case EdgeDirection.Either =>
      graph.aggregateMessages[Array[(VertexId, VD)]](
        ctx => {
          ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))
          ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))
        },
        (a, b) => a ++ b, TripletFields.All)
    case EdgeDirection.In =>
      graph.aggregateMessages[Array[(VertexId, VD)]](
        ctx => ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr))),
        (a, b) => a ++ b, TripletFields.Src)
    case EdgeDirection.Out =>
      graph.aggregateMessages[Array[(VertexId, VD)]](
        ctx => ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr))),
        (a, b) => a ++ b, TripletFields.Dst)
    case EdgeDirection.Both =>
      throw new SparkException("collectEdges does not support EdgeDirection.Both. Use" +
        "EdgeDirection.Either instead.")
  }
  graph.vertices.leftJoin(nbrs) { (vid, vdata, nbrsOpt) =>
    nbrsOpt.getOrElse(Array.empty[(VertexId, VD)])
  }
}

從上面的代碼中,第一步是根據 EdgeDirection 來肯定調用哪一個 aggregateMessages 實現聚合操做。咱們用知足條件 EdgeDirection.Either 的狀況來講明。能夠看到 aggregateMessages 的方式消息的函數爲:

ctx => {
  ctx.sendToSrc(Array((ctx.dstId, ctx.dstAttr)))
  ctx.sendToDst(Array((ctx.srcId, ctx.srcAttr)))
},

這個函數在處理每條邊時都會同時向源頂點和目的頂點發送消息,消息內容分別爲(目的頂點 id,目的頂點屬性)、(源頂點 id,源頂點屬性)。爲何會這樣處理呢? 咱們知道,每條邊都由兩個頂點組成,對於這個邊,我須要向源頂點發送目的頂點的信息來記錄它們之間的鄰居關係,同理向目的頂點發送源頂點的信息來記錄它們之間的鄰居關係。

Merge 函數是一個集合合併操做,它合併同同一個頂點對應的全部目的頂點的信息。以下所示:

(a, b) => a ++ b

經過 aggregateMessages 得到包含鄰居關係信息的 VertexRDD 後,把它和現有的 vertices 做 join 操做,獲得每一個頂點的鄰居消息。

collectNeighborIds
該方法的做用是收集每一個頂點的鄰居頂點的頂點 id。它的實現和 collectNeighbors 很是相同。須要指定方向。

def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] = {
  val nbrs =
    if (edgeDirection == EdgeDirection.Either) {
      graph.aggregateMessages[Array[VertexId]](
        ctx => { ctx.sendToSrc(Array(ctx.dstId)); ctx.sendToDst(Array(ctx.srcId)) },
        _ ++ _, TripletFields.None)
    } else if (edgeDirection == EdgeDirection.Out) {
      graph.aggregateMessages[Array[VertexId]](
        ctx => ctx.sendToSrc(Array(ctx.dstId)),
        _ ++ _, TripletFields.None)
    } else if (edgeDirection == EdgeDirection.In) {
      graph.aggregateMessages[Array[VertexId]](
        ctx => ctx.sendToDst(Array(ctx.srcId)),
        _ ++ _, TripletFields.None)
    } else {
      throw new SparkException("It doesn't make sense to collect neighbor ids without a " +
        "direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)")
    }
  graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>
    nbrsOpt.getOrElse(Array.empty[VertexId])
  }
}

和 collectNeighbors 的實現不一樣的是,aggregateMessages 函數中的 sendMsg 函數只發送頂點Id到源頂點和目的頂點。其它的實現基本一致。

ctx => { ctx.sendToSrc(Array(ctx.dstId)); ctx.sendToDst(Array(ctx.srcId)) }

2.4.8 緩存操做

  在 Spark 中,RDD 默認是不緩存的。爲了不重複計算,當須要屢次利用它們時,咱們必須顯示地緩存它們。GraphX 中的圖也有相同的方式。當利用到圖屢次時,確保首先訪問 Graph.cache() 方法。
  在迭代計算中,爲了得到最佳的性能,不緩存多是必須的。默認狀況下,緩存的 RDD 和圖會一直保留在內存中直到由於內存壓力迫使它們以LRU的順序刪除。對於迭代計算,先前的迭代的中間結果將填充到緩存 中。雖然它們最終會被刪除,可是保存在內存中的不須要的數據將會減慢垃圾回收。只有中間結果不須要,不緩存它們是更高效的。然而,由於圖是由多個 RDD 組成的,正確的不持久化它們是困難的。對於迭代計算,咱們建議使用 Pregel API,它能夠正確的不持久化中間結果。
  GraphX 中的緩存操做有 cache, persist, unpersist 和 unpersistVertices。它們的接口分別是:

def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
def cache(): Graph[VD, ED]
def unpersist(blocking: Boolean = true): Graph[VD, ED]
def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]

2.5 Pregel API

  圖自己是遞歸數據結構,頂點的屬性依賴於它們鄰居的屬性,這些鄰居的屬性又依賴於本身鄰居的屬性。因此許多重要的圖算法都是迭代的從新計算每一個頂點的屬性,直到知足某個肯定的條件。一系列的圖併發(graph-parallel)抽象已經被提出來用來表達這些迭代算法。GraphX 公開了一個相似 Pregel 的操做,它是普遍使用的 Pregel 和 GraphLab 抽象的一個融合。
  GraphX 中實現的這個更高級的 Pregel 操做是一個約束到圖拓撲的批量同步(bulk-synchronous)並行消息抽象。Pregel 操做者執行一系列的超步(super steps),在這些步驟中,頂點從以前的超步中接收進入 (inbound) 消息的總和,爲頂點屬性計算一個新的值,而後在之後的超步中發送消息到鄰居頂點。不像 Pregel 而更像 GraphLab,消息經過邊 triplet 的一個函數被並行計算,消息的計算既會訪問源頂點特徵也會訪問目的頂點特徵。在超步中,沒有收到消息的頂點會被跳過。當沒有消息遺留時,Pregel 操做中止迭代並返回最終的圖。
  注意:與標準的 Pregel 實現不一樣的是,GraphX 中的頂點僅僅能發送信息給鄰居頂點,而且能夠利用用戶自定義的消息函數並行地構造消息。這些限制容許對 GraphX 進行額外的優化。
  下面的代碼是 pregel 的具體實現。

def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
  (graph: Graph[VD, ED],
   initialMsg: A,
   maxIterations: Int = Int.MaxValue,
   activeDirection: EdgeDirection = EdgeDirection.Either)
  (vprog: (VertexId, VD, A) => VD,
   sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
   mergeMsg: (A, A) => A)
  : Graph[VD, ED] =
{
  var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
  // 計算消息
  var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
  var activeMessages = messages.count()
  // 迭代
  var prevG: Graph[VD, ED] = null
  var i = 0
  while (activeMessages > 0 && i < maxIterations) {
    // 接收消息並更新頂點
    prevG = g
    g = g.joinVertices(messages)(vprog).cache()
    val oldMessages = messages
    // 發送新消息
    messages = g.mapReduceTriplets(
      sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
    activeMessages = messages.count()
    i += 1
  }
  g
}

2.5.1 pregel 計算模型

Pregel 計算模型中有三個重要的函數,分別是 vertexProgram、sendMessage 和 messageCombiner。
  • vertexProgram:用戶定義的頂點運行程序。它做用於每個頂點,負責接收進來的信息,並計算新的頂點值。
  • sendMsg:發送消息。
  • mergeMsg:合併消息。

咱們具體分析它的實現。根據代碼能夠知道,這個實現是一個迭代的過程。在開始迭代以前,先完成一些初始化操做:

var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
// 計算消息
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
var activeMessages = messages.count()

程序首先用 vprog 函數處理圖中全部的頂點,生成新的圖。而後用生成的圖調用聚合操做(mapReduceTriplets,實際的實現是咱們前面章節講到的 aggregateMessagesWithActiveSet 函數)獲取聚合後的消息。 activeMessages 指 messages 這個 VertexRDD中的頂點數。
下面就開始迭代操做了。在迭代內部,分爲二步。
• (1)接收消息,並更新頂點

g = g.joinVertices(messages)(vprog).cache()
// joinVertices 的定義
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD)
: Graph[VD, ED] = {
  val uf = (id: VertexId, data: VD, o: Option[U]) => {
    o match {
      case Some(u) => mapFunc(id, data, u)
      case None => data
    }
  }
  graph.outerJoinVertices(table)(uf)
}

這一步其實是使用 outerJoinVertices 來更新頂點屬性。outerJoinVertices 在關聯操做中有詳細介紹。

• (2)發送新消息

messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()

注意:在上面的代碼中,mapReduceTriplets 多了一個參數 Some((oldMessages, activeDirection))。這個參數的做用是:它使咱們在發送新的消息時,會忽略掉那些兩端都沒有接收到消息的邊,減小計算量。

2.5.2 pregel 實現最短路徑

import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators
val graph: Graph[Long, Double] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// 初始化圖
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
  (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
  triplet => {  // Send Message
    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    } else {
      Iterator.empty
    }
  },
  (a,b) => math.min(a,b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))

上面的例子中,Vertex Program 函數定義以下:

(id, dist, newDist) => math.min(dist, newDist)

這個函數的定義顯而易見,當兩個消息來的時候,取它們當中路徑的最小值。同理 Merge Message 函數也是一樣的含義。
Send Message 函數中,會首先比較 triplet.srcAttr + triplet.attr 和 triplet.dstAttr,即比較加上邊的屬性後,這個值是否小於目的節點的屬性,若是小於,則發送消息到目的頂點。

2.6 GraphX 實例

下圖中有6我的,每一個人有名字和年齡,這些人根據社會關係造成 8 條邊,每條邊有其屬性。在如下例子演示中將構建頂點、邊和圖,打印圖的屬性、轉換操做、結構操做、鏈接操做、聚合操做,並結合實際要求進行演示。

程序代碼以下:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx.{Edge, _}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Practice extends App {

  // 屏蔽日誌
  Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
  Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

  //設定一個 SparkConf
  val conf = new SparkConf().setAppName("SimpleGraphX").setMaster("local[4]")
  val sc = new SparkContext(conf)

  // 初始化頂點集合
  val vertexArray = Array(
    (1L, ("Alice"28)),
    (2L, ("Bob"27)),
    (3L, ("Charlie"65)),
    (4L, ("David"42)),
    (5L, ("Ed"55)),
    (6L, ("Fran"50))
  )
  // 建立頂點的 RDD 表示
  val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)

  // 初始化邊的集合
  val edgeArray = Array(
    Edge(2L1L7),
    Edge(2L4L2),
    Edge(3L2L4),
    Edge(3L6L3),
    Edge(4L1L1),
    Edge(2L5L2),
    Edge(5L3L8),
    Edge(5L6L3)
  )

  // 建立邊的 RDD 表示
  val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)

  // 建立一個圖
  val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)

  //***************************  圖的屬性    ****************************************
  println("屬性演示")
  println("**********************************************************")
  println("找出圖中年齡大於30的頂點:")
  graph.vertices.filter { case (id, (name, age)) => age > 30 }.collect.foreach {
    case (id, (name, age)) => println(s"$name is $age")
  }

  println
  println("找出圖中屬性大於 5 的邊:")
  graph.edges.filter(e => e.attr > 5).collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))
  println

  // triplets 操做,((srcId, srcAttr), (dstId, dstAttr), attr)
  println("列出邊屬性 >5 的 tripltes:")
  for (triplet <- graph.triplets.filter(t => t.attr > 5).collect) 
{
    println(s"${triplet.srcAttr._1} likes ${triplet.dstAttr._1}")
  }
  println

  // degrees 操做
  println("找出圖中最大的出度、入度、度數:")
  def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
    if (a._2 > b._2) a else b
  }

  println("max of outDegrees:" + graph.outDegrees.reduce(max) + " max of inDegrees:" + graph.inDegrees.reduce(max) + " max of Degrees:" + graph.degrees.reduce(max))
  println

  //***************************  轉換操做    ****************************************
  println("轉換操做")
  println("**********************************************************")
  println("頂點的轉換操做,頂點age + 10:")
  graph.mapVertices { case (id, (name, age)) => (id, (name, age + 10)) }.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))
  println
  println("邊的轉換操做,邊的屬性*2:")
  graph.mapEdges(e => e.attr * 2).edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))
  println
  println("三元組的轉換操做,邊的屬性爲端點的age相加:")
  graph.mapTriplets(tri => tri.srcAttr._2 * tri.dstAttr._2).triplets.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))
  println

  //***************************  結構操做    ****************************************
  println("結構操做")
  println("**********************************************************")
  println("頂點年紀 >30 的子圖:")
  val subGraph 
= graph.subgraph(vpred = (id, vd) => vd._2 >= 30)
  println("子圖全部頂點:")
  subGraph.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))
  println
  println("子圖全部邊:")
  subGraph.edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))
  println
  println("反轉整個圖:")
  val reverseGraph 
= graph.reverse
  println("子圖全部頂點:")
  reverseGraph.vertices.collect.foreach(v => println(s"${v._2._1} is ${v._2._2}"))
  println
  println("子圖全部邊:")
  reverseGraph.edges.collect.foreach(e => println(s"${e.srcId} to ${e.dstId} att ${e.attr}"))
  println

  //***************************  鏈接操做    ****************************************
  println("鏈接操做")
  println("**********************************************************")
  val inDegrees: VertexRDD[Int] 
= graph.inDegrees

  case class User(name: String, age: Int, inDeg: Int, outDeg: Int)

  // 建立一個新圖,頂類點VD的數據型爲 User,並從 graph 作類型轉換
  val initialUserGraph: Graph[User, Int] 
= graph.mapVertices { case (id, (name, age)) => User(name, age, 00) }

  // initialUserGraph 與 inDegrees、outDegrees(RDD)進行鏈接,並修改 initialUserGraph 中 inDeg 值、outDeg 值
  val userGraph = initialUserGraph.outerJoinVertices(initialUserGraph.inDegrees) {
    case (id, u, inDegOpt) => User(u.name, u.age, inDegOpt.getOrElse(0), u.outDeg)
  }.outerJoinVertices(initialUserGraph.outDegrees) {
    case (id, u, outDegOpt) => User(u.name, u.age, u.inDeg, outDegOpt.getOrElse(0))
  }

  println("鏈接圖的屬性:")
  userGraph.vertices.collect.foreach(v => println(s"${v._2.name} inDeg: ${v._2.inDeg}  outDeg: ${v._2.outDeg}"))
  println

  println("出度和入讀相同的人員:")
  userGraph.vertices.filter 
{
    case (id, u) => u.inDeg == u.outDeg
  }.collect.foreach {
    case (id, property) => println(property.name)
  }
  println

  //***************************  聚合操做    ****************************************
  println("聚合操做")
  println("**********************************************************")
  println("collectNeighbors:獲取當前節點source節點的id和屬性")
  graph.collectNeighbors(EdgeDirection.In).collect.foreach(v => {
    println(s"id: ${v._1}");
    for (arr <- v._2) {
      println(s"      ${arr._1} (name: ${arr._2._1}  age: ${arr._2._2})")
    }
  })

  println("aggregateMessages版本:")
  graph.aggregateMessages[Array[(VertexId, (String, Int))]](ctx => ctx.sendToDst(Array((ctx.srcId.toLong, (ctx.srcAttr._1, ctx.srcAttr._2)))), _ ++ _).collect.foreach(v => {
    println(s"id: ${v._1}");
    for (arr <- v._2) {
      println(s"    ${arr._1} (name: ${arr._2._1}  age: ${arr._2._2})")
    }
  })

  println("聚合操做")
  println("**********************************************************")
  println("找出年紀最大的追求者:")

  val oldestFollower: VertexRDD[(String, Int)] = userGraph.aggregateMessages[(String, Int)](
    // 將源頂點的屬性發送給目標頂點,map 過程
    ctx => ctx.sendToDst((ctx.srcAttr.name, ctx.srcAttr.age)),
    // 獲得最大追求者,reduce 過程
    (a, b) => if (a._2 > b._2) a else b
  )

  userGraph.vertices.leftJoin(oldestFollower) { (id, user, optOldestFollower) =>
    optOldestFollower match {
      case None => s"${user.name} does not have any followers."
      case Some((name, age)=> s"${name} is the oldest follower of ${user.name}."
    }
  }.collect.foreach { case (id, str) => println(str) }
  println

  //***************************  實用操做    ****************************************
  println("聚合操做")
  println("**********************************************************")

  val sourceId: VertexId = 5L // 定義源點
  val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)

  initialGraph.triplets.collect().foreach(println)

  println("找出5到各頂點的最短距離:")
  val sssp = initialGraph.pregel(Double.PositiveInfinity, Int.MaxValue, EdgeDirection.Out)(
    (id, dist, newDist) => {
      println("||||" + id);
      math.min(dist, newDist)
    },
    triplet => { // 計算權重
      println(">>>>" + triplet.srcId)
      if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
        // 發送成功
        Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
      } else {
        // 發送不成功
        Iterator.empty
      }
    },
    (a, b) => math.min(a, b) // 當前節點全部輸入的最短距離
  )
  sssp.triplets.collect().foreach(println)

  println(sssp.vertices.collect.mkString("\n"))

  sc.stop()
}

運行結果截圖:

運行結果以下:

屬性演示
**********************************************************
找出圖中年齡大於30的頂點:
David is 42
Ed is 55
Fran is 50
Charlie is 65

找出圖中屬性大於 5 的邊:
2 to 1 att 7
5 to 3 att 8

列出邊屬性 >5 的 tripltes:
Bob likes Alice
Ed likes Charlie

找出圖中最大的出度、入度、度數:
max of outDegrees:(2,3) max of inDegrees:(6,2) max of Degrees:(2,4)

轉換操做
**********************************************************
頂點的轉換操做,頂點age + 10
4 is (David,52)
1 is (Alice,38)
5 is (Ed,65)
6 is (Fran,60)
2 is (Bob,37)
3 is (Charlie,75)

邊的轉換操做,邊的屬性*2
2 to 1 att 14
2 to 4 att 4
3 to 2 att 8
3 to 6 att 6
2 to 5 att 4
4 to 1 att 2
5 to 3 att 16
5 to 6 att 6

三元組的轉換操做,邊的屬性爲端點的age相加:
2 to 1 att 756
2 to 4 att 1134
3 to 2 att 1755
3 to 6 att 3250
2 to 5 att 1485
4 to 1 att 1176
5 to 3 att 3575
5 to 6 att 2750

結構操做
**********************************************************
頂點年紀 >30 的子圖:
子圖全部頂點:
David is 42
Ed is 55
Fran is 50
Charlie is 65

子圖全部邊:
3 to 6 att 3
5 to 3 att 8
5 to 6 att 3

反轉整個圖:
子圖全部頂點:
David is 42
Alice is 28
Ed is 55
Fran is 50
Bob is 27
Charlie is 65

子圖全部邊:
1 to 2 att 7
4 to 2 att 2
2 to 3 att 4
6 to 3 att 3
1 to 4 att 1
5 to 2 att 2
3 to 5 att 8
6 to 5 att 3

鏈接操做
**********************************************************
鏈接圖的屬性:
David inDeg: 1  outDeg: 1
Alice inDeg: 2  outDeg: 0
Ed inDeg: 1  outDeg: 2
Fran inDeg: 2  outDeg: 0
Bob inDeg: 1  outDeg: 3
Charlie inDeg: 1  outDeg: 2

出度和入讀相同的人員:
David

聚合操做
**********************************************************
collectNeighbors:獲取當前節點source節點的id和屬性
id: 4
      2 (name: Bob  age: 27)
id: 1
      2 (name: Bob  age: 27)
      4 (name: David  age: 42)
id: 5
      2 (name: Bob  age: 27)
id: 6
      3 (name: Charlie  age: 65)
      5 (name: Ed  age: 55)
id: 2
      3 (name: Charlie  age: 65)
id: 3
      5 (name: Ed  age: 55)
aggregateMessages版本:
id: 4
    2 (name: Bob  age: 27)
id: 1
    2 (name: Bob  age: 27)
    4 (name: David  age: 42)
id: 5
    2 (name: Bob  age: 27)
id: 6
    3 (name: Charlie  age: 65)
    5 (name: Ed  age: 55)
id: 2
    3 (name: Charlie  age: 65)
id: 3
    5 (name: Ed  age: 55)
聚合操做
**********************************************************
找出年紀最大的追求者:
Bob is the oldest follower of David.
David is the oldest follower of Alice.
Bob is the oldest follower of Ed.
Charlie is the oldest follower of Fran.
Charlie is the oldest follower of Bob.
Ed is the oldest follower of Charlie.

聚合操做
**********************************************************
((2,Infinity),(1,Infinity),7)
((2,Infinity),(4,Infinity),2)
((3,Infinity),(2,Infinity),4)
((3,Infinity),(6,Infinity),3)
((2,Infinity),(5,0.0),2)
((4,Infinity),(1,Infinity),1)
((5,0.0),(3,Infinity),8)
((5,0.0),(6,Infinity),3)
找出5到各頂點的最短距離:
||||6
||||1
||||3
||||4
||||5
||||2
>>>>5
>>>>2
>>>>3
>>>>2
>>>>4
>>>>5
>>>>3
>>>>2
||||3
||||6
>>>>3
>>>>3
||||2
>>>>2
>>>>2
>>>>2
||||1
||||4
>>>>4
||||1
((2,12.0),(1,15.0),7)
((2,12.0),(4,14.0),2)
((3,8.0),(2,12.0),4)
((3,8.0),(6,3.0),3)
((2,12.0),(5,0.0),2)
((4,14.0),(1,15.0),1)
((5,0.0),(3,8.0),8)
((5,0.0),(6,3.0),3)
(4,14.0)
(1,15.0)
(5,0.0)
(6,3.0)
(2,12.0)
(3,8.0)

第3章 圖算法

3.1 PageRank 排名算法

3.1.1 算法概述

PageRank,即網頁排名,又稱網頁級別Google 左側排名佩奇排名。是Google創始人拉里·佩奇和謝爾蓋·布林於 1997 年構建早期的搜索系統原型時提出的連接分析算法,在揉合了諸如 Title 標識和 Keywords 標識等全部其它因素以後,Google 經過 PageRank 來調整結果,使那些更具「等級/重要性」的網頁在搜索結果中令網站排名得到提高,從而提升搜索結果的相關性和質量。

3.1.2 從入鏈數量到 PageRank

PageRank 的計算基於如下兩個基本假設:
  • 數量假設:在 Web 圖模型中,若是一個頁面節點接收到的其餘網頁指向的入鏈數量越多,那麼這個頁面越重要。
  • 質量假設:指向頁面 A 的入鏈質量不一樣,質量高的頁面會經過連接向其餘頁面傳遞更多的權重。因此越是質量高的頁面指向頁面 A,則頁面 A 越重要。
利用以上兩個假設,PageRank 算法剛開始賦予每一個網頁相同的重要性得分,經過迭代遞歸計算來更新每一個頁面節點的 PageRank 得分,直到得分穩定爲止。 PageRank 計算得出的結果是網頁的重要性評價,這和用戶輸入的查詢是沒有任何關係的,即算法是主題無關的

3.1.3 PageRank 算法原理

PageRank 的計算充分利用了兩個假設:數量假設和質量假設。步驟以下:
  • 1)在初始階段:網頁經過連接關係構建起 Web 圖,每一個頁面設置相同的 PageRank 值,經過若干輪的計算,會獲得每一個頁面所得到的最終 PageRank 值。隨着每一輪的計算進行,網頁當前的 PageRank 值會不斷獲得更新。
  • 2)在一輪中更新頁面 PageRank 得分的計算方法:在一輪更新頁面 PageRank 得分的計算中,每一個頁面將其當前的 PageRank 值平均分配到本頁面包含的出鏈上,這樣每一個連接即得到了相應的權值。而每一個頁面將全部指向本頁面的入鏈所傳入的權值求和,便可獲得新的 PageRank 得分。當每一個頁面都得到了更新後的 PageRank 值,就完成了一輪 PageRank 計算。

基本思想
若是網頁 T 存在一個指向網頁 A 的鏈接,則代表 T 的全部者認爲 A 比較重要,從而把 T 的一部分重要性得分賦予 A。這個重要性得分值爲:PR(T)/L(T)
  其中 PR(T) 爲 T 的 PageRank 值,L(T) 爲 T 的出鏈數。
  則 A 的 PageRank 值爲一系列相似於 T 的頁面重要性得分值的累加。
一個頁面的得票數由全部鏈向它的頁面的重要性來決定,到一個頁面的超連接至關於對該頁投一票。一個頁面的 PageRank 是由全部鏈向它的頁面(鏈入頁面)的重要性通過遞歸算法獲得的。一個有較多鏈入的頁面會有較高的等級,相反若是一個頁面沒有任何鏈入頁面,那麼它沒有等級。
咱們設向量 B 爲第1、第2、…、第 N 個網頁的網頁排名


矩陣 A 表明網頁之間的權重輸出關係,其中 amn 表明第 m 個網頁向第 n 個網頁的輸出權重。

輸出權重計算較爲簡單:假設 m 一共有 10 個出鏈,指向 n 的一共有2個,那麼 m 向 n 輸出的權重就爲 2/10。
如今問題變爲:A 是已知的,咱們要經過計算獲得 B。
假設 Bi 是第 i 次迭代的結果,那麼

初始假設全部網頁的排名都是 1/N (N爲網頁總數量),即

經過上述迭代計算,最終 Bi 會收斂,即 Bi 無限趨近於 B,此時 B = B × A。

 

具體示例
假設有網頁 A、B、C、D,它們之間的連接關係以下圖所示


計算 B1 以下:

不斷迭代,計算結果以下:

 

第 1次迭代: 0.125, 0.333, 0.083, 0.458 
第 2次迭代: 0.042, 0.500, 0.042, 0.417 
第 3次迭代: 0.021, 0.431, 0.014, 0.535 
第 4次迭代: 0.007, 0.542, 0.007, 0.444 
第 5次迭代: 0.003, 0.447, 0.002, 0.547 
第 6次迭代: 0.001, 0.549, 0.001, 0.449 
第 7次迭代: 0.001, 0.449, 0.000, 0.550 
第 8次迭代: 0.000, 0.550, 0.000, 0.450 
第 9次迭代: 0.000, 0.450, 0.000, 0.550 
第10次迭代: 0.000, 0.550, 0.000, 0.450 
... ...

咱們能夠發現,A 和 C 的權重變爲 0,而 B 和 D 的權重也趨於在 0.5 附近擺動。從圖中也能夠觀察出:A 和 C 之間有互相連接,但它們又把權重輸出給了 B 和 D,而 B 和 D之間互相連接,並不向 A 或 C 輸出任何權重,因此長此以往權重就都轉移到 B 和 D 了。

PageRank 的改進
上面是最簡單正常的狀況,考慮一下兩種特殊狀況:


第一種狀況是,B 存在導向本身的連接,迭代計算過程是:

 

第 1次迭代: 0.125, 0.583, 0.083, 0.208 
第 2次迭代: 0.042, 0.833, 0.042, 0.083 
第 3次迭代: 0.021, 0.931, 0.014, 0.035 
第 4次迭代: 0.007, 0.972, 0.007, 0.014 
第 5次迭代: 0.003, 0.988, 0.002, 0.006 
第 6次迭代: 0.001, 0.995, 0.001, 0.002 
第 7次迭代: 0.001, 0.998, 0.000, 0.001 
第 8次迭代: 0.000, 0.999, 0.000, 0.000 
第 9次迭代: 0.000, 1.000, 0.000, 0.000 
第10次迭代: 0.000, 1.000, 0.000, 0.000 
... ...

咱們發現最終 B 權重變爲1,其它全部網頁的權重都變爲了0。
第二種狀況是 B 是孤立於其它網頁的,既沒有入鏈也沒有出鏈,迭代計算過程是:

第 1次迭代: 0.125, 0.000, 0.125, 0.250 
第 2次迭代: 0.063, 0.000, 0.063, 0.125 
第 3次迭代: 0.031, 0.000, 0.031, 0.063 
第 4次迭代: 0.016, 0.000, 0.016, 0.031 
第 5次迭代: 0.008, 0.000, 0.008, 0.016 
第 6次迭代: 0.004, 0.000, 0.004, 0.008 
第 7次迭代: 0.002, 0.000, 0.002, 0.004 
第 8次迭代: 0.001, 0.000, 0.001, 0.002 
第 9次迭代: 0.000, 0.000, 0.000, 0.001 
第10次迭代: 0.000, 0.000, 0.000, 0.000 
... ...

咱們發現全部網頁權重都變爲了0。
出現這種狀況是由於上面的數學模型出現了問題,該模型認爲上網者從一個網頁瀏覽下一個網頁都是經過頁面的超連接。想象一下正常的上網情景,其實咱們在看完一個網頁後,可能直接在瀏覽器輸入一個網址,而不經過上一個頁面的超連接。
咱們假設每一個網頁被用戶經過直接訪問方式的機率是相等的,即 1/N,N 爲網頁總數,設矩陣 e 以下:


設用戶經過頁面超連接瀏覽下一網頁的機率爲 α,則直接訪問的方式瀏覽下一個網頁的機率爲 1 - α,改進上一節的迭代公式爲:

一般狀況下設 α 爲0.8,上一節」具體示例」的計算變爲以下:

迭代過程以下:
第 1次迭代: 0.150, 0.317, 0.117, 0.417 
第 2次迭代: 0.097, 0.423, 0.090, 0.390 
第 3次迭代: 0.086, 0.388, 0.076, 0.450 
第 4次迭代: 0.080, 0.433, 0.073, 0.413 
第 5次迭代: 0.079, 0.402, 0.071, 0.447 
第 6次迭代: 0.079, 0.429, 0.071, 0.421 
第 7次迭代: 0.078, 0.408, 0.071, 0.443 
第 8次迭代: 0.078, 0.425, 0.071, 0.426 
第 9次迭代: 0.078, 0.412, 0.071, 0.439 
第10次迭代: 0.078, 0.422, 0.071, 0.428 
第11次迭代: 0.078, 0.414, 0.071, 0.437 
第12次迭代: 0.078, 0.421, 0.071, 0.430 
第13次迭代: 0.078, 0.415, 0.071, 0.436 
第14次迭代: 0.078, 0.419, 0.071, 0.431 
第15次迭代: 0.078, 0.416, 0.071, 0.435 
第16次迭代: 0.078, 0.419, 0.071, 0.432 
第17次迭代: 0.078, 0.416, 0.071, 0.434 
第18次迭代: 0.078, 0.418, 0.071, 0.432 
第19次迭代: 0.078, 0.417, 0.071, 0.434 
第20次迭代: 0.078, 0.418, 0.071, 0.433 
... ...

修正 PageRank 計算公式
因爲存在一些出鏈爲 0,也就是那些不連接任何其餘網頁的網,也稱爲孤立網頁,使得不少網頁能被訪問到。所以須要對 PageRank 公式進行修正,即在簡單公式的基礎上增長了阻尼係數(damping factor)q, q 通常取值 q=0.85。
其意義是,在任意時刻,用戶到達某頁面後並繼續向後瀏覽的機率。1- q= 0.15 就是用戶中止點擊,隨機跳到新 URL 的機率)的算法被用到了全部頁面上,估算頁面可能被上網者放入書籤的機率。
最後,即全部這些被換算爲一個百分比再乘上一個係數 q。因爲下面的算法,沒有頁面的 PageRank 會是 0。因此,Google 經過數學系統給了每一個頁面一個最小值。

這個公式就是 S. Brin 和 L. Page 在《The Anatomy of a Large- scale Hypertextual Web Search Engine Computer Networks and ISDN Systems 》定義的公式。
因此一個頁面的 PageRank 是由其餘頁面的 PageRank 計算獲得。Google 不斷的重複計算每一個頁面的 PageRank。若是給每一個頁面一個隨機 PageRank 值(非0),那麼通過不斷的重複計算,這些頁面的 PR 值會趨向於正常和穩定。這就是搜索引擎使用它的緣由。

首先求完整的公式

3.1.4 Spark GraphX 實現

import org.apache.spark.graphx.GraphLoader

// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksByUsername.collect().mkString("\n"))

3.2 廣度優先遍歷(參考)

val graph = GraphLoader.edgeListFile(sc, "graphx/data/test_graph.txt")

val root: VertexId = 1
val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else
  Double.PositiveInfinity)

val vprog = { (id: VertexId, attr: Double, msg: Double) => math.min(attr,msg) }

val sendMessage = { (triplet: EdgeTriplet[Double, Int]) =>
  var iter:Iterator[(VertexId, Double)] = Iterator.empty
  val isSrcMarked = triplet.srcAttr != Double.PositiveInfinity
  val isDstMarked = triplet.dstAttr != Double.PositiveInfinity
  if(!(isSrcMarked && isDstMarked))
{
    if(isSrcMarked){
      iter = Iterator((triplet.dstId,triplet.srcAttr+1))
    }else{
      iter = Iterator((triplet.srcId,triplet.dstAttr+1))
    }
  }
  iter
}

val reduceMessage = { (a: Double, b: Double) => math.min(a,b) }

val bfs = initialGraph.pregel(Double.PositiveInfinity, 20)(vprog, sendMessage, reduceMessage)

println(bfs.vertices.collect.mkString("\n"))

3.3 單源最短路徑(參考)

import scala.reflect.ClassTag

import org.apache.spark.graphx._

/**
  * Computes shortest paths to the given set of landmark vertices, returning a graph where each
  * vertex attribute is a map containing the shortest-path distance to each reachable landmark.
  */

object ShortestPaths {
  /** Stores a map from the vertex id of a landmark to the distance to that landmark. */
  type SPMap = Map[VertexId, Int]

  private def makeMap(x: (VertexId, Int)*) = Map(x: _*)

  private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) }

  private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap =
    (spmap1.keySet ++ spmap2.keySet).map {
      k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
    }.toMap

  /**
    * Computes shortest paths to the given set of landmark vertices.
    *
    * @tparam ED the edge attribute type (not used in the computation)
    *
    * @param graph the graph for which to compute the shortest paths
    * @param landmarks the list of landmark vertex ids. Shortest paths will be computed to each
    * landmark.
    *
    * @return a graph where each vertex attribute is a map containing the shortest-path distance to
    * each reachable landmark vertex.
    */

  def run[VD, ED: ClassTag](graph: Graph[VD, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED] = {
    val spGraph = graph.mapVertices { (vid, attr) =>
      if (landmarks.contains(vid)) makeMap(vid -> 0else makeMap()
    }

    val initialMessage = makeMap()

    def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
      addMaps(attr, msg)
    }

    def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)= {
      val newAttr = incrementMap(edge.dstAttr)
      if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr))
      else Iterator.empty
    }

    Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)
  }
}

3.4 連通圖(參考)

import scala.reflect.ClassTag

import org.apache.spark.graphx._

/** Connected components algorithm. */
object ConnectedComponents {
  /**
    * Compute the connected component membership of each vertex and return a graph with the vertex
    * value containing the lowest vertex id in the connected component containing that vertex.
    *
    * @tparam VD the vertex attribute type (discarded in the computation)
    * @tparam ED the edge attribute type (preserved in the computation)
    * @param graph the graph for which to compute the connected components
    * @param maxIterations the maximum number of iterations to run for
    * @return a graph with vertex attributes containing the smallest vertex in each
    *         connected component
    */

  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
                                      maxIterations: Int): Graph[VertexId, ED] = {
    require(maxIterations > 0, s"Maximum of iterations must be greater than 0," +
      s" but got ${maxIterations}")

    val ccGraph = graph.mapVertices { case (vid, _) => vid }
    def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)= {
      if (edge.srcAttr < edge.dstAttr) {
        Iterator((edge.dstId, edge.srcAttr))
      } else if (edge.srcAttr > edge.dstAttr) {
        Iterator((edge.srcId, edge.dstAttr))
      } else {
        Iterator.empty
      }
    }
    val initialMessage = Long.MaxValue
    val pregelGraph = Pregel(ccGraph, initialMessage,
      maxIterations, EdgeDirection.Either)(
      vprog = (id, attr, msg) => math.min(attr, msg),
      sendMsg = sendMessage,
      mergeMsg = (a, b) => math.min(a, b))
    ccGraph.unpersist()
    pregelGraph
  } // end of connectedComponents

  /**
    * Compute the connected component membership of each vertex and return a graph with the vertex
    * value containing the lowest vertex id in the connected component containing that vertex.
    *
    * @tparam VD the vertex attribute type (discarded in the computation)
    * @tparam ED the edge attribute type (preserved in the computation)
    * @param graph the graph for which to compute the connected components
    * @return a graph with vertex attributes containing the smallest vertex in each
    *         connected component
    */

  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
    run(graph, Int.MaxValue)
  }
}

3.5 三角計數(參考)

import scala.reflect.ClassTag

import org.apache.spark.graphx._

/**
  * Compute the number of triangles passing through each vertex.
  *
  * The algorithm is relatively straightforward and can be computed in three steps:
  *
  * <ul>
  * <li> Compute the set of neighbors for each vertex</li>
  * <li> For each edge compute the intersection of the sets and send the count to both vertices.</li>
  * <li> Compute the sum at each vertex and divide by two since each triangle is counted twice.</li>
  * </ul>
  *
  * There are two implementations.  The default `TriangleCount.run` implementation first removes
  * self cycles and canonicalizes the graph to ensure that the following conditions hold:
  * <ul>
  * <li> There are no self edges</li>
  * <li> All edges are oriented src > dst</li>
  * <li> There are no duplicate edges</li>
  * </ul>
  * However, the canonicalization procedure is costly as it requires repartitioning the graph.
  * If the input data is already in "canonical form" with self cycles removed then the
  * `TriangleCount.runPreCanonicalized` should be used instead.
  *
  * {{{
  * val canonicalGraph = graph.mapEdges(e => 1).removeSelfEdges().canonicalizeEdges()
  * val counts = TriangleCount.runPreCanonicalized(canonicalGraph).vertices
  * }}}
  *
  */

object TriangleCount {

  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = {
    // Transform the edge data something cheap to shuffle and then canonicalize
    val canonicalGraph = graph.mapEdges(e => true).removeSelfEdges().convertToCanonicalEdges()
    // Get the triangle counts
    val counters = runPreCanonicalized(canonicalGraph).vertices
    // Join them bath with the original graph
    graph.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) =>
      optCounter.getOrElse(0)
    }
  }


  def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED] = {
    // Construct set representations of the neighborhoods
    val nbrSets: VertexRDD[VertexSet] =
      graph.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
        val set = new VertexSet(nbrs.length)
        var i = 0
        while (i < nbrs.length) {
          // prevent self cycle
          if (nbrs(i) != vid) {
            set.add(nbrs(i))
          }
          i += 1
        }
        set
      }

    // join the sets with the graph
    val setGraph: Graph[VertexSet, ED] = graph.outerJoinVertices(nbrSets) {
      (vid, _, optSet) => optSet.getOrElse(null)
    }

    // Edge function computes intersection of smaller vertex with larger vertex
    def edgeFunc(ctx: EdgeContext[VertexSet, ED, Int]) {
      val (smallSet, largeSet) = if (ctx.srcAttr.size < ctx.dstAttr.size) {
        (ctx.srcAttr, ctx.dstAttr)
      } else {
        (ctx.dstAttr, ctx.srcAttr)
      }
      val iter = smallSet.iterator
      var counter: Int = 0
      while (iter.hasNext) {
        val vid = iter.next()
        if (vid != ctx.srcId && vid != ctx.dstId && largeSet.contains(vid)) {
          counter += 1
        }
      }
      ctx.sendToSrc(counter)
      ctx.sendToDst(counter)
    }

    // compute the intersection along edges
    val counters: VertexRDD[Int] = setGraph.aggregateMessages(edgeFunc, _ + _)
    // Merge counters with the graph and divide by two since each triangle is counted twice
    graph.outerJoinVertices(counters) { (_, _, optCounter: Option[Int]) =>
      val dblCount = optCounter.getOrElse(0)
      // This algorithm double counts each triangle so the final count should be even
      require(dblCount % 2 == 0"Triangle count resulted in an invalid number of triangles.")
      dblCount / 2
    }
  }
}

第4章 PageRank 實例

採用的數據是 wiki 數據中含有 Berkeley 標題的網頁之間鏈接關係,數據爲兩個文件:graphx-wiki-vertices.txt 和 graphx-wiki-edges.txt,能夠分別用於圖計算的頂點和邊。
Step一、上傳數據

[atguigu@hadoop102 hadoop-2.7.2]$ pwd
/opt/module/hadoop-2.7.2
[atguigu@hadoop102 hadoop-2.7.2]$ bin/hdfs dfs -put /opt/software/graphx-wiki-edges.txt /
[atguigu@hadoop102 hadoop-2.7.2]$ bin/hdfs dfs -put /opt/software/graphx-wiki-vertices.txt /

Step二、RDD 加載數據轉換 Edges

scala> val erdd = sc.textFile("hdfs://hadoop102:9000/graphx-wiki-edges.txt")
erdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/graphx-wiki-edges.txt MapPartitionsRDD[88] at textFile at <console>:26

scala> val edges = erdd.map(x => { val para = x.split("\t"); Edge(para(0).trim.toLong, para(1).trim.toLong,0) })
edges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[Int]] = MapPartitionsRDD[89] at map at <console>:28

Step三、RDD 加載數據轉換 vertices

scala> val vrdd = sc.textFile("hdfs://hadoop102:9000/graphx-wiki-vertices.txt")
vrdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/graphx-wiki-vertices.txt MapPartitionsRDD[91] at textFile at <console>:26

scala> val vertices = vrdd.map(x => { val para = x.split("\t"); (para(0).trim.toLong, para(1).trim) })
vertices: org.apache.spark.rdd.RDD[(Long, String)] = MapPartitionsRDD[92] at map at <console>:28

Step四、構建 Graph

scala> val graph = Graph(vertices, edges)
graph: org.apache.spark.graphx.Graph[String,Int] = org.apache.spark.graphx.impl.GraphImpl@a0ff17d

Step五、運行配置 RageRank

scala> val prGraph = graph.pageRank(0.001).cache()
prGraph: org.apache.spark.graphx.Graph[Double,Double] = org.apache.spark.graphx.impl.GraphImpl@45e9b508

Step六、輸出 RageRank 結果

scala> val titleAndPrGraph = graph.outerJoinVertices(prGraph.vertices) {(v, title, rank) => (rank.getOrElse(0.0), title)}
titleAndPrGraph: org.apache.spark.graphx.Graph[(Double, String),Int] = org.apache.spark.graphx.impl.GraphImpl@6bb0284d

scala> titleAndPrGraph.vertices.top(10) { Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1) }.foreach(t => println(t._2._2 + ": " + t._2._1))
University of California, Berkeley: 1321.1117543121227
Berkeley, California: 664.8841977233989
Uc berkeley: 162.5013274339786
Berkeley Software Distribution: 90.47860388486127
Lawrence Berkeley National Laboratory: 81.90404939642022
George Berkeley: 81.85226118458043
Busby Berkeley: 47.87199821801991
Berkeley Hills: 44.76406979519929
Xander Berkeley: 30.32407534728813
Berkeley County, South Carolina: 28.908336483710315

示例代碼以下:

package com.atguigu.graphx

import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx._
import org.apache.spark.{SparkConf, SparkContext}

object PageRank extends App {

  // 屏蔽日誌
  Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
  Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

  // 設定一個 SparkConf
  val conf = new SparkConf().setAppName("SimpleGraphX").setMaster("local[4]")
  val sc = new SparkContext(conf)

  val erdd = sc.textFile("D:\\learn\\JetBrains\\workspace_idea\\spark\\doc\\graphx-wiki-edges.txt")
  val edges = erdd.map(x => {
    val para = x.split("\t");
    Edge(para(0).trim.toLong, para(1).trim.toLong, 0)
  })

  val vrdd = sc.textFile("D:\\learn\\JetBrains\\workspace_idea\\spark\\doc\\graphx-wiki-vertices.txt")
  val vertices = vrdd.map(x => {
    val para = x.split("\t");
    (para(0).trim.toLong, para(1).trim)
  })

  val graph = Graph(vertices, edges)

  println("**********************************************************")
  println("PageRank 計算,獲取最有價值的數據")
  println("**********************************************************")

  val prGraph = graph.pageRank(0.001).cache()

  val titleAndPrGraph = graph.outerJoinVertices(prGraph.vertices) { (v, title, rank) => (rank.getOrElse(0.0), title) }

  titleAndPrGraph.vertices.top(10) {
    Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1)
  }.foreach(t => println(t._2._2 + ": " + t._2._1))

  sc.stop()
}

輸出結果以下:

**********************************************************
PageRank 計算,獲取最有價值的數據
**********************************************************
University of California, Berkeley: 1321.1117543121227
Berkeley, California: 664.8841977233989
Uc berkeley: 162.5013274339786
Berkeley Software Distribution: 90.47860388486127
Lawrence Berkeley National Laboratory: 81.90404939642022
George Berkeley: 81.85226118458043
Busby Berkeley: 47.87199821801991
Berkeley Hills: 44.76406979519929
Xander Berkeley: 30.32407534728813
Berkeley County, South Carolina: 28.908336483710315
相關文章
相關標籤/搜索