Spark GraphX 快速入門

Spark GraphX 快速入門

(基於原文:http://blog.csdn.net/mach_learn/article/details/46501351 編輯)算法

概述

GraphX是Spark的一個新組件用於圖和並行圖計算。 GraphX經過引進一個新的圖抽象擴展了spark RDD:帶有頂點和邊屬性的有向多重圖。 爲了支持圖計算,GraphX 提供了不少基本的操做(像 subgraph, joinVertices, and aggregateMessages)和pregel的一個優化變種。 除此以外,GraphX 包含了一個正在增加的圖算法和圖構造的集合來簡化圖的分析任務。shell

從spark1.1 遷移

GraphX 在spark 1.3.1改變了部分用戶正在使用api:apache

爲了改進性能,引入了一個新版的 mapReduceTriplets 稱爲aggregateMessages,它取先前返回信息從 mapReduceTriplets 經過一個回調 EdgeContext 而不是經過返回值。咱們正在遺棄 mapReduceTriplets,鼓勵用戶查閱過分指南。api

在spark1.0和1.1,EdgeRDD的簽名切換從 EdgeRDD[ED] 到 EdgeRDD[ED, VD]來進行一些緩存優化。 咱們已經發現了一個更加優雅的解決方案,恢復了簽名到更加天然地EdgeRDD[ED]類型。數組

開始

開始spark GraphX,你首先須要將spark和GraphX導入你的工程,以下:緩存

import org.apache.spark._
import org.apache.spark.graphx._

// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD

若是你沒有使用spark shell,你須要本身創建一個SparkContext。網絡

屬性圖

屬性圖是一個有向的多重圖,用戶爲每個頂點(vertex)和邊(edge)定義對象。一個有向多重圖是一個有向圖,潛在的多重平行邊共享相同的源和目的頂點(vertex)。支持平行邊的能力簡化了相同頂點間有多重關係(例如,同時和朋友)的建模場景。每個頂點以64位長度標識(vertexId)做爲鍵。GraphX沒有對頂點標識符強加一個排序限制。一樣地,邊有對應的源和目的頂點標識符。數據結構

屬性圖經過頂點(VD)和邊(ED)類型參數化。這些類型分別指與頂點和邊相關的對象。架構

GraphX優化了頂點和邊類型表示,當它們使用原始數據類型(像 int,double等),使用特殊數組存儲它們下降了內存使用。app

在一些狀況下,同一個圖中頂點使用不一樣的屬性類型進行描述。這能經過繼承實現。例如,將用戶和產品建模爲一個二分圖,能夠用以下方式:

class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty
case class ProductProperty(val name: String, val price: Double) extends VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null

像RDDs,屬性圖是不變的、分佈式的和容錯的。圖的值或者結構的改變經過產生一個指望改變的新圖來完成。注意,原始圖的本質部分(不影響結構、屬性和索引)均可以在新圖中重用,用來減小這種固有的功能數據結構的成本。圖被分區經過executors使用一個範圍的頂點進行啓發式分區。像RDDs同樣,當發生故障時,圖的每個分區能被從新建立在不一樣的機器上。

邏輯上屬性圖對應一對類型化的RDDs集合,其編碼每個頂點和邊的屬性。所以,圖類包含圖的頂點和邊成員:

class Graph[VD, ED] { val vertices: VertexRDD[VD] val edges: EdgeRDD[ED] }

VertexRDD[VD]和 EdgeRDD[ED]分別對應RDD[(VertexID, VD)]和RDD[Edge[ED]]版本的擴展和優化。VertexRDD[VD] 和 EdgeRDD[ED]提供了額外的功能在圖計算中,同時進行了內部優化。討論 VertexRDD 和 EdgeRDD API細節在vertex和edgeRDDs小節,如今暫且認爲簡單RDDs形式:RDD[(VertexID, VD)] 和 RDD[Edge[ED]]

屬性圖實例

假設咱們想構建一個包含不一樣合做者的屬性圖在圖工程中。頂點屬性可能包含用戶名和職業。咱們註釋邊使用字符串描述合做者之間的關係。

結果圖有以下類型簽名:

val userGraph: Graph[(String, String), String]

有不少種方式構建一個屬性圖從原始文件、RDDs、甚至合成生成器,這些在graph builders節將詳細介紹。或許最基本的方法是使用圖對象。例如,下面代碼展現了使用一系列RDDs集合構建一個圖:

// Assume the SparkContext has already been constructed
val sc: SparkContext
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)

