在一些大佬的博客已查不到HBase2.x最新的實踐代碼,從某書上粘貼來的代碼在新版本下執行不了,所以寫下本篇實踐,從HBase 1.4.2等老版本升級而來,想要使用Spark讀寫HBase2.0 API的可借鑑本文。ps:官網掛的示例也報錯!java
# hbase shell
> list //查看錶
> create 'spark_hbase_src', 'info' //建立一張數據源表
複製代碼
> create 'spark_hbase_res', 'info' //建立一張結果表,用來寫入計算結果
複製代碼
數據模型:模擬路上車輛的通過記錄,爲csv格式文本文件(txt)sql
車牌號 | 車牌顏色 | 設備編號 | 行駛方向 | 記錄時間 |
---|---|---|---|---|
豫A12345 | 藍色 | D12C01 | 南北 | 2019/10/16 12:00:00 |
豫B12121 | 黃色 | D13C06 | 南北 | 2019/10/10 12:11:00 |
豫C66666 | 藍色 | D15C08 | 西東 | 2019/10/29 12:09:00 |
豫D11111 | 藍色 | D18C07 | 北南 | 2019/10/18 12:15:00 |
本身模擬生成一些文本數據,上傳到hdfs,也能夠在本機。shell
<!-- Hbase server庫 提供Hbase讀寫API-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
複製代碼
以前只須要這個HBase jar就能夠了,實踐中有報錯:apache
導入這個包:api
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>${hbase.version}</version>
</dependency>
複製代碼
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/htrace/SamplerBuilder
Caused by: java.lang.ClassNotFoundException: org.apache.htrace.SamplerBuilder
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 17 more
複製代碼
導入這個包:bash
<!-- https://mvnrepository.com/artifact/org.apache.htrace/htrace-core -->
<dependency>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
<version>3.1.0-incubating</version>
</dependency>
複製代碼
其餘spark-core等依賴自行添加:app
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<!-- Spark核心庫 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--Spark sql庫 提供DF類API -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
複製代碼
ctrl+c自取:dom
import java.util.UUID
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSession
import scala.util.Try
object SparkWriteHBase {
val hbaseConfig = HBaseConfiguration.create()
hbaseConfig.set("hbase.zookeeper.quorum", "zk地址1,zk地址2,zk地址3")
hbaseConfig.set("hbase.zookeeper.property.clientPort", "2181")
//在IDE中設置此項爲true,避免出現"hbase-default.xml"版本不匹配的運行時異常
hbaseConfig.set("hbase.defaults.for.version.skip", "true")
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Spark-HBase")
.master("local[2]")
.getOrCreate()
//讀取的示例數據
val data = spark.read.csv("hdfs://your-hdfs-host:8020/traffic.txt")
.toDF("number", "color", "device", "direction", "photo_time")
println("數據條數是:" + data.count())
val SRC_FAMILYCOLUMN = "info"
data.foreachPartition(p => {
//獲取HBase鏈接
val hbaseConn = ConnectionFactory.createConnection(hbaseConfig)
val resultTable = TableName.valueOf("spark_hbase_src")
//獲取錶鏈接
val table = hbaseConn.getTable(resultTable)
p.foreach(r => {
val put = new Put(Bytes.toBytes(UUID.randomUUID().toString))
put.addColumn(Bytes.toBytes(SRC_FAMILYCOLUMN), Bytes.toBytes("number"), Bytes.toBytes(r.getString(0)))
put.addColumn(Bytes.toBytes(SRC_FAMILYCOLUMN), Bytes.toBytes("color"), Bytes.toBytes(r.getString(1)))
put.addColumn(Bytes.toBytes(SRC_FAMILYCOLUMN), Bytes.toBytes("device"), Bytes.toBytes(r.getString(2)))
put.addColumn(Bytes.toBytes(SRC_FAMILYCOLUMN), Bytes.toBytes("direction"), Bytes.toBytes(r.getString(3)))
put.addColumn(Bytes.toBytes(SRC_FAMILYCOLUMN), Bytes.toBytes("photo_time"), Bytes.toBytes(r.getString(4)))
Try(table.put(put)).getOrElse(table.close()) //將數據寫入HBase,若出錯關閉table
})
table.close()
hbaseConn.close()
})
}
}
複製代碼
寫入先後數據量對比:0 -> 1199: oop
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSession
object SparkReadHbase {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Spark-HBase")
.master("local")
.getOrCreate()
val hbaseConfig = HBaseConfiguration.create()
hbaseConfig.set("hbase.zookeeper.quorum", "zk地址1,zk地址2,zk地址3")
hbaseConfig.set("hbase.zookeeper.property.clientPort", "2181")
//在IDE中設置此項爲true,避免出現"hbase-default.xml"版本不匹配的運行時異常
hbaseConfig.set("hbase.defaults.for.version.skip", "true")
hbaseConfig.set(TableInputFormat.INPUT_TABLE, "spark_hbase_src")
val SRC_FAMILYCOLUMN = "info"
//從hbase中讀取RDD
val hbaseRDD = spark.sparkContext.newAPIHadoopRDD(hbaseConfig,
classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
import spark.implicits._
hbaseRDD.map({ case (_, result) =>
// val key = Bytes.toString(result.getRow)
val number = Bytes.toString(result.getValue(SRC_FAMILYCOLUMN.getBytes, "number".getBytes))
val color = Bytes.toString(result.getValue(SRC_FAMILYCOLUMN.getBytes, "color".getBytes))
val device = Bytes.toString(result.getValue(SRC_FAMILYCOLUMN.getBytes, "device".getBytes))
val direction = Bytes.toString(result.getValue(SRC_FAMILYCOLUMN.getBytes, "direction".getBytes))
val photo_time = Bytes.toString(result.getValue(SRC_FAMILYCOLUMN.getBytes, "photo_time".getBytes))
(number, color, device, direction, photo_time)
}).toDF("number", "color", "device", "direction", "photo_time").show(false)
}
}
複製代碼
show()的打印截圖~成功讀取到HBase中的數據: post
官網的 Example 36. HBaseContext Usage Example 以下:
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
...
val hbaseContext = new HBaseContext(sc, config)
複製代碼
2019-10-10編譯了一下這個源碼獲得這個jar,mvn官網也提供了一個1.0版本的 ↓↓↓
提供spark讀寫hbase的api,可做爲hbase-server庫以外的另外一種選擇↑↑↑
複製代碼
由如下兩種,主要區別是使用的配置文件對象不一樣
使用Hadoop JobConf配置,初始化JobConf用的TableOutputFormat類 是 org.apache.hadoop.hbase.mapred 包下的。
使用Hadoop Configuration配置,使用的 TableInputFormat 類是 org.apache.hadoop.hbase.mapreduce 包下的
這兩個API的使用方法相似,示例以下:
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.SparkSession
object SparkWriteHBaseByHadoopDataset {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("SparkWriteHBase2").master("local").getOrCreate()
val sc = spark.sparkContext
val tableName = "test_student"
val config = HBaseConfiguration.create()
config.set("hbase.zookeeper.quorum", "manager.bigdata,master.bigdata,worker.bigdata")
config.set("hbase.zookeeper.property.clientPort", "2181")
config.set("hbase.defaults.for.version.skip", "true")
val inputDataRDD = sc.parallelize(Array("1,Jack,M,26", "2,Rose,M,17")) //模擬構建兩行記錄的RDD
val rdd = inputDataRDD.map(_.split(',')).map { arr => {
val put = new Put(Bytes.toBytes(arr(0))) //行健的值
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(arr(1))) //info:name列的值
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("gender"), Bytes.toBytes(arr(2))) //info:gender列的值
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(arr(3).toInt)) //info:age列的值
(new ImmutableBytesWritable, put)
}
}
// 初始化JobConf,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的
val jobConf = new JobConf(config)
jobConf.setOutputFormat(classOf[org.apache.hadoop.hbase.mapred.TableOutputFormat])
rdd.saveAsHadoopDataset(jobConf)
//TableInputFormat 是 org.apache.hadoop.hbase.mapreduce 包下的
config.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(config)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
}
複製代碼
兩種API方法底層均調用SparkHadoopWriter
對象的write方法,無性能差別。