大數據學習——spark運營案例

iplocation需求php

在互聯網中,咱們常常會見到城市熱點圖這樣的報表數據,例如在百度統計中,會統計今年的熱門旅遊城市、熱門報考學校等,會將這樣的信息顯示在熱點圖中。java

 

 所以,咱們須要經過日誌信息(運行商或者網站本身生成)和城市ip段信息來判斷用戶的ip段,統計熱點經緯度。mysql

練習數據 web

連接:https://pan.baidu.com/s/14IA1pzUWEnDK_VCH_LYRLw
提取碼:pnwv 算法


 

 

package org.apache.spark

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import java.io.{BufferedReader, FileInputStream, InputStreamReader}
import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

/**
  * Created by Administrator on 2019/6/12.
  */
object IPLocation {
  def ip2Long(ip: String): Long = {
    //ip轉數字口訣
    //分金定穴循八卦,toolong插棍左八圈
    val split: Array[String] = ip.split("[.]")
    var ipNum = 0L
    for (i <- split) {
      ipNum = i.toLong | ipNum << 8L
    }
    ipNum
  }

  //二分法查找
  def binarySearch(ipNum: Long, value: Array[(String, String, String, String, String)]): Int = {
    //上下循環循上下,左移右移尋中間
    var start = 0
    var end = value.length - 1
    while (start <= end) {
      val middle = (start + end) / 2
      if (ipNum >= value(middle)._1.toLong && ipNum <= value(middle)._2.toLong) {
        return middle
      }
      if (ipNum > value(middle)._2.toLong) {
        start = middle
      }
      if (ipNum < value(middle)._1.toLong) {
        end = middle
      }
    }
    -1
  }

  def data2MySQL(iterator: Iterator[(String, Int)]): Unit = {
    var conn: Connection = null
    var ps: PreparedStatement = null
    val sql = "INSERT INTO location_count (location, total_count) VALUES (?, ?)"
    try {
      conn = DriverManager.getConnection("jdbc:mysql://192.168.74.100:3306/test", "root", "123456")
      iterator.foreach(line => {
        ps = conn.prepareStatement(sql)
        ps.setString(1, line._1)
        ps.setInt(2, line._2)
        ps.executeUpdate()
      })
    } catch {
      case e: Exception => println(e)
    } finally {
      if (ps != null)
        ps.close()
      if (conn != null)
        conn.close()
    }
  }

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("iplocation").setMaster("local[5]")
    val sc = new SparkContext(conf)
    //讀取數據(ipstart,ipend,城市基站名,經度,維度)
    val jizhanRDD = sc.textFile("E:\\ip.txt").map(_.split("\\|")).map(x => (x(2), x(3), x(4) + "-" + x(5) + "-" + x(6) + "-" + x(7) + "-" + x(8) + "-" + x(9), x(13), x(14)))
    //    jizhanRDD.foreach(println)
    //把RDD轉換成數據
    val jizhanPartRDDToArray: Array[(String, String, String, String, String)] = jizhanRDD.collect()
    //廣播變量,一個只讀的數據區,是全部的task都能讀取的地方,至關於mr的分佈式內存
    val jizhanRDDToArray: Broadcast[Array[(String, String, String, String, String)]] = sc.broadcast(jizhanPartRDDToArray)
    //    println(jizhanRDDToArray.value)

    val IPS = sc.textFile("E:\\20090121000132.394251.http.format").map(_.split("\\|")).map(x => x(1))

    //把ip地址轉換爲Long類型,而後經過二分法去ip段數據中查找,對找到的經緯度作wordcount
    //((經度,緯度),1)
    val result = IPS.mapPartitions(it => {

      val value: Array[(String, String, String, String, String)] = jizhanRDDToArray.value
      it.map(ip => {
        //將ip轉換成數字
        val ipNum: Long = ip2Long(ip)

        //拿這個數字去ip段中經過二分法查找,返回ip在ip的Array中的角標
        val index: Int = binarySearch(ipNum, value)
        //通Array拿出想要的數據
        ((value(index)._4, value(index)._5), 1)
      })
    })

    //聚合操做
    val resultFinnal: RDD[((String, String), Int)] = result.reduceByKey(_ + _)

