Spark+Hbase 億級流量分析實戰(小巧高性能的ETL)

在上一篇文章 大豬 已經介紹了日誌存儲設計方案 ,咱們數據已經落地到數據中心上了,那接下來如何ETL呢?畢竟但是生產環境級別的,可不能亂來。其實只要解決幾個問題便可,沒必要要引入很大級別的組件來作,固然了各有各的千秋,本文主要從 易懂小巧簡潔高性能 這三個方面去設計出發點,順便還實現了一個精巧的 Filebeatmysql

設計

loghub功能 要實現的功能就是掃描天天的增量日誌並寫入Hbase中 git

須要攻克以下幾個小難題 sql

  1. 須要把文件中的每一行數據都取出來
  2. 能處理超過10G以上的大日誌文件,而且只能佔用機器必定的內存,越小越好
  3. 從上圖能夠看到 標黃 的是已經寫入Hbase的數據,不能重複讀取
  4. 非活躍文件不能掃,由於文件過多會影響總體讀取IO性能
  5. 讀取中的過程要保證增量數據不能錄入,由於要保證offset的時候寫入mysql穩定不跳躍

實現

大豬 根據線上的生產環境一一把上面的功能從新分析給實現一下。json

從第一點看仍是比較簡單的嘛?可是咱們要結合上面的 5 個問題來看才行。bash

總結一句話就是:要實現一個高性能並且能隨時重啓繼續工做的 loghub ETL 程序工具

實際也必需這樣作,由於生產環境容不得馬虎,否則就等着被BOSS性能

實現過程

須要有一個讀取全部日誌文件方法 ui

還要實現一個保存並讀取文件進度的方法 spa

因爲不能把一個日誌文件所有讀入內存進行處理 因此還須要一個能根據索引一行一行接着讀取數據的方法 設計

最後剩下一個Hbase的鏈接池小工具

幾個核心方法已經寫完了,接着是咱們的主程序

def run(logPath: File, defaultOffsetDay: String): Unit = {
    val sdfstr = Source.fromFile(seekDayFile).getLines().mkString
    val offsetDay = Option(if (sdfstr == "") null else sdfstr)
    
    //讀取設置讀取日期的倒數一天以後的日期文件夾
    val noneOffsetFold = logPath
      .listFiles()
      .filter(_.getName >= LocalDate.parse(offsetDay.getOrElse(defaultOffsetDay)).minusDays(1).toString)
      .sortBy(f => LocalDate.parse(f.getName).toEpochDay)

    //讀取文件夾中的全部日誌文件,並取出索引進行匹配
    val filesPar = noneOffsetFold
      .flatMap(files(_, file => file.getName.endsWith(".log")))
      .map(file => (file, seeks().getOrDefault(MD5Hash.getMD5AsHex(file.getAbsolutePath.getBytes()), 0), file.length()))
      .filter(tp2 => {
        //過濾出新文件,與有增量的日誌文件
        val fileMd5 = MD5Hash.getMD5AsHex(tp2._1.getAbsolutePath.getBytes())
        val result = offsets.asScala.filter(m => fileMd5.equals(m._1))
        result.isEmpty || tp2._3 > result.head._2
      })
      .par

    filesPar.tasksupport = pool

    val willUpdateOffset = new util.HashMap[String, Long]()
    val formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")
    var logTime:String = null
    filesPar
      .foreach(tp3 => {
        val hbaseClient = HbasePool.getTable
        //由於不能全量讀取數據,全部只能一條一條讀取,批量提出交給HbaseClient的客戶端的mutate方式優雅處理
        //foreach 裏面的部分就是咱們的業務處理部分
        lines(tp3._1, tp3._2, tp3._3, () => {
          willUpdateOffset.put(tp3._1.getAbsolutePath, tp3._3)
          offsets.put(MD5Hash.getMD5AsHex(tp3._1.getAbsolutePath.getBytes), tp3._3)
        })
          .foreach(line => {
            val jsonObject = parse(line)
            val time = (jsonObject \ "time").extract[Long]
            val data = jsonObject \ "data"
            val dataMap = data.values.asInstanceOf[Map[String, Any]]
              .filter(_._2 != null)
              .map(x => x._1 -> x._2.toString)

            val uid = dataMap("uid")
            logTime = time.getLocalDateTime.toString
            val rowkey = uid.take(2) + "|" + time.getLocalDateTime.format(formatter) + "|" + uid.substring(2, 8)

            val row = new Put(Bytes.toBytes(rowkey))
            dataMap.foreach(tp2 => row.addColumn(Bytes.toBytes("info"), Bytes.toBytes(tp2._1), Bytes.toBytes(tp2._2)))
            hbaseClient.mutate(row)
          })
        hbaseClient.flush()
      })
    //更新索引到文件上
    writeSeek(willUpdateOffset)
    //更新索引日期到文件上
    writeSeekDay(noneOffsetFold.last.getName)
    //把 logTime offset 寫到mysql中,方便Spark+Hbase程序讀取並計算
  }
複製代碼

程序很精簡,沒有任何沒用的功能在裏面,線上的生產環境就應該是這子的了。 你們還能夠根據需求加入程序退出發郵件通知功能之類的。 真正去算了一下也就100行功能代碼,並且佔用極小的內存,都不到100M,很精很精。

傳送門 完整ETL程序源碼


相關文章
相關標籤/搜索