hbase環境搭建

環境:jdk1.8,hadoop2.6.5node

安裝hbase 1.2.6版本。sql

首先配置環境變量:shell

export HBASE_HOME=/home/hbaseapache

設置hbase-site.xmlsocket

<configuration>

   <property>

         <name>hbase.rootdir</name>

         <value>hdfs://127.0.0.1:8020/hbase</value>

    </property>

     <property>

         <name>hbase.cluster.distributed</name>

         <value>false</value>

        </property>

   <property>

         <name>hbase.zookeeper.property.dataDir</name>

        <value>/opt/hadoop/zookeeper</value>

  </property>

</configuration>

設置hbase-env.shelasticsearch

export HBASE_MANAGES_ZK=truemaven

啓動:bin/start-hbase.shoop

關閉:bin/stop-hbase.sh測試

交互式shell: hbase shellthis

 

 

    1. 基本操做

建立表:

create ‘table1’,’family’

添加數據:

put ‘table1’,’key1’,’family:field1’,’value1’

刪除表:

disable ‘table1’

drop ‘table1’

查詢數據:

scan ‘table1’

get ‘table1’,’key1’

刪除整行:

deleteall ‘table1’,’key1’

刪除字段:

delete ‘table1’,’key1’,’family1:field1’

刪除族:

alter'table',{NAME=>'info',METHOD=>'delete'}

添加族:

alter'table',{NAME=>'info'}

 

    1. maven依賴

<dependency>

        <groupId>org.apache.spark</groupId>

        <artifactId>spark-core_2.10</artifactId>

        <version>0.9.1</version>

</dependency>

  

<dependency>

        <groupId>org.apache.hbase</groupId>

        <artifactId>hbase</artifactId>

        <version>0.98.2-hadoop2</version>

</dependency>

  

<dependency>

        <groupId>org.apache.hbase</groupId>

        <artifactId>hbase-client</artifactId>

        <version>0.98.2-hadoop2</version>

</dependency>

  

<dependency>

        <groupId>org.apache.hbase</groupId>

        <artifactId>hbase-common</artifactId>

        <version>0.98.2-hadoop2</version>

</dependency>

  

<dependency>

        <groupId>org.apache.hbase</groupId>

        <artifactId>hbase-server</artifactId>

        <version>0.98.2-hadoop2</version>

</dependency>

 

    1. 讀Hbase示例代碼
import org.apache.spark._

import org.apache.spark.rdd.NewHadoopRDD

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}

import org.apache.hadoop.hbase.client.HBaseAdmin

import org.apache.hadoop.hbase.mapreduce.TableInputFormat





object HBaseTest {

  def main(args: Array[String]) {

    val sc = new SparkContext(args(0), "HBaseTest",

      System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))



    val conf = HBaseConfiguration.create()

    conf.set(TableInputFormat.INPUT_TABLE, args(1))



    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],

      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],

      classOf[org.apache.hadoop.hbase.client.Result])



    hBaseRDD.count()

 // 將數據映射爲表  也就是將 RDD轉化爲 dataframe schema

val shop = hBaseRDD.map(r=>(

      Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("customer_id"))),

      Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("create_id")))

      )).toDF("customer_id","create_id")



    shop.registerTempTable("shop")



    // 測試

val df2 = sqlContext.sql("SELECT customer_id FROM shop")



    df2.foreach(println)



    System.exit(0)

  }

}

第二種方法:

object HbaseFactory {

  private val user =User.create(UserGroupInformation.createRemoteUser(ScmConfUtil.getInstance().getString("hadoopUser","sospdm")))

