Spark鏈接HBase

(一)、Spark讀取HBase中的數據node

hbase中的數據apache

 

 1 import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
 2 import org.apache.hadoop.hbase.client.HBaseAdmin
 3 import org.apache.hadoop.hbase.mapreduce.TableInputFormat
 4 import org.apache.spark._
 5 import org.apache.hadoop.hbase.util.Bytes
 6 
 7 /**
 8   * Created by *** on 2018/2/12.
 9   *
10   * 從hbase讀取數據轉化成RDD
11   */
12 object SparkReadHBase {
13 
14   def main(args: Array[String]): Unit = {
15     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
16     val sc = new SparkContext(sparkConf)
17 
18     val tablename = "account"
19     val conf = HBaseConfiguration.create()
20     //設置zooKeeper集羣地址,也能夠經過將hbase-site.xml導入classpath,可是建議在程序裏這樣設置
21     conf.set("hbase.zookeeper.quorum","node02,node03,node04")
22     //設置zookeeper鏈接端口,默認2181
23     conf.set("hbase.zookeeper.property.clientPort", "2181")
24     conf.set(TableInputFormat.INPUT_TABLE, tablename)
25 
26     // 若是表不存在則建立表
27     val admin = new HBaseAdmin(conf)
28     if (!admin.isTableAvailable(tablename)) {
29       val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))
30       admin.createTable(tableDesc)
31     }
32 
33     //讀取數據並轉化成rdd
34     val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
35       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
36       classOf[org.apache.hadoop.hbase.client.Result])
37 
38     val count = hBaseRDD.count()
39     println(count)
40     hBaseRDD.foreach{case (_,result) =>{
41       //獲取行鍵
42       val key = Bytes.toString(result.getRow)
43       //經過列族和列名獲取列
44       val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))
45       val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))
46       println("Row key:"+key+" Name:"+name+" Age:"+age)
47     }}
48 
49     sc.stop()
50     admin.close()
51   }
52 
53 }

(二)、Spark寫HBaseoop

  1.第一種方式:spa

 1 import org.apache.hadoop.hbase.HBaseConfiguration
 2 import org.apache.hadoop.hbase.client.Put
 3 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 4 import org.apache.hadoop.hbase.mapred.TableOutputFormat
 5 import org.apache.hadoop.hbase.util.Bytes
 6 import org.apache.hadoop.mapred.JobConf
 7 import org.apache.spark.{SparkConf, SparkContext}
 8 import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
 9 /**
10   * Created by *** on 2018/2/12.
11   *
12   * 使用saveAsHadoopDataset寫入數據
13   */
14 object SparkWriteHBaseOne {
15   def main(args: Array[String]): Unit = {
16     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
17     val sc = new SparkContext(sparkConf)
18 
19     val conf = HBaseConfiguration.create()
20     //設置zooKeeper集羣地址,也能夠經過將hbase-site.xml導入classpath,可是建議在程序裏這樣設置
21     conf.set("hbase.zookeeper.quorum","node02,node03,node04")
22     //設置zookeeper鏈接端口,默認2181
23     conf.set("hbase.zookeeper.property.clientPort", "2181")
24 
25     val tablename = "account"
26 
27     //初始化jobconf,TableOutputFormat必須是org.apache.hadoop.hbase.mapred包下的!
28     val jobConf = new JobConf(conf)
29     jobConf.setOutputFormat(classOf[TableOutputFormat])
30     jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
31 
32     val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
33 
34 
35     val rdd = indataRDD.map(_.split(',')).map{arr=>{
36       /*一個Put對象就是一行記錄,在構造方法中指定主鍵
37        * 全部插入的數據必須用org.apache.hadoop.hbase.util.Bytes.toBytes方法轉換
38        * Put.add方法接收三個參數:列族,列名,數據
39        */
40       val put = new Put(Bytes.toBytes(arr(0).toInt))
41       put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
42       put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
43       //轉化成RDD[(ImmutableBytesWritable,Put)]類型才能調用saveAsHadoopDataset
44       (new ImmutableBytesWritable, put)
45     }}
46 
47     rdd.saveAsHadoopDataset(jobConf)
48 
49     sc.stop()
50   }
51 }

  2.第二種方式:3d

 1 import org.apache.hadoop.hbase.client.{Put, Result}
 2 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 3 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
 4 import org.apache.hadoop.hbase.util.Bytes
 5 import org.apache.hadoop.mapreduce.Job
 6 import org.apache.spark._
 7 /**
 8   * Created by *** on 2018/2/12.
 9   *
10   * 使用saveAsNewAPIHadoopDataset寫入數據
11   */
12 object SparkWriteHBaseTwo {
13   def main(args: Array[String]): Unit = {
14     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
15     val sc = new SparkContext(sparkConf)
16 
17     val tablename = "account"
18 
19     sc.hadoopConfiguration.set("hbase.zookeeper.quorum","node02,node03,node04")
20     sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
21     sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
22 
23     val job = new Job(sc.hadoopConfiguration)
24     job.setOutputKeyClass(classOf[ImmutableBytesWritable])
25     job.setOutputValueClass(classOf[Result])
26     job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
27 
28     val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
29     val rdd = indataRDD.map(_.split(',')).map{arr=>{
30       val put = new Put(Bytes.toBytes(arr(0)))
31       put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
32       put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
33       (new ImmutableBytesWritable, put)
34     }}
35 
36     rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
37   }
38 }
相關文章
相關標籤/搜索