嫌Hive導數據到Hbase太慢?試試buckloader吧

1

前言javascript


   若是咱們離線計算好的hive數據須要同步到hbase中,你們會用什麼方法呢?java

     若是是明細數據,上千萬乃至上億行的數據,導入到hbase中確定是須要考慮效率問題的mysql


     若是是直接使用hbase客戶端的API進行數據插入,效率是很是低的
web


     因此咱們選擇了bulkloader工具進行操做(原理:利用hbase以外的計算引擎將源數據加工成hbase的底層文件格式:Hfile,而後通知hbase導入便可)面試


測試數據redis

CREATE TABLE wedw_dw.t_user_order_info(                                   user_id string                                     ,user_name string  ,order_id  string  ,order_amt decimal(16,2))ROW FORMAT SERDE                                        'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'WITH SERDEPROPERTIES (                                  'field.delim'=',',                                    'serialization.format'=',')         ;+----------+------------+-----------+------------+--+| user_id  | user_name  | order_id  | order_amt  |+----------+------------+-----------+------------+--+| 1        | 小紅         | 001       | 100.32     || 2        | 小明         | 002       | 34.76      || 3        | 小花         | 003       | 39.88      || 4        | 小牛         | 004       | 22.22      || 5        | 小劉         | 005       | 98765.34   |+----------+------------+-----------+------------+--+# /data/hive/warehouse/wedw/dw/t_user_order_info/



2

 利用hbasse自帶程序導入sql

# hbase建表typescript

hbase(main):009:0* create 'user_order_info','user_info','order_info'


# 執行hbase自帶的importtsv程序(mapreduce程序),將原始文件轉成hfileapache

/usr/local/hadoop-current/bin/yarn jar \/usr/local/hbase-current/lib/hbase-server-1.2.0-cdh5.8.2.jar  \importtsv -Dimporttsv.columns=HBASE_ROW_KEY,user_info:user_name,order_info:order_id,order_info:order_amt \'-Dimporttsv.separator=,' \-Dmapreduce.job.queuename='root.test' \-Dimporttsv.bulk.output=hdfs://cluster/data/hive/output1 user_order_info \hdfs://cluster/data/hive/warehouse/wedw/dw/t_user_order_info


完整參數:ruby

  •   -Dimporttsv.bulk.output=/path/for/output   輸出目錄

  •   -Dimporttsv.skip.bad.lines=false   是否跳過髒數據行

  •   -Dimporttsv.separator=|'   指定分隔符

  •   -Dimporttsv.timestamp=currentTimeAsLong 是否指定時間戳

  •   -Dimporttsv.mapper.class=my.Mapper  替換默認的Mapper類


# 移動數據到hbase表中

hadoop jar hbase-server-1.2.0-cdh5.8.2.jar completebulkload  hdfs://cluster/data/hive/output1 user_order_info



3


編寫代碼導入

hbase建表

hbase(main):018:0> create 'user_info','info'
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <groupId>com.wedoctor.spark</groupId>    <artifactId>spark-0708</artifactId>    <version>1.0-SNAPSHOT</version>    <properties>        <maven.compiler.source>1.8</maven.compiler.source>        <maven.compiler.target>1.8</maven.compiler.target>        <scala.version>2.11.8</scala.version>        <spark.version>2.2.0</spark.version>        <hadoop.version>2.8.1</hadoop.version>        <encoding>UTF-8</encoding>    </properties>    <dependencies>                <dependency>            <groupId>org.scala-lang</groupId>            <artifactId>scala-library</artifactId>            <version>${scala.version}</version>        </dependency>                <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-core_2.11</artifactId>            <version>${spark.version}</version>        </dependency>        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-hive_2.11</artifactId>            <version>${spark.version}</version>        </dependency>        <dependency>            <groupId>log4j</groupId>            <artifactId>log4j</artifactId>            <version>1.2.17</version>        </dependency>                <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-sql_2.11</artifactId>            <version>${spark.version}</version>        </dependency>        <dependency>            <groupId>mysql</groupId>            <artifactId>mysql-connector-java</artifactId>            <version>5.1.41</version>        </dependency>                <dependency>            <groupId>com.typesafe</groupId>            <artifactId>config</artifactId>            <version>1.3.0</version>        </dependency>                <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-streaming_2.11</artifactId>            <version>${spark.version}</version>        </dependency>                <dependency>            <groupId>redis.clients</groupId>            <artifactId>jedis</artifactId>            <version>2.8.1</version>        </dependency>        <dependency>            <groupId>org.scalikejdbc</groupId>            <artifactId>scalikejdbc_2.11</artifactId>            <version>2.5.0</version>        </dependency>                <dependency>            <groupId>org.scalikejdbc</groupId>            <artifactId>scalikejdbc-config_2.11</artifactId>            <version>2.5.0</version>        </dependency>                <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>            <version>${spark.version}</version>        </dependency>                <dependency>            <groupId>org.apache.kafka</groupId>            <artifactId>kafka-clients</artifactId>            <version>0.10.2.1</version>        </dependency>        <dependency>            <groupId>org.apache.hbase</groupId>            <artifactId>hbase-server</artifactId>            <version>1.2.0-cdh5.8.2</version>        </dependency>        <dependency>            <groupId>org.apache.hbase</groupId>            <artifactId>hbase-client</artifactId>            <version>1.2.0-cdh5.8.2</version>            <exclusions>                <exclusion>                    <groupId>org.apache.httpcomponents</groupId>                    <artifactId>httpclient</artifactId>                </exclusion>                <exclusion>                    <groupId>org.apache.httpcomponents</groupId>                    <artifactId>httpcore</artifactId>                </exclusion>            </exclusions>        </dependency>    </dependencies>    <repositories>        <repository>            <id>cloudera</id>            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>        </repository>    </repositories>    <build>        <plugins>                      <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-compiler-plugin</artifactId>                <version>3.5.1</version>            </plugin>                        <plugin>                <groupId>net.alchim31.maven</groupId>                <artifactId>scala-maven-plugin</artifactId>                <version>3.2.2</version>                <executions>                    <execution>                        <goals>                            <goal>compile</goal>                            <goal>testCompile</goal>                        </goals>                        <configuration>                            <args>                                <arg>-dependencyfile</arg>                                <arg>${project.build.directory}/.scala_dependencies</arg>                            </args>                        </configuration>                    </execution>                </executions>            </plugin>        </plugins>    </build></project>