    //    resultFinnal.foreach(println)
    //將數據存儲到數據庫
    resultFinnal.map(x => (x._1._1 + "-" + x._1._2, x._2)).foreachPartition(data2MySQL _)
    sc.stop()

  }

}

 

 

 

 

 

PV案例sql

 

 

package org.apache.spark

import org.apache.spark.rdd.RDD

/**
  * Created by Administrator on 2019/6/12.
  */
//PV(Page View)訪問量, 即頁面瀏覽量或點擊量
object PV {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("pv").setMaster("local[2]")
    val sc = new SparkContext(conf)
    //讀取數據access.log
    val file: RDD[String] = sc.textFile("e:\\access.log")
    //將一行數據做爲輸入,將()
    val pvAndOne: RDD[(String, Int)] = file.map(x => ("pv", 1))
    //聚合計算
    val result = pvAndOne.reduceByKey(_ + _)
    result.foreach(println)
  }
}

 

 

 

UV 數據庫

package org.apache.spark

import org.apache.spark.rdd.RDD

/**
  * Created by Administrator on 2019/6/12.
  */
//UV(Unique Visitor)獨立訪客,統計1天內訪問某站點的用戶數(以cookie爲依據);訪問網站的一臺電腦客戶端爲一個訪客
object UV {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("pv").setMaster("local[2]")
    val sc = new SparkContext(conf)
    //讀取數據access.log
    val file: RDD[String] = sc.textFile("e:\\access.log")

    //要分割file,拿到ip,而後去重
    val uvAndOne = file.map(_.split(" ")).map(x => x(0)).distinct().map(x => ("uv", 1))

    //聚合
    val result = uvAndOne.reduceByKey(_ + _)
    result.foreach(println)
  }
}

 

 

pv uv環比分析apache

 

package org.apache.spark

import scala.collection.mutable.ArrayBuffer

/**
  * Created by Administrator on 2019/6/12.
  */
object Pvbi {
//  LoggerLevels.setStreamingLogLevels()
  val conf = new SparkConf().setAppName("pv").setMaster("local[7]")
  val sc = new SparkContext(conf)
  val PVArr = ArrayBuffer[(String, Int)]()
  val UVArr = ArrayBuffer[(String, Int)]()

  def main(args: Array[String]) {
    computePVOneDay("e:\\access/tts7access20140824.log")
    computePVOneDay("e:\\access/tts7access20140825.log")
    computePVOneDay("e:\\access/tts7access20140826.log")
    computePVOneDay("e:\\access/tts7access20140827.log")
    computePVOneDay("e:\\access/tts7access20140828.log")
    computePVOneDay("e:\\access/tts7access20140829.log")
    computePVOneDay("e:\\access/tts7access20140830.log")
    println(PVArr)
    computeUVOneDay("e:\\access/tts7access20140824.log")
    computeUVOneDay("e:\\access/tts7access20140825.log")
    computeUVOneDay("e:\\access/tts7access20140826.log")
    computeUVOneDay("e:\\access/tts7access20140827.log")
    computeUVOneDay("e:\\access/tts7access20140828.log")
    computeUVOneDay("e:\\access/tts7access20140829.log")
    computeUVOneDay("e:\\access/tts7access20140830.log")
    println(UVArr)
  }

  def computePVOneDay(filePath: String): Unit = {
    val file = sc.textFile(filePath)
    val pvTupleOne = file.map(x => ("pv", 1)).reduceByKey(_ + _)
    val collect: Array[(String, Int)] = pvTupleOne.collect()
    PVArr.+=(collect(0))
  }

  def computeUVOneDay(filePath: String): Unit = {
    val rdd1 = sc.textFile(filePath)
    val rdd3 = rdd1.map(x => x.split(" ")(0)).distinct
    val rdd4 = rdd3.map(x => ("uv", 1))
    val rdd5 = rdd4.reduceByKey(_ + _)
    val collect: Array[(String, Int)] = rdd5.collect()
    UVArr.+=(collect(0))
  }
}

 

 

 

TopKcookie

package org.apache.spark

import org.apache.spark.rdd.RDD

/**
  * Created by Administrator on 2019/6/12.
  */
