Spark SQL實現日誌離線批處理

1、 基本的離線數據處理架構:
 
  1. 數據採集   Flume:Web日誌寫入到HDFS
  2. 數據清洗   髒數據 Spark、Hive、MR等計算框架來完成。 清洗完以後再放回HDFS
  3. 數據處理   按照須要,進行業務的統計和分析。 也經過計算框架完成
  4. 處理結果入庫   存放到RDBMS、NoSQL中
  5. 數據可視化    經過圖形化展現出來。  ECharts、HUE、Zeppelin
 
處理框圖:
 
1 2 3 4 5 6 7爲離線處理,其中5不必定是Hive(還有Spark SQL等) 6不必定是RDBMS(NoSQL)
執行時,可用調度框架Oozie、Azkaban,指定任務執行的時間
 
另一條線是實時處理
 
 
擬定項目需求:
  1. 統計某時間段最受歡迎的某項的TopN和對應的訪問次數
  2. 按地市統計最受歡迎  從IP提取城市信息
  3. 按訪問流量統計
 
 
互聯網日誌通常包括有:
訪問時間  訪問URL  耗費流量   訪問IP地址
從日誌裏提取以上咱們須要的數據
 
假設咱們如今僅有一臺電腦供學習做爲集羣使用,爲了防止內存溢出,有必要進行剪切日誌:
用head -10000命令截取前10000條
數據量太大的話,在IDE中可能會報錯
 
 
 
 2、日誌處理過程
 
數據清洗:
 
第一步: 從原始日誌提取有用信息,本例中就是拿到時間、URL、流量、IP
  1. 讀取日誌文件,獲得RDD,經過map方法,split成一個數組,而後選擇數組中有用的幾項(用斷點的方法分析哪幾項有用,並匹配相應的變量)
  2. 獲取到的信息有可能由於某些問題,如線程問題而致使生成了帶有錯誤的信息,第一步中一開始用了SimpleDateFormat(線程不安全)來轉變時間格式,會致使某些時間轉換錯誤。通常要改爲FastDateFormat來作
 
實現代碼:
//提取有用信息,轉換格式
object SparkStatFormatJob {
  def main(args: Array[String]) = {
    val spark = SparkSession.builder().appName("SparkStatFormatJob").master("local[2]").getOrCreate()
    val access = spark.sparkContext.textFile("/Users/kingheyleung/Downloads/data/10000_access.log")
    //access.take(10).foreach(println)
    access.map(line => {
      val splits = line.split(" ")
      val ip = splits(0)
      //用斷點的方法,觀察splits數組,找出時間、url、流量對應哪個字段
      //建立時間類DateUtils,轉換成經常使用的時間表達方式
      //把url多餘的""引號清除掉
      val time = splits(3) + " " + splits(4)
      val url = splits(11).replaceAll("\"", "")
      val traffic = splits(9)
      //(ip, DateUtils.parse(time), url, traffic)  用來測試輸出是否正常
      //把裁剪好的數據從新組合,用Tab分割
      DateUtils.parse(time) + "\t" + url + "\t" + traffic + "\t" + ip
    }).saveAsTextFile("file:///usr/local/mycode/immooclog/")
    spark.stop()
  }
}

 

//日期解析
object DateUtils {
  //輸入格式
  val ORIGINAL_TIME_FORMAT = FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:sss Z", Locale.ENGLISH)
  //輸出格式
  val TARGET_TIME_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
 
