GraphX的圖運算操做

GraphX內置了許多函數實現圖運算操做。ide

mapVertices

mapVertices的功能是transform each vertex attribute in the graph using the map function。即對已有圖的頂點屬性作轉換。函數

def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2)
    (implicit eq: VD =:= VD2 = null): Graph[VD2, ED]

其中,VD2爲轉換後的頂點屬性類型,map定義了轉換函數ui

案例

myGraph.vertices.collect().foreach(println)
(4,d)
(1,a)
(5,e)
(2,b)
(3,c)

myGraph.mapVertices[Int]((vertexId, _) => if (vertexId < 2L) 0 else 1).vertices.collect().foreach(println)
(4,1)
(1,0)
(5,1)
(2,1)
(3,1)

mapEdges

mapEdges對已有圖的邊屬性作轉換,transforms each edge attribute in the graph using the map function。scala

// (1)
  def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2] = {
    mapEdges((pid, iter) => iter.map(map))
  }
// (2)
  def mapEdges[ED2: ClassTag](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2])
    : Graph[VD, ED2]

ED2爲轉換後的邊屬性類型,Edge[ED]只包含邊的屬性值和與邊相連的頂點的VertexId,不包含頂點的屬性值。第一個mapEdges在內部調用了第二個mapEdges,第二個mapEdges中的map函數,以一個分區的全部Edge做爲輸入,對邊的屬性進行轉換。code

案例

myGraph.edges.collect().foreach(println)
Edge(1,2,is-friends-with)
Edge(2,3,is-friends-with)
Edge(3,4,is-friends-with)
Edge(3,5,Wrote-status)
Edge(4,5,Likes-status)

myGraph.mapEdges(e => if (e.attr == "is-friends-with") 0 else 1).edges.collect().foreach(println)
Edge(1,2,0)
Edge(2,3,0)
Edge(3,4,0)
Edge(3,5,1)
Edge(4,5,1)

mapTriplets

def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
    mapTriplets((pid, iter) => iter.map(map), TripletFields.All)
  }
  def mapTriplets[ED2: ClassTag](
      map: EdgeTriplet[VD, ED] => ED2,
      tripletFields: TripletFields): Graph[VD, ED2] = {
    mapTriplets((pid, iter) => iter.map(map), tripletFields)
  }

關於mapTriplets的功能,官方的描述是Transforms each edge attribute using the map function, passing it the adjacent vertex attributes as well. If adjacent vertex values are not required, consider using mapEdges instead。mapTriplets一樣是對已有圖的邊屬性進行轉換,只不過EdgeTriplet包含與邊相鄰的頂點的屬性值,而Edge不包含。orm

案例

myGraph.mapTriplets(et => if (et.attr == "is-friends-with" && et.srcAttr == "a") 0 else 1).edges.collect().foreach(println)
Edge(1,2,0)
Edge(2,3,1)
Edge(3,4,1)
Edge(3,5,1)
Edge(4,5,1)

值得一提的是,mapVertices的map函數是以單個頂點做爲輸入,而mapEdgesmapTriplets的map函數是以一個分區的全部邊做爲輸入,這是由於GraphX使用點切分的方式存儲圖。ip

joinVertices

有時候,咱們但願將額外的屬性合併到一個圖的頂點屬性中去,可使用joinVertice操做。ci

def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD)
    : Graph[VD, ED] = {
    val uf = (id: VertexId, data: VD, o: Option[U]) => {
      o match {
        case Some(u) => mapFunc(id, data, u)
        case None => data
      }
    }
    graph.outerJoinVertices(table)(uf)
  }

其中,table爲額外的屬性,mapFunc定義瞭如何將額外的屬性和頂點的已有屬性進行合併。使用joinVertice會返回一個新的帶有頂點屬性的圖。在執行mapFunc時,頂點的VertexId、頂點的屬性值會與該頂點對應的額外屬性進行匹配,由case None => data可知,若是一個頂點沒有對應的額外屬性,則會保留該頂點的原有屬性值。get

案例

已有圖的頂點屬性爲頂點名稱(a,b,c,...),需求是將地點屬性修改成頂點的「出度」。源碼

myGraph.joinVertices(outDegrees)((_,_, d) => d.toString).vertices.collect().foreach(println)

(4,1)
(1,1)
(5,e)
(2,1)
(3,2)

