項目中有需求將Hive的表存儲在HBase中。經過Spark訪問Hive表,經過必定ETL過程生成HFile,並通知HBase進行bulk load。實測這是導數最快的手段。java
CDH : 5.7.0mysql
Hadoop : 2.6.0-cdh5.7.0sql
Spark : 1.6.0-cdh5.7.0apache
Hive : 1.1.0-cdh5.7.0windows
HBase : 1.2.0-cdh5.7.0api
Hadoop項目裏面,最坑的就是依賴關係複雜,而後常常會發現一些衝突包。。。下面是嘔心瀝血整理出無衝突的POM。eclipse
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>your.group.id</groupId> <artifactId>your.artifact.id</artifactId> <version>your.version</version> <packaging>jar</packaging> <properties> <slf4j.version>1.7.5</slf4j.version> <scala.version>2.10</scala.version> <mysql.version>5.1.38</mysql.version> <logback.version>1.0.13</logback.version> <!-- Hadoop eco base version --> <hadoop.version>2.6.0</hadoop.version> <hbase.version>1.2.0</hbase.version> <zookeeper.version>3.4.5</zookeeper.version> <spark.version>1.6.0</spark.version> <hive.version>1.1.0</hive.version> <!-- Hadoop eco cdh version --> <cdh.version>5.7.0</cdh.version> <hadoop-cdh.version>${hadoop.version}-cdh${cdh.version}</hadoop-cdh.version> <hbase-cdh.version>${hbase.version}-cdh${cdh.version}</hbase-cdh.version> <zookeeper-cdh.version>${zookeeper.version}-cdh${cdh.version}</zookeeper-cdh.version> <spark-cdh.version>${spark.version}-cdh${cdh.version}</spark-cdh.version> <hive-cdh.version>${hive.version}-cdh${cdh.version}</hive-cdh.version> <!-- Hadoop eco current version --> <hadoop.current.version>${hadoop-cdh.version}</hadoop.current.version> <hbase.current.version>${hbase-cdh.version}</hbase.current.version> <zookeeper.current.version>${zookeeper-cdh.version}</zookeeper.current.version> <spark.current.version>${spark-cdh.version}</spark.current.version> <hive.current.version>${hive-cdh.version}</hive.current.version> </properties> <dependencies> <!-- zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>${zookeeper.current.version}</version> <exclusions> <exclusion> <groupId>org.jboss.netty</groupId> <artifactId>netty</artifactId> </exclusion> </exclusions> <!--<exclusions>--> <!--<exclusion>--> <!--<groupId>log4j</groupId>--> <!--<artifactId>log4j</artifactId>--> <!--</exclusion>--> <!--<exclusion>--> <!--<groupId>org.slf4j</groupId>--> <!--<artifactId>slf4j-log4j12</artifactId>--> <!--</exclusion>--> <!--</exclusions>--> </dependency> <!-- Hadoop & HBase --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.current.version}</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.eclipse.jetty.orbit</groupId> <artifactId>javax.servlet</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.current.version}</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.eclipse.jetty.orbit</groupId> <artifactId>javax.servlet</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.current.version}</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.current.version}</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.eclipse.jetty.orbit</groupId> <artifactId>javax.servlet</artifactId> </exclusion> <exclusion> <artifactId>servlet-api-2.5</artifactId> <groupId>org.mortbay.jetty</groupId> </exclusion> </exclusions> </dependency> <!-- Spark --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.current.version}</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_${scala.version}</artifactId> <version>${spark.current.version}</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>apache-log4j-extras</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.current.version}</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.version}</artifactId> <version>${spark.current.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_${scala.version}</artifactId> <version>${spark.current.version}</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_${scala.version}</artifactId> <version>${spark.current.version}</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.version}</artifactId> <version>${spark.current.version}</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-graphx_${scala.version}</artifactId> <version>${spark.current.version}</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-yarn_${scala.version}</artifactId> <version>${spark.current.version}</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-assembly_${scala.version}</artifactId> <version>${spark.current.version}</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-launcher_${scala.version}</artifactId> <version>${spark.current.version}</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <!-- hive --> <!--<dependency>--> <!--<groupId>org.spark-project.hive</groupId>--> <!--<artifactId>hive-cli</artifactId>--> <!--<version>${hive.current.version}</version>--> <!--<exclusions>--> <!--<exclusion>--> <!--<groupId>org.jboss.netty</groupId>--> <!--<artifactId>netty</artifactId>--> <!--</exclusion>--> <!--</exclusions>--> <!--</dependency>--> <!-- log for java--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> <version>${slf4j.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>${logback.version}</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>${logback.version}</version> </dependency> <!-- jdbc --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>utf-8</encoding> <showDeprecation>true</showDeprecation> <showWarnings>true</showWarnings> </configuration> </plugin> </plugins> </build> <repositories> <repository> <id>aliyun-r</id> <name>aliyun maven repo remote</name> <url>http://maven.aliyun.com/nexus/content/repositories/central/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> </project>
這裏要求你的Spark環境支持訪問Hive。CDH已經幫你作好了maven
固然你不須要訪問Hive則無視ide
ImmutableBytesWritable實際上是行鍵的包裝類,KeyValue則是HBase裏的一個單元格(Cell)oop
因此應該是有多個RDD記錄共同構建成邏輯上HBase的一行
生成HFile要求Key有序。開始是覺得只要行鍵有序,即map以後,sortByKey就ok,後來HFileOutputFormat一直報後值比前值小(即未排序)。翻了不少鬼佬網站,才發現,這裏的行鍵有序,是要求rowKey+列族+列名總體有序!!!
也就是說,你要管好列族+列名的排序。。。
(這就是折磨我一整個通宵的天坑啊啊啊啊。。。。)
demo: 如今一個表有兩個列族 a 有列 a1 a2 , b 有列 b1 b2
flatMap的時候,行鍵能夠無論,輸出的時候必須保證:
rowKey a a1 值
rowKey a a2 值
rowKey b b1 值
rowKey b b2 值
若是
rowKey a a2 值
rowKey a a1 值
rowKey b b2 值
rowKey b b1 值
則GG思密達(巨坑!)
行鍵再經過rdd.sortByKey() 以實現總體有序
這點能夠經過
System.setProperty("user.name","hbase"); System.setProperty("HADOOP_USER_NAME", "hbase");
來實現,也能夠以hbase用戶身份進行spark-submit。我調試是在windows下直接idea debug,因此就醬。
這樣能夠避免各類顯式的在代碼裏進行配置
根據本身須要進行修改
public static void main(String args[]) throws Exception { String ip = "your.ip"; String hdfsOutputDir = "/tmp/bulkload/" + System.currentTimeMillis(); // saveAsNewAPIHadoopFile 要求HDFS目錄是不存在的,它本身建立 System.setProperty("user.name","hbase"); System.setProperty("HADOOP_USER_NAME", "hbase"); System.setProperty("SPARK_LOCAL_HOSTNAME", ip); Class<?> classess[] = { ImmutableBytesWritable.class, KeyValue.class, Put.class, ImmutableBytesWritable.Comparator.class }; SparkConf sparkConf = new SparkCont().setAppName("") .set("spark.sql.hive.metastore.jars", "builtin") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.referenceTracking", "false") .registerKryoClasses(classess); // 必需要用Kryo來序列化這些沒實現序列化接口的類,不然你懂的 JavaSparkContext ctx = new JavaSparkContext(sparkConf); HiveContext hctx = new HiveContext(ctx); hctx.sql("use YOUR_HIVE_DATABASE"); String tableName = "YOUR_HIVE_TABLE"; DataFrame table = hctx.table(tableName); JavaPairRDD<ImmutableBytesWritable, KeyValue> hbaseData = table.limit(1000).javaRDD().flatMapToPair(new PairFlatMapFunction<Row, ImmutableBytesWritable, KeyValue>() { private static final long serialVersionUID = 3750223032740257913L; @Override public Iterable<Tuple2<ImmutableBytesWritable, KeyValue>> call(Row row) throws Exception { List<Tuple2<ImmutableBytesWritable, KeyValue>> kvs = new ArrayList<>(); for(xxxx) { // 你的邏輯 // 在這裏控制 列族+列名的排序 kvs.add(new Tuple2<>(rk, new KeyValue(rkBytes, "a".getBytes(), "a1".getBytes(),value))); kvs.add(new Tuple2<>(rk, new KeyValue(rkBytes, "a".getBytes(), "a2".getBytes(),value))); kvs.add(new Tuple2<>(rk, new KeyValue(rkBytes, "b".getBytes(), "b1".getBytes(),value))); kvs.add(new Tuple2<>(rk, new KeyValue(rkBytes, "b".getBytes(), "b2".getBytes(),value))); } return kvs; } }).sortByKey(); // 這裏讓Spark去控制行鍵的排序 Configuration conf = HBaseConfiguration.create(); conf.set(TableOutputFormat.OUTPUT_TABLE,"your.hbase.table"); Job job = Job.getInstance(); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); Connection conn = ConnectionFactory.createConnection(conf); TableName hbTableName = TableName.valueOf("your.hbase.namespace".getBytes(), "your.hbase.table".getBytes()); HRegionLocator regionLocator = new HRegionLocator(hbTableName, (ClusterConnection) conn); Table realTable = conn.getTable(hbTableName); HFileOutputFormat2.configureIncrementalLoad(job,realTable,regionLocator); hbaseData.saveAsNewAPIHadoopFile(hdfsOutputDir,ImmutableBytesWritable.class,KeyValue.class,HFileOutputFormat2.class,job.getConfiguration()); // bulk load start LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); Admin admin = conn.getAdmin(); loader.doBulkLoad(new Path(hdfsOutputDir),admin,realTable,regionLocator); ctx.stop(); }