Spark 讀寫 HBase 的兩種方式(RDD、DataFrame)

使用 saveAsHadoopDataset 寫入數據

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
//import org.apache.hadoop.mapreduce.Job
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

/**
  * Created by blockchain on 18-9-9 下午3:45 in Beijing.
  */

object SparkHBaseRDD {
  def main(args: Array[String]) {
    // 屏蔽沒必要要的日誌顯示在終端上
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

    val spark = SparkSession.builder().appName("SparkHBaseRDD").getOrCreate()
    val sc = spark.sparkContext

    val tablename = "SparkHBase"

    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum","localhost")  //設置zooKeeper集羣地址,也能夠經過將hbase-site.xml導入classpath,可是建議在程序裏這樣設置
    hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")       //設置zookeeper鏈接端口,默認2181
    hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)

    // 初始化job,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的
    val jobConf = new JobConf(hbaseConf)
    jobConf.setOutputFormat(classOf[TableOutputFormat])

    val indataRDD = sc.makeRDD(Array("2,jack,16", "1,Lucy,15", "5,mike,17", "3,Lily,14"))

    val rdd = indataRDD.map(_.split(',')).map{ arr=>
      /*一個Put對象就是一行記錄,在構造方法中指定主鍵
       * 全部插入的數據 須用 org.apache.hadoop.hbase.util.Bytes.toBytes 轉換
       * Put.addColumn 方法接收三個參數:列族,列名,數據*/
      val put = new Put(Bytes.toBytes(arr(0)))
      put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
      put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("age"),Bytes.toBytes(arr(2)))
      (new ImmutableBytesWritable, put)
    }
    rdd.saveAsHadoopDataset(jobConf)

    spark.stop()
  }
}

HBase shell 中 查看寫入的數據html

hbase(main):005:0* scan 'SparkHBase'
ROW                        COLUMN+CELL                                                                
 1                         column=cf1:age, timestamp=1536494344379, value=15                          
 1                         column=cf1:name, timestamp=1536494344379, value=Lucy                       
 2                         column=cf1:age, timestamp=1536494344380, value=16                          
 2                         column=cf1:name, timestamp=1536494344380, value=jack                       
 3                         column=cf1:age, timestamp=1536494344379, value=14                          
 3                         column=cf1:name, timestamp=1536494344379, value=Lily                       
 5                         column=cf1:age, timestamp=1536494344380, value=17                          
 5                         column=cf1:name, timestamp=1536494344380, value=mike                       
4 row(s) in 0.0940 seconds

hbase(main):006:0>

如上所示,寫入成功。git

使用 newAPIHadoopRDD 讀取數據

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
//import org.apache.hadoop.mapreduce.Job
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

/**
  * Created by blockchain on 18-9-9 下午3:45 in Beijing.
  */

object SparkHBaseRDD {
  def main(args: Array[String]) {
    // 屏蔽沒必要要的日誌顯示在終端上
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

    val spark = SparkSession.builder().appName("SparkHBaseRDD").getOrCreate()
    val sc = spark.sparkContext

    val tablename = "SparkHBase"

    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum","localhost")  //設置zooKeeper集羣地址,也能夠經過將hbase-site.xml導入classpath,可是建議在程序裏這樣設置
    hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")       //設置zookeeper鏈接端口,默認2181
    hbaseConf.set(TableInputFormat.INPUT_TABLE, tablename)
    
    // 若是表不存在,則建立表
    val admin = new HBaseAdmin(hbaseConf)
    if (!admin.isTableAvailable(tablename)) {
      val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))
      admin.createTable(tableDesc)
    }

    //讀取數據並轉化成rdd TableInputFormat 是 org.apache.hadoop.hbase.mapreduce 包下的
    val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result])

    hBaseRDD.foreach{ case (_ ,result) =>
      //獲取行鍵
      val key = Bytes.toString(result.getRow)
      //經過列族和列名獲取列
      val name = Bytes.toString(result.getValue("cf1".getBytes,"name".getBytes))
      val age = Bytes.toString(result.getValue("cf1".getBytes,"age".getBytes))
      println("Row key:"+key+"\tcf1.Name:"+name+"\tcf1.Age:"+age)
    }
    admin.close()

    spark.stop()
  }
}

