讀寫Hbase的方法,這裏是經過Spark的RDD來操做的方法,經過Hbase API的方式是另外一種,這裏不涉及。node
首先配置pom,添加hbase依賴,通常Spark平臺不包含hbase的jar包,因此這些依賴不添加<scope>provided</scope>sql
maven相關部分以下:apache
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<hbase.version>0.98.8-hadoop2</hbase.version>
<spark.artifactId.version>2.11</spark.artifactId.version>
</properties>maven
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.2</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
<exclusion>
<artifactId>curator-framework</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
<exclusion>
<artifactId>curator-recipes</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
<exclusion>
<artifactId>curator-client</artifactId>
<groupId>org.apache.curator</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.2</version>
<scope>provided</scope>
</dependency>ide
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
</exclusions>
</dependency>oop
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>ui
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>0.98.8-hadoop2</version>
<!-- <scope>provided</scope> -->
</dependency>spa
而後看操做代碼以下:scala
package mytest import org.apache.spark.sql.SparkSession import org.slf4j.LoggerFactory import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.HConstants import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.hbase.util.Base64 import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.hbase.mapreduce.TableOutputFormat object hbaseSpark4 { val logger = LoggerFactory.getLogger(hbsaeSpark4.getClass) def main(args: Array[String]): Unit = { val tableName: String = "table1" val writeTableName: String = "table2" val pixFix: String = SevsConfig.getProp("hbase.table.family") val sparkconf = new SparkConf().setAppName("hbsaeSpark4") .set("HADOOP_USER_NAME", ("hbase.hadoop.username")) .set("HADOOP_GROUP_NAME", ("hbase.hadoop.groupname")) //.setMaster("local") val sc = new SparkContext(sparkconf) read_hbase(sc, tableName) logger.error("write to hbase #################") save_hase(sc, writeTableName) logger.error("ending ################") } def read_hbase(sc: SparkContext, tableName: String): Unit = { val configuration = HBaseConfiguration.create() configuration.set("hbase.zookeeper.property.clientPort", "2015") configuration.set("zookeeper.znode.parent", "/hbase") configuration.set("hbase.zookeeper.quorum", "namenode1-sit.com,namenode2-sit.com,slave01-sit.com") configuration.set(TableInputFormat.INPUT_TABLE, tableName) val startRowkey = "0600000003" val endRowkey = "8800000008" val scan = new Scan(Bytes.toBytes(startRowkey), Bytes.toBytes(endRowkey)) // scan.setCacheBlocks(false) // scan.addFamily(Bytes.toBytes("ks")); // scan.addColumn(Bytes.toBytes("ks"), Bytes.toBytes("data")) var proto = ProtobufUtil.toScan(scan) val scan_str = Base64.encodeBytes(proto.toByteArray()) configuration.set(TableInputFormat.SCAN, scan_str) val rdd = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) val resultRDD = rdd.map(tup => resultToList(tup)) resultRDD.take(2).foreach(println) } def resultToList(tableValues: Tuple2[ImmutableBytesWritable, Result]): String = { val tup = tableValues._2 val result = tableValues._2 val rowKey = result.getRow() val rowId = Bytes.toString(rowKey).reverse rowId } def save_hase(sc: SparkContext, writeTableName: String): Unit = { val configuration = HBaseConfiguration.create() configuration.set("hbase.zookeeper.property.clientPort", "2015") configuration.set("zookeeper.znode.parent", "/hbase") configuration.set("hbase.zookeeper.quorum", "namenode1-sit.com,namenode2-sit.com,slave01-sit.com") configuration.set(TableOutputFormat.OUTPUT_TABLE, writeTableName) val job = new Job(configuration) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) val indataRDD = sc.makeRDD(Array("1,jack,15", "2,Lily,16", "3,mike,16")) val rdd = indataRDD.map(_.split(',')).map { arr => { val put = new Put(Bytes.toBytes(arr(0))) put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(arr(1))) put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(arr(2).toInt)) (new ImmutableBytesWritable, put) } } rdd.saveAsNewAPIHadoopDataset(job.getConfiguration()) } }