  private val HConnectionFunc = ()=>{

    val configuration = HBaseConfiguration.create()

    //初始化信息

    // 設置配置參數

    //Hbase獲取ZK的IP地址

    configuration.set("hbase.zookeeper.quorum",

      ScmConfUtil.getInstance().getString("zookeeper1.remote.quorum",""));

    //Hbase獲取主機端口號

    configuration.set("hbase.zookeeper.property.clientPort",

      ScmConfUtil.getInstance().getString("zookeeper1.remote.clientPort",""));

    configuration.set("hbase.client.retries.number",

      ScmConfUtil.getInstance().getString("hbase.client.retries.number",""));

    configuration.set("ipc.socket.timeout",

      ScmConfUtil.getInstance().getString("ipc.socket.timeout",""));

    configuration.set("hbase.rpc.timeout",

      ScmConfUtil.getInstance().getString("hbase.rpc.timeout",""));

    configuration.set("hbase.client.operation.timeout",

      ScmConfUtil.getInstance().getString("hbase.client.operation.timeout",""));

    configuration.set("hbase.client.scanner.timeout.period",

      ScmConfUtil.getInstance().getString("hbase.client.scanner.timeout.period",""));

    configuration.set("hbase.client.write.buffer",

      ScmConfUtil.getInstance().getString("hbase.client.write.buffer",""));

    configuration.set("hadoop.job.ugi", "sospdm,sospdm");

    Try(HConnectionManager.createConnection(configuration,user)).get

  }



  private val H_C = HConnectionFunc()

  //銷燬

  sys.addShutdownHook({

    //關閉鏈接

    H_C.close()

  })

  def queryForObject(tableName:String,rowKey:String,familyName:String,columns:String*):Map[String,String]={

    var resultObject:Map[String,String]=Map[String,String]()

    try

    {

      val table = H_C.getTable(tableName)

      val get = new Get(Bytes.toBytes(rowKey))

      if (null != familyName && !"".eq(familyName)){

        get.addFamily(Bytes.toBytes(familyName))

      }

      for(c<-columns){

        //添加查詢的列

        get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(c));

      }

      val result = table.get(get)

      MEPUtils.out(result)

      for(cell <- result.rawCells()){

        resultObject += (new String(CellUtil.cloneQualifier(cell))->new String(CellUtil.cloneValue(cell),"UTF-8"))

      }



    }catch{

      case e:Exception => MEPUtils.out(e)

    }

    resultObject

  }

}

 

    1. 寫Hbase示例代碼

Hbase

package com.iteblog.bigdata.hbase



import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration}

import org.apache.hadoop.hbase.client.Put

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapred.TableOutputFormat

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.mapred.JobConf

import org.apache.spark.{SparkConf, SparkContext}





object SparkToHBase {

  def main(args: Array[String]) {

    if (args.length < 1) {

      System.err.println("Usage: SparkToHBase <input file>")

      System.exit(1)

    }



    val conf = new SparkConf().setAppName("SparkToHBase")

    val sc = new SparkContext(conf)



    val input = sc.textFile(args(0))



    //建立HBase配置

    val hConf = HBaseConfiguration.create()

    hConf.set(HConstants.ZOOKEEPER_QUORUM, "www.iteblog.com:2181")



    //建立JobConf,設置輸出格式和表名

    val jobConf = new JobConf(hConf, this.getClass)

    jobConf.setOutputFormat(classOf[TableOutputFormat])

    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "iteblog")



    val data = input.map { item =>

      val Array(key, value) = item.split("\t")

      val rowKey = key.reverse

      val put = new Put(Bytes.toBytes(rowKey))

      put.add(Bytes.toBytes("f1"), Bytes.toBytes("info"), Bytes.toBytes(value))

      (new ImmutableBytesWritable, put)

    }

    //保存到HBase表

    data.saveAsHadoopDataset(jobConf)

    sc.stop()

  }

}

第二種方式:

package com.iteblog.bigdata.hbase



import org.apache.hadoop.hbase.client.{Put, Result}

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.TableOutputFormat

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration}

import org.apache.hadoop.mapred.JobConf

import org.apache.hadoop.mapreduce.Job

import org.apache.spark.{SparkContext, SparkConf}



object SparkToHBaseNew {

