Spark綜合使用及用戶行爲案例區域內熱門商品統計分析實戰-Spark商業應用實戰

版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何技術交流,可隨時聯繫。sql

1 UDAF 電商業務的實現城市信息累加

  • MutableAggregationBuffer是一個數組,這裏咱們取 buffer.getString(0)。數據庫

  • 把傳進來的字符串進行追加到buffer.getString(0)中。express

    class GroupConcatDistinctUDAF extends UserDefinedAggregateFunction {
      
       輸入數據類型
       override def inputSchema: StructType = StructType(StructField("cityInfo", StringType) :: Nil)
      
       緩衝數據類型
       override def bufferSchema: StructType = StructType(StructField("bufferCityInfo", StringType) :: Nil)
      
       輸出數據類型
       override def dataType: DataType = StringType
      
       一致性校驗
       override def deterministic: Boolean = true
      
       override def initialize(buffer: MutableAggregationBuffer): Unit = {
          buffer(0)= ""
       }
      
        /**
          * 更新
          * 能夠認爲是,一個一個地將組內的字段值傳遞進來
          * 實現拼接的邏輯
          */
        override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
          // 緩衝中的已經拼接過的城市信息串
          var bufferCityInfo = buffer.getString(0)
          // 剛剛傳遞進來的某個城市信息
          val cityInfo = input.getString(0)
      
          // 在這裏要實現去重的邏輯
          // 判斷:以前沒有拼接過某個城市信息,那麼這裏才能夠接下去拼接新的城市信息
          if(!bufferCityInfo.contains(cityInfo)) {
            if("".equals(bufferCityInfo))
              bufferCityInfo += cityInfo
            else {
              // 好比1:北京
              // 1:北京,2:上海
              bufferCityInfo += "," + cityInfo
            }
      
            buffer.update(0, bufferCityInfo)
          }
        }
      
        override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        
          var bufferCityInfo1 = buffer1.getString(0);
          val bufferCityInfo2 = buffer2.getString(0);
      
          for(cityInfo <- bufferCityInfo2.split(",")) {
            if(!bufferCityInfo1.contains(cityInfo)) {
              if("".equals(bufferCityInfo1)) {
                bufferCityInfo1 += cityInfo;
              } else {
                bufferCityInfo1 += "," + cityInfo;
              }
            }
          }
      
          buffer1.update(0, bufferCityInfo1);
        }
      
        override def evaluate(buffer: Row): Any = {
          buffer.getString(0)
        }
      
      }
    複製代碼

2 UDAF 無類型的用戶自定於聚合函數求平均值

  • 分析數據apache

    第一列爲user_id,第二列爲item_id,第三列爲score
      0162381440670851711,4,7.0
      0162381440670851711,11,4.0
      0162381440670851711,32,1.0
      0162381440670851711,176,27.0
      0162381440670851711,183,11.0
      0162381440670851711,184,5.0
      0162381440670851711,207,9.0
      0162381440670851711,256,3.0
      0162381440670851711,258,4.0
      0162381440670851711,259,16.0
      0162381440670851711,260,8.0
      0162381440670851711,261,18.0
      0162381440670851711,301,1.0
    複製代碼
  • 一、inputSchemajson

    定義輸入數據的Schema,要求類型是StructType,它的參數是由StructField類型構成的數組。好比這裏要定義score列的Schema,首先使用StructField聲明score列的名字score_column,數據類型爲DoubleType。這裏輸入只有score這一列,因此StructField構成的數組只有一個元素。數組

    override def inputSchema: StructType = StructType(StructField("score_column",DoubleType)::Nil)
    複製代碼
  • 二、bufferSchema緩存

    計算score的平均值時,須要用到score的總和sum以及score的總個數count這樣的中間數據,那麼就使用bufferSchema來定義它們。安全

    override def bufferSchema: StructType = StructType(StructField("sum",DoubleType)::StructField("count",LongType)::Nil)
    複製代碼
  • 三、dataTypeapp

    咱們須要對自定義聚合函數的最終數據類型進行說明,使用dataType函數。好比計算出的平均score是Double類型。dom

    override def dataType: DataType = DoubleType
    複製代碼
  • 四、deterministic

    deterministic函數用於對輸入數據進行一致性檢驗,是一個布爾值,當爲true時,表示對於一樣的輸入會獲得一樣的輸出。由於對於一樣的score輸入,確定要獲得相同的score平均值,因此定義爲true。

    override def deterministic: Boolean = true
    複製代碼
  • 五、initialize