在上面的實例中,咱們用到了Edge樣本類。Edges 有一個srcId 和 dstId 對應原頂點和目的頂點標識符。除此以外,Edge類有一個attr 成員存儲邊屬性。

咱們可使用graph.vertices和graph.edges解構出一個圖對應的頂點和邊。

val graph: Graph[(String, String), String] // Constructed from above
// Count all users which are postdocs
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count

注意:graph.vertices返回一個VertexRDD[(String, String)],其擴展自RDD[(VertexID, (String, String))],這樣咱們可使用Scala case表達式來解構元祖。在另外一方面,graph.edges返回一個EdgeRDD 包含Edge[String]對象。咱們也可使用case類類型的構造器,以下所示:

graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count

除了屬性圖的頂點和邊視圖。GraphX 也暴露了一個triplet 視圖。triplet視圖邏輯上鍊接了頂點和邊屬性產生一個 RDD[EdgeTriplet[VD, ED]],其包含EdgeTriplet類。join能夠表達在下面SQL表達式:

SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
ON e.srcId = src.Id AND e.dstId = dst.Id

或者生動的表示爲:

EdgeTriplet類擴展了Edge類經過增長srcAttr 和dstAttr 成員,它們包含源和目的頂點屬性。咱們可使用一個圖的 triplet 視圖來提供一些字符串描述用戶之間的關係。

val graph: Graph[(String, String), String] // Constructed from above
// Use the triplets view to create an RDD of facts.
val facts: RDD[String] =
  graph.triplets.map(triplet =>
    triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
facts.collect.foreach(println(_))

圖操做

像RDDs有基本的操做,如 map、filter和reduceByKey,屬性圖也有一些基本的操做,這些操做採用用戶自定義函數,產生轉換屬性和解構的新圖。在Graph中定義的核心操做是已經被優化的實現,組合核心操做的便捷操做定義在GraphOps中。然而,因爲Scala的隱士轉換在GraphOps中的操做可在Graph的成員中自動得到。例如,咱們能夠計算每個頂點的入度(定義在GraphOps),以下所示:

val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees

區別核心graph操做和GraphOps的緣由是在未來支持不一樣的圖表述。每個圖表述必須提供核心操做實現,重複使用在GraphOps中有用的一些操做。

操做列表概要

如下是一個定義在 Graph 和 GraphOps函數快速摘要,爲簡單起見都做爲 Graph 的成員。注意:一些函數簽名已經被簡化(像默認參數和類型約束被移除),一些高級的函數沒有列出,若是須要請參考api文檔。

/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
  // Information about the Graph ===================================================================
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]
  // Views of the graph as collections =============================================================
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
  val triplets: RDD[EdgeTriplet[VD, ED]]
  // Functions for caching graphs ==================================================================
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def cache(): Graph[VD, ED]
  def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
  // Change the partitioning heuristic  ============================================================
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
  // Transform vertex and edge attributes ==========================================================
  def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
    : Graph[VD, ED2]
  // Modify the graph structure ====================================================================
  def reverse: Graph[VD, ED]
  def subgraph(
      epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
      vpred: (VertexID, VD) => Boolean = ((v, d) => true))
    : Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
  // Join RDDs with the graph ======================================================================
  def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])
      (mapFunc: (VertexID, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
  // Aggregate information about adjacent triplets =================================================
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]]
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A]
  // Iterative graph-parallel computation ==========================================================
  def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
      vprog: (VertexID, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
  // Basic graph algorithms ========================================================================
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  def connectedComponents(): Graph[VertexID, ED]
  def triangleCount(): Graph[Int, ED]
  def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED]
}

