前言:java
做爲Hadoop生態系統中重要的一員, HBase做爲分佈式列式存儲, 在線實時處理的特性, 備受矚目, 未來能在不少應用場景, 取代傳統關係型數據庫的江湖地位. 本篇博文重點講解HBase的數據導入, 描述三種方式, Client API, Bulkload, 以及Hive Over HBase. node
*). Client API實現
藉助HBase的Client API來導入, 是最簡易學的方式.shell
Configuration config = HBaseConfiguration.create(); // 配置hbase.zookeeper.quorum: 後接zookeeper集羣的機器列表 config.set("hbase.zookeeper.quorum", "tw-node109,tw-node110,tw-node111"); // 配置hbase.zookeeper.property.clientPort: zookeeper集羣的服務端口 config.set("hbase.zookeeper.property.clientPort", "2181"); HTable htable = null; try { // 配置hbase的具體表名 htable = new HTable(config, "hbase_table"); // 設置rowkey的值 Put put = new Put(Bytes.toBytes("rowkey:1001")); // 設置family:qualifier:value put.add(Bytes.toBytes("family"), Bytes.toBytes("qualifier"), Bytes.toBytes("value")); // 使用put類, 寫入hbase對應的表中 htable.put(put); } catch (Exception e) { e.printStackTrace(); } finally { if (htable != null) { try { htable.close(); } catch (IOException e) { e.printStackTrace(); } } }
評: HBase的client api編程, 相對仍是簡單的. 惟一須要注意的是, 若在本地編寫測試用列, 須要在本地配置hbase集羣相關的域名, 使得域名和ip地址能對應上, 切記.
至於hbase client的讀寫優化, 咱們放到下面的博文進行講解.數據庫
*). 批量導入Bulkload
HBase的bulkload數據導入, 分兩個階段:
#). 階段一: 藉助使用HFileOutputFormat的MapReduce, 直接生成HBase的內部數據存儲格式HFile.
其原理: HFileOutputFormat藉助configureIncrementalLoad函數, 基於當前table的各個region邊界自動匹配MapReduce的分區類TotalOrderPartitioner, 這樣生成的HFile都對應一個具體的region, 此時效率最高效.
#). 階段二: 藉助completebulkload工具, 將生成的HFile文件熱載入hbase集羣.apache
1. importtsv數據導入演示
hbase自帶了importtsv工具, 其對tsv格式的數據文件提供了默認的支持.
數據文件data.tsv(以'\t'分割數據文件)編程
1001 lilei 17 13800001111 1002 lily 16 13800001112 1003 lucy 16 13800001113 1004 meimei 16 13800001114
上傳至hdfs目錄 /test/hbase/tsv/input
sudo -u hdfs hdfs dfs -mkdir -p /test/hbase/tsv/input
sudo -u hdfs hdfs dfs -put data.tsv /test/hbase/tsv/input/json
嘗試構建的HBase表student
hbase shell
hbase> create 'student', {NAME => 'info'}api
執行importtsv
sudo -u hdfs hadoop jar /usr/lib/hbase/hbase-<version>.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age,info:phone -Dimporttsv.bulk.output=/test/hbase/tsv/output/ student /test/hbase/tsv/input app
沒有指定-Dimporttsv.bulk.output, importtsv默認行爲是纔有client api的put來導入數據於hbase, 指定-Dimporttsv.bulk.output, 則須要下一步
sudo -u hdfs hadoop jar /usr/lib/hbase/hbase-<version>.jar completebulkload /test/hbase/tsv/output/ studentmaven
數據驗證:
scan 'student', {LIMIT => 10}
2. 自定義bulkload數據導入演示
數據文件準備, 以以前data.tsv文件爲準
構建HBase表student_new
hbase> create 'student_new', {NAME => 'info'}
編寫MapReduce代碼, 以下所示:
public class MyBulkload { public static class MyBulkMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> { @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 數據按\t切分組織, 也能夠自定義的方式來解析, 好比複雜的json/xml文本行 String line = value.toString(); String[] terms = line.split("\t"); if ( terms.length == 4 ) { byte[] rowkey = terms[0].getBytes(); ImmutableBytesWritable imrowkey = new ImmutableBytesWritable(rowkey); // 寫入context中, rowkey => keyvalue, 列族:列名 info:name, info:age, info:phone context.write(imrowkey, new KeyValue(rowkey, Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(terms[1]))); context.write(imrowkey, new KeyValue(rowkey, Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(terms[2]))); context.write(imrowkey, new KeyValue(rowkey, Bytes.toBytes("info"), Bytes.toBytes("phone"), Bytes.toBytes(terms[3]))); } } } public static void main(String[] args) throws Exception { if ( args.length != 3 ) { System.err.println("Usage: MyBulkload <table_name> <data_input_path> <hfile_output_path>"); System.exit(2); } String tableName = args[0]; String inputPath = args[1]; String outputPath= args[2]; // 建立的HTable實例用於, 用於獲取導入表的元信息, 包括region的key範圍劃分 Configuration conf = HBaseConfiguration.create(); HTable table = new HTable(conf, tableName); Job job = Job.getInstance(conf, "MyBulkload"); job.setMapperClass(MyBulkMapper.class); job.setJarByClass(MyBulkload.class); job.setInputFormatClass(TextInputFormat.class); // 最重要的配置代碼, 須要重點分析 HFileOutputFormat.configureIncrementalLoad(job, table); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
注: 藉助maven的assembly插件, 生成胖jar包(就是把依賴的zookeeper和hbase jar包都打到該MapReduce包中), 不然的話, 就須要用戶靜態配置, 在Hadoop的class中添加zookeeper和hbase的配置文件和相關jar包.
最終的jar包爲 mybulk.jar, 主類名爲com.m8zmyp.mmxf.MyBulkload, 生成HFile, 增量熱載入hbase
sudo -u hdfs hadoop jar <xxoo>.jar <MainClass> <table_name> <data_input_path> <hfile_output_path>
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <hfile_output_path> <table_name>
sudo -u hdfs hadoop jar mybulk.jar com.m8zmyp.mmxf.MyBulkload student_new /test/hbase/tsv/input /test/hbase/tsv/new_output
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /test/hbase/tsv/new_output student_new
數據驗證:
scan 'student_new', {LIMIT => 10}
*). 藉助Hive Over Hbase
構建Hbase表hbase_student
hbase> create 'hbase_student', 'info'
構建hive外表hive_student, 並對應hbase_student表
CREATE EXTERNAL TABLE hive_student (rowkey string, name string, age int, phone string)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:name,info:age,info:phone")
TBLPROPERTIES("hbase.table.name" = "hbase_student");
數據導入驗證:
1. 建立數據外表
CREATE EXTERNAL TABLE data_student (rowkey string, name string, age int, phone string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/test/hbase/tsv/input/';
2. 數據經過hive_student導入到hbase_student表中SET hive.hbase.bulk=true;INSERT OVERWRITE TABLE hive_student SELECT rowkey, name, age, phone FROM data_student;備註: 若遇到java.lang.IllegalArgumentException: Property value must not be null異常, 須要hive-0.13.0及以上版本支持詳見: https://issues.apache.org/jira/browse/HIVE-5515