initialize用戶初始化緩存數據。好比score的緩存數據有兩個:sum和count,須要初始化爲sum=0.0和count=0L,第一個初始化爲Double類型,第二個初始化爲長整型。

override def initialize(buffer: MutableAggregationBuffer): Unit = {
          //sum=0.0
          buffer(0)=0.0
          //count=0
          buffer(1)=0L
        }
複製代碼
  • 六、update

當有新的輸入數據時,update用戶更新緩存變量。好比這裏當有新的score輸入時,須要將它的值更新變量sum中,並將count加1

override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
          //輸入非空
          if(!input.isNullAt(0)){
            //sum=sum+輸入的score
            buffer(0)=buffer.getDouble(0)+input.getDouble(0)
            //count=count+1
            buffer(1)=buffer.getLong(1)+1
          }
        }
複製代碼
  • 七、merge

    merge將更新的緩存變量存入到緩存中

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
            buffer1(0)=buffer1.getDouble(0)+buffer2.getDouble(0)
            buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)
          }
    複製代碼
  • 八、evaluate

    evaluate是一個計算方法,用於計算咱們的最終結果。好比這裏用於計算平均得分average(score)=sum(score)/count(score)

    override def evaluate(buffer: Row): Double = buffer.getDouble(0)/buffer.getLong(1)
    複製代碼

3 類型安全的用戶自定義聚合函數(Type-Safe User-Defined Aggregate Functions)

  • Data用於存儲itemdata.data數據,Average用於存儲計算score平均值的中間數據,須要注意的是Average的參數sum和count都要聲明爲變量var。

    case class Data(user_id: String, item_id: String, score: Double)
      case class Average(var sum: Double,var count: Long)
    複製代碼
  • 具體源碼

  • 聚合函數 toColumn.name("average_score")

  • 使用聚合函數 dataDS.select(averageScore).show()

    import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
      import org.apache.spark.sql.expressions.Aggregator
       
       
      /**
        * 類型安全自定義聚合函數
        */
      object TypeSafeMyAverageTest {
      
        case class Data(user_id: String, item_id: String, score: Double)
        case class Average(var sum: Double,var count: Long)
       
        object SafeMyAverage extends Aggregator[Data, Average, Double] {
        
            zero至關於1中的initialize初始化函數,初始化存儲中間數據的Average
            override def zero: Average = Average(0.0D, 0L)
           
            reduce函數至關於1中的update函數,當有新的數據a時,更新中間數據b
            override def reduce(b: Average, a: Data): Average = {
                b.sum += a.score
                b.count += 1L
                b
              }
           
              override def merge(b1: Average, b2: Average): Average = {
                b1.sum+=b2.sum
                b1.count+= b2.count
                b1
              }
           
              override def finish(reduction: Average): Double = reduction.sum / reduction.count
           
              緩衝數據編碼方式
              override def bufferEncoder: Encoder[Average] = Encoders.product
           
              最終數據輸出編碼方式
              override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
            }
           
            def main(args: Array[String]): Unit = {
              //建立Spark SQL切入點
              val spark = SparkSession.builder().master("local").appName("My-Average").getOrCreate()
              //讀取HDFS文件系統數據itemdata.data生成RDD
              val rdd = spark.sparkContext.textFile("hdfs://192.168.189.21:8020/input/mahout-demo/itemdata.data")
              //RDD轉化成DataSet
              import spark.implicits._
              val dataDS =rdd.map(_.split(",")).map(d => Data(d(0), d(1), d(2).toDouble)).toDS()
              //自定義聚合函數
              val averageScore = SafeMyAverage.toColumn.name("average_score")
              dataDS.select(averageScore).show()
            }
      }
    複製代碼

4 區域內熱門商品項目實戰

  • 版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何技術交流,可隨時聯繫。

4.1 區域模型

  • 區域內熱門商品分析流程

4.2 建立Spark客戶端

// 任務的執行ID,用戶惟一標示運行後的結果,用在MySQL數據庫中
val taskUUID = UUID.randomUUID().toString

// 構建Spark上下文
val sparkConf = new SparkConf().setAppName("SessionAnalyzer").setMaster("local[*]")

// 建立Spark客戶端
val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
val sc = spark.sparkContext
複製代碼

4.3 註冊UDF函數

