在實時應用之中,不免會遇到往NoSql數據如HBase中寫入數據的情景。題主在工做中遇到以下情景,須要實時查詢某個設備ID對應的帳號ID數量。踩過的坑也挺多,舉其中之一,如一開始選擇使用NEO4J圖數據庫存儲設備和帳號的關係,固然也有其餘的數據,最終構成一個複雜的圖關係,可是這個圖數據庫免費版是單機安裝(集羣要收費),在實時寫入和查詢關係的時候,致使咱們一臺服務器內存和cpu損耗嚴重,爲了保證Hadoop集羣的穩定性,只好替換掉這個數據庫,採用流行的HBase。本文就HBase的使用心得作以下記錄。數據庫
1.rowkey設計:設備id是32位字母、數字組成的串,考慮到HBase長表掃描的查詢最快,因此rowkey的設計方式爲,設備ID+帳號ID拼接而成,這樣在掃描某個設備ID時會很快計算出條數。apache
2.HBase表設計:在建立表的時候採用預分區建表,由於這樣的,若是知道hbase數據表的rowkey的分佈狀況,就能夠在建表的時候對hbase進行region的預分區,這樣作的好處是防止大數據量插入的熱點問題,提升數據插入的效率。rowkey是字母或者數字開頭,因此建表語句以下(數據量再大的時候還能夠在細分分區):服務器
create 'T_TEST', 'data', SPLITS => ['0', '1','2', '3','4', '5','6','7','8','9','a', 'b', 'c', 'd', 'e', 'f', 'g']
此處入坑:建立表的時候將HBase表映射到Hive外部表,語句以下。這樣作是爲了方便導入歷史數據,可是Hive跑批將歷史數據導入以後,從HBase查詢已經導入的某一數據的時候,沒法查詢導數據,也沒法經過API寫入到HBase,這個問題很詭異,後來想了下Hive導入的數據編碼和HBase的不一樣,因而從新將表刪除,不採用映射表,直接使用Spark將歷史數據導入,問題解決。app
CREATE external TABLE tmp.H_T_TEST(key string ,num string) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,data:num") TBLPROPERTIES ("hbase.table.name" = "T_TEST");
3.設計好rowkey和表以後,咱們就開始寫Spark代碼了。oop
此處入坑,我把HBase的鏈接池寫在了和Spark的同一位置,這樣會遇到一個問題,Spark程序運行的時候報HBaseConnection沒有序列化,按照網上的作法,將對象加上 @transient註解,雖然不報錯誤,仍是沒法將數據寫入到Hba之中。後來通過查找,找到了解決辦法,將HBase的鏈接放到消息的循環以內,即一個分區創建一個HBase鏈接,代碼以下。大數據
def main(args: Array[String]): Unit = { val sc: SparkContext = SparkUtil.createSparkContext(this.getClass.getSimpleName) val ssc: StreamingContext = new StreamingContext(sc, Seconds(10)) val messages = SparkUtil.createDStreamFromKafka( "T_TEST", topicSet, ssc)//建立消息接收器 messages.foreachRDD(rdd => { rdd.foreachPartition(partitionRecords => {//循環分區 try { val connection = HBaseUtil.getHbaseConn //獲取HBase鏈接,分區建立一個鏈接,分區不跨節點,不須要序列化 partitionRecords.foreach(s => { val data = JSON.parseObject(s._2)//將數據轉化成JSON格式 val tableName = TableName.valueOf("T_TEST") val table = connection.getTable(tableName)//獲取錶鏈接 val put = new Put(Bytes.toBytes(data.getString("id1") + "_" + data.getString("id2"))) put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("num"), Bytes.toBytes("1")) Try(table.put(put)).getOrElse(table.close())//將數據寫入HBase,若出錯關閉table table.close()//分區數據寫入HBase後關閉鏈接 }) } catch { case e: Exception => logger.error("寫入HBase失敗,{}", e.getMessage) } }) }) ssc.start() ssc.awaitTermination() }
至此問題解決,數據正常,還沒出現過問題,等待時間驗證吧。this
4.歷史數據導入,在導入歷史數據的時候,因爲數據放在了Hive的兩個不一樣表之中,一開始想要一次性讀入,使用Spark SQL的dataframe,建立一個hivecontext,寫HiveSQL將兩個表結果執行union all操做,可是Spark程序報rpc錯誤。將兩個表的結果分別查出,使用dataframe 的union all操做,也是不行,也是rpc錯誤,查了不少資料,仍是沒解決,莫名其妙的錯誤,後來兩個表分開執行導入歷史數據,問題再也不出現,可能Spark仍是不夠成熟,老是遇到莫名其妙的問題。編碼
在使用Hbase的時候要預分區。不要爲了方便使用Hive外部映射表。HBase的鏈接池要放在分區循環開始的地方,否則建立不少的鏈接,會致使HBase垮掉。spa