Spark綜合使用及用戶行爲案例廣告點擊量實時統計分析實戰-Spark商業應用實戰

版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何技術交流,可隨時聯繫。java

1 廣告點擊數據模型

1.1 數據格式

timestamp 	  province 	  city        userid         adid
複製代碼

1.2 生成動態黑名單

數據格式
    (timestamp province city userid adid)
    
    統計單用戶的統計次數
    (date, userid,adid,clickCount)
    
    閾值統計統計黑名單
複製代碼

1.3 計算廣告點擊流量實時統計結果

輸入數據格式
    (userid, timestamp province city userid adid)
    
    計算後數據格式並持久化
    (date,province,city,adid,clickCount)
複製代碼

1.4 實時統計天天每一個省份top3熱門廣告

輸入數據格式
   (yyyyMMdd_province_city_adid,clickCount)
    
    計算後數據格式並持久化
    (date,province, adid,clickCount)
    
    註冊成表ROW_NUMBER()實現窗聚合
    tmp_daily_ad_click_count_by_prov
複製代碼

1.5 實時統計天天每一個廣告在最近1小時的滑動窗口內的點擊趨勢(每分鐘的點擊量)

輸入數據格式
    (timestamp province city userid adid)
    
    計算後數據格式並持久化
    (date,hour,minute,adid,clickCount)
複製代碼

2 具體技術實現

2.1 SparkStreaming 與Kafka對接

  • 構建Spark上下文mysql

    val sparkConf = new SparkConf().setAppName("streamingRecommendingSystem").setMaster("local[*]")
    
      // 建立Spark客戶端
      val spark = SparkSession.builder().config(sparkConf).getOrCreate()
      val sc = spark.sparkContext
      val ssc = new StreamingContext(sc, Seconds(5))
      
      // 獲取Kafka配置
      val broker_list = ConfigurationManager.config.getString("kafka.broker.list")
      val topics = ConfigurationManager.config.getString("kafka.topics")
    複製代碼
  • kafka消費者配置算法

    val kafkaParam = Map(
       "bootstrap.servers" -> broker_list,//用於初始化連接到集羣的地址
       "key.deserializer" -> classOf[StringDeserializer],
       "value.deserializer" -> classOf[StringDeserializer],
       //用於標識這個消費者屬於哪一個消費團體
       "group.id" -> "commerce-consumer-group",
       //若是沒有初始化偏移量或者當前的偏移量不存在任何服務器上,可使用這個配置屬性
       //可使用這個配置,latest自動重置偏移量爲最新的偏移量
       //earilist:提交過度區,從Offset處讀取,若是沒有提交過offset,從頭讀取
       //latest:提交過度區,從Offset處讀取,沒有從最新的數據開始讀取
       //None:若是沒有提交offset,就會報錯,提交過offset,就從offset處讀取
       "auto.offset.reset" -> "latest",
       //若是是true,則這個消費者的偏移量會在後臺自動提交
       "enable.auto.commit" -> (false: java.lang.Boolean)
     )
    複製代碼
  • 設置檢查點目錄sql

    ssc.checkpoint("./streaming_checkpoint")
    複製代碼
  • LocationStrategies 分配分區策略數據庫

    // 建立DStream,返回接收到的輸入數據
      // LocationStrategies:根據給定的主題和集羣地址建立consumer
      // LocationStrategies.PreferConsistent:持續的在全部Executor之間勻分配分區 (均勻分配,選中的每個Executor都會分配 partition)
      // LocationStrategies.PreferBrokers: 若是executor和kafka brokers 在同一臺機器上,選擇該executor。
      // LocationStrategies.PreferFixed: 若是機器不是均勻的狀況下,能夠指定特殊的hosts。固然若是不指定,採用 LocationStrategies.PreferConsistent模式
    複製代碼
  • ConsumerStrategies 消費策略bootstrap

    // ConsumerStrategies:選擇如何在Driver和Executor上建立和配置Kafka Consumer
      // ConsumerStrategies.Subscribe:訂閱一系列主題
      val adRealTimeLogDStream=KafkaUtils.createDirectStream[String,String](ssc,
              LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(topics),kafkaParam))
    複製代碼
  • SparkStreaming開始消費安全

    var adRealTimeValueDStream = adRealTimeLogDStream.map(consumerRecordRDD => consumerRecordRDD.value())
    複製代碼