// 註冊自定義函數
    spark.udf.register("concat_long_string", (v1: Long, v2: String, split: String) => v1.toString + split + v2)
    
    spark.udf.register("get_json_object", (json: String, field: String) => {
      val jsonObject = JSONObject.fromObject(json);
      jsonObject.getString(field)
    })
    spark.udf.register("group_concat_distinct", new GroupConcatDistinctUDAF())

    // 獲取任務日期參數
    val startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE)
    val endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE)
複製代碼

4.4 查詢用戶指定日期範圍內的城市粒度點擊行爲數據

val cityid2clickActionRDD = getcityid2ClickActionRDDByDate(spark, startDate, endDate)

def getcityid2ClickActionRDDByDate(spark: SparkSession, startDate: String, endDate: String): RDD[(Long, Row)] = {
    // 從user_visit_action中,查詢用戶訪問行爲數據
    // 第一個限定:click_product_id,限定爲不爲空的訪問行爲,那麼就表明着點擊行爲
    // 第二個限定:在用戶指定的日期範圍內的數據

    val sql =
      "SELECT " +
          "city_id," +
          "click_product_id " +
        "FROM user_visit_action " +
        "WHERE click_product_id IS NOT NULL and click_product_id != -1L " +
          "AND date>='" + startDate + "' " +
          "AND date<='" + endDate + "'"

    val clickActionDF = spark.sql(sql)

    //(cityid, row)
    clickActionDF.rdd.map(item => (item.getAs[Long]("city_id"), item))
  }
複製代碼

4.5 查詢城市信息

def getcityid2CityInfoRDD(spark: SparkSession): RDD[(Long, Row)] = {

    val cityInfo = Array((0L, "北京", "華北"), (1L, "上海", "華東"), (2L, "南京", "華東"), (3L, "廣州", "華南"), (4L, "三亞", "華南"), (5L, "武漢", "華中"), (6L, "長沙", "華中"), (7L, "西安", "西北"), (8L, "成都", "西南"), (9L, "哈爾濱", "東北"))
    import spark.implicits._
    val cityInfoDF = spark.sparkContext.makeRDD(cityInfo).toDF("city_id", "city_name", "area")
    cityInfoDF.rdd.map(item => (item.getAs[Long]("city_id"), item))
  }

// 使用(city_id , 城市信息)
val cityid2cityInfoRDD = getcityid2CityInfoRDD(spark)
複製代碼

4.6 生成點擊商品基礎信息臨時表

// 將點擊行爲cityid2clickActionRDD和城市信息cityid2cityInfoRDD進行Join關聯
// tmp_click_product_basic
generateTempClickProductBasicTable(spark, cityid2clickActionRDD, cityid2cityInfoRDD)