屬性操做

像RDD map操做,屬性圖包括下面操做:

class Graph[VD, ED] {
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}

這裏每個操做產生一個新圖,其頂點和邊被用戶定義的map函數修改了。

注意:在每個實例圖結構不受影響。這是這些操做的關鍵特徵,這容許結果圖重複利用原始圖的結構索引。下面的代碼片斷邏輯上是等同的,可是第一個沒有保存結構索引,其不會從GraphX系統優化中獲益:

val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) }
val newGraph = Graph(newVertices, graph.edges)

代替,使用mapVertices保護結構索引:

val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))

這些操做常常用來初始化圖爲了進行特殊計算或者排除不須要的屬性。例如,給定一個圖,它的出度做爲頂點屬性(以後描述如何構建這樣一個圖),咱們初始化它爲PageRank:

// Given a graph where the vertex property is the out degree
val inputGraph: Graph[Int, String] =
  graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
// Construct a graph where each edge contains the weight
// and each vertex is the initial PageRank
val outputGraph: Graph[Double, Double] =
  inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)

結構操做

當前,GraphX僅僅支持一個簡單的經常使用結構操做,未來會不斷完善。下面是基本結構操做列表:

class Graph[VD, ED] {
  def reverse: Graph[VD, ED]
  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
               vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}

reverse操做返回一個新圖,其全部的邊方向反向。有時這是有用的,例如,嘗試計算反轉的PageRank。由於反轉操做沒有修改頂點或者邊屬性或者改變邊數量,這可以高效的實現沒有數據移動或者複製。

subgraph操做利用頂點和邊判斷,返回圖包含知足判斷的頂點,知足邊判斷的頂點,知足頂點判斷的鏈接頂點。subgraph 操做能夠用在一些情景,限制感興趣的圖頂點和邊,刪除損壞鏈接。例如,在下面代碼中,咱們能夠移除損壞鏈接:

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
                       (4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
                       Edge(4L, 0L, "student"),   Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
    triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
  ).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
    triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
  ).collect.foreach(println(_))
  • 注意: 在上面的實例中僅僅頂點判斷被提到。subgraph 操做默認是true ,若是頂點和邊判斷沒有被提到時。

mask操做構建了一個subgraph 經過返回圖,其包含頂點和邊也被髮如今輸入圖中。這能夠聯合subgraph操做使用來限制一個圖在其餘相關圖屬性的基礎上。例如,咱們可使用丟失頂點的圖運行鏈接組件,而後限制有效子圖的返回。

// Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)

groupEdges操做合併了多重圖的並行邊(例如,頂點之間的重複邊)。在一些數字應用程序中,並行邊能被增長(權重融合)到一個邊,所以減小了圖的大小。

join操做

在不少狀況下,須要將外部數據集合(RDDs)添加到圖中。例如,咱們可能有額外的用戶屬性,咱們想把它融合到一個存在圖中或者咱們可能想拉數據屬性從一個圖到另外一個圖。這些任務可使用join操做來實現。下面咱們列出了關鍵的join操做:

class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
    : Graph[VD, ED]
  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
}

joinVertices操做鏈接vertices 和輸入RDD,返回一個新圖,其頂點屬性經過應用用戶定義map函數到joined vertices結果上得到的。在RDD頂點沒有一個匹配值保留其原始值。

注意:若是RDD對一個給定頂點包含超過一個值,僅僅有一個將會使用。所以,建議輸入RDD保持惟一性,這可使用下面方法,預索引結果值,加快join執行速度。

val nonUniqueCosts: RDD[(VertexID, Double)]
val uniqueCosts: VertexRDD[Double] =
  graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)
val joinedGraph = graph.joinVertices(uniqueCosts)(
  (id, oldCost, extraCost) => oldCost + extraCost)

