在使用Spark時常常須要把數據落入HBase中,若是使用普通的Java API,寫入會速度很慢。還好Spark提供了Bulk寫入方式的接口。那麼Bulk寫入與普通寫入相比有什麼優點呢?apache
下面給出完整代碼:oop
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.conf.Configuration
/**
* Created by shaonian
*/
object HBaseBulk {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Bulk")
val sc = new SparkContext(sparkConf)
val conf = new Configuration()
conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set(TableOutputFormat.OUTPUT_TABLE, "bulktest")
val job = Job.getInstance(conf)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val init = sc.makeRDD(Array("1,james,32", "2,lebron,30", "3,harden,28"))
val rdd = init.map(_.split(",")).map(arr => {
val put = new Put(Bytes.toBytes(arr(0)))
put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes(arr(1)))
put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(arr(2).toInt))
(new ImmutableBytesWritable, put)
})
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
sc.stop()
}