  def main(args: Array[String]) {

    if (args.length < 1) {

      System.err.println("Usage: SparkToHBaseNew <input file>")

      System.exit(1)

    }

    val conf = new SparkConf().setAppName("SparkToHBaseNew")

    val sc = new SparkContext(conf)

    val input = sc.textFile(args(0))

    val hConf = HBaseConfiguration.create()

    hConf.set(HConstants.ZOOKEEPER_QUORUM, "www.iteblog.com:2181")

    val jobConf = new JobConf(hConf, this.getClass)

    jobConf.set(TableOutputFormat.OUTPUT_TABLE, "iteblog")

    //設置job的輸出格式

    val job = Job.getInstance(jobConf)

    job.setOutputKeyClass(classOf[ImmutableBytesWritable])

    job.setOutputValueClass(classOf[Result])

    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    val data = input.map { item =>

      val Array(key, value) = item.split("\t")

      val rowKey = key.reverse

      val put = new Put(Bytes.toBytes(rowKey))

      put.add(Bytes.toBytes("f1"), Bytes.toBytes("info"), Bytes.toBytes(value))

      (new ImmutableBytesWritable, put)

    }

    //保存到HBase表

    data.saveAsNewAPIHadoopDataset(job.getConfiguration)

    sc.stop()

  }

}
    1. 用newAPIHadoopRDD
object OrderES {

  val logger = LoggerFactory.getLogger(OrderES.getClass)



  var sc: SparkContext = null



  System.getProperties().setProperty("HADOOP_USER_NAME", "bmps")

  System.getProperties().setProperty("HADOOP_GROUP_NAME", "bmps")



  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()

      .set("es.nodes", "10.37.154.82,10.37.154.83,10.37.154.84")

      .set("cluster.name", "elasticsearch")

      .set("es.port", "9200")

      .set("HADOOP_USER_NAME", "bmps")

      .set("HADOOP_GROUP_NAME", "bmps")



sc = new SparkContext(conf)

代碼



def readHbase(sc: SparkContext): Unit = {



    import org.apache.hadoop.hbase.{ HBaseConfiguration, CellUtil }

    import org.apache.hadoop.hbase.mapreduce.TableInputFormat



    System.getProperties().setProperty("HADOOP_USER_NAME", "bmps")

    System.getProperties().setProperty("HADOOP_GROUP_NAME", "bmps")



    val conf = HBaseConfiguration.create()

    conf.set(TableInputFormat.INPUT_TABLE, "ns_bmps:bmps_group_member")

    conf.set("hbase.zookeeper.quorum", "namenode1-sit.cnsuning.com,namenode2-sit.cnsuning.com,slave01-sit.cnsuning.com")

    //Hbase獲取主機端口號

    conf.set("hbase.zookeeper.property.clientPort", "2015")

    conf.set("zookeeper.znode.parent", "/hbase")

    conf.set("hbase.client.retries.number", "1")

    conf.set("ipc.socket.timeout", "1000")

    conf.set("hbase.rpc.timeout", "1000")

    conf.set("HADOOP_USER_NAME", "bmps")

    conf.set("HADOOP_GROUP_NAME", "bmps")



    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],

      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],

      classOf[org.apache.hadoop.hbase.client.Result])

    println("count:"+hBaseRDD.count())



//讀取數據並轉化成rdd 

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], 

      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], 

      classOf[org.apache.hadoop.hbase.client.Result]) 

 

    val count = hBaseRDD.count() 

    println(count) 

    hBaseRDD.foreach{case (_,result) =>{ 

      //獲取行鍵 

      val key = Bytes.toString(result.getRow) 

      //經過列族和列名獲取列 

      val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes)) 

      val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes)) 

      println("Row key:"+key+" Name:"+name+" Age:"+age) 

    }} 

  }