更加通常的 outerJoinVertices 行爲和joinVertices類似除了用戶定義的map函數被應用到全部頂點和能夠改變頂點類型。由於不是全部的頂點有一個匹配值在輸入RDD,map函數使用了一個Option類型。例如,咱們能夠設置一個圖對PageRank經過初始化頂點屬性使用出度:

val outDegrees: VertexRDD[Int] = graph.outDegrees
val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) =>
  outDegOpt match {
    case Some(outDeg) => outDeg
    case None => 0 // No outDegree means zero outDegree
  }
}

你可能已經覺察到了柯里函數模式的多參數列表(例如f(a)(b))被使用在上面的實例中。當咱們能有等同寫f(a)(b)爲 f(a,b),這將意味着類型接口b將不會依賴於a。所以用戶須要提供類型註釋對用戶自定義函數:

val joinedGraph = graph.joinVertices(uniqueCosts, (id: VertexID, oldCost: Double, extraCost: Double) => oldCost + extraCost)

相鄰聚合(Neighborhood Aggregation)

在圖分析任務中一個關鍵步驟就是彙集每個頂點的鄰居信息。例如,咱們想知道每個用戶的追隨者數量或者追隨者的平均年齡。一些迭代的圖算法(像PageRank,最短路徑和聯通組件)反覆的彙集相鄰頂點的屬性(像當前pagerank值,源的最短路徑,最小可到達的頂點id)。

爲了改善原始彙集操做的性能,將graph.mapReduceTriplets 改成新的graph.AggregateMessages。固然API的改變很小,下面提供了過分嚮導。

信息彙集(Aggregate Messages (aggregateMessages))

在GraphX中核心的彙集操做是aggregateMessages。這個操做應用了一個用戶定義的sendMsg函數到圖中的每個邊 triplet,而後用mergeMsg函數在目的節點彙集這些信息。

class Graph[VD, ED] { def aggregateMessages[Msg: ClassTag]( sendMsg: EdgeContext[VD, ED, Msg] => Unit, mergeMsg: (Msg, Msg) => Msg, tripletFields: TripletFields = TripletFields.All) : VertexRDD[Msg] }

用戶定義一個 sendMsg 函數使用 EdgeContext,其暴露了源和目的屬性,及它們相關的邊屬性,函數(sendToSrc, and sendToDst)發送信息到源和目的屬性。考慮 sendMsg 做爲map-reduce中的map函數。用戶定義的mergeMsg函數使用到相同頂點的兩個信息,將它們計算產出一條信息。考慮mergeMsg 做爲map-reduce的reduce函數。aggregateMessages函數返回一個VertexRDD[Msg],其包含了到達每個頂點的融合信息(Msg類型)。沒有接收一個信息的頂點不被包含在返回的VertexRDD中。

除此以外,aggregateMessages使用了一個選項tripletsFields,其代表在EdgeContext中什麼數據能夠被訪問(例如,有源頂點屬性沒有目的頂點屬性)。tripletsFields 可能的選項被定義在TripletsFields中,默認值爲 TripletFields.All,其代表用戶定義的sendMsg 函數能夠訪問EdgeContext的任何屬性。tripletFields 參數通知GraphX僅僅須要EdgeContext的一部分,容許GraphX 選擇一個優化的鏈接策略。例如,若是咱們計算每個用戶追隨者的平均年齡,咱們僅僅要求源屬性便可,因此咱們使用 TripletFields.Src 來代表咱們僅僅使用源屬性。

在以前的GraphX版本中,咱們使用字節碼檢測來推斷 TripletFields ,然而咱們已經發現字節碼檢測是稍微不可靠,因此代替先前方式使用更加明確的用戶控制。

在下面的實例中,咱們使用 aggregateMessages操做來計算每個用戶更年長追隨者的平均年齡。

