前言:以前給你們分享了Spark經過接口直接讀取HBase的一個小demo: HBase-Spark-Read-Demo,但若是在數據量很是大的狀況下,Spark直接掃描HBase表必然會對HBase集羣形成不小的壓力。基於此,今天再給你們分享一下Spark經過Snapshot直接讀取HBase HFile文件的方式。
首先咱們先建立一個HBase表:test,並插入幾條數據,以下:node
hbase(main):003:0> scan 'test' ROW COLUMN+CELL r1 column=f:name, timestamp=1583318512414, value=zpb r2 column=f:name, timestamp=1583318517079, value=lisi r3 column=f:name, timestamp=1583318520839, value=wang
接着,咱們建立該HBase表的快照,其在HDFS上路徑以下:apache
hbase(main):005:0> snapshot 'test', 'test-snapshot' 0 row(s) in 0.3690 seconds $ hdfs dfs -ls /apps/hbase/data/.hbase-snapshot Found 1 items drwxr-xr-x - hbase hdfs 0 2020-03-21 21:24 /apps/hbase/data/.hbase-snapshot/test-snapshot
代碼以下:json
import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase._ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableSnapshotInputFormat} import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.util.{Base64, Bytes} import org.apache.spark.{SparkConf, SparkContext} object SparkReadHBaseSnapshotDemo { // 主函數 def main(args: Array[String]) { // 設置spark訪問入口 val conf = new SparkConf().setAppName("SparkReadHBaseSnapshotDemo") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .setMaster("local")//調試 val sc = new SparkContext(conf) // 獲取HbaseRDD val job = Job.getInstance(getHbaseConf()) TableSnapshotInputFormat.setInput(job, "test-snapshot", new Path("/user/tmp")) val hbaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) hbaseRDD.map(_._2).map(getRes(_)).count() } def getRes(result: org.apache.hadoop.hbase.client.Result): String = { val rowkey = Bytes.toString(result.getRow()) val name = Bytes.toString(result.getValue("f".getBytes, "name".getBytes)) println(rowkey+"---"+name) name } // 構造 Hbase 配置信息 def getHbaseConf(): Configuration = { val conf: Configuration = HBaseConfiguration.create() conf.set(TableInputFormat.SCAN, getScanStr()) conf } // 獲取掃描器 def getScanStr(): String = { val scan = new Scan() // scan.set.... 各類過濾 val proto = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray()) } }
注:上述代碼需將core-site.xml&hdfs-site.xml&hbase-site.xml文件放在資源目錄resources下。不然,應在代碼中進行配置,代碼以下:微信
package com.xcar.etl import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase._ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.hbase.client.Scan import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableSnapshotInputFormat} import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.util.{Base64, Bytes} import org.apache.spark.{SparkConf, SparkContext} object SparkReadHBaseSnapshotDemo2 { val HBASE_ZOOKEEPER_QUORUM = "xxxx.com.cn" // 主函數 def main(args: Array[String]) { // 設置spark訪問入口 val conf = new SparkConf().setAppName("SparkReadHBaseSnapshotDemo2") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .setMaster("local")//調試 val sc = new SparkContext(conf) // 獲取HbaseRDD val job = Job.getInstance(getHbaseConf()) TableSnapshotInputFormat.setInput(job, "test-snapshot", new Path("/user/tmp")) val hbaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) hbaseRDD.map(_._2).map(getRes(_)).count() } def getRes(result: org.apache.hadoop.hbase.client.Result): String = { val rowkey = Bytes.toString(result.getRow()) val name = Bytes.toString(result.getValue("f".getBytes, "name".getBytes)) println(rowkey+"---"+name) name } // 構造 Hbase 配置信息 def getHbaseConf(): Configuration = { val conf: Configuration = HBaseConfiguration.create() conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("zookeeper.znode.parent", "/hbase") conf.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM) conf.set("hbase.rootdir", "/apps/hbase") // 設置查詢的表名 conf.set(TableInputFormat.INPUT_TABLE, "test") conf.set("fs.defaultFS","hdfs://xxxxxx:8020") conf.set(TableInputFormat.SCAN, getScanStr()) conf } // 獲取掃描器 def getScanStr(): String = { val scan = new Scan() // scan.set.... 各類過濾 val proto = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray()) } }
TableSnapshotInputFormat.setInput 方法參數解析:app
public static void setInput(org.apache.hadoop.mapreduce.Job job, String snapshotName, org.apache.hadoop.fs.Path restoreDir) throws IOException 參數解析: job - the job to configure snapshotName - the name of the snapshot to read from restoreDir - a temporary directory to restore the snapshot into. Current user should have write permissions to this directory, and this should not be a subdirectory of rootdir. After the job is finished, restoreDir can be deleted.
項目用到的 pom.xml 文件:maven
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.zpb.test</groupId> <artifactId>spark-read-hbase-snapshot-demo</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>spark-read-hbase-snapshot-demo</name> <url>http://maven.apache.org</url> <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <properties> <cdh.hbase.version>1.2.0-cdh5.7.0</cdh.hbase.version> <cdh.spark.version>1.6.0-cdh5.7.0</cdh.spark.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.62</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${cdh.spark.version}</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${cdh.hbase.version}</version> </dependency> </dependencies> </project>
轉載請註明出處!歡迎關注本人微信公衆號【HBase工做筆記】