HBase原理--BulkLoad

在實際生產環境中,有這樣一種場景:用戶數據位於HDFS中,業務須要按期將這部分海量數據導入HBase系統,以執行隨機查詢更新操做。這種場景若是調用寫入API進行處理,極有可能會給RegionServer帶來較大的寫入壓力:sql

•引發RegionServer頻繁flush,進而不斷compact、split,影響集羣穩定性。shell

•引發RegionServer頻繁GC,影響集羣穩定性。apache

•消耗大量CPU資源、帶寬資源、內存資源以及IO資源,與其餘業務產生資源競爭。app

•在某些場景下,好比平均KV大小比較大的場景,會耗盡RegionServer的處理線程,致使集羣阻塞。工具

鑑於存在上述問題,HBase提供了另外一種將數據寫入HBase集羣的方法——BulkLoad。BulkLoad首先使用MapReduce將待寫入集羣數據轉換爲HFile文件,再直接將這些HFile文件加載到在線集羣中。顯然,BulkLoad方案沒有將寫請求發送給RegionServer處理,能夠有效避免上述一系列問題。oop

BulkLoad核心流程ui

從HBase的視角來看,BulkLoad主要由兩個階段組成:spa

1)HFile生成階段。這個階段會運行一個MapReduce任務,MapReduce的mapper須要本身實現,將HDFS文件中的數據讀出來組裝成一個複合KV,其中Key是rowkey,Value能夠是KeyValue對象、Put對象甚至Delete對象;MapReduce的reducer由HBase負責,經過方法HFileOutputFormat2.configureIncrementalLoad()進行配置,這個方法主要負責如下事項。線程

•根據表信息配置一個全局有序的partitioner。code

•將partitioner文件上傳到HDFS集羣並寫入DistributedCache。

•設置reduce task的個數爲目標表Region的個數。

•設置輸出key/value類知足HFileOutputFormat所規定的格式要求。

•根據類型設置reducer執行相應的排序(KeyValueSortReducer或者PutSortReducer)。

這個階段會爲每一個Region生成一個對應的HFile文件。

2)HFile導入階段。HFile準備就緒以後,就可使用工具completebulkload將HFile加載到在線HBase集羣。completebulkload工具主要負責如下工做。

•依次檢查第一步生成的全部HFile文件,將每一個文件映射到對應的Region。

•將HFile文件移動到對應Region所在的HDFS文件目錄下。

•告知Region對應的RegionServer,加載HFile文件對外提供服務。

若是在BulkLoad的中間過程當中Region發生了分裂,completebulkload工具會自動將對應的HFile文件按照新生成的Region邊界切分紅多個HFile文件,保證每一個HFile都能與目標表當前的Region相對應。但這個過程須要讀取HFile內容,於是並不高效。須要儘可能減小HFile生成階段和HFile導入階段的延遲,最好可以在HFile生成以後馬上執行HFile導入。

基於BulkLoad兩階段的工做原理,BulkLoad的核心流程如圖所示。
image.png

BulkLoad基礎案例

在hbase上建立一張表:
create 'test_log','ext'

執行BulkLoad代碼:

import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object BulkLoad1 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("HbaseBulkLoad")

    val spark = SparkSession.builder
      .config(sparkConf)
      .getOrCreate()
    val sc = spark.sparkContext

    val datas = List(
      ("abc", ("ext", "type", "login")),
      ("ccc", ("ext", "type", "logout"))
    )

    val dataRdd = sc.parallelize(datas)

    val output = dataRdd.map {
      x => {
        val rowKey = Bytes.toBytes(x._1)
        val immutableRowKey = new ImmutableBytesWritable(rowKey)

        val colFam = x._2._1
        val colName = x._2._2
        val colValue = x._2._3

        val kv = new KeyValue(
          rowKey,
          Bytes.toBytes(colFam),
          Bytes.toBytes(colName),
          Bytes.toBytes(colValue.toString)
        )
        (immutableRowKey, kv)
      }
    }


    val hConf = HBaseConfiguration.create()
    hConf.addResource("hbase_site.xml")
    val hTableName = "test_log"
    hConf.set("hbase.mapreduce.hfileoutputformat.table.name",hTableName)
    val tableName = TableName.valueOf(hTableName)
    val conn = ConnectionFactory.createConnection(hConf)
    val table = conn.getTable(tableName)
    val regionLocator = conn.getRegionLocator(tableName)

    val hFileOutput = "/tmp/h_file"

    output.saveAsNewAPIHadoopFile(hFileOutput,
      classOf[ImmutableBytesWritable],
      classOf[KeyValue],
      classOf[HFileOutputFormat2],
      hConf)

    val bulkLoader = new LoadIncrementalHFiles(hConf)

    bulkLoader.doBulkLoad(new Path(hFileOutput),conn.getAdmin,table,regionLocator)
  }

}

提交spark執行:

spark-submit \
--master yarn \
--conf spark.yarn.tokens.hbase.enabled=true \
--deploy-mode client \
--class BulkLoad1
--executor-memory 512m
--driver-memory 512m
--total-executor-cores 2
/home/hadoop/hadoop-2.8.5/files/Spark_study.jar

在hbase shell上查看:

scan 'test_log'
image.png

相關文章
相關標籤/搜索