Spark圖處理GraphX學習筆記!web
1、什麼是GraphX?算法
Graphx利用了Spark這樣了一個並行處理框架來實現了圖上的一些可並行化執行的算法。sql
算法是否可以並行化與Spark自己無關數據結構
算法並行化與否的自己,須要經過數學來證實框架
已經證實的可並行化算法,利用Spark來實現會是一個錯的選擇,由於Graphx支持pregel的圖計算模型dom
2、Graphx包含哪些組件和基本框架?ide
graph中重要的成員變量分別爲函數
verticespost
edges學習
triplets
爲何要引入triplets呢,主要是和Pregel這個計算模型相關,在triplets中,同時記錄着edge和vertex. 具體代碼就不羅列了。
函數分紅幾大類
對全部頂點或邊的操做,但不改變圖結構自己,如mapEdges, mapVertices
子圖,相似於集合操做中的filter subGraph
圖的分割,即paritition操做,這個對於Spark計算來講,很關鍵,正是由於有了不一樣的Partition,纔有了並行處理的可能, 不一樣的PartitionStrategy,其收益不一樣。最容易想到的就是利用Hash來將整個圖分紅多個區域。
outerJoinVertices 頂點的外鏈接操做
圖的經常使用算法是集中抽象到GraphOps這個類中,在Graph裏做了隱式轉換,將Graph轉換爲GraphOps,具體有下列12個算子:
collectNeighborIds
collectNeighbors
collectEdges
joinVertices
filter
pickRandomVertex
pregel
pageRank
staticPageRank
connectedComponents
triangleCount
stronglyConnectedComponents
RDD是Spark體系的核心,那麼Graphx中引入了哪些新的RDD呢,有倆,分別爲
VertexRDD
EdgeRDD
較之EdgeRdd,VertexRDD更爲重要,其上的操做也不少,主要集中於Vertex之上屬性的合併,說到合併就不得不扯到關係代數和集合論,因此在VertexRdd中能看到許多相似於sql中的術語,如
leftJoin
innerJoin
在進行數學計算的時候,圖用線性代數中的矩陣來表示,那麼如何進行存儲呢?
學數據結構的時候,老師確定說過好多的辦法,再也不囉嗦了。
不過在大數據的環境下,若是圖很巨大,表示頂點和邊的數據不足以放在一個文件中怎麼辦? 用HDFS
加載的時候,一臺機器的內存不足以容下怎麼辦? 延遲加載,在真正須要數據時,將數據分發到不一樣機器中,採用級聯方式。
通常來講,咱們會將全部與頂點相關的內容保存在一個文件中vertexFile,全部與邊相關的信息保存在另外一個文件中edgeFile。
生成某一個具體的圖時,用edge就能夠表示圖中頂點的關聯關係,同時圖的結構也表示出來了。
下面是Spark官方示例,用2個Array構造了一個Graph。
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
graphLoader是graphx中專門用於圖的加載和生成,最重要的函數就是edgeListFile。
//以頂點劃分,分紅4個分區
val graph = GraphLoader.edgeListFile(sc,"hdfs://192.168.0.10:9000/input/graph/web-Google.txt",minEdgePartitions = 4)
5、GraphX應用舉例
一行代碼:
val rank = graph.pageRank(0.01).vertices
用RDD實現:
// Connect to the Spark clusterval sc = new SparkContext("spark://master.amplab.org", "research") // Load my user data and parse into tuples of user id and attribute list val users = (sc.textFile("graphx/data/users.txt") .map(line => line.split(",")) .map( parts => (parts.head.toLong, parts.tail) )) // Parse the edge data which is already in userId -> userId format val followerGraph = GraphLoader.edgeListFile(sc, "graphx/data/followers.txt") // Attach the user attributes val graph = followerGraph.outerJoinVertices(users) { case (uid, deg, Some(attrList)) => attrList // Some users may not have attributes so we set them as empty case (uid, deg, None) => Array.empty[String] } // Restrict the graph to users with usernames and names val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2) // Compute the PageRank // Get the attributes of the top pagerank users val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) { case (uid, attrList, Some(pr)) => (pr, attrList.toList) case (uid, attrList, None) => (0.0, attrList.toList) } println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))