def main(args: Array[String]): Unit = { val hConf = HBaseConfiguration.create(); hConf.set("hbase.zookeeper.quorum","m6,m7,m8") val tableName = "t_person" hConf.set(TableInputFormat.INPUT_TABLE, tableName) val hAdmin = new HBaseAdmin(hConf) val conf = new SparkConf() conf.set("spark.master", "local") conf.set("spark.app.name", "spark demo") val sc = new SparkContext(conf); val rs = sc.newAPIHadoopRDD(hConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) rs.foreach(x => { println(Bytes.toString(x._2.getRow)) // 經過列族和列名獲取列 println(Bytes.toInt(x._2.getValue("base_info".getBytes, "age".getBytes))) }) }
def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.set("spark.master", "local") conf.set("spark.app.name", "spark demo") // 建立SparkSession對象 val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate(); // 建立sparkContext對象 val sc = spark.sparkContext val hbaseConf = HBaseConfiguration.create() val tableName = "t_person" hbaseConf.set("hbase.zookeeper.quorum","m6,m7,m8") hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName) // val hbaseAdmin = new HBaseAdmin(hbaseConf) val jobConf = new JobConf(hbaseConf, this.getClass) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val pairs = sc.parallelize(List(("p_0000010", "12"))) def convert(data : (String, String)) = { val p = new Put(Bytes.toBytes(data._1)) p.add(Bytes.toBytes("base_info"), Bytes.toBytes("age"), Bytes.toBytes(data._2)) (new ImmutableBytesWritable , p) } // 保存數據到hbase數據庫中 new PairRDDFunctions(pairs.map(convert)).saveAsHadoopDataset(jobConf) }