Programming Guide中的例子apache
import org.apache.spark.graphx._ val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"),Edge(5L, 3L, "advisor"),Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"))) val graph = Graph(users, relationships) val rankedGraph = graph.staticPageRank(3)
實際的例子,別踩白快的傳輸分析:app
val transfer = sc.textFile("hdfs://LDKJSERVER1046:8020/user/flume/transfer/*/*/*/*.tsv").filter(line => line.contains("biecaibaikuai")) val structuredTransferRDD = transfer.map(line => { val trunks = line.split("\t") if(trunks.length == 35){ (trunks(6), trunks(7), trunks(3), trunks(5), trunks(12), trunks(13)) }}) val rdd = structuredTransferRDD.filter(arg => arg != ()).map(arg => arg.asInstanceOf[(String, String, String, String, String, String)]) val repatitionedRDD = rdd.repartition(100) repatitionedRDD.cache repatitionedRDD.count val mappedRDD = repatitionedRDD.map(arg => ((arg._1, arg._1.hashCode.toLong),(arg._2, arg._2.hashCode.toLong), arg._3, arg._4, arg._5, arg._6)) val vertexs = mappedRDD.flatMap(arg => Array((arg._1._2, arg._1._1), (arg._2._2, arg._2._1))).distinct vertexs.cache vertexs.count import org.apache.spark.graphx._ val edges = mappedRDD.map(arg => (arg._1._2, arg._2._2, arg._3)).distinct.map(arg => Edge(arg._1, arg._2, arg._3)) edges.cache edges.count val graph = Graph(vertexs, edges) val rankedGraph = graph.staticPageRank(3) rankedGraph.vertices.first rankedGraph.vertices.filter(arg => arg._2 != 0.15).first