做者:Syn良子 出處:http://www.cnblogs.com/cssdongl 轉載請註明出處css
你們都知道用mapreduce或者spark寫入已知的hbase中的表時,直接在mapreduce或者spark的driver class中聲明以下代碼json
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tablename);
隨後mapreduce在mapper或者reducer中直接context寫入便可,而spark則是構造好包含Put的PairRDDFunctions後saveAsHadoopDataset便可.app
而常常會碰到一些要求是根據輸入數據,處理後須要寫入hbase多個表或者表名是未知的,須要按照數據中某個字段來構造表名寫入hbase.oop
因爲表名未知,因此不能設置TableOutputFormat.OUTPUT_TABLE,那麼這種要求也容易實現,分別總結mapreduce和spark的實現方法(其實到最後會發現異曲同工)測試
一.MapReduce寫入Hbase多表ui
在MR的main方法中加入以下代碼便可this
job.setOutputFormatClass(MultiTableOutputFormat.class);
隨後就能夠在mapper或者reducer的context中根據相關字段構造表名和put寫入多個hbase表.spa
二.Spark寫入Hbase多表code
這裏直接用我測試過的spark streaming程序寫入多個hbase表,上代碼orm
object SparkStreamingWriteToHbase { def main(args: Array[String]): Unit = { var masterUrl = "yarn-client" if (args.length > 0) { masterUrl = args(0) } val conf = new SparkConf().setAppName("Write to several tables of Hbase").setMaster(masterUrl) val ssc = new StreamingContext(conf, Seconds(5)) val topics = Set("app_events") val brokers = PropertiesUtil.getValue("BROKER_ADDRESS") val kafkaParams = Map[String, String]( "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder") val hbaseTableSuffix = "_clickcounts" val hConf = HBaseConfiguration.create() val zookeeper = PropertiesUtil.getValue("ZOOKEEPER_ADDRESS") hConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeper) val jobConf = new JobConf(hConf, this.getClass) val kafkaDStreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val appUserClicks = kafkaDStreams.flatMap(rdd => { val data = JSONObject.fromObject(rdd._2) Some(data) }).map{jsonLine => val key = jsonLine.getString("appId") + "_" + jsonLine.getString("uid") val value = jsonLine.getString("click_count") (key, value) } val result = appUserClicks.map { item => val rowKey = item._1 val value = item._2 convertToHbasePut(rowKey, value, hbaseTableSuffix) } result.foreachRDD { rdd => rdd.saveAsNewAPIHadoopFile("", classOf[ImmutableBytesWritable], classOf[Put], classOf[MultiTableOutputFormat], jobConf) } ssc.start() ssc.awaitTermination() } def convertToHbasePut(key: String, value: String, tableNameSuffix: String): (ImmutableBytesWritable, Put) = { val rowKey = key val tableName = rowKey.split("_")(0) + tableNameSuffix val put = new Put(Bytes.toBytes(rowKey)) put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("count"), Bytes.toBytes(value)) (new ImmutableBytesWritable(Bytes.toBytes(tableName)), put) } }
簡單描述下,這裏spark streaming中處理的是從kafka中讀取的json數據,其中的appId字段用來構造tablename區分寫入不一樣的hbase table.最後以saveAsNewAPIHadoopFile把rdd寫入hbase表
進入saveAsNewAPIHadoopFile會發現其實和mapreduce的配置沒什麼區別,以下
def saveAsNewAPIHadoopFile( path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration) { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf val job = new NewAPIHadoopJob(hadoopConf) job.setOutputKeyClass(keyClass) job.setOutputValueClass(valueClass) job.setOutputFormatClass(outputFormatClass) job.getConfiguration.set("mapred.output.dir", path) saveAsNewAPIHadoopDataset(job.getConfiguration) }
這個方法的參數分別是ouput path,這裏寫入hbase,傳入爲空便可,其餘參數outputKeyClass,outputValueClass,outputFormatClass,jobconf
這裏的outputFormatClass確保必定是MultiTableOutputFormat來保證寫入多表,對了,這裏說明一點,確保你要寫入的hbase表首先被create了。