一、業務需求
在擁有手機號在每一個基站處停留時間日誌 和 基站信息的 算出某個手機號的(所在基站,停留時間),(當前所在經度,當前所在緯度)apache
其中手機鏈接基站產生的日誌信息相似以下:網絡
18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1 18611132889,20160327082500,16030401EAFB68F1E3CDF819735E1C66,1 18688888888,20160327170000,16030401EAFB68F1E3CDF819735E1C66,0 18611132889,20160327180000,16030401EAFB68F1E3CDF819735E1C66,0
上面的含義表示的是:手機號,時間,基站ID,接入網絡的類型(0:unknow,1:3G,2:2G,6:4G)spa
基站信息:日誌
9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6 CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6 16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6
上面的含義表示的是:基站ID,經度,緯度,接入網絡的類型(0:unknow,1:3G,2:2G,6:4G)code
編寫Scale代碼:it
package com.Hive import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object FD { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("FD").setMaster("local[2]") val sc = new SparkContext(conf) //1.讀取數據文件 val user =sc.textFile("src/main/data/log/")//用戶數據 val base = sc.textFile("src/main/data/base_info.txt")//基站數據 //2.數據清洗工做,數據維度提取 // 用戶數據清洗 val splited = user.map(line =>{ val fields = line.split(",") val phone = fields(0) val base = fields(2) val envet = fields(3).toInt val time = { if (envet == 1){ -fields(1).toLong//賦值- }else{ fields(1).toLong//正值+ } } ((phone,base),time) }) // splited.collect().foreach(println(_)) // 基站數據清洗 val alcsplited = base.map(line =>{ val fields = line.split(",") val id = fields(0) val x = fields(1) val y = fields(2) (id,(x,y)) }) // splited.collect().foreach(println(_)) //3.統計每一個用戶在每一個基站中停留的時間 val reducted = splited.reduceByKey(_+_) // reducted.collect().foreach(println(_)) //((phone,base),time) val pmt = reducted.map(x=>{ //(基站ID,(手機號,時間)) //x._1對應的是元組((mobile,lac),time)中的(mobile,lac) //x._2對應的是元組((mobile,lac),time)中的time ((x._1._2),(x._1._1,x._2)) }) //鏈接join 以後的結果[(基站ID,((手機號,時間),(經度,緯度)))] val joined:RDD[(String, ((String, Long), (String, String)))] = pmt.join(alcsplited) //按照手機號進行分組 //_. :表明的是基站 手機號,時間,經度,緯度 //_._2 :表明的是 手機號,時間 經度,緯度 //_._2_1 :表明的是 手機號,時間 //_._2._1._ :表明的是 手機號 val MobileGroupBykey = joined.groupBy(_._2._1._1) val result = MobileGroupBykey.mapValues(_.toList.sortBy(_._2._1._2).reverse.take(2)) println(result.collect().toBuffer) sc.stop() } }