輸出以下github

Row key:1	cf1.Name:Lucy	cf1.Age:15
Row key:2	cf1.Name:jack	cf1.Age:16
Row key:3	cf1.Name:Lily	cf1.Age:14
Row key:5	cf1.Name:mike	cf1.Age:17

Spark DataFrame 經過 Phoenix 讀寫 HBase

友情提示JDBC方式 訪問 Phoenixsql

Apache Spark Pluginshell

部署Maven:https://blog.csdn.net/yitengtongweishi/article/details/81946562 須要添加的依賴以下:apache

<dependency>
   <groupId>org.apache.phoenix</groupId>
   <artifactId>phoenix-core</artifactId>
   <version>${phoenix.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.phoenix</groupId>
  <artifactId>phoenix-spark</artifactId>
  <version>${phoenix.version}</version>
</dependency>

下面老規矩,直接上代碼。api

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{SaveMode, SparkSession}

/**
  * Created by blockchain on 18-9-9 下午8:33 in Beijing.
  */

object SparkHBaseDataFrame {
  def main(args: Array[String]) {
    // 屏蔽沒必要要的日誌顯示在終端上
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

    val spark = SparkSession.builder().appName("SparkHBaseDataFrame").getOrCreate()

    val url = s"jdbc:phoenix:localhost:2181"
    val dbtable = "PHOENIXTEST"

    //spark 讀取 phoenix 返回 DataFrame 的 第一種方式
    val rdf = spark.read
      .format("jdbc")
      .option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
      .option("url", url)
      .option("dbtable", dbtable)
      .load()
    rdf.printSchema()

    //spark 讀取 phoenix 返回 DataFrame 的 第二種方式
    val df = spark.read
      .format("org.apache.phoenix.spark")
      .options(Map("table" -> dbtable, "zkUrl" -> url))
      .load()
    df.printSchema()

    //spark DataFrame 寫入 phoenix,須要先建好表
    df.write
      .format("org.apache.phoenix.spark")
      .mode(SaveMode.Overwrite)
      .options(Map("table" -> "PHOENIXTESTCOPY", "zkUrl" -> url))
      .save()

    spark.stop()
  }
}

Phoenix 中查看寫入的數據app

0: jdbc:phoenix:localhost:2181> SELECT * FROM PHOENIXTEST ;
+-----+----------+
| PK  |   COL1   |
+-----+----------+
| 1   | Hello    |
| 2   | World    |
| 3   | HBase    |
| 4   | Phoenix  |
+-----+----------+
4 rows selected (0.049 seconds)
0: jdbc:phoenix:localhost:2181> 
0: jdbc:phoenix:localhost:2181> SELECT * FROM PHOENIXTESTCOPY ;
+-----+----------+
| PK  |   COL1   |
+-----+----------+
| 1   | Hello    |
| 2   | World    |
| 3   | HBase    |
| 4   | Phoenix  |
+-----+----------+
4 rows selected (0.03 seconds)
0: jdbc:phoenix:localhost:2181>

如上所示,寫入成功。oop

原文連接:轉載請註明出處,謝謝!ui


本文參考連接:

Spark與HBase的整合

Spark DataFrame寫入HBase的經常使用方式

spark將數據寫入hbase以及從hbase讀取數據

Use Spark to read and write HBase data

Apache Spark - Apache HBase Connector

Apache Spark Comes to Apache HBase with HBase-Spark Module

Spark-on-HBase: DataFrame based HBase connector

Spark 下操做 HBase(1.0.0 新 API)

Spark整合HBase(自定義HBase DataSource)

spark經過Phoenix讀取hbase數據

相關文章
相關標籤/搜索