前言javascript
若是咱們離線計算好的hive數據須要同步到hbase中,你們會用什麼方法呢?java
若是是明細數據,上千萬乃至上億行的數據,導入到hbase中確定是須要考慮效率問題的mysql
若是是直接使用hbase客戶端的API進行數據插入,效率是很是低的
web
因此咱們選擇了bulkloader工具進行操做(原理:利用hbase以外的計算引擎將源數據加工成hbase的底層文件格式:Hfile,而後通知hbase導入便可)面試
測試數據redis
CREATE TABLE wedw_dw.t_user_order_info( user_id string ,user_name string ,order_id string ,order_amt decimal(16,2))ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'WITH SERDEPROPERTIES ( 'field.delim'=',', 'serialization.format'=',') ;+----------+------------+-----------+------------+--+| user_id | user_name | order_id | order_amt |+----------+------------+-----------+------------+--+| 1 | 小紅 | 001 | 100.32 || 2 | 小明 | 002 | 34.76 || 3 | 小花 | 003 | 39.88 || 4 | 小牛 | 004 | 22.22 || 5 | 小劉 | 005 | 98765.34 |+----------+------------+-----------+------------+--+# /data/hive/warehouse/wedw/dw/t_user_order_info/
利用hbasse自帶程序導入sql
# hbase建表typescript
hbase(main):009:0* create 'user_order_info','user_info','order_info'
# 執行hbase自帶的importtsv程序(mapreduce程序),將原始文件轉成hfileapache
/usr/local/hadoop-current/bin/yarn jar \/usr/local/hbase-current/lib/hbase-server-1.2.0-cdh5.8.2.jar \importtsv -Dimporttsv.columns=HBASE_ROW_KEY,user_info:user_name,order_info:order_id,order_info:order_amt \'-Dimporttsv.separator=,' \-Dmapreduce.job.queuename='root.test' \-Dimporttsv.bulk.output=hdfs://cluster/data/hive/output1 user_order_info \hdfs://cluster/data/hive/warehouse/wedw/dw/t_user_order_info
完整參數:ruby
-Dimporttsv.bulk.output=/path/for/output 輸出目錄
-Dimporttsv.skip.bad.lines=false 是否跳過髒數據行
-Dimporttsv.separator=|' 指定分隔符
-Dimporttsv.timestamp=currentTimeAsLong 是否指定時間戳
-Dimporttsv.mapper.class=my.Mapper 替換默認的Mapper類
# 移動數據到hbase表中
hadoop jar hbase-server-1.2.0-cdh5.8.2.jar completebulkload hdfs://cluster/data/hive/output1 user_order_info
3
編寫代碼導入
hbase建表
'user_info','info' create
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.wedoctor.spark</groupId> <artifactId>spark-0708</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <scala.version>2.11.8</scala.version> <spark.version>2.2.0</spark.version> <hadoop.version>2.8.1</hadoop.version> <encoding>UTF-8</encoding> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.41</version> </dependency> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>org.scalikejdbc</groupId> <artifactId>scalikejdbc_2.11</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>org.scalikejdbc</groupId> <artifactId>scalikejdbc-config_2.11</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.0-cdh5.8.2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.0-cdh5.8.2</version> <exclusions> <exclusion> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> </exclusion> <exclusion> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpcore</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> </plugins> </build></project>
spark程序編寫
package com.hbase.bulkloaderimport org.apache.hadoop.fs.{Path}import org.apache.hadoop.hbase.client.ConnectionFactoryimport org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}import org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}import org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.mapreduce.Jobimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.{DataFrame, SparkSession}object BulkLoader { //Logger.getLogger("org").setLevel(Level.ERROR) def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "pgxl") val spark: SparkSession = SparkSession.builder() .master("local[*]") .config("hive.metastore.uris", "thrift://10.11.3.44:9999") .appName("bulkloaderTest") .enableHiveSupport() .getOrCreate() val re: DataFrame = spark.sql("select * from wedw_dw.t_user_order_info") val dataRdd: RDD[(String, (String, String, String))] = re.rdd.flatMap(row => { val rowkey: String = row.getAs[String]("user_id").toString Array( (rowkey, ("info", "user_id", row.getAs[String]("user_id"))), (rowkey, ("info", "user_name", row.getAs[String]("user_name"))), (rowkey, ("info", "order_id", row.getAs[String]("order_id"))), (rowkey, ("info", "order_amt", row.get(3).toString)) ) }) val output = dataRdd.filter(x=>x._1 != null).sortBy(x=>(x._1,x._2._1,x._2._2)).map { x => { val rowKey = Bytes.toBytes(x._1) val immutableRowKey = new ImmutableBytesWritable(rowKey) val colFam = x._2._1 val colName = x._2._2 val colValue = x._2._3 val kv = new KeyValue( rowKey, Bytes.toBytes(colFam), Bytes.toBytes(colName), Bytes.toBytes(colValue.toString) ) (immutableRowKey, kv) } } val conf = HBaseConfiguration.create() conf.set("fs.defaultFS", "hdfs://cluster") conf.set("hbase.zookeeper.quorum", "10.11.3.43") val job = Job.getInstance(conf) val conn = ConnectionFactory.createConnection(conf) val table = conn.getTable(TableName.valueOf("user_info")) val locator = conn.getRegionLocator(TableName.valueOf("user_info")) // 將咱們本身的數據保存爲HFile HFileOutputFormat2.configureIncrementalLoad(job, table, locator) output.saveAsNewAPIHadoopFile("/data/hive/test/", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration) // 構造一個導入hfile的工具類 new LoadIncrementalHFiles(job.getConfiguration).doBulkLoad(new Path("/data/hive/test/"),conn.getAdmin,table,locator) conn.close() spark.close() }}
hbase表結果:
深刻探究order by,sort by,distribute by,cluster by
本文分享自微信公衆號 - 大數據私房菜(datagogogo)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。