例如如今有這些數據:sql
10010 95555 2016-11-11 15:55:54 10010 95556 2016-11-11 15:55:54 10010 95557 2016-11-11 15:55:54 10086 95555 2016-11-11 15:55:54 10086 95558 2016-11-11 15:55:54 10000 95555 2016-11-11 15:55:54 10000 95558 2016-11-11 15:55:54
第一列表明是用戶這個手機號,第二列表明是用戶的朋友的手機號,而後計算用戶與用戶之間有幾個共同好友號碼apache
用sparkgraph代碼以下json
package spark_graph import org.apache.spark.graphx.{Edge, _} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * Created by dongdong on 18/1/18. */ object Spark_Contact_Test { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("Spark_Contact_Test") .setMaster("local[4]") val sc = new SparkContext(conf) // 構造頂點 val userVertex = sc.textFile("/Users/dongdong/Desktop/spark_contact/contact/data_test/date_contact").map(line => { //分詞 val words = line.split("\\s+") // 本身的號碼 val self_mobile = words(0).split("[^0-9]").filter(_.length > 0).mkString("") // 朋友的號碼 val relation_mobile = words(1).split("[^0-9]").filter(_.length > 0).mkString("") (self_mobile, relation_mobile) } ).filter(t => // 號碼不是5位的過濾掉 樣例數據是5位,若是生產數據是11位,這邊需改動一下 t._1.length == 5 && t._2.length == 5 ).map(x => // 前面一個表明的是這個頂點,第二個爲這個頂點的屬性 例如(10010 95555 ) (x._1.toLong, x._2) ) // 構造邊 val edge = userVertex.map(vertex => { // 邊與邊之間必定要是long類型的 構想是這個號碼與號碼之間關聯,關聯關係爲1 Edge(vertex._1, vertex._2.toLong, 1) }) // 默認頂點 val defaultVertex = ("00000") // 頂點、邊 默認頂點能夠構成一個圖 val graphContact = Graph(userVertex, edge, defaultVertex) //Triplets(三元組),包含源點、源點屬性、目標點、目標點屬性、邊屬性 // 源點=10010源點屬性=95555邊屬性=1目標點=95555目標點屬性=00000 // graphContact.triplets.map(triplet => { // "源點=" + triplet.srcId + "源點屬性=" + triplet.srcAttr + "邊屬性=" + triplet.attr + "目標點=" + triplet.dstId + "目標點屬性=" + triplet.dstAttr // // }).collect().foreach(print(_)) /* 源碼: def aggregateMessages[A: ClassTag]( sendMsg: EdgeContext[VD, ED, A] => Unit, mergeMsg: (A, A) => A, tripletFields: TripletFields = TripletFields.All) : VertexRDD[A] = { aggregateMessagesWithActiveSet(sendMsg, mergeMsg, tripletFields, None) */ /* 這一步是爲了將relation_mobile 有多少個self_mobile而且是以","進行分割符 (95555,10010,10086,10000) (95556,10010) (95558,10000,10086) */ val aggregateMessages: VertexRDD[String] = graphContact.aggregateMessages(msgFun, reduceFun) /* 這步是將values 進行排序去重而且過濾掉key爲1的狀況 例如是 (95556,10010)這個tuple,由於沒有多個關聯 (95558,List(10000, 10086)) (95555,List(10000, 10010, 10086)) */ val sortAndFilter = aggregateMessages.mapValues( tuple => { val list = tuple.split(",").toList.sorted.distinct list } ).filter(_._2.size > 1) /* 主要是爲了 self_moble 與self_moble 在同一個key裏 Map(10000,10086 -> 95558) Map(10000,10086 -> 95555, 10000,10010 -> 95555, 10010,10086 -> 95555) */ val hmRDD = sortAndFilter.map(t => { var hm = new scala.collection.mutable.HashMap[String, String]() for (i <- 0 until t._2.size; j <- i + 1 until t._2.size) { if (i != j) { var key = t._2(i) + "," + t._2(j) var value = t._1.toString hm(key) = value } } hm }) /* 爲了將map flatmap一下 變成元組,再進行groupbykey (10000,10086,CompactBuffer(95558, 95555)) (10010,10086,CompactBuffer(95555)) (10000,10010,CompactBuffer(95555)) */ val hm2TupleRDD = hmRDD.map(t => { t.toList.map(x => { (x._1, x._2) }) }) .flatMap(t => { t }) .groupByKey() /* 第二個圖的頂點 (10000,2) (10086,2) (10010,3) 前面一個表明的是這個頂點,第二個爲這個頂點的屬性 */ val userVertexTwo = userVertex.groupByKey().mapValues(t => { t.size }) // 第二圖默認頂點屬性 val defaultVertexTwo = (0) // 構造第二個圖的邊 val edgeTwo = hm2TupleRDD.map(t => { val split = t._1.split(",") Edge(split(0).toLong, split(1).toLong, t._2.toList.size) }) val graphContactTwo = Graph(userVertexTwo, edgeTwo, defaultVertexTwo) /* 10000與10086相關連 10000的用戶朋友數量=2類似度=1.0共同朋友數量=2.0 10086與10000相關連 10086的用戶朋友數量=2類似度=1.0共同朋友數量=2.0 10000與10010相關連 10000的用戶朋友數量=2類似度=0.5共同朋友數量=1.0 10010與10000相關連 10010的用戶朋友數量=3類似度=0.3共同朋友數量=1.0 10010與10086相關連 10010的用戶朋友數量=3類似度=0.3共同朋友數量=1.0 10086與10010相關連 10086的用戶朋友數量=2類似度=0.5共同朋友數量=1.0 */ val result = graphContactTwo.triplets.map(t => { val usr1 = t.srcId val usr2 = t.dstId val common_friend = t.attr.toFloat val usr1_usr2 = common_friend / t.srcAttr.toFloat val usr2_usr1 = common_friend / t.dstAttr.toFloat usr1.toString + "與" + usr2.toString + "相關連" + "\t" + usr1.toString + "的用戶朋友數量=" + t.srcAttr + "類似度=" + usr1_usr2.toString + "共同朋友數量=" + common_friend + "\t" + usr2.toString + "與" + usr1.toString + "相關連" + "\t" + usr2.toString + "的用戶朋友數量=" + t.dstAttr + "類似度=" + usr2_usr1.toString + "共同朋友數量=" + common_friend }) .foreach(println(_)) sc.stop() } // map 函數 把 self_mobile 發送過去 def msgFun(triplet: EdgeContext[(String), Int, String]) { triplet.sendToDst(triplet.srcId.toString) } // reduce 函數 relation_mobile做爲key reducebykey(_+_) def reduceFun(a: (String), b: (String)): String = a + "," + b }
寫完map和reduce函數和一系列的rdd函數,我本身寫完感受都快別噁心死了,這種方式處理代碼太過於複雜app
下面這種方式是用dataframe進行處理函數
代碼以下:ui
package spark_graph import org.apache.spark.sql.SparkSession /** * Created by dongdong on 18/1/18. */ object ContactDataFrame { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("ContactDataFrame") .master("local") .getOrCreate() import spark.implicits._ // 數據變成dataframe val userDataFrame = spark.sparkContext.textFile("/Users/dongdong/Desktop/spark_contact/contact/data_test/date_contact").map(line => { //分詞 val words = line.split("\\s+") // 本身的號碼 val self_mobile = words(0).split("[^0-9]").filter(_.length > 0).mkString("") // 朋友的號碼 val relation_mobile = words(1).split("[^0-9]").filter(_.length > 0).mkString("") (self_mobile, relation_mobile) } ).filter(t => // 號碼不是5位的過濾掉 樣例數據是5位,若是生產數據是11位,這邊需改動一下 t._1.length == 5 && t._2.length == 5 ).toDF("self_mobile","relation_mobile") // dataframe註冊成一張表 userDataFrame.createOrReplaceTempView("t_user_contact") // 把這張表現cache到內存裏 spark.catalog.cacheTable("t_user_contact") val resultDataframe= spark.sql( """ |select |user_mobile, |friend_mobile, |count(1) as common_mobile_cnt |from |(select |a.self_mobile as user_mobile, |b.self_mobile as friend_mobile, |a.relation_mobile as common_mobile |from |( |select |distinct |self_mobile, |relation_mobile |from |t_user_contact |)a |inner join |(select |distinct |self_mobile, |relation_mobile |from |t_user_contact |)b |on |a.relation_mobile=b.relation_mobile |)c |where user_mobile!=friend_mobile |group by user_mobile,friend_mobile | """.stripMargin) resultDataframe.show(false) // 清除內存 spark.catalog.clearCache() spark.stop() } }
結果以下:spa
+-----------+-------------+-----------------+ |user_mobile|friend_mobile|common_mobile_cnt| +-----------+-------------+-----------------+ |10010 |10086 |1 | |10086 |10010 |1 | |10000 |10010 |1 | |10086 |10000 |2 | |10000 |10086 |2 | |10010 |10000 |1 | +-----------+-------------+-----------------+
超級簡潔,明瞭。建議能sql搞定的,都不要用函數。scala