Spark+Hbase 億級流量分析實戰( PV/UV )

做爲一個百億級的流量實時分析統計系統怎麼能沒有 PV / UV 這兩經典的超級瑪麗亞指標呢,話說五百年前它倆但是鼻祖,咳咳...,很差意思沒忍住,迴歸正文,大豬 在上一篇已經介紹了 小巧高性能ETL程序設計與實現 了,到如今,咱們的數據已經落地到 Hbase 上了,並且日誌的時間也已經寫到 Mysql 了,萬事都已經具有了,接下來咱們就要擼指標了,先從兩個經典的指標開始擼。mysql

程序流程

咱們先理一下整個程序的計算流程,請看大圖: git

  1. 開始計算是咱們的 Driver 程序入口github

  2. 開始計算以前檢查監聽 Redis 有沒有收到程序退出通知,若是有程序結束,不然往下執行redis

  3. 首先去查詢咱們上篇文章的 ETL loghub 日誌的進度的平均時間點算法

  4. Switch 處是判斷 loghub 的時間距離咱們上次計算的指標時間是否相差足夠時間,通常定義爲3分鐘時間以後,由於 loghub 的時間會有少許的波動狀況sql

  5. 不知足則 Sleep 30秒,能夠本身控制Sleep範圍。apache

  6. 知足則計算 上次指標計算結束時間 ~ (loghub時間 - 3分鐘日誌波動)bash

  7. 計算完成更新指標結果而且更新指標計算時間,而後回到第 2 點。多線程

程序實現

先從 DriverMain 入口開始擼起app

//監聽redis退出消息
while (appRunning) {
      val dbClient = new DBJdbc(props.getProperty("jdbcUrl"))
      //日誌offset
      val loghubTime = dbClient.query("loghub").toLocalDateTime.minusMinutes(3)
      //指標計算offset
      val indicatorTime =dbClient.query("indicator").toLocalDateTime
      //兩個時間相差(分)
      val betweenTimeMinutes = Duration.between(indicatorTime, loghubTime).toMinutes

      val format = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")
      //相差足夠時間則進行指標運行,不然睡眠
      if (betweenTimeMinutes >= 1) {
        app.run(spark, indicatorTime, loghubTime)
        //計算完成更新指標時間
        dbClient.upsert(Map("offsetName" -> "indicator"), Update(sets = Map("time" -> loghubTime.toString)), "offset")
      } else {
        //讓咱們的老大哥睡會,別太累了
        TimeUnit.SECONDS.sleep(30)
      }
    }
複製代碼

從註釋上看,總體思路仍是比較清晰的。

接下來咱們跟着往下看run裏面的方法作了什麼有意思的操做

conf.set(TableInputFormat.INPUT_TABLE, Tables.LOG_TABLE)
conf.set("TableInputFormat.SCAN_ROW_START", start)
conf.set("TableInputFormat.SCAN_ROW_START", end)
val logDS = sc.newAPIHadoopRDD(
      conf,
      classOf[TableInputFormat2],
      classOf[ImmutableBytesWritable],
      classOf[Result]
    )
      .map(tp2 => HbaseUtil.resultToMap(tp2._2))
      .map(map => {
        LogCase(
          //子case類,存放多種格式的時間
          dt = DT(
            map.get("time").toLocalDateTimeStr(),
            map.get("time").toLocalDate().toString
          ),
          `type` = map.get("type"),
          aid = map.get("aid"),
          uid = map.get("uid"),
          tid = map.get("tid"),
          ip = map.get("ip")
        )
      }).toDS()

    logDS.cache()
    logDS.createTempView("log")
    //各種指標
    new PV().run()
    new UV().run()
複製代碼

startend 就是上面傳下來須要查詢的日誌時間範圍

簡要說明:就是把Hbase的時間範圍數據轉成SparkSQL中的一張log

UVPV 指標計算裏面就可使用這張 log 表了

