SparkRDD算子案例:統計出每個省份每一個廣告被點擊數量排行的Top3

1、數據準備apache

agent.log:時間戳,省份,城市,用戶,廣告,中間字段使用空格分隔。ui

 

2、需求分析spa

方法一:scala

(1)用空格分割每一行的數據,須要的數據是省份id和廣告id3d

(2)將省份id和廣告id和次數1組成鍵值對,經過算子map組合成((省份id,廣告id),1)對象

(3)計算相同key的總和,使用算子reduceByKey將相同key的值聚合到一塊兒,在shuffle前有combine操做blog

(4)用map將((省份id,廣告id),sum)改成(省份id,(廣告id,sum))排序

(5)將同一個省份的全部廣告進行分組聚合(省份id,List((廣告id1,sum1),(廣告id2,sum2)…))get

(6)對同一個省份全部廣告的集合進行排序並取前3條string

方法二:

(1)用空格分割每一行的數據,須要的數據是省份id和廣告id

(2)將同一個省份的全部廣告進行分組聚合(省份id,List(廣告id1,廣告id2,…))

(3)將廣告id和次數1組成鍵值對,經過算子map組合成(廣告id,1),並根據廣告id進行分組聚合,再經過算子map轉換成List以後取出廣告id和List大小

(4)根據List大小進行降序排序,並取出前3條

 

3、代碼實現

方法一:

package com.require

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo1 {
  def main(args: Array[String]): Unit = {

    //一、實例化conf對象以及建立sc對象
    val conf = new SparkConf().setMaster("local").setAppName(Demo1.getClass.getSimpleName)
    val sc = new SparkContext(conf)

    //二、讀取文件
    val fileRDD: RDD[String] = sc.textFile("F:\\數據\\agent.log")

    //三、切分、拼1
    val toOneRDD: RDD[((String, String), Int)] = fileRDD.map { x =>
      val strings: Array[String] = x.split(" ")
      ((strings(1), strings(4)), 1)
    }

    //四、聚合((province,add),sum)
    val sumRDD: RDD[((String, String), Int)] = toOneRDD.reduceByKey(_ + _)

    //五、將省份做爲key,廣告加點擊數爲value:(province,(add,sum))
    val mapRDD: RDD[(String, (String, Int))] = sumRDD.map(x => (x._1._1, (x._1._2, x._2)))

    //六、將同一個省份的全部廣告進行分組聚合(province,List((add1,sum1),(add2,sum2)...))
    val groupRDD: RDD[(String, Iterable[(String, Int)])] = mapRDD.groupByKey()

    //七、對同一個省份全部廣告的集合進行排序並取前3條
    val sortRDD: RDD[(String, List[(String, Int)])] = groupRDD.mapValues { x =>
      x.toList.sortWith((x, y) => x._2 > y._2).take(3)
    }

    //八、將數據拉取到Driver端並打印
    sortRDD.collect().foreach(println)

    sc.stop()
  }
}

  

方法二:

package com.require

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo2 {
  def main(args: Array[String]): Unit = {

    //一、實例化conf對象以及建立sc對象
    val conf = new SparkConf().setMaster("local").setAppName(Demo2.getClass.getSimpleName)
    val sc = new SparkContext(conf)
    
    //二、讀取文件
    val fileRDD: RDD[String] = sc.textFile("F:\\數據\\agent.log")

    //三、切分
    val mapRDD: RDD[(String, String)] = fileRDD.map(x => {
      val strings: Array[String] = x.split(" ")
      (strings(1), strings(4))
    })

    //四、根據省份進行分組(province,List(add1,add2,...))
    val groupRDD: RDD[(String, Iterable[String])] = mapRDD.groupByKey()

    //五、處理List
    val result: RDD[(String, List[(String, Int)])] = groupRDD.map(x => {
      //將廣告拼1,並分組取出大小
      val stringToInt: Map[String, Int] = x._2.map((_, 1)).groupBy(_._1).map(y => {
        val size: Int = y._2.toList.size
        (y._1, size)
      })

      //根據廣告數量降序排序並取出前三
      val tuples: List[(String, Int)] = stringToInt.toList.sortBy(-_._2).take(3)
      (x._1, tuples)
    })

    //六、打印
    result.foreach(println)

    sc.stop()

  }
}

 

4、運行結果

相關文章
相關標籤/搜索