環境:jdk1.8,hadoop2.6.5node
安裝hbase 1.2.6版本。sql
首先配置環境變量:shell
export HBASE_HOME=/home/hbaseapache
設置hbase-site.xmlsocket
<configuration> <property> <name>hbase.rootdir</name> <value>hdfs://127.0.0.1:8020/hbase</value> </property> <property> <name>hbase.cluster.distributed</name> <value>false</value> </property> <property> <name>hbase.zookeeper.property.dataDir</name> <value>/opt/hadoop/zookeeper</value> </property> </configuration>
設置hbase-env.shelasticsearch
export HBASE_MANAGES_ZK=truemaven
啓動:bin/start-hbase.shoop
關閉:bin/stop-hbase.sh測試
交互式shell: hbase shellthis
建立表:
create ‘table1’,’family’
添加數據:
put ‘table1’,’key1’,’family:field1’,’value1’
刪除表:
disable ‘table1’
drop ‘table1’
查詢數據:
scan ‘table1’
get ‘table1’,’key1’
刪除整行:
deleteall ‘table1’,’key1’
刪除字段:
delete ‘table1’,’key1’,’family1:field1’
刪除族:
alter'table',{NAME=>'info',METHOD=>'delete'}
添加族:
alter'table',{NAME=>'info'}
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.98.2-hadoop2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>0.98.2-hadoop2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>0.98.2-hadoop2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>0.98.2-hadoop2</version>
</dependency>
import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.mapreduce.TableInputFormat object HBaseTest { def main(args: Array[String]) { val sc = new SparkContext(args(0), "HBaseTest", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, args(1)) val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) hBaseRDD.count() // 將數據映射爲表 也就是將 RDD轉化爲 dataframe schema val shop = hBaseRDD.map(r=>( Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("customer_id"))), Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("create_id"))) )).toDF("customer_id","create_id") shop.registerTempTable("shop") // 測試 val df2 = sqlContext.sql("SELECT customer_id FROM shop") df2.foreach(println) System.exit(0) } }
第二種方法:
object HbaseFactory { private val user =User.create(UserGroupInformation.createRemoteUser(ScmConfUtil.getInstance().getString("hadoopUser","sospdm"))) private val HConnectionFunc = ()=>{ val configuration = HBaseConfiguration.create() //初始化信息 // 設置配置參數 //Hbase獲取ZK的IP地址 configuration.set("hbase.zookeeper.quorum", ScmConfUtil.getInstance().getString("zookeeper1.remote.quorum","")); //Hbase獲取主機端口號 configuration.set("hbase.zookeeper.property.clientPort", ScmConfUtil.getInstance().getString("zookeeper1.remote.clientPort","")); configuration.set("hbase.client.retries.number", ScmConfUtil.getInstance().getString("hbase.client.retries.number","")); configuration.set("ipc.socket.timeout", ScmConfUtil.getInstance().getString("ipc.socket.timeout","")); configuration.set("hbase.rpc.timeout", ScmConfUtil.getInstance().getString("hbase.rpc.timeout","")); configuration.set("hbase.client.operation.timeout", ScmConfUtil.getInstance().getString("hbase.client.operation.timeout","")); configuration.set("hbase.client.scanner.timeout.period", ScmConfUtil.getInstance().getString("hbase.client.scanner.timeout.period","")); configuration.set("hbase.client.write.buffer", ScmConfUtil.getInstance().getString("hbase.client.write.buffer","")); configuration.set("hadoop.job.ugi", "sospdm,sospdm"); Try(HConnectionManager.createConnection(configuration,user)).get } private val H_C = HConnectionFunc() //銷燬 sys.addShutdownHook({ //關閉鏈接 H_C.close() }) def queryForObject(tableName:String,rowKey:String,familyName:String,columns:String*):Map[String,String]={ var resultObject:Map[String,String]=Map[String,String]() try { val table = H_C.getTable(tableName) val get = new Get(Bytes.toBytes(rowKey)) if (null != familyName && !"".eq(familyName)){ get.addFamily(Bytes.toBytes(familyName)) } for(c<-columns){ //添加查詢的列 get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(c)); } val result = table.get(get) MEPUtils.out(result) for(cell <- result.rawCells()){ resultObject += (new String(CellUtil.cloneQualifier(cell))->new String(CellUtil.cloneValue(cell),"UTF-8")) } }catch{ case e:Exception => MEPUtils.out(e) } resultObject } }
寫Hbase
package com.iteblog.bigdata.hbase import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration} import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.{SparkConf, SparkContext} object SparkToHBase { def main(args: Array[String]) { if (args.length < 1) { System.err.println("Usage: SparkToHBase <input file>") System.exit(1) } val conf = new SparkConf().setAppName("SparkToHBase") val sc = new SparkContext(conf) val input = sc.textFile(args(0)) //建立HBase配置 val hConf = HBaseConfiguration.create() hConf.set(HConstants.ZOOKEEPER_QUORUM, "www.iteblog.com:2181") //建立JobConf,設置輸出格式和表名 val jobConf = new JobConf(hConf, this.getClass) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE, "iteblog") val data = input.map { item => val Array(key, value) = item.split("\t") val rowKey = key.reverse val put = new Put(Bytes.toBytes(rowKey)) put.add(Bytes.toBytes("f1"), Bytes.toBytes("info"), Bytes.toBytes(value)) (new ImmutableBytesWritable, put) } //保存到HBase表 data.saveAsHadoopDataset(jobConf) sc.stop() } }
第二種方式:
package com.iteblog.bigdata.hbase import org.apache.hadoop.hbase.client.{Put, Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration} import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkContext, SparkConf} object SparkToHBaseNew { def main(args: Array[String]) { if (args.length < 1) { System.err.println("Usage: SparkToHBaseNew <input file>") System.exit(1) } val conf = new SparkConf().setAppName("SparkToHBaseNew") val sc = new SparkContext(conf) val input = sc.textFile(args(0)) val hConf = HBaseConfiguration.create() hConf.set(HConstants.ZOOKEEPER_QUORUM, "www.iteblog.com:2181") val jobConf = new JobConf(hConf, this.getClass) jobConf.set(TableOutputFormat.OUTPUT_TABLE, "iteblog") //設置job的輸出格式 val job = Job.getInstance(jobConf) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) val data = input.map { item => val Array(key, value) = item.split("\t") val rowKey = key.reverse val put = new Put(Bytes.toBytes(rowKey)) put.add(Bytes.toBytes("f1"), Bytes.toBytes("info"), Bytes.toBytes(value)) (new ImmutableBytesWritable, put) } //保存到HBase表 data.saveAsNewAPIHadoopDataset(job.getConfiguration) sc.stop() } }
object OrderES { val logger = LoggerFactory.getLogger(OrderES.getClass) var sc: SparkContext = null System.getProperties().setProperty("HADOOP_USER_NAME", "bmps") System.getProperties().setProperty("HADOOP_GROUP_NAME", "bmps") def main(args: Array[String]): Unit = { val conf = new SparkConf() .set("es.nodes", "10.37.154.82,10.37.154.83,10.37.154.84") .set("cluster.name", "elasticsearch") .set("es.port", "9200") .set("HADOOP_USER_NAME", "bmps") .set("HADOOP_GROUP_NAME", "bmps") sc = new SparkContext(conf) 代碼 def readHbase(sc: SparkContext): Unit = { import org.apache.hadoop.hbase.{ HBaseConfiguration, CellUtil } import org.apache.hadoop.hbase.mapreduce.TableInputFormat System.getProperties().setProperty("HADOOP_USER_NAME", "bmps") System.getProperties().setProperty("HADOOP_GROUP_NAME", "bmps") val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, "ns_bmps:bmps_group_member") conf.set("hbase.zookeeper.quorum", "namenode1-sit.cnsuning.com,namenode2-sit.cnsuning.com,slave01-sit.cnsuning.com") //Hbase獲取主機端口號 conf.set("hbase.zookeeper.property.clientPort", "2015") conf.set("zookeeper.znode.parent", "/hbase") conf.set("hbase.client.retries.number", "1") conf.set("ipc.socket.timeout", "1000") conf.set("hbase.rpc.timeout", "1000") conf.set("HADOOP_USER_NAME", "bmps") conf.set("HADOOP_GROUP_NAME", "bmps") val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) println("count:"+hBaseRDD.count()) //讀取數據並轉化成rdd val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) val count = hBaseRDD.count() println(count) hBaseRDD.foreach{case (_,result) =>{ //獲取行鍵 val key = Bytes.toString(result.getRow) //經過列族和列名獲取列 val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes)) val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes)) println("Row key:"+key+" Name:"+name+" Age:"+age) }} }
環境:
import org.apache.hadoop.hbase.security.User import org.apache.hadoop.security.UserGroupInformation import scala.util.Try private val user =User.create(UserGroupInformation.createRemoteUser("bmps")) private val HConnectionFunc = ()=>{ import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapreduce.TableInputFormat // System.getProperties().setProperty("HADOOP_USER_NAME", "bmps") // System.getProperties().setProperty("HADOOP_GROUP_NAME", "bmps") val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, "ns_bmps:bmps_group_member") conf.set("hbase.zookeeper.quorum", "namenode1-sit.cnsuning.com,namenode2-sit.cnsuning.com,slave01-sit.cnsuning.com") //Hbase獲取主機端口號 conf.set("hbase.zookeeper.property.clientPort", "2015") conf.set("zookeeper.znode.parent", "/hbase") Try(HConnectionManager.createConnection(conf,user)).get } private val H_C = HConnectionFunc() //銷燬 sys.addShutdownHook({ //關閉鏈接 H_C.close() }) def readHbase(sc: SparkContext): Unit = { import org.apache.hadoop.hbase.{HBaseConfiguration,CellUtil} import org.apache.hadoop.hbase.mapreduce.TableInputFormat System.getProperties().setProperty("HADOOP_USER_NAME", "bmps") System.getProperties().setProperty("HADOOP_GROUP_NAME", "bmps") val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, "ns_bmps:bmps_group_member") conf.set("hbase.zookeeper.quorum", "namenode1-sit.cnsuning.com,namenode2-sit.cnsuning.com,slave01-sit.cnsuning.com") //Hbase獲取主機端口號 conf.set("hbase.zookeeper.property.clientPort", "2015") conf.set("zookeeper.znode.parent", "/hbase") try { val table = H_C.getTable("ns_bmps:bmps_group_member") val get = new Get(Bytes.toBytes("663409232".reverse)) get.addFamily(Bytes.toBytes("info")) // for(c<-columns){ // //添加查詢的列 // get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(c)); // } val result = table.get(get) var resultObject:Map[String,String]=Map[String,String]() for(cell <- result.rawCells()){ println(Bytes.toString(CellUtil.cloneQualifier(cell))+":"+Bytes.toString(CellUtil.cloneValue(cell))) resultObject += (new String(CellUtil.cloneQualifier(cell))->new String(CellUtil.cloneValue(cell),"UTF-8")) } }catch{ case e:Exception => println(e) } }