object TopK {
  def main(args: Array[String]) {
    //建立配置,設置app的name
    val conf = new SparkConf().setAppName("topk").setMaster("local[2]")
    //建立sparkcontext,將conf傳進來
    val sc = new SparkContext(conf)
    //讀取數據access.log
    val file: RDD[String] = sc.textFile("e:\\access.log")
    //將一行數據做爲輸入,將()
    val refUrlAndOne: RDD[(String, Int)] = file.map(_.split(" ")).map(x => x(10)).map((_, 1))

    //聚合
    val result: Array[(String, Int)] = refUrlAndOne.reduceByKey(_ + _).sortBy(_._2, false).take(3)

    println(result.toList)
  }
}

 

 

 https://web.umeng.com/main.php?c=flow&a=frame&siteid=1254552353#!/1560332550346/flow/trend/1/1254552353/2019-06-12/2019-06-12app

 

 

 

 

mobile_location案例

 

須要的數據

連接:https://pan.baidu.com/s/1JbGxnrgxcy05LFUmVo8AUQ 
提取碼:h7io 

package org.apache.spark

import org.apache.spark.rdd.RDD

import scala.collection.mutable.Map

object MobileLocation {

  def main(args: Array[String]) {
    //本地運行
    val conf = new SparkConf().setAppName("UserLocation").setMaster("local[5]")
    val sc = new SparkContext(conf)

    //todo:過濾出工做時間(讀取基站用戶信息:18688888888,20160327081200,CC0710CC94ECC657A8561DE549D940E0,1)
    val officetime = sc.textFile("e:\\ce\\*.log")
      .map(_.split(",")).filter(x => (x(1).substring(8, 14) >= "080000" && (x(1).substring(8, 14) <= "180000")))

    //todo:過濾出家庭時間(讀取基站用戶信息:18688888888,20160327081200,CC0710CC94ECC657A8561DE549D940E0,1)
    val hometime = sc.textFile("e:\\ce\\*.log")
      .map(_.split(",")).filter(x => (x(1).substring(8, 14) > "180000" && (x(1).substring(8, 14) <= "240000")))

    //todo:讀取基站信息:9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6
    val rdd20 = sc.textFile("e:\\ce\\loc_info.txt")
      .map(_.split(",")).map(x => (x(0), (x(1), x(2))))

    //todo:計算多餘的時間次數
    val map1Result = computeCount(officetime)
    val map2Result = computeCount(hometime)
    val mapBro1 = sc.broadcast(map1Result)
    val mapBro2 = sc.broadcast(map2Result)

    //todo:計算工做時間
    computeOfficeTime(officetime, rdd20, "c://out/officetime", mapBro1.value)

    //todo:計算家庭時間
    computeHomeTime(hometime, rdd20, "c://out/hometime", mapBro2.value)
    sc.stop()
  }

  /**
    * 計算多餘的時間次數
    *
    * 一、將「電話_基站ID_年月日"按key進行分組,若是value的大小爲2,那麼證實在同一天同一時間段(8-18或者20-24)同時出現兩次,那麼這樣的數據須要記錄,減去多餘的時間
    * 二、以「電話_基站ID」做爲key,將共同出現的次數爲2的累加,做爲value,存到map中,
    * 例如:
    * 13888888888_8_20160808100923_1和13888888888_8_20160808170923_0表示在13888888888在同一天20160808的8-18點的時間段,在基站8出現入站和出站
    * 那麼,這樣的數據對於用戶13888888888在8基站就出現了重複數據,須要針對key爲13888888888_8的value加1
    * 由於咱們計算的是幾個月的數據,那麼,其餘天數也會出現這種狀況,累加到13888888888_8這個key中
    */
  def computeCount(rdd1: RDD[Array[String]]): Map[String, Int] = {
    var map = Map(("init", 0))
    //todo:groupBy:按照"電話_基站ID_年月日"分組,將符合同一組的數據聚在一塊兒
    for ((k, v) <- rdd1.groupBy(x => x(0) + "_" + x(2) + "_" + x(1).substring(0, 8)).collect()) {
      val tmp = map.getOrElse(k.substring(0, k.length() - 9), 0)
      if (v.size % 2 == 0) {
        //todo:以「電話_基站ID」做爲key,將共同出現的次數做爲value,存到map中
        map += (k.substring(0, k.length() - 9) -> (tmp + v.size / 2))
      }
    }
    map
  }