// Import random graph generation library import org.apache.spark.graphx.util.GraphGenerators // Create a graph with "age" as the vertex property. Here we use a random graph for simplicity. val graph: Graph[Double, Int] = GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble ) // Compute the number of older followers and their total age val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)]( triplet => { // Map Function if (triplet.srcAttr > triplet.dstAttr) { // Send message to destination vertex containing counter and age triplet.sendToDst(1, triplet.srcAttr) } }, // Add counter and age (a, b) => (a._1 + b._1, a._2 + b.2) // Reduce Function ) // Divide total age by number of older followers to get average age of older followers val avgAgeOfOlderFollowers: VertexRDD[Double] = olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } ) // Display the results avgAgeOfOlderFollowers.collect.foreach(println())

當messages (以及消息總數)是常量大小(例如, float和addition代替lists和鏈接(concatenation)),aggregateMessages 操做效果最好。 6.4.5.2 Map Reduce Triplets Transition Guide (Legacy)

在早的GraphX版本中咱們計算鄰居聚合使用mapReduceTriplets操做:

class Graph[VD, ED] { def mapReduceTriplets[Msg]( map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)], reduce: (Msg, Msg) => Msg) : VertexRDD[Msg] }

mapReduceTriplets 操做應用用戶定義的map函數到每個triplet ,使用用戶定義的reduce函數聚合產生 messages。。然而,咱們發現用戶返回迭代器是昂貴的,它抑制了咱們應用額外優化(例如,本地頂點的從新編號)的能力。在 aggregateMessages 中咱們引進了EdgeContext,其暴露triplet屬性,也明確了函數發送信息的源和目的頂點。除此以外,咱們移除了字節碼檢測,取而代之的是要求用戶指明哪一個triplet屬性被須要。

下面的代碼塊使用 mapReduceTriplets:

val graph: Graph[Int, Float] = ... def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = { Iterator((triplet.dstId, "Hi")) } def reduceFun(a: Int, b: Int): Int = a + b val result = graph.mapReduceTriplets[String](msgFun, reduceFun)

使用aggregateMessages重寫爲:

val graph: Graph[Int, Float] = ... def msgFun(triplet: EdgeContext[Int, Float, String]) { triplet.sendToDst("Hi") } def reduceFun(a: Int, b: Int): Int = a + b val result = graph.aggregateMessages[String](msgFun, reduceFun)

計算度(Degree)信息

一個普通的聚合任務是計算每個頂點的度:每個頂點邊的數量。在有向圖的狀況下,它常常知道入度,出度和每一個頂點的總度。 GraphOps 類包含了每個頂點的一系列的度的計算。例如:在下面將計算最大入度,出度和總度:

// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
  if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int)  = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int)   = graph.degrees.reduce(max)

鄰居收集

在一些情形下,經過收集每個頂點的鄰居頂點和它的屬性來表達計算是更加容易的。這容易完成經過使用 collectNeighborIds 和 collectNeighbors 操做。

class GraphOps[VD, ED] {
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
}

這些操做代價比較高,因爲複製信息和要求大量的通訊。儘量直接使用aggregateMessages 操做完成相同的計算。

緩存和取消緩存

在spark中,RDDs默認沒有持久化在內存中。當屢次使用它們時,爲了不重複計算,它們必須被明確緩存。GraphX 中的圖也是相同的方式。當使用一個圖屢次時,首先確認調用Graph.cache()。

在迭代計算中,爲了最好的性能,uncaching 也多是須要的。默認,緩存的RDDs和圖將會保留在內存中直到內存不足,迫使它們以LRU順序被驅除。對於迭代計算,從過去相關迭代產生的中間結果將被緩存,即便最終被驅除,不須要的數據存儲在內存中將會減緩垃圾回收。取消不須要的中間結果的緩存將會更加高效。這涉及每次迭代物化(緩存和強迫)一個圖和RDD,取消全部其餘數據集緩存,僅僅使用物化數據集在未來迭代中。然而,由於圖由多個RDDs組成,正確解除他們的持久化是比較難的。對迭代計算咱們推薦使用 Pregel API,其能正確的解除中間結果的持久化。

Pregel API

