SparkGraph 與SparkDataFrame 兩種方式計算朋友的二度關係

例如如今有這些數據:sql

10010   95555   2016-11-11  15:55:54
10010   95556   2016-11-11  15:55:54
10010   95557   2016-11-11  15:55:54
10086   95555   2016-11-11  15:55:54
10086   95558   2016-11-11  15:55:54
10000   95555   2016-11-11  15:55:54
10000   95558   2016-11-11  15:55:54

第一列表明是用戶這個手機號,第二列表明是用戶的朋友的手機號,而後計算用戶與用戶之間有幾個共同好友號碼apache

用sparkgraph代碼以下json

package spark_graph

import org.apache.spark.graphx.{Edge, _}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by dongdong on 18/1/18.
  */
object Spark_Contact_Test {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("Spark_Contact_Test")
      .setMaster("local[4]")

    val sc = new SparkContext(conf)

    //    構造頂點
    val userVertex = sc.textFile("/Users/dongdong/Desktop/spark_contact/contact/data_test/date_contact").map(line => {
      //分詞
      val words = line.split("\\s+")
      //      本身的號碼
      val self_mobile = words(0).split("[^0-9]").filter(_.length > 0).mkString("")
      //      朋友的號碼
      val relation_mobile = words(1).split("[^0-9]").filter(_.length > 0).mkString("")
      (self_mobile, relation_mobile)
    }
    ).filter(t =>
      //      號碼不是5位的過濾掉 樣例數據是5位,若是生產數據是11位,這邊需改動一下
      t._1.length == 5 && t._2.length == 5
    ).map(x =>
      //      前面一個表明的是這個頂點,第二個爲這個頂點的屬性  例如(10010   95555 )
      (x._1.toLong, x._2)
    )

    //    構造邊
    val edge = userVertex.map(vertex => {
      //      邊與邊之間必定要是long類型的 構想是這個號碼與號碼之間關聯,關聯關係爲1
      Edge(vertex._1, vertex._2.toLong, 1)
    })
    //    默認頂點
    val defaultVertex = ("00000")

    //     頂點、邊 默認頂點能夠構成一個圖
    val graphContact = Graph(userVertex, edge, defaultVertex)

    //Triplets(三元組),包含源點、源點屬性、目標點、目標點屬性、邊屬性
    //    源點=10010源點屬性=95555邊屬性=1目標點=95555目標點屬性=00000
    //    graphContact.triplets.map(triplet => {
    //      "源點=" + triplet.srcId + "源點屬性=" + triplet.srcAttr + "邊屬性=" + triplet.attr + "目標點=" + triplet.dstId + "目標點屬性=" + triplet.dstAttr
    //
    //    }).collect().foreach(print(_))

    /*
    源碼:
     def aggregateMessages[A: ClassTag](
      sendMsg: EdgeContext[VD, ED, A] => Unit,
      mergeMsg: (A, A) => A,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A] = {
    aggregateMessagesWithActiveSet(sendMsg, mergeMsg, tripletFields, None)
     */
    /*
    這一步是爲了將relation_mobile 有多少個self_mobile而且是以","進行分割符
    (95555,10010,10086,10000)
    (95556,10010)
    (95558,10000,10086)
    */
    val aggregateMessages: VertexRDD[String] = graphContact.aggregateMessages(msgFun, reduceFun)

    /*
    這步是將values 進行排序去重而且過濾掉key爲1的狀況 例如是 (95556,10010)這個tuple,由於沒有多個關聯
    (95558,List(10000, 10086))
    (95555,List(10000, 10010, 10086))
     */
    val sortAndFilter = aggregateMessages.mapValues(
      tuple => {
        val list = tuple.split(",").toList.sorted.distinct
        list
      }
    ).filter(_._2.size > 1)

    /*
    主要是爲了 self_moble 與self_moble 在同一個key裏
    Map(10000,10086 -> 95558)
    Map(10000,10086 -> 95555, 10000,10010 -> 95555, 10010,10086 -> 95555)
     */
    val hmRDD = sortAndFilter.map(t => {
      var hm = new scala.collection.mutable.HashMap[String, String]()
      for (i <- 0 until t._2.size; j <- i + 1 until t._2.size) {
        if (i != j) {
          var key = t._2(i) + "," + t._2(j)
          var value = t._1.toString
          hm(key) = value
        }
      }
      hm
    })


    /*
    爲了將map flatmap一下 變成元組,再進行groupbykey
     (10000,10086,CompactBuffer(95558, 95555))
     (10010,10086,CompactBuffer(95555))
     (10000,10010,CompactBuffer(95555))
     */
    val hm2TupleRDD = hmRDD.map(t => {
      t.toList.map(x => {
        (x._1, x._2)
      })
    })
      .flatMap(t => {
        t
      })
      .groupByKey()


    /*
    第二個圖的頂點
     (10000,2)
     (10086,2)
     (10010,3)
      前面一個表明的是這個頂點,第二個爲這個頂點的屬性
     */
    val userVertexTwo = userVertex.groupByKey().mapValues(t => {
      t.size
    })
    //    第二圖默認頂點屬性
    val defaultVertexTwo = (0)