  /**
    * 計算在家的時間
    */
  def computeHomeTime(rdd1: RDD[Array[String]], rdd2: RDD[(String, (String, String))], outDir: String, map: Map[String, Int]) {

    //todo:(手機號_基站ID,時間)算法:24-x 或者 x-20
    val rdd3 = rdd1.map(x => ((x(0) + "_" + x(2), if (x(3).toInt == 1) 24 - Integer.parseInt(x(1).substring(8, 14)) / 10000
    else Integer.parseInt(x(1).substring(8, 14)) / 10000 - 20)))

    //todo:手機號_基站ID,總時間
    val rdd4 = rdd3.reduceByKey(_ + _).map {
      case (telPhone_zhanId, totalTime) => {
        (telPhone_zhanId, totalTime - (Math.abs(map.getOrElse(telPhone_zhanId, 0)) * 4))
      }
    }

    //todo:按照總時間排序(手機號_基站ID,總時間<倒敘>)
    val rdd5 = rdd4.sortBy(_._2, false)

    //todo:分割成:手機號,(基站ID,總時間)
    val rdd6 = rdd5.map {
      case (telphone_zhanId, totalTime) => (telphone_zhanId.split("_")(0), (telphone_zhanId.split("_")(1), totalTime))
    }

    //todo:找到時間的最大值:(手機號,compactBuffer((基站ID,總時間1),(基站ID,總時間2)))
    val rdd7 = rdd6.groupByKey.map {
      case (telphone, buffer) => (telphone, buffer.head)
    }.map {
      case (telphone, (zhanId, totalTime)) => (telphone, zhanId, totalTime)
    }

    //todo:join都獲取基站的經緯度
    val rdd8 = rdd7.map {
      case (telphon, zhanId, time) => (zhanId, (telphon, time))
    }.join(rdd2).map {
      //todo:(a,(1,2))
      case (zhanId, ((telphon, time), (jingdu, weidu))) => (telphon, zhanId, jingdu, weidu)
    }
    rdd8.foreach(println)
    //rdd8.saveAsTextFile(outDir)
  }

  /**
    * 計算工做的時間
    */
  def computeOfficeTime(rdd1: RDD[Array[String]], rdd2: RDD[(String, (String, String))], outDir: String, map: Map[String, Int]) {

    //todo:(手機號_基站ID,時間) 算法:18-x 或者 x-8
    val rdd3 = rdd1.map(x => ((x(0) + "_" + x(2), if (x(3).toInt == 1) 18 - Integer.parseInt(x(1).substring(8, 14)) / 10000
    else Integer.parseInt(x(1).substring(8, 14)) / 10000 - 8)))

    //todo:手機號_基站ID,總時間
    val rdd4 = rdd3.reduceByKey(_ + _).map {
      case (telPhone_zhanId, totalTime) => {
        (telPhone_zhanId, totalTime - (Math.abs(map.getOrElse(telPhone_zhanId, 0)) * 10))
      }
    }

    //todo:按照總時間排序(手機號_基站ID,總時間<倒敘>)
    val rdd5 = rdd4.sortBy(_._2, false)

    //todo:分割成:手機號,(基站ID,總時間)
    val rdd6 = rdd5.map {
      case (telphone_zhanId, totalTime) => (telphone_zhanId.split("_")(0), (telphone_zhanId.split("_")(1), totalTime))
    }

    //todo:找到時間的最大值:(手機號,compactBuffer((基站ID,總時間1),(基站ID,總時間2)))
    val rdd7 = rdd6.groupByKey.map {
      case (telphone, buffer) => (telphone, buffer.head)
    }.map {
      case (telphone, (zhanId, totalTime)) => (telphone, zhanId, totalTime)
    }

    //todo:join都獲取基站的經緯度
    val rdd8 = rdd7.map {
      case (telphon, zhanId, time) => (zhanId, (telphon, time))
    }.join(rdd2).map {
      case (zhanId, ((telphon, time), (jingdu, weidu))) => (telphon, zhanId, jingdu, weidu)
    }
    rdd8.foreach(println)
    //rdd8.saveAsTextFile(outDir)
  }

}

相關文章
相關標籤/搜索