Graphs 本質上就是遞歸的數據結構,頂點的屬性依賴於他們鄰居的屬性,鄰居屬性依次依賴於他們鄰居的屬性。所以,一些重要的圖算法迭代的重複計算每個頂點的屬性直到固定條件獲得知足。一些列的圖並行抽象已經被提出來知足這些迭代算法。GraphX 提供了一個變種的Pregel API。

在GraphX中,更高級的Pregel操做是一個約束到圖拓撲的批量同步(bulk-synchronous)並行消息抽象。Pregel操做執行一系列高級步驟,頂點從過去的超級步驟接收他們流入信息總和,對頂點屬性計算一個新值,發送信息到鄰居節點在下一個高級步驟。不像Pregel,信息做爲邊triplet函數被平行計算,信息計算訪問源和目的頂點屬性。沒有接收信息的頂點在一個高級步驟中被跳過。當沒有保留信息時,pregel終止迭代並返回最終圖。

注意: 不像更加標準的Pregel實現,GraphX 的頂點僅僅發送消息到鄰居頂點,使用用戶定義的消息函數並行構建消息。這些限制容許GraphX額外優化。

下面是Pregel操做的類型簽名和它的實現概述(注意,graph.cache調用被移除)

class GraphOps[VD, ED] {
  def pregel[A]
      (initialMsg: A,
       maxIter: Int = Int.MaxValue,
       activeDir: EdgeDirection = EdgeDirection.Out)
      (vprog: (VertexId, VD, A) => VD,
       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
       mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    // Receive the initial message at each vertex
    var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
    // compute the messages
    var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
    var activeMessages = messages.count()
    // Loop until no messages remain or maxIterations is achieved
    var i = 0
    while (activeMessages > 0 && i < maxIterations) {
      // Receive the messages: -----------------------------------------------------------------------
      // Run the vertex program on all vertices that receive messages
      val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
      // Merge the new vertex values back into the graph
      g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
      // Send Messages: ------------------------------------------------------------------------------
      // Vertices that didn't receive a message above don't appear in newVerts and therefore don't
      // get to send messages.  More precisely the map phase of mapReduceTriplets is only invoked
      // on edges in the activeDir of vertices in newVerts
      messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
      activeMessages = messages.count()
      i += 1
    }
    g
  }
}

注意: Pregel使用兩個參數列表(像graph.pregel(list1)(list2))。第一個參數列表包含配置參數包括初始化信息,最大迭代次數和發送信息邊方向(默認沿着out邊)。第二個參數列表包含用戶自定義函數,對應接收信息(頂點程序Vprog),計算信息(sendMsg)和組合信息(mergeMsg)。

咱們可使用Pregel操做表達計算,像下面的單元最短路徑實例。

import org.apache.spark.graphx._
// Import random graph generation library
import org.apache.spark.graphx.util.GraphGenerators
// A graph with edge attributes containing distances
val graph: Graph[Int, Double] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
  (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
  triplet => {  // Send Message
    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    } else {
      Iterator.empty
    }
  },
  (a,b) => math.min(a,b) // Merge Message
  )
println(sssp.vertices.collect.mkString("\n"))

圖構建(Graph Builders)

GraphX 提供了一些方法來構建一個圖,從一個RDD的頂點和邊或者硬盤上。默認狀況下,沒有圖構建者從新將圖的邊分區;取而代之,邊留住他們默認的分區(像hdfs原始塊)。Graph.groupEdges 要求圖從新分區,由於它假定相同的邊在同一個分區,全部你必須在調用groupEdges以前調用Graph.partitionBy 。

object GraphLoader {
  def edgeListFile(
      sc: SparkContext,
      path: String,
      canonicalOrientation: Boolean = false,
      minEdgePartitions: Int = 1)
    : Graph[Int, Int]
}

GraphLoader.edgeListFile提供了一種方式加載硬盤上邊的列表。它解析下面的鄰接對(起始頂點id和目的頂點id)列表,跳過#開始的行註釋:

# This is a comment
2 1
4 1
1 2

