在實際生產環境中,有這樣一種場景:用戶數據位於HDFS中,業務須要按期將這部分海量數據導入HBase系統,以執行隨機查詢更新操做。這種場景若是調用寫入API進行處理,極有可能會給RegionServer帶來較大的寫入壓力:sql
•引發RegionServer頻繁flush,進而不斷compact、split,影響集羣穩定性。shell
•引發RegionServer頻繁GC,影響集羣穩定性。apache
•消耗大量CPU資源、帶寬資源、內存資源以及IO資源,與其餘業務產生資源競爭。app
•在某些場景下,好比平均KV大小比較大的場景,會耗盡RegionServer的處理線程,致使集羣阻塞。工具
鑑於存在上述問題,HBase提供了另外一種將數據寫入HBase集羣的方法——BulkLoad。BulkLoad首先使用MapReduce將待寫入集羣數據轉換爲HFile文件,再直接將這些HFile文件加載到在線集羣中。顯然,BulkLoad方案沒有將寫請求發送給RegionServer處理,能夠有效避免上述一系列問題。oop
BulkLoad核心流程ui
從HBase的視角來看,BulkLoad主要由兩個階段組成:spa
1)HFile生成階段。這個階段會運行一個MapReduce任務,MapReduce的mapper須要本身實現,將HDFS文件中的數據讀出來組裝成一個複合KV,其中Key是rowkey,Value能夠是KeyValue對象、Put對象甚至Delete對象;MapReduce的reducer由HBase負責,經過方法HFileOutputFormat2.configureIncrementalLoad()進行配置,這個方法主要負責如下事項。線程
•根據表信息配置一個全局有序的partitioner。code
•將partitioner文件上傳到HDFS集羣並寫入DistributedCache。
•設置reduce task的個數爲目標表Region的個數。
•設置輸出key/value類知足HFileOutputFormat所規定的格式要求。
•根據類型設置reducer執行相應的排序(KeyValueSortReducer或者PutSortReducer)。
這個階段會爲每一個Region生成一個對應的HFile文件。
2)HFile導入階段。HFile準備就緒以後,就可使用工具completebulkload將HFile加載到在線HBase集羣。completebulkload工具主要負責如下工做。
•依次檢查第一步生成的全部HFile文件,將每一個文件映射到對應的Region。
•將HFile文件移動到對應Region所在的HDFS文件目錄下。
•告知Region對應的RegionServer,加載HFile文件對外提供服務。
若是在BulkLoad的中間過程當中Region發生了分裂,completebulkload工具會自動將對應的HFile文件按照新生成的Region邊界切分紅多個HFile文件,保證每一個HFile都能與目標表當前的Region相對應。但這個過程須要讀取HFile內容,於是並不高效。須要儘可能減小HFile生成階段和HFile導入階段的延遲,最好可以在HFile生成以後馬上執行HFile導入。
基於BulkLoad兩階段的工做原理,BulkLoad的核心流程如圖所示。
BulkLoad基礎案例
在hbase上建立一張表:
create 'test_log','ext'
執行BulkLoad代碼:
import org.apache.hadoop.hbase.client.ConnectionFactory import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles} import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object BulkLoad1 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("HbaseBulkLoad") val spark = SparkSession.builder .config(sparkConf) .getOrCreate() val sc = spark.sparkContext val datas = List( ("abc", ("ext", "type", "login")), ("ccc", ("ext", "type", "logout")) ) val dataRdd = sc.parallelize(datas) val output = dataRdd.map { x => { val rowKey = Bytes.toBytes(x._1) val immutableRowKey = new ImmutableBytesWritable(rowKey) val colFam = x._2._1 val colName = x._2._2 val colValue = x._2._3 val kv = new KeyValue( rowKey, Bytes.toBytes(colFam), Bytes.toBytes(colName), Bytes.toBytes(colValue.toString) ) (immutableRowKey, kv) } } val hConf = HBaseConfiguration.create() hConf.addResource("hbase_site.xml") val hTableName = "test_log" hConf.set("hbase.mapreduce.hfileoutputformat.table.name",hTableName) val tableName = TableName.valueOf(hTableName) val conn = ConnectionFactory.createConnection(hConf) val table = conn.getTable(tableName) val regionLocator = conn.getRegionLocator(tableName) val hFileOutput = "/tmp/h_file" output.saveAsNewAPIHadoopFile(hFileOutput, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], hConf) val bulkLoader = new LoadIncrementalHFiles(hConf) bulkLoader.doBulkLoad(new Path(hFileOutput),conn.getAdmin,table,regionLocator) } }
提交spark執行:
spark-submit \
--master yarn \
--conf spark.yarn.tokens.hbase.enabled=true \
--deploy-mode client \
--class BulkLoad1
--executor-memory 512m
--driver-memory 512m
--total-executor-cores 2
/home/hadoop/hadoop-2.8.5/files/Spark_study.jar
在hbase shell上查看:
scan 'test_log'