Spark獲取某個手機號在某個基站下停留的時間和當前手機所在的位置的案例

一、業務需求
在擁有手機號在每一個基站處停留時間日誌 和 基站信息的 算出某個手機號的(所在基站,停留時間),(當前所在經度,當前所在緯度)apache

其中手機鏈接基站產生的日誌信息相似以下:網絡

18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1
18611132889,20160327082500,16030401EAFB68F1E3CDF819735E1C66,1
18688888888,20160327170000,16030401EAFB68F1E3CDF819735E1C66,0
18611132889,20160327180000,16030401EAFB68F1E3CDF819735E1C66,0

上面的含義表示的是:手機號,時間,基站ID,接入網絡的類型(0:unknow,1:3G,2:2G,6:4G)spa

基站信息:日誌

9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6
CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6
16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6

上面的含義表示的是:基站ID,經度,緯度,接入網絡的類型(0:unknow,1:3G,2:2G,6:4G)code

編寫Scale代碼:it

package com.Hive
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object FD {
    
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setAppName("FD").setMaster("local[2]")
        val sc  = new SparkContext(conf)
    
        //1.讀取數據文件
    
        val user =sc.textFile("src/main/data/log/")//用戶數據
        val base  = sc.textFile("src/main/data/base_info.txt")//基站數據
    
    
        //2.數據清洗工做,數據維度提取
    //    用戶數據清洗
        val splited = user.map(line =>{
    
        val fields = line.split(",")
        val phone = fields(0)
    
        val base = fields(2)
        val envet = fields(3).toInt
    
        val time  = {
          if (envet == 1){
           -fields(1).toLong//賦值-
          }else{
            fields(1).toLong//正值+
          }
        }
    
      ((phone,base),time)
    })
    
    //   splited.collect().foreach(println(_))
    
    //    基站數據清洗
        val alcsplited = base.map(line =>{
          val fields = line.split(",")
          val id = fields(0)
          val x = fields(1)
          val y = fields(2)
          (id,(x,y))
        })
    
       // splited.collect().foreach(println(_))
    
        //3.統計每一個用戶在每一個基站中停留的時間
    
        val reducted = splited.reduceByKey(_+_)
    
       // reducted.collect().foreach(println(_))
    
        //((phone,base),time)
        val pmt = reducted.map(x=>{
    
          //(基站ID,(手機號,時間))
          //x._1對應的是元組((mobile,lac),time)中的(mobile,lac)
          //x._2對應的是元組((mobile,lac),time)中的time
          ((x._1._2),(x._1._1,x._2))
    
        })
    
    
    
        //鏈接join 以後的結果[(基站ID,((手機號,時間),(經度,緯度)))]
    
        val joined:RDD[(String, ((String, Long), (String, String)))] = pmt.join(alcsplited)
    
    
        //按照手機號進行分組
        //_.        :表明的是基站 手機號,時間,經度,緯度
        //_._2      :表明的是 手機號,時間 經度,緯度
        //_._2_1    :表明的是 手機號,時間
        //_._2._1._ :表明的是 手機號
        val MobileGroupBykey  = joined.groupBy(_._2._1._1)
    
        val result = MobileGroupBykey.mapValues(_.toList.sortBy(_._2._1._2).reverse.take(2))
    
        println(result.collect().toBuffer)
    
        sc.stop()
    
    
      }
    
    }
相關文章
相關標籤/搜索