  def parse(time:String) = {
    TARGET_TIME_FORMAT.format(new Date(getTime(time)))
  }
  def getTime(time:String) = {
    try {
      ORIGINAL_TIME_FORMAT.parse(time.substring(time.indexOf("[") + 1, time.lastIndexOf("]"))).getTime
    } catch { 
      case e : Exception => {
        0l
      }
    }
  }

 

通常日誌處理須要進行分區java

本例中按照日誌中的訪問時間進行分區
 
 
第二步:解析上一步獲得的有用信息,我把它稱爲解析日誌
 
其實就是把較爲整潔的數據日誌,解析出每一個字段的含義,並把RDD轉成DF
在此案例中,完成的是:
輸入:訪問時間  訪問URL  耗費流量   訪問IP地址  =>轉變爲輸出:url、類型(本例中url的後綴有article仍是video)、對應ID號、流量、ip、城市、時間、天(用於分組)
而且建立DataFrame(也就是定義Row和StructType,其中Row要和原日誌的每一個字段對應,而StructType是根據所須要的輸出來定義就行)
 
實現代碼:
//解析日誌
object SparkStatCleanJob {
  def main(args: Array[String]) = {
    val spark = SparkSession.builder().appName("SparkStatCleanJob").master("local[2]").getOrCreate()
    val accessRDD = spark.sparkContext.textFile("file:///Users/kingheyleung/Downloads/data/access_10000.log")
    //RDD convert to DF, define Row and StructType
    val accessDF = spark.createDataFrame(accessRDD.map(line => LogConvertUtils.convertToRow(line)), LogConvertUtils.struct)
    //accessDF.printSchema()
    //accessDF.show(false)
    spark.stop()
  }
}

 

//RDD轉換成DF的工具類
object LogConvertUtils {
  //構建Struct
  val struct = StructType(
    Array(
      StructField("url", StringType),
      StructField("cmsType", StringType),
      StructField("cmsId", LongType),
      StructField("traffic", LongType),
      StructField("ip", StringType),
      StructField("city", StringType),
      StructField("time", StringType),
      StructField("day", StringType)
    )
  )
  //提取信息,構建Row
  def convertToRow(line:String) = {
 
    try {
      val splits = line.split("\t")
      val url = splits(1)
      val traffic = splits(2).toLong
      val ip = splits(3)
      val domain = "http://www.imooc.com/"
      val cms = url.substring(url.indexOf(domain) + domain.length())
      val cmsSplits = cms.split("/")
 
      var cmsType = ""
      var cmsId = 0l
      //判斷是否存在
      if (cmsSplits.length > 1) {
        cmsType = cmsSplits(0)
        cmsId = cmsSplits(1).toLong
      }
      val city = IpUtils.getCity(ip)     //經過Ip解析工具傳進,具體看下面
      val time = splits(0)
      val day = time.substring(0, 10).replaceAll("-", "")
 
      //定義Row,與Struct同樣
      Row(url, cmsType, cmsId, traffic, ip, city, time, day)
    } catch {
      case e: Exception => Row(0)
    }
  }
}

注意:轉換時必定要記得類型轉換!!!!mysql

 
進一步解析:對IP地址解析來得到城市信息
 
在這裏,爲了讓IP地址轉換成直觀的城市信息,我使用了GitHub上的開源項目來實現:
用Maven編譯下載的項目
mvn clean package -DskipTests
 
安裝jar包到本身的Maven倉庫中: 
mvn install:install-file -Dfile=路徑.jar -DgroupId=com.ggstar -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar
 
在IDE裏面的pom.xml添加dependency,參照GitHub主頁上的pom.xml中的dependency
可是出現報錯了:
java.io.FileNotFoundException:
file:/Users/rocky/maven_repos/com/ggstar/ipdatabase/1.0/ipdatabase-1.0.jar!/ipRegion.xlsx (No such file or directory)
根據提示,咱們須要在項目源碼中找到相應的文件拷進去IDE中的main/resources中!
 
