iplocation需求php
在互聯網中,咱們常常會見到城市熱點圖這樣的報表數據,例如在百度統計中,會統計今年的熱門旅遊城市、熱門報考學校等,會將這樣的信息顯示在熱點圖中。java
所以,咱們須要經過日誌信息(運行商或者網站本身生成)和城市ip段信息來判斷用戶的ip段,統計熱點經緯度。mysql
練習數據 web
連接:https://pan.baidu.com/s/14IA1pzUWEnDK_VCH_LYRLw
提取碼:pnwv 算法
package org.apache.spark import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import java.io.{BufferedReader, FileInputStream, InputStreamReader} import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ArrayBuffer /** * Created by Administrator on 2019/6/12. */ object IPLocation { def ip2Long(ip: String): Long = { //ip轉數字口訣 //分金定穴循八卦,toolong插棍左八圈 val split: Array[String] = ip.split("[.]") var ipNum = 0L for (i <- split) { ipNum = i.toLong | ipNum << 8L } ipNum } //二分法查找 def binarySearch(ipNum: Long, value: Array[(String, String, String, String, String)]): Int = { //上下循環循上下,左移右移尋中間 var start = 0 var end = value.length - 1 while (start <= end) { val middle = (start + end) / 2 if (ipNum >= value(middle)._1.toLong && ipNum <= value(middle)._2.toLong) { return middle } if (ipNum > value(middle)._2.toLong) { start = middle } if (ipNum < value(middle)._1.toLong) { end = middle } } -1 } def data2MySQL(iterator: Iterator[(String, Int)]): Unit = { var conn: Connection = null var ps: PreparedStatement = null val sql = "INSERT INTO location_count (location, total_count) VALUES (?, ?)" try { conn = DriverManager.getConnection("jdbc:mysql://192.168.74.100:3306/test", "root", "123456") iterator.foreach(line => { ps = conn.prepareStatement(sql) ps.setString(1, line._1) ps.setInt(2, line._2) ps.executeUpdate() }) } catch { case e: Exception => println(e) } finally { if (ps != null) ps.close() if (conn != null) conn.close() } } def main(args: Array[String]) { val conf = new SparkConf().setAppName("iplocation").setMaster("local[5]") val sc = new SparkContext(conf) //讀取數據(ipstart,ipend,城市基站名,經度,維度) val jizhanRDD = sc.textFile("E:\\ip.txt").map(_.split("\\|")).map(x => (x(2), x(3), x(4) + "-" + x(5) + "-" + x(6) + "-" + x(7) + "-" + x(8) + "-" + x(9), x(13), x(14))) // jizhanRDD.foreach(println) //把RDD轉換成數據 val jizhanPartRDDToArray: Array[(String, String, String, String, String)] = jizhanRDD.collect() //廣播變量,一個只讀的數據區,是全部的task都能讀取的地方,至關於mr的分佈式內存 val jizhanRDDToArray: Broadcast[Array[(String, String, String, String, String)]] = sc.broadcast(jizhanPartRDDToArray) // println(jizhanRDDToArray.value) val IPS = sc.textFile("E:\\20090121000132.394251.http.format").map(_.split("\\|")).map(x => x(1)) //把ip地址轉換爲Long類型,而後經過二分法去ip段數據中查找,對找到的經緯度作wordcount //((經度,緯度),1) val result = IPS.mapPartitions(it => { val value: Array[(String, String, String, String, String)] = jizhanRDDToArray.value it.map(ip => { //將ip轉換成數字 val ipNum: Long = ip2Long(ip) //拿這個數字去ip段中經過二分法查找,返回ip在ip的Array中的角標 val index: Int = binarySearch(ipNum, value) //通Array拿出想要的數據 ((value(index)._4, value(index)._5), 1) }) }) //聚合操做 val resultFinnal: RDD[((String, String), Int)] = result.reduceByKey(_ + _) // resultFinnal.foreach(println) //將數據存儲到數據庫 resultFinnal.map(x => (x._1._1 + "-" + x._1._2, x._2)).foreachPartition(data2MySQL _) sc.stop() } }
PV案例sql
package org.apache.spark import org.apache.spark.rdd.RDD /** * Created by Administrator on 2019/6/12. */
//PV(Page View)訪問量, 即頁面瀏覽量或點擊量
object PV { def main(args: Array[String]) { val conf = new SparkConf().setAppName("pv").setMaster("local[2]") val sc = new SparkContext(conf) //讀取數據access.log val file: RDD[String] = sc.textFile("e:\\access.log") //將一行數據做爲輸入,將() val pvAndOne: RDD[(String, Int)] = file.map(x => ("pv", 1)) //聚合計算 val result = pvAndOne.reduceByKey(_ + _) result.foreach(println) } }
UV 數據庫
package org.apache.spark import org.apache.spark.rdd.RDD /** * Created by Administrator on 2019/6/12. */ //UV(Unique Visitor)獨立訪客,統計1天內訪問某站點的用戶數(以cookie爲依據);訪問網站的一臺電腦客戶端爲一個訪客 object UV { def main(args: Array[String]) { val conf = new SparkConf().setAppName("pv").setMaster("local[2]") val sc = new SparkContext(conf) //讀取數據access.log val file: RDD[String] = sc.textFile("e:\\access.log") //要分割file,拿到ip,而後去重 val uvAndOne = file.map(_.split(" ")).map(x => x(0)).distinct().map(x => ("uv", 1)) //聚合 val result = uvAndOne.reduceByKey(_ + _) result.foreach(println) } }
pv uv環比分析apache
package org.apache.spark import scala.collection.mutable.ArrayBuffer /** * Created by Administrator on 2019/6/12. */ object Pvbi { // LoggerLevels.setStreamingLogLevels() val conf = new SparkConf().setAppName("pv").setMaster("local[7]") val sc = new SparkContext(conf) val PVArr = ArrayBuffer[(String, Int)]() val UVArr = ArrayBuffer[(String, Int)]() def main(args: Array[String]) { computePVOneDay("e:\\access/tts7access20140824.log") computePVOneDay("e:\\access/tts7access20140825.log") computePVOneDay("e:\\access/tts7access20140826.log") computePVOneDay("e:\\access/tts7access20140827.log") computePVOneDay("e:\\access/tts7access20140828.log") computePVOneDay("e:\\access/tts7access20140829.log") computePVOneDay("e:\\access/tts7access20140830.log") println(PVArr) computeUVOneDay("e:\\access/tts7access20140824.log") computeUVOneDay("e:\\access/tts7access20140825.log") computeUVOneDay("e:\\access/tts7access20140826.log") computeUVOneDay("e:\\access/tts7access20140827.log") computeUVOneDay("e:\\access/tts7access20140828.log") computeUVOneDay("e:\\access/tts7access20140829.log") computeUVOneDay("e:\\access/tts7access20140830.log") println(UVArr) } def computePVOneDay(filePath: String): Unit = { val file = sc.textFile(filePath) val pvTupleOne = file.map(x => ("pv", 1)).reduceByKey(_ + _) val collect: Array[(String, Int)] = pvTupleOne.collect() PVArr.+=(collect(0)) } def computeUVOneDay(filePath: String): Unit = { val rdd1 = sc.textFile(filePath) val rdd3 = rdd1.map(x => x.split(" ")(0)).distinct val rdd4 = rdd3.map(x => ("uv", 1)) val rdd5 = rdd4.reduceByKey(_ + _) val collect: Array[(String, Int)] = rdd5.collect() UVArr.+=(collect(0)) } }
TopKcookie
package org.apache.spark import org.apache.spark.rdd.RDD /** * Created by Administrator on 2019/6/12. */ object TopK { def main(args: Array[String]) { //建立配置,設置app的name val conf = new SparkConf().setAppName("topk").setMaster("local[2]") //建立sparkcontext,將conf傳進來 val sc = new SparkContext(conf) //讀取數據access.log val file: RDD[String] = sc.textFile("e:\\access.log") //將一行數據做爲輸入,將() val refUrlAndOne: RDD[(String, Int)] = file.map(_.split(" ")).map(x => x(10)).map((_, 1)) //聚合 val result: Array[(String, Int)] = refUrlAndOne.reduceByKey(_ + _).sortBy(_._2, false).take(3) println(result.toList) } }
mobile_location案例
須要的數據
連接:https://pan.baidu.com/s/1JbGxnrgxcy05LFUmVo8AUQ
提取碼:h7io
package org.apache.spark import org.apache.spark.rdd.RDD import scala.collection.mutable.Map object MobileLocation { def main(args: Array[String]) { //本地運行 val conf = new SparkConf().setAppName("UserLocation").setMaster("local[5]") val sc = new SparkContext(conf) //todo:過濾出工做時間(讀取基站用戶信息:18688888888,20160327081200,CC0710CC94ECC657A8561DE549D940E0,1) val officetime = sc.textFile("e:\\ce\\*.log") .map(_.split(",")).filter(x => (x(1).substring(8, 14) >= "080000" && (x(1).substring(8, 14) <= "180000"))) //todo:過濾出家庭時間(讀取基站用戶信息:18688888888,20160327081200,CC0710CC94ECC657A8561DE549D940E0,1) val hometime = sc.textFile("e:\\ce\\*.log") .map(_.split(",")).filter(x => (x(1).substring(8, 14) > "180000" && (x(1).substring(8, 14) <= "240000"))) //todo:讀取基站信息:9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6 val rdd20 = sc.textFile("e:\\ce\\loc_info.txt") .map(_.split(",")).map(x => (x(0), (x(1), x(2)))) //todo:計算多餘的時間次數 val map1Result = computeCount(officetime) val map2Result = computeCount(hometime) val mapBro1 = sc.broadcast(map1Result) val mapBro2 = sc.broadcast(map2Result) //todo:計算工做時間 computeOfficeTime(officetime, rdd20, "c://out/officetime", mapBro1.value) //todo:計算家庭時間 computeHomeTime(hometime, rdd20, "c://out/hometime", mapBro2.value) sc.stop() } /** * 計算多餘的時間次數 * * 一、將「電話_基站ID_年月日"按key進行分組,若是value的大小爲2,那麼證實在同一天同一時間段(8-18或者20-24)同時出現兩次,那麼這樣的數據須要記錄,減去多餘的時間 * 二、以「電話_基站ID」做爲key,將共同出現的次數爲2的累加,做爲value,存到map中, * 例如: * 13888888888_8_20160808100923_1和13888888888_8_20160808170923_0表示在13888888888在同一天20160808的8-18點的時間段,在基站8出現入站和出站 * 那麼,這樣的數據對於用戶13888888888在8基站就出現了重複數據,須要針對key爲13888888888_8的value加1 * 由於咱們計算的是幾個月的數據,那麼,其餘天數也會出現這種狀況,累加到13888888888_8這個key中 */ def computeCount(rdd1: RDD[Array[String]]): Map[String, Int] = { var map = Map(("init", 0)) //todo:groupBy:按照"電話_基站ID_年月日"分組,將符合同一組的數據聚在一塊兒 for ((k, v) <- rdd1.groupBy(x => x(0) + "_" + x(2) + "_" + x(1).substring(0, 8)).collect()) { val tmp = map.getOrElse(k.substring(0, k.length() - 9), 0) if (v.size % 2 == 0) { //todo:以「電話_基站ID」做爲key,將共同出現的次數做爲value,存到map中 map += (k.substring(0, k.length() - 9) -> (tmp + v.size / 2)) } } map } /** * 計算在家的時間 */ def computeHomeTime(rdd1: RDD[Array[String]], rdd2: RDD[(String, (String, String))], outDir: String, map: Map[String, Int]) { //todo:(手機號_基站ID,時間)算法:24-x 或者 x-20 val rdd3 = rdd1.map(x => ((x(0) + "_" + x(2), if (x(3).toInt == 1) 24 - Integer.parseInt(x(1).substring(8, 14)) / 10000 else Integer.parseInt(x(1).substring(8, 14)) / 10000 - 20))) //todo:手機號_基站ID,總時間 val rdd4 = rdd3.reduceByKey(_ + _).map { case (telPhone_zhanId, totalTime) => { (telPhone_zhanId, totalTime - (Math.abs(map.getOrElse(telPhone_zhanId, 0)) * 4)) } } //todo:按照總時間排序(手機號_基站ID,總時間<倒敘>) val rdd5 = rdd4.sortBy(_._2, false) //todo:分割成:手機號,(基站ID,總時間) val rdd6 = rdd5.map { case (telphone_zhanId, totalTime) => (telphone_zhanId.split("_")(0), (telphone_zhanId.split("_")(1), totalTime)) } //todo:找到時間的最大值:(手機號,compactBuffer((基站ID,總時間1),(基站ID,總時間2))) val rdd7 = rdd6.groupByKey.map { case (telphone, buffer) => (telphone, buffer.head) }.map { case (telphone, (zhanId, totalTime)) => (telphone, zhanId, totalTime) } //todo:join都獲取基站的經緯度 val rdd8 = rdd7.map { case (telphon, zhanId, time) => (zhanId, (telphon, time)) }.join(rdd2).map { //todo:(a,(1,2)) case (zhanId, ((telphon, time), (jingdu, weidu))) => (telphon, zhanId, jingdu, weidu) } rdd8.foreach(println) //rdd8.saveAsTextFile(outDir) } /** * 計算工做的時間 */ def computeOfficeTime(rdd1: RDD[Array[String]], rdd2: RDD[(String, (String, String))], outDir: String, map: Map[String, Int]) { //todo:(手機號_基站ID,時間) 算法:18-x 或者 x-8 val rdd3 = rdd1.map(x => ((x(0) + "_" + x(2), if (x(3).toInt == 1) 18 - Integer.parseInt(x(1).substring(8, 14)) / 10000 else Integer.parseInt(x(1).substring(8, 14)) / 10000 - 8))) //todo:手機號_基站ID,總時間 val rdd4 = rdd3.reduceByKey(_ + _).map { case (telPhone_zhanId, totalTime) => { (telPhone_zhanId, totalTime - (Math.abs(map.getOrElse(telPhone_zhanId, 0)) * 10)) } } //todo:按照總時間排序(手機號_基站ID,總時間<倒敘>) val rdd5 = rdd4.sortBy(_._2, false) //todo:分割成:手機號,(基站ID,總時間) val rdd6 = rdd5.map { case (telphone_zhanId, totalTime) => (telphone_zhanId.split("_")(0), (telphone_zhanId.split("_")(1), totalTime)) } //todo:找到時間的最大值:(手機號,compactBuffer((基站ID,總時間1),(基站ID,總時間2))) val rdd7 = rdd6.groupByKey.map { case (telphone, buffer) => (telphone, buffer.head) }.map { case (telphone, (zhanId, totalTime)) => (telphone, zhanId, totalTime) } //todo:join都獲取基站的經緯度 val rdd8 = rdd7.map { case (telphon, zhanId, time) => (zhanId, (telphon, time)) }.join(rdd2).map { case (zhanId, ((telphon, time), (jingdu, weidu))) => (telphon, zhanId, jingdu, weidu) } rdd8.foreach(println) //rdd8.saveAsTextFile(outDir) } }