Spark Streaming實時寫入數據到HBase

1、概述

  在實時應用之中,不免會遇到往NoSql數據如HBase中寫入數據的情景。題主在工做中遇到以下情景,須要實時查詢某個設備ID對應的帳號ID數量。踩過的坑也挺多,舉其中之一,如一開始選擇使用NEO4J圖數據庫存儲設備和帳號的關係,固然也有其餘的數據,最終構成一個複雜的圖關係,可是這個圖數據庫免費版是單機安裝(集羣要收費),在實時寫入和查詢關係的時候,致使咱們一臺服務器內存和cpu損耗嚴重,爲了保證Hadoop集羣的穩定性,只好替換掉這個數據庫,採用流行的HBase。本文就HBase的使用心得作以下記錄。數據庫

2、解決方案

  1.rowkey設計:設備id是32位字母、數字組成的串,考慮到HBase長表掃描的查詢最快,因此rowkey的設計方式爲,設備ID+帳號ID拼接而成,這樣在掃描某個設備ID時會很快計算出條數。apache

2.HBase表設計:在建立表的時候採用預分區建表,由於這樣的,若是知道hbase數據表的rowkey的分佈狀況,就能夠在建表的時候對hbase進行region的預分區,這樣作的好處是防止大數據量插入的熱點問題,提升數據插入的效率。rowkey是字母或者數字開頭,因此建表語句以下(數據量再大的時候還能夠在細分分區):服務器

create 'T_TEST', 'data', SPLITS => ['0', '1','2', '3','4', '5','6','7','8','9','a', 'b', 'c', 'd', 'e', 'f', 'g']

此處入坑:建立表的時候將HBase表映射到Hive外部表,語句以下。這樣作是爲了方便導入歷史數據,可是Hive跑批將歷史數據導入以後,從HBase查詢已經導入的某一數據的時候,沒法查詢導數據,也沒法經過API寫入到HBase,這個問題很詭異,後來想了下Hive導入的數據編碼和HBase的不一樣,因而從新將表刪除,不採用映射表,直接使用Spark將歷史數據導入,問題解決。app

CREATE external TABLE tmp.H_T_TEST(key string ,num string) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,data:num")
TBLPROPERTIES ("hbase.table.name" = "T_TEST");

3.設計好rowkey和表以後,咱們就開始寫Spark代碼了。oop

此處入坑,我把HBase的鏈接池寫在了和Spark的同一位置,這樣會遇到一個問題,Spark程序運行的時候報HBaseConnection沒有序列化,按照網上的作法,將對象加上 @transient註解,雖然不報錯誤,仍是沒法將數據寫入到Hba之中。後來通過查找,找到了解決辦法,將HBase的鏈接放到消息的循環以內,即一個分區創建一個HBase鏈接,代碼以下。大數據

def main(args: Array[String]): Unit = {
    val sc: SparkContext = SparkUtil.createSparkContext(this.getClass.getSimpleName)
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(10))
    val messages = SparkUtil.createDStreamFromKafka(
      "T_TEST",
      topicSet,
      ssc)//建立消息接收器

    messages.foreachRDD(rdd => {
      rdd.foreachPartition(partitionRecords => {//循環分區
        try {
          val connection = HBaseUtil.getHbaseConn //獲取HBase鏈接,分區建立一個鏈接,分區不跨節點,不須要序列化
          partitionRecords.foreach(s => {
            val data = JSON.parseObject(s._2)//將數據轉化成JSON格式
            val tableName = TableName.valueOf("T_TEST")
            val table = connection.getTable(tableName)//獲取錶鏈接

            val put = new Put(Bytes.toBytes(data.getString("id1") + "_" + data.getString("id2")))
            put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("num"), Bytes.toBytes("1"))

            Try(table.put(put)).getOrElse(table.close())//將數據寫入HBase,若出錯關閉table
            table.close()//分區數據寫入HBase後關閉鏈接
          })
        } catch {
          case e: Exception => logger.error("寫入HBase失敗,{}", e.getMessage)
        }
      })
    })
    ssc.start()
    ssc.awaitTermination()

  }

至此問題解決,數據正常,還沒出現過問題,等待時間驗證吧。this

4.歷史數據導入,在導入歷史數據的時候,因爲數據放在了Hive的兩個不一樣表之中,一開始想要一次性讀入,使用Spark SQL的dataframe,建立一個hivecontext,寫HiveSQL將兩個表結果執行union all操做,可是Spark程序報rpc錯誤。將兩個表的結果分別查出,使用dataframe 的union all操做,也是不行,也是rpc錯誤,查了不少資料,仍是沒解決,莫名其妙的錯誤,後來兩個表分開執行導入歷史數據,問題再也不出現,可能Spark仍是不夠成熟,老是遇到莫名其妙的問題。編碼

3、總結

  在使用Hbase的時候要預分區。不要爲了方便使用Hive外部映射表。HBase的鏈接池要放在分區循環開始的地方,否則建立不少的鏈接,會致使HBase垮掉。spa

相關文章
相關標籤/搜索