GraphX內置了許多函數實現圖運算操做。ide
mapVertices
的功能是transform each vertex attribute in the graph using the map function。即對已有圖的頂點屬性作轉換。函數
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2) (implicit eq: VD =:= VD2 = null): Graph[VD2, ED]
其中,VD2
爲轉換後的頂點屬性類型,map
定義了轉換函數ui
myGraph.vertices.collect().foreach(println) (4,d) (1,a) (5,e) (2,b) (3,c) myGraph.mapVertices[Int]((vertexId, _) => if (vertexId < 2L) 0 else 1).vertices.collect().foreach(println) (4,1) (1,0) (5,1) (2,1) (3,1)
mapEdges
對已有圖的邊屬性作轉換,transforms each edge attribute in the graph using the map function。scala
// (1) def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2] = { mapEdges((pid, iter) => iter.map(map)) } // (2) def mapEdges[ED2: ClassTag](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]) : Graph[VD, ED2]
ED2
爲轉換後的邊屬性類型,Edge[ED]
只包含邊的屬性值和與邊相連的頂點的VertexId,不包含頂點的屬性值。第一個mapEdges
在內部調用了第二個mapEdges,第二個mapEdges中的map函數,以一個分區的全部Edge做爲輸入,對邊的屬性進行轉換。code
myGraph.edges.collect().foreach(println) Edge(1,2,is-friends-with) Edge(2,3,is-friends-with) Edge(3,4,is-friends-with) Edge(3,5,Wrote-status) Edge(4,5,Likes-status) myGraph.mapEdges(e => if (e.attr == "is-friends-with") 0 else 1).edges.collect().foreach(println) Edge(1,2,0) Edge(2,3,0) Edge(3,4,0) Edge(3,5,1) Edge(4,5,1)
def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = { mapTriplets((pid, iter) => iter.map(map), TripletFields.All) } def mapTriplets[ED2: ClassTag]( map: EdgeTriplet[VD, ED] => ED2, tripletFields: TripletFields): Graph[VD, ED2] = { mapTriplets((pid, iter) => iter.map(map), tripletFields) }
關於mapTriplets
的功能,官方的描述是Transforms each edge attribute using the map function, passing it the adjacent vertex attributes as well. If adjacent vertex values are not required, consider using mapEdges
instead。mapTriplets一樣是對已有圖的邊屬性進行轉換,只不過EdgeTriplet
包含與邊相鄰的頂點的屬性值,而Edge
不包含。orm
myGraph.mapTriplets(et => if (et.attr == "is-friends-with" && et.srcAttr == "a") 0 else 1).edges.collect().foreach(println) Edge(1,2,0) Edge(2,3,1) Edge(3,4,1) Edge(3,5,1) Edge(4,5,1)
值得一提的是,mapVertices
的map函數是以單個頂點做爲輸入,而mapEdges
和mapTriplets
的map函數是以一個分區的全部邊做爲輸入,這是由於GraphX使用點切分的方式存儲圖。ip
有時候,咱們但願將額外的屬性合併到一個圖的頂點屬性中去,可使用joinVertice
操做。ci
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD) : Graph[VD, ED] = { val uf = (id: VertexId, data: VD, o: Option[U]) => { o match { case Some(u) => mapFunc(id, data, u) case None => data } } graph.outerJoinVertices(table)(uf) }
其中,table
爲額外的屬性,mapFunc
定義瞭如何將額外的屬性和頂點的已有屬性進行合併。使用joinVertice
會返回一個新的帶有頂點屬性的圖。在執行mapFunc時,頂點的VertexId、頂點的屬性值會與該頂點對應的額外屬性進行匹配,由case None => data
可知,若是一個頂點沒有對應的額外屬性,則會保留該頂點的原有屬性值。get
已有圖的頂點屬性爲頂點名稱(a,b,c,...),需求是將地點屬性修改成頂點的「出度」。源碼
myGraph.joinVertices(outDegrees)((_,_, d) => d.toString).vertices.collect().foreach(println) (4,1) (1,1) (5,e) (2,1) (3,2)
發現,VertexId爲1,2,3,4的頂點都正確實現了需求,但VertexId爲5的頂點,沒有對應的額外屬性值(頂點5的出度爲0,在outDegrees中沒有記錄),於是頂點5的屬性值沒有改變,若是想要實現將出度爲0的頂點的屬性值修改成0,有兩種方法,
outerJoinVertices
操做。從joinVertices的源碼能夠發現,joinVertices是outerJoinVertices
的一個特例。
已有圖的頂點屬性爲頂點名稱(a,b,c,...),需求是將地點屬性修改成頂點的「出度」。
// 方法1 val outDegrees: VertexRDD[Int] = myGraph.aggregateMessages[Int](_.sendToSrc(1), _ + _) myGraph.outerJoinVertices(outDegrees)((_,_, d) => d).vertices.collect().foreach(println) (4,Some(1)) (1,Some(1)) (5,None) (2,Some(1)) (3,Some(2)) //方法2 val outDegrees: VertexRDD[Int] = myGraph.aggregateMessages[Int](_.sendToSrc(1), _ + _) myGraph.outerJoinVertices(outDegrees)((_,_, d) => d.getOrElse(0)).vertices.collect().foreach(println) (4,1) (1,1) (5,0) (2,1) (3,2)
能夠發現,調用outerJoinVertices,頂點在匹配其額外屬性時,會將其匹配的額外屬性轉換爲Option
類型,所以能夠經過getOrElse
方法指定默認值的方式,爲出度爲0且沒有記錄在outDegrees中的頂點設置正確的額外屬性值。
aggregateMessages的主要功能是向鄰邊發消息,而後合併鄰邊收到的消息。
def aggregateMessages[A: ClassTag]( sendMsg: EdgeContext[VD, ED, A] => Unit, mergeMsg: (A, A) => A, tripletFields: TripletFields = TripletFields.All) : VertexRDD[A] = { aggregateMessagesWithActiveSet(sendMsg, mergeMsg, tripletFields, None) }
其中,A
爲消息的類型,aggregateMessages的返回值爲VertexRDD[A]
sendMsg函數用來發送消息,它以EdgeContext
做爲參數,沒有返回值。EdgeContext
與EdgeTriplet
有些相似,成員變量都包含:srcId、dstId、srcAttr、dstAttr和attr。可是EdgeContext抽象類提升了兩個發送消息的方法:sendToSrc
和sendToDst
,sendToSrc將類型爲A的信息發送給源頂點,sendToDst將類型爲A 的信息發送給目標頂點,toEdgeTriplet
方法實現了從EdgeContext到EdgeTriplet的轉換。
/** * Represents an edge along with its neighboring vertices and allows sending messages along the * edge. Used in [[Graph#aggregateMessages]]. */ abstract class EdgeContext[VD, ED, A] { /** The vertex id of the edge's source vertex. */ def srcId: VertexId /** The vertex id of the edge's destination vertex. */ def dstId: VertexId /** The vertex attribute of the edge's source vertex. */ def srcAttr: VD /** The vertex attribute of the edge's destination vertex. */ def dstAttr: VD /** The attribute associated with the edge. */ def attr: ED /** Sends a message to the source vertex. */ def sendToSrc(msg: A): Unit /** Sends a message to the destination vertex. */ def sendToDst(msg: A): Unit /** Converts the edge and vertex properties into an [[EdgeTriplet]] for convenience. */ def toEdgeTriplet: EdgeTriplet[VD, ED] = { val et = new EdgeTriplet[VD, ED] et.srcId = srcId et.srcAttr = srcAttr et.dstId = dstId et.dstAttr = dstAttr et.attr = attr et } }12
mergeMsg函數用來合併消息。每一個頂點收到的全部消息都會被彙集起來傳遞給mergeMsg函數,mergeMsg對其合併獲得最終結果。
統計圖中頂點的「出度」。
myGraph.aggregateMessages[Int](_.sendToSrc(1), _+_).collect().foreach(println) (4,1) (1,1) (2,1) (3,2)