在上一篇文章 大豬 已經介紹了日誌存儲設計方案 ,咱們數據已經落地到數據中心上了,那接下來如何ETL呢?畢竟但是生產環境級別的,可不能亂來。其實只要解決幾個問題便可,沒必要要引入很大級別的組件來作,固然了各有各的千秋,本文主要從 易懂、小巧 、簡潔、 高性能 這三個方面去設計出發點,順便還實現了一個精巧的 Filebeat。mysql
loghub功能 要實現的功能就是掃描天天的增量日誌並寫入Hbase中 git
須要攻克以下幾個小難題 sql
大豬 根據線上的生產環境一一把上面的功能從新分析給實現一下。json
從第一點看仍是比較簡單的嘛?可是咱們要結合上面的 5 個問題來看才行。bash
總結一句話就是:要實現一個高性能並且能隨時重啓繼續工做的 loghub ETL 程序
。工具
實際也必需這樣作,由於生產環境容不得馬虎,否則就等着被BOSS性能
須要有一個讀取全部日誌文件方法 ui
還要實現一個保存並讀取文件進度的方法 spa
因爲不能把一個日誌文件所有讀入內存進行處理 因此還須要一個能根據索引一行一行接着讀取數據的方法 設計
最後剩下一個Hbase的鏈接池小工具
幾個核心方法已經寫完了,接着是咱們的主程序
def run(logPath: File, defaultOffsetDay: String): Unit = {
val sdfstr = Source.fromFile(seekDayFile).getLines().mkString
val offsetDay = Option(if (sdfstr == "") null else sdfstr)
//讀取設置讀取日期的倒數一天以後的日期文件夾
val noneOffsetFold = logPath
.listFiles()
.filter(_.getName >= LocalDate.parse(offsetDay.getOrElse(defaultOffsetDay)).minusDays(1).toString)
.sortBy(f => LocalDate.parse(f.getName).toEpochDay)
//讀取文件夾中的全部日誌文件,並取出索引進行匹配
val filesPar = noneOffsetFold
.flatMap(files(_, file => file.getName.endsWith(".log")))
.map(file => (file, seeks().getOrDefault(MD5Hash.getMD5AsHex(file.getAbsolutePath.getBytes()), 0), file.length()))
.filter(tp2 => {
//過濾出新文件,與有增量的日誌文件
val fileMd5 = MD5Hash.getMD5AsHex(tp2._1.getAbsolutePath.getBytes())
val result = offsets.asScala.filter(m => fileMd5.equals(m._1))
result.isEmpty || tp2._3 > result.head._2
})
.par
filesPar.tasksupport = pool
val willUpdateOffset = new util.HashMap[String, Long]()
val formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS")
var logTime:String = null
filesPar
.foreach(tp3 => {
val hbaseClient = HbasePool.getTable
//由於不能全量讀取數據,全部只能一條一條讀取,批量提出交給HbaseClient的客戶端的mutate方式優雅處理
//foreach 裏面的部分就是咱們的業務處理部分
lines(tp3._1, tp3._2, tp3._3, () => {
willUpdateOffset.put(tp3._1.getAbsolutePath, tp3._3)
offsets.put(MD5Hash.getMD5AsHex(tp3._1.getAbsolutePath.getBytes), tp3._3)
})
.foreach(line => {
val jsonObject = parse(line)
val time = (jsonObject \ "time").extract[Long]
val data = jsonObject \ "data"
val dataMap = data.values.asInstanceOf[Map[String, Any]]
.filter(_._2 != null)
.map(x => x._1 -> x._2.toString)
val uid = dataMap("uid")
logTime = time.getLocalDateTime.toString
val rowkey = uid.take(2) + "|" + time.getLocalDateTime.format(formatter) + "|" + uid.substring(2, 8)
val row = new Put(Bytes.toBytes(rowkey))
dataMap.foreach(tp2 => row.addColumn(Bytes.toBytes("info"), Bytes.toBytes(tp2._1), Bytes.toBytes(tp2._2)))
hbaseClient.mutate(row)
})
hbaseClient.flush()
})
//更新索引到文件上
writeSeek(willUpdateOffset)
//更新索引日期到文件上
writeSeekDay(noneOffsetFold.last.getName)
//把 logTime offset 寫到mysql中,方便Spark+Hbase程序讀取並計算
}
複製代碼
程序很精簡,沒有任何沒用的功能在裏面,線上的生產環境就應該是這子的了。 你們還能夠根據需求加入程序退出發郵件通知功能之類的。 真正去算了一下也就100行功能代碼,並且佔用極小的內存,都不到100M,很精很精。
傳送門 完整ETL程序源碼