發現,VertexId爲1,2,3,4的頂點都正確實現了需求,但VertexId爲5的頂點,沒有對應的額外屬性值(頂點5的出度爲0,在outDegrees中沒有記錄),於是頂點5的屬性值沒有改變,若是想要實現將出度爲0的頂點的屬性值修改成0,有兩種方法,

  1. 在outDegrees中增長出度爲0的點的記錄;
  2. 使用outerJoinVertices操做。

outerJoinVertices

從joinVertices的源碼能夠發現,joinVertices是outerJoinVertices的一個特例。

案例

已有圖的頂點屬性爲頂點名稱(a,b,c,...),需求是將地點屬性修改成頂點的「出度」。

// 方法1
val outDegrees: VertexRDD[Int] = myGraph.aggregateMessages[Int](_.sendToSrc(1), _ + _)
myGraph.outerJoinVertices(outDegrees)((_,_, d) => d).vertices.collect().foreach(println)

(4,Some(1))
(1,Some(1))
(5,None)
(2,Some(1))
(3,Some(2))
//方法2
val outDegrees: VertexRDD[Int] = myGraph.aggregateMessages[Int](_.sendToSrc(1), _ + _)
myGraph.outerJoinVertices(outDegrees)((_,_, d) => d.getOrElse(0)).vertices.collect().foreach(println)

(4,1)
(1,1)
(5,0)
(2,1)
(3,2)

能夠發現,調用outerJoinVertices,頂點在匹配其額外屬性時,會將其匹配的額外屬性轉換爲Option類型,所以能夠經過getOrElse方法指定默認值的方式,爲出度爲0且沒有記錄在outDegrees中的頂點設置正確的額外屬性值。

aggregateMessages

aggregateMessages的主要功能是向鄰邊發消息,而後合併鄰邊收到的消息。

def aggregateMessages[A: ClassTag](
      sendMsg: EdgeContext[VD, ED, A] => Unit,
      mergeMsg: (A, A) => A,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A] = {
    aggregateMessagesWithActiveSet(sendMsg, mergeMsg, tripletFields, None)
  }

其中,A爲消息的類型,aggregateMessages的返回值爲VertexRDD[A]

sendMsg

sendMsg函數用來發送消息,它以EdgeContext做爲參數,沒有返回值。EdgeContextEdgeTriplet有些相似,成員變量都包含:srcId、dstId、srcAttr、dstAttr和attr。可是EdgeContext抽象類提升了兩個發送消息的方法:sendToSrcsendToDst,sendToSrc將類型爲A的信息發送給源頂點,sendToDst將類型爲A 的信息發送給目標頂點,toEdgeTriplet方法實現了從EdgeContext到EdgeTriplet的轉換。

/**
 * Represents an edge along with its neighboring vertices and allows sending messages along the
 * edge. Used in [[Graph#aggregateMessages]].
 */
abstract class EdgeContext[VD, ED, A] {
  /** The vertex id of the edge's source vertex. */
  def srcId: VertexId
  /** The vertex id of the edge's destination vertex. */
  def dstId: VertexId
  /** The vertex attribute of the edge's source vertex. */
  def srcAttr: VD
  /** The vertex attribute of the edge's destination vertex. */
  def dstAttr: VD
  /** The attribute associated with the edge. */
  def attr: ED

  /** Sends a message to the source vertex. */
  def sendToSrc(msg: A): Unit
  /** Sends a message to the destination vertex. */
  def sendToDst(msg: A): Unit

  /** Converts the edge and vertex properties into an [[EdgeTriplet]] for convenience. */
  def toEdgeTriplet: EdgeTriplet[VD, ED] = {
    val et = new EdgeTriplet[VD, ED]
    et.srcId = srcId
    et.srcAttr = srcAttr
    et.dstId = dstId
    et.dstAttr = dstAttr
    et.attr = attr
    et
  }
}12

mergeMsg

mergeMsg函數用來合併消息。每一個頂點收到的全部消息都會被彙集起來傳遞給mergeMsg函數,mergeMsg對其合併獲得最終結果。

案例

統計圖中頂點的「出度」。

myGraph.aggregateMessages[Int](_.sendToSrc(1), _+_).collect().foreach(println)

(4,1)
(1,1)
(2,1)
(3,2)
相關文章
相關標籤/搜索