def generateTempClickProductBasicTable(spark: SparkSession, cityid2clickActionRDD: RDD[(Long, Row)], cityid2cityInfoRDD: RDD[(Long, Row)]) {
    // 執行join操做,進行點擊行爲數據和城市數據的關聯
    val joinedRDD = cityid2clickActionRDD.join(cityid2cityInfoRDD)

    // 將上面的JavaPairRDD,轉換成一個JavaRDD<Row>(才能將RDD轉換爲DataFrame)
    val mappedRDD = joinedRDD.map { case (cityid, (action, cityinfo)) =>
      val productid = action.getLong(1)
      //action.getAs[String]("aera")
      val cityName = cityinfo.getString(1)
      val area = cityinfo.getString(2)
      (cityid, cityName, area, productid)
    }
    // 1 北京
    // 2 上海
    // 1 北京
    // group by area,product_id
    // 1:北京,2:上海

    // 兩個函數
    // UDF:concat2(),將兩個字段拼接起來,用指定的分隔符
    // UDAF:group_concat_distinct(),將一個分組中的多個字段值,用逗號拼接起來,同時進行去重
    import spark.implicits._
    val df = mappedRDD.toDF("city_id", "city_name", "area", "product_id")
    // 爲df建立臨時表
    df.createOrReplaceTempView("tmp_click_product_basic")
複製代碼

4.7 生成各區域各商品點擊次數的臨時表

generateTempAreaPrdocutClickCountTable(spark)
    
    def generateTempAreaPrdocutClickCountTable(spark: SparkSession) {

        // 按照area和product_id兩個字段進行分組
        // 計算出各區域各商品的點擊次數
        // 能夠獲取到每一個area下的每一個product_id的城市信息拼接起來的串
        val sql = "SELECT " +
            "area," +
            "product_id," +
            "count(*) click_count, " +
            "group_concat_distinct(concat_long_string(city_id,city_name,':')) city_infos " +
          "FROM tmp_click_product_basic " +
          "GROUP BY area,product_id "
    
        val df = spark.sql(sql)
    
        // 各區域各商品的點擊次數(以及額外的城市列表),再次將查詢出來的數據註冊爲一個臨時表
        df.createOrReplaceTempView("tmp_area_product_click_count")
  }
複製代碼

4.8 生成包含完整商品信息的各區域各商品點擊次數的臨時表

generateTempAreaFullProductClickCountTable(spark)
關聯tmp_area_product_click_count表與product_info表,在tmp_area_product_click_count基礎上引入商品的詳細信息

def generateTempAreaFullProductClickCountTable(spark: SparkSession) {

// 將以前獲得的各區域各商品點擊次數表,product_id
// 去關聯商品信息表,product_id,product_name和product_status
// product_status要特殊處理,0,1,分別表明了自營和第三方的商品,放在了一個json串裏面
// get_json_object()函數,能夠從json串中獲取指定的字段的值
// if()函數,判斷,若是product_status是0,那麼就是自營商品;若是是1,那麼就是第三方商品
// area, product_id, click_count, city_infos, product_name, product_status

// 你拿到到了某個區域top3熱門的商品,那麼其實這個商品是自營的,仍是第三方的
// 實際上是很重要的一件事

// 技術點:內置if函數的使用

val sql = "SELECT " +
    "tapcc.area," +
    "tapcc.product_id," +
    "tapcc.click_count," +
    "tapcc.city_infos," +
    "pi.product_name," +
    "if(get_json_object(pi.extend_info,'product_status')='0','Self','Third Party') product_status " +
  "FROM tmp_area_product_click_count tapcc " +
    "JOIN product_info pi ON tapcc.product_id=pi.product_id "

val df = spark.sql(sql)

df.createOrReplaceTempView("tmp_area_fullprod_click_count")
複製代碼

}

4.9 使用開窗函數獲取各個區域內點擊次數排名前3的熱門商品

val areaTop3ProductRDD = getAreaTop3ProductRDD(taskUUID, spark)
    def getAreaTop3ProductRDD(taskid: String, spark: SparkSession): DataFrame = {

    // 華北、華東、華南、華中、西北、西南、東北
    // A級:華北、華東
    // B級:華南、華中
    // C級:西北、西南
    // D級:東北

    // case when
    // 根據多個條件,不一樣的條件對應不一樣的值
    // case when then ... when then ... else ... end

    val sql = "SELECT " +
        "area," +
        "CASE " +
          "WHEN area='China North' OR area='China East' THEN 'A Level' " +
          "WHEN area='China South' OR area='China Middle' THEN 'B Level' " +
          "WHEN area='West North' OR area='West South' THEN 'C Level' " +
          "ELSE 'D Level' " +
        "END area_level," +
        "product_id," +
        "city_infos," +
        "click_count," +
        "product_name," +
        "product_status " +
      "FROM (" +
        "SELECT " +
          "area," +
          "product_id," +
          "click_count," +
          "city_infos," +
          "product_name," +
          "product_status," +
          "row_number() OVER (PARTITION BY area ORDER BY click_count DESC) rank " +
        "FROM tmp_area_fullprod_click_count " +
        ") t " +
      "WHERE rank<=3"

    spark.sql(sql)
  }
複製代碼

4.10 保存到數據庫

import spark.implicits._
val areaTop3ProductDF = areaTop3ProductRDD.rdd.map(row =>
  AreaTop3Product(taskUUID, row.getAs[String]("area"), row.getAs[String]("area_level"), row.getAs[Long]("product_id"), row.getAs[String]("city_infos"), row.getAs[Long]("click_count"), row.getAs[String]("product_name"), row.getAs[String]("product_status"))
).toDS

areaTop3ProductDF.write
  .format("jdbc")
  .option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))
  .option("dbtable", "area_top3_product")
  .option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))
  .option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))
  .mode(SaveMode.Append)
  .save()
複製代碼

5 總結

溫故而知新,本文爲了綜合複習,進行代碼總結,內容粗鄙,勿怪

版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何技術交流,可隨時聯繫。

秦凱新 於深圳

相關文章
相關標籤/搜索