spark讀取hbase造成RDD,存入hive或者spark_sql分析

object SaprkReadHbase { var total:Int = 0 def main(args: Array[String]) { val spark = SparkSession .builder() .master("local[2]") .appName("Spark Read Hbase ") .enableHiveSupport() //若是要讀取hive的表,就必須使用這個 .getOrCreate() val sc= spark.sparkContext //zookeeper信息設置,存儲着hbase的元信息 val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03") conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set(TableInputFormat.INPUT_TABLE, "event_logs_20190218") //讀取數據並轉化成rdd val hBaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], //定義輸入格式 classOf[org.apache.hadoop.hbase.client.Result]) //定義輸出 val count = hBaseRDD.count() println("\n\n\n:" + count) import spark.implicits._ val logRDD: RDD[EventLog] = hBaseRDD.map{case (_,result) =>{ //獲取行鍵v val rowKey = Bytes.toString(result.getRow) val api_v=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("api_v"))) val app_id=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("app_id"))) val c_time=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("c_time"))) val ch_id=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("ch_id"))) val city=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("city"))) val province=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("province"))) val country=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("country"))) val en=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("en"))) val ip=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("ip"))) val net_t=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("net_t"))) val pl=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("pl"))) val s_time=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("s_time"))) val user_id=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("user_id"))) val uuid=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("uuid"))) val ver=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("ver")))
//樣例類進行schemal信息構建。元組與樣例類的字段值聽說不能超過22個,通常structureType構建(row,schemal) new EventLog(rowKey,api_v,app_id,c_time,ch_id,city,province,country,en,ip,net_t,pl,s_time,user_id,uuid,ver) } }
//能夠轉爲dataframe、dataset存入hive做爲寬表 或者直接進行sparkcore分析 val logds= logRDD.toDS() logds.createTempView("event_logs") val sq= spark.sql("select * from event_logs limit 1") println(sq.explain()) sq.show() sc.stop() spark.stop() } }


//write hbase
/**  * @created by imp ON 2018/2/19  */class SparkWriteHbase {  def main(args: Array[String]): Unit = {    val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local")    val sc = new SparkContext(sparkConf)    sc.setLogLevel("ERROR")    val conf = HBaseConfiguration.create()    conf.set("hbase.zookeeper.quorum", "hadoop01,hadoop02,hadoop03")    conf.set("hbase.zookeeper.property.clientPort", "2181")    conf.set(TableOutputFormat.OUTPUT_TABLE, "test")    val job = new Job(conf)    job.setOutputKeyClass(classOf[ImmutableBytesWritable])    job.setOutputValueClass(classOf[Result])    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])    var arrResult: Array[String] = new Array[String](1)    arrResult(0) = "1, 3000000000";    //arrResult(0) = "1,100,11"    val resultRDD = sc.makeRDD(arrResult)    val saveRDD = resultRDD.map(_.split(',')).map { arr => {      val put = new Put(Bytes.toBytes(arr(0)))      put.add(Bytes.toBytes("info"), Bytes.toBytes("total"), Bytes.toBytes(arr(1)))      (new ImmutableBytesWritable, put)    }    }    println("getConfiguration")    var c = job.getConfiguration()    println("save")    saveRDD.saveAsNewAPIHadoopDataset(c)    sc.stop()    //  spark.stop()  }}
相關文章
相關標籤/搜索