環境:

    1. 寫入saveAsNewAPIHadoopDataset
  1. object TestHBase3 {  
  2.    
  3.   def main(args: Array[String]): Unit = {  
  4.     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")  
  5.     val sc = new SparkContext(sparkConf)  
  6.       
  7.     val tablename = "account"  
  8.       
  9.     sc.hadoopConfiguration.set("hbase.zookeeper.quorum","slave1,slave2,slave3")  
  10.     sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")  
  11.     sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)  
  12.       
  13.     val job = new Job(sc.hadoopConfiguration)  
  14.     job.setOutputKeyClass(classOf[ImmutableBytesWritable])  
  15.     job.setOutputValueClass(classOf[Result])    
  16.     job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])    
  17.   
  18.     val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))  
  19.     val rdd = indataRDD.map(_.split(',')).map{arr=>{  
  20.       val put = new Put(Bytes.toBytes(arr(0)))  
  21.       put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))  
  22.       put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))  
  23.       (new ImmutableBytesWritable, put)   
  24.     }}  
  25.       
  26.     rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())  
  27.   }  
  28.   
  29. }  

 

    1. 用Hbase Connection
import org.apache.hadoop.hbase.security.User

  import org.apache.hadoop.security.UserGroupInformation

  import scala.util.Try

 

   private val user =User.create(UserGroupInformation.createRemoteUser("bmps"))



   private val HConnectionFunc = ()=>{

     import org.apache.hadoop.hbase.HBaseConfiguration

      import org.apache.hadoop.hbase.mapreduce.TableInputFormat

     

//      System.getProperties().setProperty("HADOOP_USER_NAME", "bmps")

//    System.getProperties().setProperty("HADOOP_GROUP_NAME", "bmps")



    val conf = HBaseConfiguration.create()

    conf.set(TableInputFormat.INPUT_TABLE, "ns_bmps:bmps_group_member")

    conf.set("hbase.zookeeper.quorum", "namenode1-sit.cnsuning.com,namenode2-sit.cnsuning.com,slave01-sit.cnsuning.com")

    //Hbase獲取主機端口號

    conf.set("hbase.zookeeper.property.clientPort", "2015")

    conf.set("zookeeper.znode.parent", "/hbase")

    Try(HConnectionManager.createConnection(conf,user)).get

  }

 

   private val H_C = HConnectionFunc()

  //銷燬

  sys.addShutdownHook({

    //關閉鏈接

    H_C.close()

  })

   

  def readHbase(sc: SparkContext): Unit = {

    import org.apache.hadoop.hbase.{HBaseConfiguration,CellUtil}

    import org.apache.hadoop.hbase.mapreduce.TableInputFormat



    System.getProperties().setProperty("HADOOP_USER_NAME", "bmps")

    System.getProperties().setProperty("HADOOP_GROUP_NAME", "bmps")



    val conf = HBaseConfiguration.create()

    conf.set(TableInputFormat.INPUT_TABLE, "ns_bmps:bmps_group_member")

    conf.set("hbase.zookeeper.quorum", "namenode1-sit.cnsuning.com,namenode2-sit.cnsuning.com,slave01-sit.cnsuning.com")

    //Hbase獲取主機端口號

    conf.set("hbase.zookeeper.property.clientPort", "2015")

    conf.set("zookeeper.znode.parent", "/hbase")



    try

    {

      val table = H_C.getTable("ns_bmps:bmps_group_member")

      val get = new Get(Bytes.toBytes("663409232".reverse))     

        get.addFamily(Bytes.toBytes("info"))     

//      for(c<-columns){

//        //添加查詢的列

//        get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(c));

//      }

      val result = table.get(get)

      var resultObject:Map[String,String]=Map[String,String]()

      for(cell <- result.rawCells()){

        println(Bytes.toString(CellUtil.cloneQualifier(cell))+":"+Bytes.toString(CellUtil.cloneValue(cell)))

        resultObject += (new String(CellUtil.cloneQualifier(cell))->new String(CellUtil.cloneValue(cell),"UTF-8"))

      }

     

    }catch{

      case e:Exception => println(e)

    }

   

  }
相關文章
相關標籤/搜索