 存儲清洗後的數據:
按day分區來進行存儲  partitionBy
存儲模式:mode(SaveMode.Overwrite)  覆蓋存儲
coalesce:聽說生產中常常用,是項目的調優勢,控制文件的輸出大小,個數
 
 
3、統計功能實現
 
功能實現一:統計TopN視頻  
 
第一步:讀取數據,read.format().load
第二步:
  1. 使用DataFrame API統計分析
  2. SQL API
最後把統計結果保存在MySQL數據庫中
 
調優勢:
讀取parquet文件時,系統會默認解析各字段相應的數據類型,但有時候咱們就只須要它是String類型,須要在SparkSession定義時添加:
config("spark.sql.sources.partitionColumnTypeInference.enabled, "false"")
變成只會按照原類型讀入
 
兩種方法:
若使用DataFrame API來作:
用$號時候須要導入 隱式轉換(這裏是列名轉換成列)!spark.implicits._
用到dataframe的count()函數要導入包:org.apache.spark.sql.functions._
 
若使用SQL API來作:
建立臨時表createTempView
當心寫SQL語句換行時不注意而忽略空格
 
實現代碼:
 
//完成統計操做
object TopNStatJob {
  def main(args: Array[String]) {
    val spark = SparkSession.builder().appName("TopNStatJob")
      .config("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
      .master("local[2]").getOrCreate()
    val accessDF = spark.read.format("parquet").load("/Users/kingheyleung/Downloads/data/clean/")
    dfCountTopNVideo(spark, accessDF)
    sqlCountTopNVideo(spark, accessDF)
    //accessDF.printSchema()
 
 
    spark.stop()
  }
  def dfCountTopNVideo(spark: SparkSession, accessDF: DataFrame): Unit = {
    /*
    * DF API
    * */
 
    //導入隱式轉換, 留意$號的使用, 而且導入functions包,使agg聚合函數count可以使用,此處若不用$的話,就沒法讓times進行desc排序了
    import spark.implicits._
    val topNDF = accessDF.filter($"day" === "20170511" && $"cmsType" === "video")
      .groupBy("day", "cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)
    topNDF.show(false)
  }
  def sqlCountTopNVideo(spark: SparkSession, accessDF: DataFrame): Unit = {
    /*
    * SQL API
    * */
 
    //建立臨時表access_view,注意換行時,很容易忽略掉空格
    accessDF.createTempView("access_view")
    val topNDF = spark.sql("select day, cmsId, count(1) as times from access_view " +
      "where day == '20170511' and cmsType == 'video' " +
      "group by day, cmsId " +
      "order by times desc")
    topNDF.show(false)
  }
}
 
 
在保存數據以前,須要寫鏈接MySQL數據庫的工具類,用到java.sql包
  1. 使用DriverManager,鏈接到mysql 3306
  2. 釋放資源,connection和preparedstatement都要,注意處理異常
 
注意:若測試時拿不到鏈接,出現如下報錯,那就是沒有在dependency中添加或者選對mysql-connetor包
java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost:3306/imooc_project?user=root&password=666
Error:scalac: error while loading <root>, Error accessing /Users/kingheyleung/.m2/repository/mysql/mysql-connector-java/5.0.8/mysql-connector-java-5.0.8.jar
 
我最終選的是5.1.40版本纔對了
 
 
實現代碼:
/*
* 鏈接MySQL數據庫
* 操做工具類
* */
object MySQLUtils {
  //得到鏈接
  def getConnection(): Unit = {
    DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_project?user=root&password=666")
  }
  //釋放資源
  def release(connection: Connection, pstmt: PreparedStatement): Unit = {
    try {
      if (pstmt != null) {
        pstmt.close()
      }
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      connection.close()
    }
  }
}

 

 
把統計數據保存到MySQL
  1. 在mysql中建立一張表,包含day,cms_Id,times三個字段(注意各自的數據類型,以及定義不容許爲NULL,並把day和cms_Id做爲PRI KEY)
  2. 建立模型類case class,三個輸入參數,day、cms_Id,times
  3. 建立操做數據庫DAO類,輸入的參數是一個list,list裝的是上面的模型類,目的是插入insert記錄到數據庫中,DAO中分如下幾步:
  4. 首先,作jdbc鏈接的準備,建立connection和prepareStatement,把關閉鏈接也寫好,用try catch finally拋出異常;
  5. 而後寫sql語句,preparestatement須要賦值的地方用佔位符放着;
  6. 進行對list遍歷,把每一個對象都放進pstmt中
  7. 調優勢!!!遍歷前把自動提交關掉,遍歷中把pstmt加入批處理中,遍歷完後執行批處理操做!最後手工提交鏈接
 
實現代碼:
//課程訪問次數實體類
case class VideoAccessStat(day: String, cmsId:Long, times: Long)
 
 
/*
* 各個維度統計的DAO操做
* */
object StatDAO {
  /*
  * 批量保存VideoAccessStat到數據庫
  * */
  def insertDayAccessTopN(list: ListBuffer[VideoAccessStat]): Unit = {
 
    var connection: Connection = null  //jdbc的準備工做, 定義鏈接
    var pstmt: PreparedStatement = null
 
    try {
      connection = MySQLUtils.getConnection() //真正獲取鏈接
 
      connection.setAutoCommit(false)   //爲了實現批處理,要關掉默認的自動提交
 
      val sql = "insert into day_topn_video(day, cms_id, times) values (?, ?, ?)"  //佔位符
      pstmt = connection.prepareStatement(sql)  //把SQL語句生成pstmt對象,後面才能夠填充佔位符中的數據
 
      for (ele <- list) {
        pstmt.setString(1, ele.day)
        pstmt.setLong(2, ele.cmsId)
        pstmt.setLong(3, ele.times)
 
        pstmt.addBatch()   //加入批處理
      }
 
      pstmt.execute()    //執行批量處理
      connection.commit()    //手工提交
 
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      MySQLUtils.release(connection, pstmt)
    }
  }
}

 

 
爲了對應以上的第3步,要把統計記錄的DF生成一個個對象,放進list中:
  1. 建立模型類對應的list
  2. 對記錄進行遍歷,把記錄的每一個字段當作參數,建立模型類對象
  3. 把每一個對象添加到list中
  4. 把list傳進DAO類中 
 
如下代碼添加到上面的TopNJob類裏面中就能夠把以前生成到的topDF的結果記錄保存到MySQL當中了:
try {
  topNDF.foreachPartition(partitionOfRecords => { //
    val list = new ListBuffer[VideoAccessStat]  //建立list來裝統計記錄
 
    //遍歷每一條記錄,取出來上面對應的三個字段day,cmsId,times
    partitionOfRecords.foreach(info => {
      val day = info.getAs[String]("day")   //後面的就是取出來的記錄的每一個字段
      val cmsId = info.getAs[Long]("cmsId")
      val times = info.getAs[Long]("times")
 
      //每一次循環建立一個VideoAccessStat對象,添加一次進入list中
      list.append(VideoAccessStat(day, cmsId, times))
    })
    //把list傳進DAO類
    StatDAO.insertDayAccessTopN(list)
  })
} catch {
  case e: Exception => e.printStackTrace()
}

 

 到此爲止已經把項目需求一完成。
 
 
 功能實現二:按照城市來找出topN視頻
 
在功能一的基礎上,運用row_number函數來實現
 
具體的實現代碼:
 
  //先計算訪問次數,並按照day,cmsId,city分組
  val cityAccessTopNDF = accessDF.filter(accessDF.col("day") === "20170511" && accessDF.col("cmsType") === "video")
    .groupBy("day", "cmsId", "city").agg(count("cmsId").as("times"))
 
  //進行分地市排序,使用到row_number函數,生成一個排名,定義爲time_rank, 而且取排名前3
  cityAccessTopNDF.select(
    cityAccessTopNDF.col("day"),
    cityAccessTopNDF.col("cmsId"),
    cityAccessTopNDF.col("times"),
    cityAccessTopNDF.col("city"),
    row_number().over(Window.partitionBy(cityAccessTopNDF.col("city"))
      .orderBy(cityAccessTopNDF.col("times").desc)
    ).as("times_rank")
  ).filter("times_rank <= 3").show(false)
}

 

