import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName, HBaseConfiguration} import org.apache.hadoop.hbase.client._ import org.apache.spark.SparkContext import scala.collection.JavaConversions._ /** * HBase 1.0.0 新版API, CRUD 的基本操做代碼示例 **/ object HBaseNewAPI { def main(args: Array[String]) { val sc = new SparkContext("local", "SparkHBase") val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("hbase.zookeeper.quorum", "master") //Connection 的建立是個重量級的工做,線程安全,是操做hbase的入口 val conn = ConnectionFactory.createConnection(conf) //從Connection得到 Admin 對象(至關於之前的 HAdmin) val admin = conn.getAdmin //本例將操做的表名 val userTable = TableName.valueOf("user") //建立 user 表 val tableDescr = new HTableDescriptor(userTable) tableDescr.addFamily(new HColumnDescriptor("basic".getBytes)) println("Creating table `user`. ") if (admin.tableExists(userTable)) { admin.disableTable(userTable) admin.deleteTable(userTable) } admin.createTable(tableDescr) println("Done!") try{ //獲取 user 表 val table = conn.getTable(userTable) try{ //準備插入一條 key 爲 id001 的數據 val p = new Put("id001".getBytes) //爲put操做指定 column 和 value (之前的 put.add 方法被棄用了) p.addColumn("basic".getBytes,"name".getBytes, "wuchong".getBytes) //提交 table.put(p) //查詢某條數據 val g = new Get("id001".getBytes) val result = table.get(g) val value = Bytes.toString(result.getValue("basic".getBytes,"name".getBytes)) println("GET id001 :"+value) //掃描數據 val s = new Scan() s.addColumn("basic".getBytes,"name".getBytes) val scanner = table.getScanner(s) try{ for(r <- scanner){ println("Found row: "+r) println("Found value: "+Bytes.toString(r.getValue("basic".getBytes,"name".getBytes))) } }finally { //確保scanner關閉 scanner.close() } //刪除某條數據,操做方式與 Put 相似 val d = new Delete("id001".getBytes) d.addColumn("basic".getBytes,"name".getBytes) table.delete(d) }finally { if(table != null) table.close() } }finally { conn.close() } } }
import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp import org.apache.hadoop.hbase.filter.SingleColumnValueFilter import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.util.{Base64, Bytes} import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.mapred.JobConf import org.apache.spark.SparkContext import org.apache.hadoop.hbase.client._ /** * Spark 讀取和寫入 HBase **/ object SparkOnHBase { def convertScanToString(scan: Scan) = { val proto = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray) } def main(args: Array[String]) { val sc = new SparkContext("local","SparkOnHBase") val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("hbase.zookeeper.quorum", "master") // ======Save RDD to HBase======== // step 1: JobConf setup val jobConf = new JobConf(conf,this.getClass) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE,"user") // step 2: rdd mapping to table // 在 HBase 中表的 schema 通常是這樣的 // *row cf:col_1 cf:col_2 // 而在Spark中,咱們操做的是RDD元組,好比(1,"lilei",14) , (2,"hanmei",18) // 咱們須要將 *RDD[(uid:Int, name:String, age:Int)]* 轉換成 *RDD[(ImmutableBytesWritable, Put)]* // 咱們定義了 convert 函數作這個轉換工做 def convert(triple: (Int, String, Int)) = { val p = new Put(Bytes.toBytes(triple._1)) p.addColumn(Bytes.toBytes("basic"),Bytes.toBytes("name"),Bytes.toBytes(triple._2)) p.addColumn(Bytes.toBytes("basic"),Bytes.toBytes("age"),Bytes.toBytes(triple._3)) (new ImmutableBytesWritable, p) } // step 3: read RDD data from somewhere and convert val rawData = List((1,"lilei",14), (2,"hanmei",18), (3,"someone",38)) val localData = sc.parallelize(rawData).map(convert) //step 4: use `saveAsHadoopDataset` to save RDD to HBase localData.saveAsHadoopDataset(jobConf) // ================================= // ======Load RDD from HBase======== // use `newAPIHadoopRDD` to load RDD from HBase //直接從 HBase 中讀取數據並轉成 Spark 能直接操做的 RDD[K,V] //設置查詢的表名 conf.set(TableInputFormat.INPUT_TABLE, "user") //添加過濾條件,年齡大於 18 歲 val scan = new Scan() scan.setFilter(new SingleColumnValueFilter("basic".getBytes,"age".getBytes, CompareOp.GREATER_OR_EQUAL,Bytes.toBytes(18))) conf.set(TableInputFormat.SCAN,convertScanToString(scan)) val usersRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) val count = usersRDD.count() println("Users RDD Count:" + count) usersRDD.cache() //遍歷輸出 usersRDD.foreach{ case (_,result) => val key = Bytes.toInt(result.getRow) val name = Bytes.toString(result.getValue("basic".getBytes,"name".getBytes)) val age = Bytes.toInt(result.getValue("basic".getBytes,"age".getBytes)) println("Row key:"+key+" Name:"+name+" Age:"+age) } // ================================= } }
轉:https://gist.github.com/wuchong/95630f80966d07d7453b#file-hbasenewapi-scalagit
http://wuchong.me/blog/2015/04/04/spark-on-yarn-cluster-deploy/github