2.2 SparkStreaming 開始根據黑名單進行過濾

  • 算法過程以下 (timestamp province city userid adid) -> (userid, timestamp province city userid adid)服務器

  • 根據userId進行過濾app

    用於Kafka Stream的線程非安全問題,從新分區切斷血統
      adRealTimeValueDStream = adRealTimeValueDStream.repartition(400)
     
      val filteredAdRealTimeLogDStream = filterByBlacklist(spark,adRealTimeValueDStream)
      
      def filterByBlacklist(spark: SparkSession, adRealTimeValueDStream:DStream[String]):DStream[(Long, String)] = {
         
          // 剛剛接受到原始的用戶點擊行爲日誌以後
          // 根據mysql中的動態黑名單,進行實時的黑名單過濾(黑名單用戶的點擊行爲,直接過濾掉,不要了)
          // 使用transform算子(將dstream中的每一個batch RDD進行處理,轉換爲任意的其餘RDD,功能很強大)
      
          val filteredAdRealTimeLogDStream = adRealTimeValueDStream.transform{ consumerRecordRDD =>
      
            //格式 :timestamp province city userid adid
            //某個時間點 某個省份 某個城市 某個用戶 某個廣告
      
            // 首先,從mysql中查詢全部黑名單用戶,將其轉換爲一個rdd
            val adBlacklists = AdBlacklistDAO.findAll()
            // (userid, timestamp province city userid adid)
            val blacklistRDD = spark.sparkContext.makeRDD(adBlacklists.map(item => (item.userid, true)))
      
      
            //格式 :timestamp province city userid adid
            val mappedRDD = consumerRecordRDD.map(consumerRecord => {
              val userid = consumerRecord.split(" ")(3).toLong
              (userid,consumerRecord)
            })
      
            // 將原始日誌數據rdd,與黑名單rdd,進行左外鏈接
            // 若是說原始日誌的userid,沒有在對應的黑名單中,join不到,左外鏈接
            // 用inner join,內鏈接,會致使數據丟失
            val joinedRDD = mappedRDD.leftOuterJoin(blacklistRDD)
      
            val filteredRDD = joinedRDD.filter{ case (userid,(log, black)) =>
              // 若是這個值存在,那麼說明原始日誌中的userid,join到了某個黑名單用戶
              if(black.isDefined && black.get) false else true
            }
      
            filteredRDD.map{ case (userid,(log, black)) => (userid, log)}
          }
      
          filteredAdRealTimeLogDStream
    }
    複製代碼