    //  構造第二個圖的邊
    val edgeTwo = hm2TupleRDD.map(t => {
      val split = t._1.split(",")
      Edge(split(0).toLong, split(1).toLong, t._2.toList.size)
    })

    val graphContactTwo = Graph(userVertexTwo, edgeTwo, defaultVertexTwo)


    /*
10000與10086相關連	10000的用戶朋友數量=2類似度=1.0共同朋友數量=2.0	10086與10000相關連	10086的用戶朋友數量=2類似度=1.0共同朋友數量=2.0
10000與10010相關連	10000的用戶朋友數量=2類似度=0.5共同朋友數量=1.0	10010與10000相關連	10010的用戶朋友數量=3類似度=0.3共同朋友數量=1.0
10010與10086相關連	10010的用戶朋友數量=3類似度=0.3共同朋友數量=1.0	10086與10010相關連	10086的用戶朋友數量=2類似度=0.5共同朋友數量=1.0
 */
    val result = graphContactTwo.triplets.map(t => {
      val usr1 = t.srcId
      val usr2 = t.dstId
      val common_friend = t.attr.toFloat
      val usr1_usr2 = common_friend / t.srcAttr.toFloat
      val usr2_usr1 = common_friend / t.dstAttr.toFloat
      usr1.toString + "與" + usr2.toString + "相關連" + "\t" + usr1.toString + "的用戶朋友數量=" + t.srcAttr + "類似度=" + usr1_usr2.toString + "共同朋友數量=" + common_friend + "\t" + usr2.toString + "與" + usr1.toString + "相關連" + "\t" + usr2.toString + "的用戶朋友數量=" + t.dstAttr + "類似度=" + usr2_usr1.toString + "共同朋友數量=" + common_friend
    })
      .foreach(println(_))

    sc.stop()
  }

  //  map 函數  把 self_mobile 發送過去
  def msgFun(triplet: EdgeContext[(String), Int, String]) {
    triplet.sendToDst(triplet.srcId.toString)
  }

  //  reduce 函數  relation_mobile做爲key  reducebykey(_+_)
  def reduceFun(a: (String), b: (String)): String = a + "," + b

}

寫完map和reduce函數和一系列的rdd函數,我本身寫完感受都快別噁心死了,這種方式處理代碼太過於複雜app

下面這種方式是用dataframe進行處理函數

代碼以下:ui

package spark_graph

import org.apache.spark.sql.SparkSession

/**
  * Created by dongdong on 18/1/18.
  */
object ContactDataFrame {


  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("ContactDataFrame")
      .master("local")
      .getOrCreate()

    import spark.implicits._
//    數據變成dataframe
    val userDataFrame = spark.sparkContext.textFile("/Users/dongdong/Desktop/spark_contact/contact/data_test/date_contact").map(line => {
      //分詞
      val words = line.split("\\s+")
      //      本身的號碼
      val self_mobile = words(0).split("[^0-9]").filter(_.length > 0).mkString("")
      //      朋友的號碼
      val relation_mobile = words(1).split("[^0-9]").filter(_.length > 0).mkString("")
      (self_mobile, relation_mobile)
    }
    ).filter(t =>
      //      號碼不是5位的過濾掉 樣例數據是5位,若是生產數據是11位,這邊需改動一下
      t._1.length == 5 && t._2.length == 5
    ).toDF("self_mobile","relation_mobile")

//    dataframe註冊成一張表
    userDataFrame.createOrReplaceTempView("t_user_contact")
//    把這張表現cache到內存裏
    spark.catalog.cacheTable("t_user_contact")

   val resultDataframe= spark.sql(
     """
       |select
       |user_mobile,
       |friend_mobile,
       |count(1) as common_mobile_cnt
       |from
       |(select
       |a.self_mobile as user_mobile,
       |b.self_mobile as friend_mobile,
       |a.relation_mobile as common_mobile
       |from
       |(
       |select
       |distinct
       |self_mobile,
       |relation_mobile
       |from
       |t_user_contact
       |)a
       |inner join
       |(select
       |distinct
       |self_mobile,
       |relation_mobile
       |from
       |t_user_contact
       |)b
       |on
       |a.relation_mobile=b.relation_mobile
       |)c
       |where user_mobile!=friend_mobile
       |group by user_mobile,friend_mobile
       |
     """.stripMargin)

    resultDataframe.show(false)

//    清除內存
    spark.catalog.clearCache()
    spark.stop()


  }

}

結果以下:spa

+-----------+-------------+-----------------+
|user_mobile|friend_mobile|common_mobile_cnt|
+-----------+-------------+-----------------+
|10010      |10086        |1                |
|10086      |10010        |1                |
|10000      |10010        |1                |
|10086      |10000        |2                |
|10000      |10086        |2                |
|10010      |10000        |1                |
+-----------+-------------+-----------------+

超級簡潔,明瞭。建議能sql搞定的,都不要用函數。scala

相關文章
相關標籤/搜索