它從指定的邊建立一個圖,自動建立邊涉及的頂點。全部的頂點和邊屬性默認爲1。canonicalOrientation參數容許重定向邊在正方向(srcid 6.7 頂點和邊RDDs

GraphX 公開了存儲在圖中頂點和邊的RDD視圖。然而,由於GraphX 使用優化的數據結構存儲頂點和邊,這些數據結構提供了額外的功能,頂點和邊被返回爲VertexRDD 和 EdgeRDD。這一節咱們溫習這些類型的額外有用的功能。

VertexRDDs

VertexRDD[A]繼承 RDD[(VertexID, A)] ,而且增長了限制:每個VertexID 僅出現一次。除此以外,VertexRDD[A] 表明每個頂點的屬性爲A。在內部,這被實現經過存儲頂點屬性在一個可重複使用的hash-map數據結構。所以,若是兩個 VertexRDDs 從相同的基 VertexRDD 得到(例,經過filter或者mapValues),他們能夠在一個常數時間進行join,沒有hash評估。爲了評估這些索引數據結構,VertexRDD 公開了下面額外的功能:

class VertexRDD[VD] extends RDD[(VertexID, VD)] {
  // Filter the vertex set but preserves the internal index
  def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
  // Transform the values without changing the ids (preserves the internal index)
  def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
  def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
  // Remove vertices from this set that appear in the other set
  def diff(other: VertexRDD[VD]): VertexRDD[VD]
  // Join operators that take advantage of the internal indexing to accelerate joins (substantially)
  def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
  def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
  // Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
  def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
}

注意:例如,filter 操做怎樣返回一個 VertexRDD。Filter 實際上的實現使用了一個 BitSet ,所以能夠重複使用索引和保留了快速join其餘VertexRDDs的能力。一樣地,mapValues操做不容許map 函數改變VertexID ,所以相同HashMap數據結構被重複使用。當join兩個來自相同HashMap 的 VertexRDDs,leftJoin和innerJoin 都能使用,join使用線性掃描而不是代價很高的點查找。

aggregateUsingIndex 操做是有用的,對從RDD[(VertexID, A)]演變的VertexRDD高效架構。概念上,若是已經構建了一系列頂點的VertexRDD[B],其是一些RDD[(VertexID, A)]的超頂點集,而後咱們能夠在聚合和隨後的索引RDD[(VertexID, A)]中重複使用索引。

val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
// There should be 100 entries in setB
setB.count
// Joining A and B should now be fast!
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)

EdgeRDDs

EdgeRDD[ED]繼承RDD[Edge[ED]],使用不一樣的分區策略(定義在PartitionStrategy)組織到塊中。在每個分區中,邊屬性和鄰接結構被分別存儲,確保屬性值變化時能夠最大化的重複使用。

在EdgeRDD 中有三個額外的函數:

// Transform the edge attributes while preserving the structure
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// Revere the edges reusing both attributes and structure
def reverse: EdgeRDD[ED]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]

在最多應用程序中,咱們已經發現EdgeRDD 操做經過圖操做來實現或者操做定義在基類RDD中。

優化表示

分佈式圖的GraphX表示的詳細優化描述超出本嚮導的範疇,一些高水平的理解可能幫助對擴展算法的設計和API的最佳使用。GraphX 對分佈式圖分區採用了vertex-cut的方法:

代替沿着邊拆分圖,GraphX 沿着vertices 分片,這種方式能夠減小通訊和存儲開銷。邏輯上,這指分配邊到機器上用時容許頂點跨多臺機器。這種分配邊的確切方法依賴於 PartitionStrategy,在不一樣啓發式方式中有不一樣的權衡。用戶能夠在從新分區圖的不一樣策略和 Graph.partitionBy操做之間選擇。默認的分區策略是使用圖構造的邊初始分區。然而,用戶能夠容易的切換到2D-partitioning或者其餘啓發式的分區策略。