2.3 SparkStreaming 生成動態黑名單

  • 轉化爲用戶粒度進行過濾,拋棄 province city ,格式爲:(date, userid,adid,clickCount)函數

  • 過濾次數大於閾值的userId,持久化到磁盤。

    generateDynamicBlacklist(filteredAdRealTimeLogDStream)
      
      def generateDynamicBlacklist(filteredAdRealTimeLogDStream: DStream[(Long, String)]) {
          
          // (timestamp province city userid adid)
          // 計算出每5個秒內的數據中,天天每一個用戶每一個廣告的點擊量
          // 經過對原始實時日誌的處理
          // 將日誌的格式處理成<yyyyMMdd_userid_adid, 1L>格式
          val dailyUserAdClickDStream = filteredAdRealTimeLogDStream.map{ case (userid,log) =>
      
            // 從tuple中獲取到每一條原始的實時日誌
            val logSplited = log.split(" ")
      
            // 提取出日期(yyyyMMdd)、userid、adid
            val timestamp = logSplited(0)
            val date = new Date(timestamp.toLong)
            val datekey = DateUtils.formatDateKey(date)
      
            val userid = logSplited(3).toLong
            val adid = logSplited(4)
      
            // 拼接key
            val key = datekey + "_" + userid + "_" + adid
            (key, 1L)
          }
      
          // 針對處理後的日誌格式,執行reduceByKey算子便可,(每一個batch中)天天每一個用戶對每一個廣告的點擊量
          val dailyUserAdClickCountDStream = dailyUserAdClickDStream.reduceByKey(_ + _)
      
          // 源源不斷的,每一個5s的batch中,當天每一個用戶對每支廣告的點擊次數
          // <yyyyMMdd_userid_adid, clickCount>
          dailyUserAdClickCountDStream.foreachRDD{ rdd =>
            rdd.foreachPartition{ items =>
              // 對每一個分區的數據就去獲取一次鏈接對象
              // 每次都是從鏈接池中獲取,而不是每次都建立
              // 寫數據庫操做,性能已經提到最高了
      
              val adUserClickCounts = ArrayBuffer[AdUserClickCount]()
              for(item <- items){
                val keySplited = item._1.split("_")
                val date = DateUtils.formatDate(DateUtils.parseDateKey(keySplited(0)))
                // yyyy-MM-dd
                val userid = keySplited(1).toLong
                val adid = keySplited(2).toLong
                val clickCount = item._2
      
                //批量插入
                adUserClickCounts += AdUserClickCount(date, userid,adid,clickCount)
              }
              AdUserClickCountDAO.updateBatch(adUserClickCounts.toArray)
            }
          }
      
          // 如今咱們在mysql裏面,已經有了累計的天天各用戶對各廣告的點擊量
          // 遍歷每一個batch中的全部記錄,對每條記錄都要去查詢一下,這一天這個用戶對這個廣告的累計點擊量是多少
          // 從mysql中查詢
          // 查詢出來的結果,若是是100,若是你發現某個用戶某天對某個廣告的點擊量已經大於等於100了
          // 那麼就斷定這個用戶就是黑名單用戶,就寫入mysql的表中,持久化
          val blacklistDStream = dailyUserAdClickCountDStream.filter{ case (key, count) =>
            val keySplited = key.split("_")
      
            // yyyyMMdd -> yyyy-MM-dd
            val date = DateUtils.formatDate(DateUtils.parseDateKey(keySplited(0)))
            val userid = keySplited(1).toLong
            val adid = keySplited(2).toLong
      
            // 從mysql中查詢指定日期指定用戶對指定廣告的點擊量
            val clickCount = AdUserClickCountDAO.findClickCountByMultiKey(date, userid, adid)
      
            // 判斷,若是點擊量大於等於100,ok,那麼很差意思,你就是黑名單用戶
            // 那麼就拉入黑名單,返回true
            if(clickCount >= 100) {
              true
            }else{
              // 反之,若是點擊量小於100的,那麼就暫時不要管它了
              false
            }
      } 
    複製代碼

2.4 計算廣告點擊流量實時統計結果(yyyyMMdd_province_city_adid,clickCount)

  • 轉化爲省城市粒度進行過濾,拋棄userId,格式爲:(yyyyMMdd_province_city_adid,clickCount)

    val adRealTimeStatDStream = calculateRealTimeStat(filteredAdRealTimeLogDStream)
      
      def calculateRealTimeStat(filteredAdRealTimeLogDStream:DStream[(Long, String)]):DStream[(String, Long)] = {
      
          // 計算天天各省各城市各廣告的點擊量
          // 設計出來幾個維度:日期、省份、城市、廣告
          // 2015-12-01,當天,能夠看到當天全部的實時數據(動態改變),好比江蘇省南京市
          // 廣告能夠進行選擇(廣告主、廣告名稱、廣告類型來篩選一個出來)
          // 拿着date、province、city、adid,去mysql中查詢最新的數據
          // 等等,基於這幾個維度,以及這份動態改變的數據,是能夠實現比較靈活的廣告點擊流量查看的功能的
      
          // date province city userid adid
          // date_province_city_adid,做爲key;1做爲value
          // 經過spark,直接統計出來全局的點擊次數,在spark集羣中保留一份;在mysql中,也保留一份
          // 咱們要對原始數據進行map,映射成<date_province_city_adid,1>格式
          // 而後呢,對上述格式的數據,執行updateStateByKey算子
          // spark streaming特有的一種算子,在spark集羣內存中,維護一份key的全局狀態
          //(userid, timestamp province city userid adid)
          val mappedDStream = filteredAdRealTimeLogDStream.map{ case (userid, log) =>
            val logSplited = log.split(" ")
      
            val timestamp = logSplited(0)
            val date = new Date(timestamp.toLong)
            val datekey = DateUtils.formatDateKey(date)
      
            val province = logSplited(1)
            val city = logSplited(2)
            val adid = logSplited(4).toLong
      
            val key = datekey + "_" + province + "_" + city + "_" + adid
      
            (key, 1L)
          }
      
          // 在這個dstream中,就至關於,有每一個batch rdd累加的各個key(各天各省份各城市各廣告的點擊次數)
          // 每次計算出最新的值,就在aggregatedDStream中的每一個batch rdd中反應出來
          val aggregatedDStream = mappedDStream.updateStateByKey[Long]{ (values:Seq[Long], old:Option[Long]) =>
            // 舉例來講
            // 對於每一個key,都會調用一次這個方法
            // 好比key是<20151201_Jiangsu_Nanjing_10001,1>,就會來調用一次這個方法7
            // 10個
      
            // values,(1,1,1,1,1,1,1,1,1,1)
      
            // 首先根據optional判斷,以前這個key,是否有對應的狀態
            var clickCount = 0L
      
            // 若是說,以前是存在這個狀態的,那麼就以以前的狀態做爲起點,進行值的累加
            if(old.isDefined) {
              clickCount = old.get
            }
      
            // values,表明了,batch rdd中,每一個key對應的全部的值
            for(value <- values) {
              clickCount += value
            }
      
            Some(clickCount)
          }
      
          // 將計算出來的最新結果,同步一份到mysql中,以便於j2ee系統使用
          aggregatedDStream.foreachRDD{ rdd =>
      
            rdd.foreachPartition{ items =>
      
              //批量保存到數據庫
              val adStats = ArrayBuffer[AdStat]()
      
              for(item <- items){
                val keySplited = item._1.split("_")
                val date = keySplited(0)
                val province = keySplited(1)
                val city = keySplited(2)
                val adid = keySplited(3).toLong
      
                val clickCount = item._2
                adStats += AdStat(date,province,city,adid,clickCount)
              }
              AdStatDAO.updateBatch(adStats.toArray)
      
            }
      
          }
      
          aggregatedDStream
    }
    複製代碼

2.5 實時統計天天每一個省份top3熱門廣告

  • 轉化爲省粒度進行過濾,拋棄userId ,cityid,格式爲:(yyyyMMdd_province_adid,clickCount)

  • 註冊成表,基於ROW_NUMBER()實現窗聚合,按照province分區,實現top3排序,

    tmp_daily_ad_click_count_by_prov
    
     calculateProvinceTop3Ad(spark,adRealTimeStatDStream)
     
     def calculateProvinceTop3Ad(spark:SparkSession, adRealTimeStatDStream:DStream[(String, Long)]) {
    
     // 每個batch rdd,都表明了最新的全量的天天各省份各城市各廣告的點擊量
     //(yyyyMMdd_province_city_adid,clickCount)
     val rowsDStream = adRealTimeStatDStream.transform{ rdd =>
    
       // <yyyyMMdd_province_city_adid, clickCount>
       // <yyyyMMdd_province_adid, clickCount>
    
       // 計算出天天各省份各廣告的點擊量
       val mappedRDD = rdd.map{ case (keyString, count) =>
    
         val keySplited = keyString.split("_")
         val date = keySplited(0)
         val province = keySplited(1)
         val adid = keySplited(3).toLong
         val clickCount = count
    
         val key = date + "_" + province + "_" + adid
         (key, clickCount)
       }
    
       val dailyAdClickCountByProvinceRDD = mappedRDD.reduceByKey( _ + _ )
    
       // 將dailyAdClickCountByProvinceRDD轉換爲DataFrame
       // 註冊爲一張臨時表
       // 使用Spark SQL,經過開窗函數,獲取到各省份的top3熱門廣告
       val rowsRDD = dailyAdClickCountByProvinceRDD.map{ case (keyString, count) =>
    
         val keySplited = keyString.split("_")
         val datekey = keySplited(0)
         val province = keySplited(1)
         val adid = keySplited(2).toLong
         val clickCount = count
    
         val date = DateUtils.formatDate(DateUtils.parseDateKey(datekey))
    
         (date, province, adid, clickCount)
    
       }
    
       import spark.implicits._
       val dailyAdClickCountByProvinceDF = rowsRDD.toDF("date","province","ad_id","click_count")
    
       // 將dailyAdClickCountByProvinceDF,註冊成一張臨時表
       dailyAdClickCountByProvinceDF.createOrReplaceTempView("tmp_daily_ad_click_count_by_prov")
    
       // 使用Spark SQL執行SQL語句,配合開窗函數,統計出各身份top3熱門的廣告
       val provinceTop3AdDF = spark.sql(
         "SELECT "
             + "date,"
             + "province,"
             + "ad_id,"
             + "click_count "
           + "FROM ( "
             + "SELECT "
               + "date,"
               + "province,"
               + "ad_id,"
               + "click_count,"
               + "ROW_NUMBER() OVER(PARTITION BY province ORDER BY click_count DESC) rank "
             + "FROM tmp_daily_ad_click_count_by_prov "
             + ") t "
           + "WHERE rank<=3"
       )
    
       provinceTop3AdDF.rdd
     }
    
     // 每次都是刷新出來各個省份最熱門的top3廣告,將其中的數據批量更新到MySQL中
     rowsDStream.foreachRDD{ rdd =>
       rdd.foreachPartition{ items =>
    
         // 插入數據庫
         val adProvinceTop3s = ArrayBuffer[AdProvinceTop3]()
    
         for (item <- items){
           val date = item.getString(0)
           val province = item.getString(1)
           val adid = item.getLong(2)
           val clickCount = item.getLong(3)
           adProvinceTop3s += AdProvinceTop3(date,province,adid,clickCount)
         }
         AdProvinceTop3DAO.updateBatch(adProvinceTop3s.toArray)
    
       }
     }
    複製代碼

    }

2.6 實時統計天天每一個廣告在最近1小時的滑動窗口內的點擊趨勢(每分鐘的點擊量)

  • 版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何技術交流,可隨時聯繫。

  • 轉化爲時間粒度進行過濾,拋棄province,userId ,cityid,格式爲: <yyyyMMddHHMM_adid,1L>,基於reduceByKeyAndWindow進行聚合

  • 最終結果展開 (date,hour,minute,adid,clickCount)

    calculateAdClickCountByWindow(adRealTimeValueDStream)
      def calculateAdClickCountByWindow(adRealTimeValueDStream:DStream[String]) {
    
      // 映射成<yyyyMMddHHMM_adid,1L>格式
      //(timestamp province city userid adid)
      val pairDStream = adRealTimeValueDStream.map{ case consumerRecord  =>
        val logSplited = consumerRecord.split(" ")
        val timeMinute = DateUtils.formatTimeMinute(new Date(logSplited(0).toLong))
        val adid = logSplited(4).toLong
    
        (timeMinute + "_" + adid, 1L)
      }
    
      // 計算窗口函數,1小時滑動窗口內的廣告點擊趨勢
      val aggrRDD = pairDStream.reduceByKeyAndWindow((a:Long,b:Long) => (a + b),Minutes(60L), Seconds(10L))
    
      // 最近1小時內,各分鐘的點擊量,並保存到數據庫
      aggrRDD.foreachRDD{ rdd =>
        rdd.foreachPartition{ items =>
          //保存到數據庫
          val adClickTrends = ArrayBuffer[AdClickTrend]()
          for (item <- items){
            val keySplited = item._1.split("_")
            // yyyyMMddHHmm
            val dateMinute = keySplited(0)
            val adid = keySplited(1).toLong
            val clickCount = item._2
    
            val date = DateUtils.formatDate(DateUtils.parseDateKey(dateMinute.substring(0, 8)))
            val hour = dateMinute.substring(8, 10)
            val minute = dateMinute.substring(10)
    
            adClickTrends += AdClickTrend(date,hour,minute,adid,clickCount)
          }
          AdClickTrendDAO.updateBatch(adClickTrends.toArray)
        }
      }
    }
    複製代碼

3 總結

溫故而知新,本文爲了綜合複習,進行代碼總結,內容粗鄙,勿怪

版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。QQ郵箱地址:1120746959@qq.com,若有任何技術交流,可隨時聯繫。

秦凱新 於深圳

來源:掘金

著做權歸做者全部。商業轉載請聯繫做者得到受權,非商業轉載請註明出處。

相關文章
相關標籤/搜索