咱們看看這兩個經典的指標裏面到底有什麼乾坤:

spark.sql(
      """ |SELECT | aid, | dt.date, | COUNT(1) as pv |FROM | log |GROUP BY | aid, | dt.date """.stripMargin)
      .rdd
      .foreachPartition(rows => {
        val props = PropsUtils.properties("db")
        val dbClient = new DBJdbc(props.getProperty("jdbcUrl"))
        rows.foreach(row => {
          dbClient.upsert(
            Map(
              "time" -> row.getAs[String]("date"),
              "aid" -> row.getAs[String]("aid")
            ),
            Update(incs = Map("pv" -> row.getAs[Long]("pv").toString)),
            "common_report"
          )
        })
        dbClient.close()
      })
複製代碼

哇然一看,大哥你這也寫得太簡單了吧

不就是一個普通的 PV 算法,再加上分區foreachPartition操做把更到的每一行聚合的結果數據upsert到咱們的common_report指標表

group by後面跟上要聚合的維度,以上是想統計每篇文章天天的PV

從這個方法咱們就能推算出common_report長什麼樣了,至少有time+aid這兩個惟一索引字段,還有pv這個字段,默認值確定是 0

百聞不如一見,看看錶的 DDL 是否是這樣子:

create table common_report
(
	id bigint auto_increment primary key,
	aid bigint not null,
	pv int default 0 null,
	uv int default 0 null,
	time date not null,
	constraint common_report_aid_time_uindex unique (aid, time)
);
複製代碼

果真一點都沒錯。

再看 dbClient.upsert 裏面大概也能猜到是實現了mysql的upsert功能,大概的sql就會生成下面格式:

INSERT INTO common_report (time, aid, pv)
VALUES ('2019-03-26', '10000', 1) ON DUPLICATE KEY UPDATE pv = pv + 1;
複製代碼

大豬 那 UV 是怎麼實現咧?一個用戶在今天來過第一次以後再來就不能重複計算了噢。

大豬答:這個簡單簡單,可使用Redis去重嘛,可是咱們使用的都是Hbase了,還使用它作啥子咧,具體咱們看一下 UV 裏面究竟是如何實現的:

val logDS = spark.table("log").as(ExpressionEncoder[LogCase])
    import spark.implicits._
    logDS
      .mapPartitions(partitionT => {
        val hbaseClient = DBHbaseHelper.getDBHbase(Tables.CACHE_TABLE)
        val md5 = (log: LogCase) => MD5Hash.getMD5AsHex(s"${log.dt.date}|${log.aid}|${log.uid}|uv".getBytes)
        partitionT
          .grouped(Consts.BATCH_MAPPARTITIONS)
          .flatMap { tList =>
            tList
              .zip(hbaseClient.incrments(tList.map(md5)))
              .map(tp2 => {
                val log = tp2._1
                log.copy(ext = EXT(tp2._2))
              })
          }
      }).createTempView("uvTable")

    spark.sql(
      """ |SELECT | aid, | dt.date, | COUNT(1) as uv |FROM | uvTable |WHERE | ext.render = 1 |GROUP BY | aid, | dt.date """.stripMargin)
      .rdd
      .foreachPartition(rows => {
        val props = PropsUtils.properties("db")
        val dbClient = new DBJdbc(props.getProperty("jdbcUrl"))
        rows.foreach(row => {
          dbClient.upsert(
            Map(
              "time" -> row.getAs[String]("date"),
              "aid" -> row.getAs[String]("aid")
            ),
            Update(incs = Map("uv" -> row.getAs[Long]("uv").toString)),
            "common_report"
          )
        })
        dbClient.close()
      })
複製代碼

spark.sql 這裏跟PV同樣嘛,就是多了一句條件ext.render = 1,可是上面那一大堆是啥子咧?

