輸入輸出轉化工具類sql
package com.rz.mobile_tag.log import org.apache.spark.sql.Row import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} /** * 訪問日誌轉換(輸入==>輸出)工具類 */ object AccessConvertUtil { // 定義的輸出字段 val structType = StructType( Array( StructField("url", StringType), StructField("cmsType", StringType), StructField("cmsId", LongType), StructField("traffic", LongType), StructField("ip", StringType), StructField("city", StringType), StructField("time", StringType), StructField("day", StringType) ) ) /** * 根據輸入的每一行信息轉換成輸出的樣式 * @param log 輸入的每一行記錄信息 */ def parseLog(log:String)={ try{ val splits = log.split("\t",-1) val url = splits(1) val traffic = splits(2).toLong val ip = splits(3) val domain = "http://www.rz.com/" val cms = url.substring(url.indexOf(domain)+domain.length) val cmsTypeId = cms.split("/") var cmsType = "" var cmsId = 0l if (cmsTypeId.length>1){ cmsType = cmsTypeId(0) cmsId = cmsTypeId(1).toLong } val city="" val time = splits(0) val day = time.substring(0, 10).replaceAll("-","") // 這個Row裏面的字段要和Struct中的字段對應上 Row(url, cmsType, cmsId, traffic, ip, city, time, day) }catch { case e:Exception =>{ Row(0) } } } }
讀取數據,清洗輸出目標數據apache
package com.rz.mobile_tag.log import org.apache.spark.sql.{SaveMode, SparkSession} /** * 使用Spark完成咱們的數據清洗操做 */ object SparkStatCleanJob { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName(s"${this.getClass.getSimpleName}") .master("local[2]") .getOrCreate() val accessRDD = spark.sparkContext.textFile(args(0)) // debug查看數據 // accessRDD.take(10).foreach(println) val accessDF = spark.createDataFrame(accessRDD.map(log =>AccessConvertUtil.parseLog(log)),AccessConvertUtil.structType) // accessDF.printSchema() // accessDF.show(false) accessDF.coalesce(1).write.format("parquet").mode(SaveMode.Overwrite).partitionBy("day").save(args(1)) spark.stop() } }