 其餘步驟和功能一同樣,可是 插入Mysql的時候報錯,緣由是MySQL不支持插入中文!!!!
首先能夠在mysql命令行中用SET character來改:
SET character_set_client = utf8
 
可經過
show variables like 'character_set_%’;
查看當前的character編碼設置
 
而後在jdbc鏈接時,加上:
useUnicode=true&characterEncoding=utf8
 
改了以後,雖然可以導入MySQL了,並且不出現亂碼,但只有一部分數據,而且在控制檯報錯:
 
com.mysql.jdbc.PreparedStatement.fillSendPacket
com.mysql.jdbc.PreparedStatement.execute
 
後來把批處理刪掉居然就能夠把全部數據導入了:
 
pstmt.executeUpdate  //不使用批處理的pstmt插入
 
 
功能三:按流量來排序topN視頻
和功能一幾乎徹底同樣,只不過計算流量總和時用的不是count函數而是要用sum函數
 
爲了代碼的複用性,防止生成重複的數據,在StatDAO定義刪除的函數:
 
def deleteDayData(day: String) = {
 
  var connection: Connection = null
  var pstmt: PreparedStatement = null
  var tables = Array("day_topn_video",
    "day_city_topn_video",
    "traffic_topn_video"
  )
 
  try {
    connection = MySQLUtils.getConnection()
 
    for (table <- tables) {
      val deleteSql = s"delete from $table where day = ?」  //Scala特殊處理
      pstmt = connection.prepareStatement(deleteSql)
      pstmt.setString(1, table)
      pstmt.setString(2, day)
      pstmt.executeUpdate()
    }
  } catch {
    case e: Exception => e.printStackTrace()
  } finally {
    MySQLUtils.release(connection, pstmt)
  }
}

 

 
須要注意的是,table在pstmt中的特殊用法!!
 
 
後續會對以上內容進行可視化處理、跑在YARN上的修改、性能調優
相關文章
相關標籤/搜索