大豬 CACHE_TABLE 是什麼來的,是Hbase一張中間表,用戶存用戶UV標記的,建表語句以下,由於維度都是按天,因此咱們TTL設計3天就能夠了,兩天也能夠。

create 'CACHE_FOR_TEST',{NAME => 'info',TTL => '3 DAYS',CONFIGURATION => {'SPLIT_POLICY' => 'org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy','KeyPrefixRegionSplitPolicy.prefix_length'=>'2'},COMPRESSION=>'SNAPPY'},SPLITS => ['20', '40', '60', '80', 'a0', 'c0', 'e0']
複製代碼

那還有其它的呢?

莫慌莫慌,大豬 這就慢慢解釋道:

val logDS = spark.table("log").as(ExpressionEncoder[LogCase])
複製代碼

上面這句的意思就是就是把log表給取出來,固然也能夠經過參數傳遞。

下面的mapPartitions挺有意思的:

partitionT
    .grouped(1000)
        .flatMap { tList =>
          tList
            .zip(hbaseClient.incrments(tList.map(md5)))
            .map(tp2 => {
              val log = tp2._1
              log.copy(ext = EXT(tp2._2))
            })
        }
複製代碼

實際 上面是處理每一個分區的數據,也就是轉換數據,咱們每來一條數據就要去Hbase那incrment一次,返回來的結果就是 render ,用戶今天來多少次就incrment 相應的次數。

那有 什麼用?我直接從Hbase GET取出數據,再判斷有沒有,若是沒有這個用戶就是今天第一次來,再把這個用戶PUT進Hbase打一個標記,so easy。

其實 當初咱們也是這麼作的,後面發現業務的東西仍是放在SQL裏面一塊兒寫比較好,容易維護,並且incrment好處多多,由於它是帶事務的,能夠多線程進行修改。

並且 大家也發現了GETPUT是兩次請求操做,保證不了事務的,指標幾千萬的數據少了那麼幾條,大家都不知道我當初找它們有辛苦。

大家 有沒有發現render = 1的時候是表明UV(恰好等於1的時候爲何是UV?這裏你們要慢慢地品嚐一下了,其實就是實現了GETPUT操做),若是render = 2的時候又能夠表明今天來過兩次以上的用戶指標,隨時擴展,就問你擼這樣的代碼結構爽不爽?

看看 incrments 方法實現了啥子

def incrments(incs: Seq[String], family: String = "info", amount: Int = 1): Seq[Long] = {
    if (incs.isEmpty) {
      Seq[Long]()
    } else {
      require(incs.head.length == 32, "pk require 32 length")
      val convertIncs = incs map { pk => new Increment(Bytes.toBytes(pk.take(8))).addColumn(Bytes.toBytes(family), Bytes.toBytes(pk.takeRight(24)), amount) }
      val results = new Array[Object](convertIncs.length)
      table.batch(convertIncs.asJava, results)
      results.array.indices.map(
        ind =>
          Bytes.toLong(
            results(ind)
              .asInstanceOf[Result]
              .getValue(
                Bytes.toBytes(family),
                Bytes.toBytes(incs(ind).takeRight(24))
              )
          )
      )
    }
  }
複製代碼

這個方法就是實現了 incrment 的批量處理,由於咱們在線上生產環境的時候測試過,批量處理比單條處理性能高了上百倍,因此這也就是爲何要寫在mapPartitions裏面的緣由了,由於只有在這個方法裏面纔有批量數據轉換操做,foreachPartition是批量處理操做,foreach,與map是一條一條操做不能使用,咱們在輸出報表到Mysql的地方已經用到過了。

大豬 不知不覺已經寫了那麼長的文章了

關閉計算程序只須要給redis發一條stop消息就能夠啦

RedisUtil().getResource.publish("computeListenerMessage", "stop")
複製代碼

不能再複製代碼了,不能顯得文章是靠代碼撐起來的。

福利 完整項目源碼


相關文章
相關標籤/搜索