一旦邊被分區,對高效的並行圖計算的挑戰就是高效的鏈接頂點屬性到邊。由於典型的現實圖有更多的邊比頂點,咱們移動頂點屬性到邊。由於不是全部的分區包含邊鄰接的全部頂點,咱們內在的維持一個路由表,當對triplets 和aggregateMessages實現鏈接請求時,路由表標識廣播頂點位置。

圖算法

GraphX 包含一些列的圖算法來簡化分析任務。算法被包含在org.apache.spark.graphx.lib包裏面,能被Graph經過GraphOps直接訪問。這部分描述算法和算法如何使用。

PageRank

PageRank 測量在圖中每個頂點的重要性,假設一條u到v的表明u對v重要性的一個支持。例如,若是一個Twitter用戶被其餘用戶瀏覽,這個用戶排名將會升高。

GraphX 自帶了靜態和動態的PageRank 實現,做爲PageRank對象的方法。靜態的PageRank 運行固定的迭代次數,然而動態的PageRank 運行知道排名收斂(例如,超過設定容忍值中止迭代)。 GraphOps容許直接調用這些算法做爲graph的方法。

GraphX 也包含了一個社會網絡數據集實例,咱們能夠在上面運行PageRank 。一個用戶的集合在graphx/data/users.txt中給出,用戶之間的關係在 graphx/data/followers.txt中給出。咱們計算每個用戶的PageRank 以下:

// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("graphx/data/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
  case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksByUsername.collect().mkString("\n"))

Connected Components

連通圖算法使用最小編號的頂點標記圖的連通體。例如,在一個社會網絡,連通圖近似聚類。GraphX 在 ConnectedComponents 對象中包含一個算法實現,咱們計算連通圖實例,數據集和 PageRank部分同樣:

// Load the graph as in the PageRank example
val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")
// Find the connected components
val cc = graph.connectedComponents().vertices
// Join the connected components with the usernames
val users = sc.textFile("graphx/data/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val ccByUsername = users.join(cc).map {
  case (id, (username, cc)) => (username, cc)
}
// Print the result
println(ccByUsername.collect().mkString("\n"))

Triangle Counting

當頂點有兩個鄰接頂點而且它們之間有邊相連,它就是三角形的一部分。GraphX 在 TriangleCount對象中實現了一個三角形計數算法,其肯定經過每個頂點的三角形數量,提供了一個集羣的測量。咱們計算社交網絡三角形的數量,數據集一樣使用PageRank部分數據集。注意:三角形數量要求邊是標準方向(srcId < dstId),圖使用Graph.partitionBy進行分區。

// Load the edges in canonical order and partition the graph for triangle count
val graph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt", true).partitionBy(PartitionStrategy.RandomVertexCut)
// Find the triangle count for each vertex
val triCounts = graph.triangleCount().vertices
// Join the triangle counts with the usernames
val users = sc.textFile("graphx/data/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
  (username, tc)
}
// Print the result
println(triCountByUsername.collect().mkString("\n"))

Examples

假設咱們想從一些文本文件構建一個圖,約束圖爲重要的人際關係和用戶,在子圖運行page-rank ,而後返回頂點用戶相關的屬性。咱們使用GraphX作這些事情僅僅須要幾行代碼:

// Connect to the Spark cluster
val sc = new SparkContext("spark://master.amplab.org", "research")

// Load my user data and parse into tuples of user id and attribute list
val users = (sc.textFile("graphx/data/users.txt")
  .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))

// Parse the edge data which is already in userId -> userId format
val followerGraph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt")

// Attach the user attributes
val graph = followerGraph.outerJoinVertices(users) {
  case (uid, deg, Some(attrList)) => attrList
  // Some users may not have attributes so we set them as empty
  case (uid, deg, None) => Array.empty[String]
}

// Restrict the graph to users with usernames and names
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)

// Compute the PageRank
val pagerankGraph = subgraph.pageRank(0.001)

// Get the attributes of the top pagerank users
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
  case (uid, attrList, Some(pr)) => (pr, attrList.toList)
  case (uid, attrList, None) => (0.0, attrList.toList)
}

println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
相關文章
相關標籤/搜索