//提取有用信息,轉換格式 object SparkStatFormatJob { def main(args: Array[String]) = { val spark = SparkSession.builder().appName("SparkStatFormatJob").master("local[2]").getOrCreate() val access = spark.sparkContext.textFile("/Users/kingheyleung/Downloads/data/10000_access.log") //access.take(10).foreach(println) access.map(line => { val splits = line.split(" ") val ip = splits(0) //用斷點的方法,觀察splits數組,找出時間、url、流量對應哪個字段 //建立時間類DateUtils,轉換成經常使用的時間表達方式 //把url多餘的""引號清除掉 val time = splits(3) + " " + splits(4) val url = splits(11).replaceAll("\"", "") val traffic = splits(9) //(ip, DateUtils.parse(time), url, traffic) 用來測試輸出是否正常 //把裁剪好的數據從新組合,用Tab分割 DateUtils.parse(time) + "\t" + url + "\t" + traffic + "\t" + ip }).saveAsTextFile("file:///usr/local/mycode/immooclog/") spark.stop() } }
//日期解析 object DateUtils { //輸入格式 val ORIGINAL_TIME_FORMAT = FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:sss Z", Locale.ENGLISH) //輸出格式 val TARGET_TIME_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss") def parse(time:String) = { TARGET_TIME_FORMAT.format(new Date(getTime(time))) } def getTime(time:String) = { try { ORIGINAL_TIME_FORMAT.parse(time.substring(time.indexOf("[") + 1, time.lastIndexOf("]"))).getTime } catch { case e : Exception => { 0l } } }
通常日誌處理須要進行分區java
//解析日誌 object SparkStatCleanJob { def main(args: Array[String]) = { val spark = SparkSession.builder().appName("SparkStatCleanJob").master("local[2]").getOrCreate() val accessRDD = spark.sparkContext.textFile("file:///Users/kingheyleung/Downloads/data/access_10000.log") //RDD convert to DF, define Row and StructType val accessDF = spark.createDataFrame(accessRDD.map(line => LogConvertUtils.convertToRow(line)), LogConvertUtils.struct) //accessDF.printSchema() //accessDF.show(false) spark.stop() } }
//RDD轉換成DF的工具類 object LogConvertUtils { //構建Struct val struct = StructType( Array( StructField("url", StringType), StructField("cmsType", StringType), StructField("cmsId", LongType), StructField("traffic", LongType), StructField("ip", StringType), StructField("city", StringType), StructField("time", StringType), StructField("day", StringType) ) ) //提取信息,構建Row def convertToRow(line:String) = { try { val splits = line.split("\t") val url = splits(1) val traffic = splits(2).toLong val ip = splits(3) val domain = "http://www.imooc.com/" val cms = url.substring(url.indexOf(domain) + domain.length()) val cmsSplits = cms.split("/") var cmsType = "" var cmsId = 0l //判斷是否存在 if (cmsSplits.length > 1) { cmsType = cmsSplits(0) cmsId = cmsSplits(1).toLong } val city = IpUtils.getCity(ip) //經過Ip解析工具傳進,具體看下面 val time = splits(0) val day = time.substring(0, 10).replaceAll("-", "") //定義Row,與Struct同樣 Row(url, cmsType, cmsId, traffic, ip, city, time, day) } catch { case e: Exception => Row(0) } } }
注意:轉換時必定要記得類型轉換!!!!mysql
//完成統計操做 object TopNStatJob { def main(args: Array[String]) { val spark = SparkSession.builder().appName("TopNStatJob") .config("spark.sql.sources.partitionColumnTypeInference.enabled", "false") .master("local[2]").getOrCreate() val accessDF = spark.read.format("parquet").load("/Users/kingheyleung/Downloads/data/clean/") dfCountTopNVideo(spark, accessDF) sqlCountTopNVideo(spark, accessDF) //accessDF.printSchema() spark.stop() } def dfCountTopNVideo(spark: SparkSession, accessDF: DataFrame): Unit = { /* * DF API * */ //導入隱式轉換, 留意$號的使用, 而且導入functions包,使agg聚合函數count可以使用,此處若不用$的話,就沒法讓times進行desc排序了 import spark.implicits._ val topNDF = accessDF.filter($"day" === "20170511" && $"cmsType" === "video") .groupBy("day", "cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc) topNDF.show(false) } def sqlCountTopNVideo(spark: SparkSession, accessDF: DataFrame): Unit = { /* * SQL API * */ //建立臨時表access_view,注意換行時,很容易忽略掉空格 accessDF.createTempView("access_view") val topNDF = spark.sql("select day, cmsId, count(1) as times from access_view " + "where day == '20170511' and cmsType == 'video' " + "group by day, cmsId " + "order by times desc") topNDF.show(false) } }
/* * 鏈接MySQL數據庫 * 操做工具類 * */ object MySQLUtils { //得到鏈接 def getConnection(): Unit = { DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_project?user=root&password=666") } //釋放資源 def release(connection: Connection, pstmt: PreparedStatement): Unit = { try { if (pstmt != null) { pstmt.close() } } catch { case e: Exception => e.printStackTrace() } finally { connection.close() } } }
//課程訪問次數實體類 case class VideoAccessStat(day: String, cmsId:Long, times: Long) /* * 各個維度統計的DAO操做 * */ object StatDAO { /* * 批量保存VideoAccessStat到數據庫 * */ def insertDayAccessTopN(list: ListBuffer[VideoAccessStat]): Unit = { var connection: Connection = null //jdbc的準備工做, 定義鏈接 var pstmt: PreparedStatement = null try { connection = MySQLUtils.getConnection() //真正獲取鏈接 connection.setAutoCommit(false) //爲了實現批處理,要關掉默認的自動提交 val sql = "insert into day_topn_video(day, cms_id, times) values (?, ?, ?)" //佔位符 pstmt = connection.prepareStatement(sql) //把SQL語句生成pstmt對象,後面才能夠填充佔位符中的數據 for (ele <- list) { pstmt.setString(1, ele.day) pstmt.setLong(2, ele.cmsId) pstmt.setLong(3, ele.times) pstmt.addBatch() //加入批處理 } pstmt.execute() //執行批量處理 connection.commit() //手工提交 } catch { case e: Exception => e.printStackTrace() } finally { MySQLUtils.release(connection, pstmt) } } }
try { topNDF.foreachPartition(partitionOfRecords => { // val list = new ListBuffer[VideoAccessStat] //建立list來裝統計記錄 //遍歷每一條記錄,取出來上面對應的三個字段day,cmsId,times partitionOfRecords.foreach(info => { val day = info.getAs[String]("day") //後面的就是取出來的記錄的每一個字段 val cmsId = info.getAs[Long]("cmsId") val times = info.getAs[Long]("times") //每一次循環建立一個VideoAccessStat對象,添加一次進入list中 list.append(VideoAccessStat(day, cmsId, times)) }) //把list傳進DAO類 StatDAO.insertDayAccessTopN(list) }) } catch { case e: Exception => e.printStackTrace() }
//先計算訪問次數,並按照day,cmsId,city分組 val cityAccessTopNDF = accessDF.filter(accessDF.col("day") === "20170511" && accessDF.col("cmsType") === "video") .groupBy("day", "cmsId", "city").agg(count("cmsId").as("times")) //進行分地市排序,使用到row_number函數,生成一個排名,定義爲time_rank, 而且取排名前3 cityAccessTopNDF.select( cityAccessTopNDF.col("day"), cityAccessTopNDF.col("cmsId"), cityAccessTopNDF.col("times"), cityAccessTopNDF.col("city"), row_number().over(Window.partitionBy(cityAccessTopNDF.col("city")) .orderBy(cityAccessTopNDF.col("times").desc) ).as("times_rank") ).filter("times_rank <= 3").show(false) }
def deleteDayData(day: String) = { var connection: Connection = null var pstmt: PreparedStatement = null var tables = Array("day_topn_video", "day_city_topn_video", "traffic_topn_video" ) try { connection = MySQLUtils.getConnection() for (table <- tables) { val deleteSql = s"delete from $table where day = ?」 //Scala特殊處理 pstmt = connection.prepareStatement(deleteSql) pstmt.setString(1, table) pstmt.setString(2, day) pstmt.executeUpdate() } } catch { case e: Exception => e.printStackTrace() } finally { MySQLUtils.release(connection, pstmt) } }