Spark寫入HBase(Bulk方式)

在使用Spark時常常須要把數據落入HBase中,若是使用普通的Java API,寫入會速度很慢。還好Spark提供了Bulk寫入方式的接口。那麼Bulk寫入與普通寫入相比有什麼優點呢?apache

  • BulkLoad不會寫WAL,也不會產生flush以及split。
  • 若是咱們大量調用PUT接口插入數據,可能會致使大量的GC操做。除了影響性能以外,嚴重時甚至可能會對HBase節點的穩定性形成影響。可是採用Bulk就不會有這個顧慮。
  • 過程當中沒有大量的接口調用消耗性能

下面給出完整代碼:oop

import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.conf.Configuration

/**
* Created by shaonian
*/
object HBaseBulk {

def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Bulk")
  val sc = new SparkContext(sparkConf)

  val conf = new Configuration()
  conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")
  conf.set("hbase.zookeeper.property.clientPort", "2181")
  conf.set(TableOutputFormat.OUTPUT_TABLE, "bulktest")
  val job = Job.getInstance(conf)
  job.setOutputKeyClass(classOf[ImmutableBytesWritable])
  job.setOutputValueClass(classOf[Result])
  job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

  val init = sc.makeRDD(Array("1,james,32", "2,lebron,30", "3,harden,28"))
  val rdd = init.map(_.split(",")).map(arr => {
   val put = new Put(Bytes.toBytes(arr(0)))
   put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes(arr(1)))
   put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(arr(2).toInt))
   (new ImmutableBytesWritable, put)
  })
  rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
  sc.stop()
  }
相關文章
相關標籤/搜索