spark程序編寫

package com.hbase.bulkloaderimport org.apache.hadoop.fs.{Path}import org.apache.hadoop.hbase.client.ConnectionFactoryimport org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}import org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}import org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.mapreduce.Jobimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.{DataFrame, SparkSession}object BulkLoader {  //Logger.getLogger("org").setLevel(Level.ERROR)  def main(args: Array[String]): Unit = {    System.setProperty("HADOOP_USER_NAME", "pgxl")    val spark: SparkSession = SparkSession.builder()      .master("local[*]")      .config("hive.metastore.uris""thrift://10.11.3.44:9999")      .appName("bulkloaderTest")      .enableHiveSupport()      .getOrCreate()    val re: DataFrame = spark.sql("select * from wedw_dw.t_user_order_info")    val dataRdd: RDD[(String, (String, String, String))] = re.rdd.flatMap(row => {      val rowkey: String = row.getAs[String]("user_id").toString      Array(        (rowkey, ("info", "user_id", row.getAs[String]("user_id"))),        (rowkey, ("info", "user_name", row.getAs[String]("user_name"))),        (rowkey, ("info", "order_id", row.getAs[String]("order_id"))),        (rowkey, ("info", "order_amt", row.get(3).toString))      )    })    val output = dataRdd.filter(x=>x._1 != null).sortBy(x=>(x._1,x._2._1,x._2._2)).map {      x => {        val rowKey = Bytes.toBytes(x._1)        val immutableRowKey = new ImmutableBytesWritable(rowKey)        val colFam = x._2._1        val colName = x._2._2        val colValue = x._2._3        val kv = new KeyValue(          rowKey,          Bytes.toBytes(colFam),          Bytes.toBytes(colName),          Bytes.toBytes(colValue.toString)        )        (immutableRowKey, kv)      }    }    val conf = HBaseConfiguration.create()    conf.set("fs.defaultFS", "hdfs://cluster")    conf.set("hbase.zookeeper.quorum""10.11.3.43")    val job = Job.getInstance(conf)    val conn = ConnectionFactory.createConnection(conf)    val table = conn.getTable(TableName.valueOf("user_info"))    val locator = conn.getRegionLocator(TableName.valueOf("user_info"))    // 將咱們本身的數據保存爲HFile    HFileOutputFormat2.configureIncrementalLoad(job, table, locator)    output.saveAsNewAPIHadoopFile("/data/hive/test/", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration)    // 構造一個導入hfile的工具類    new LoadIncrementalHFiles(job.getConfiguration).doBulkLoad(new Path("/data/hive/test/"),conn.getAdmin,table,locator)    conn.close()    spark.close()  }}


hbase表結果:


2020大數據面試題真題總結(附答案)

微信交流羣

多值維度及交叉維度最佳解決方案

深刻探究order by,sort by,distribute by,cluster by

Hive調優,數據工程師成神之路

數據質量那點事

簡述元數據管理

你真的瞭解全量表,增量表及拉鍊表嗎?

緩慢變化維(SCD)常看法決方案

全方位解讀星型模型,雪花模型及星座模型

Sqoop or Datax

left join(on&where)

ID-Mapping

大家公司還在用SparkOnYan嗎?

大廠高頻面試題-連續登陸問題

朋友面試數據研發崗遇到的面試題

數據倉庫分層架構

clickhouse實踐篇-SQL語法

clickhouse實踐篇-表引擎

簡單聊一聊大數據學習之路

朋友面試數據專家崗遇到的面試題

HADOOP快速入門

數倉工程師的利器-HIVE詳解

OLAP引擎—Kylin介紹

Hbase從入門到入坑

Kafka

Datax-數據抽取同步利器

Spark數據傾斜解決方案

Spark統一內存管理機制



本文分享自微信公衆號 - 大數據私房菜(datagogogo)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索