導入數據最快的方式,能夠略過WAL直接生產底層HFile文件java
(環境:centos6.五、Hadoop2.6.0、HBase0.98.9)node
1.SHELL方式shell
1.1 ImportTsv直接導入apache
命令:bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsvcentos
Usage: importtsv -Dimporttsv.columns=a,b,c <tablename> <inputdir>app
測試:oop
1.1.1在HBase中建立好表測試
create ‘testImport1’,’cf’spa
1.1.2準備數據文件sample1.csv,並上傳到HDFS,內容爲:orm
1,"tom"
2,"sam"
3,"jerry"
4,"marry"
5,"john
1.1.3使用導入命令導入
bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator="," -Dimporttsv.columns=HBASE_ROW_KEY,cf testImport1 /sample1.csv
1.1.4結果
1.2先經過ImportTsv生產HFile文件,再經過completeBulkload導入HBase
1.2.1使用剛纔的源數據並建立新表
create ‘testImport2’,’cf’
1.2.2使用命令生產HFile文件
bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.separator="," -Dimporttsv.bulk.output=hfile_tmp -Dimporttsv.columns=HBASE_ROW_KEY,cf testImport2 /sample1.csv
1.2.3在HDFS上的中間結果
1.2.4使用命令將HFile文件導入HBase
hadoop jar lib/hbase-server-0.98.9-hadoop2.jar completebulkload hfile_tmp testImport2
1.2.5結果
注:1.若是出現缺包錯誤提示,則把HBase的jar包包含到hadoop的classpath中;2.運行該命令的本質是一個hdfs的mv操做,並不會啓動MapReduce。
2.API代碼方式
代碼的方式更靈活一點,許多東西能夠自定義。
直接貼代碼吧:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FsShell; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; public class BulkLoadJob { static Logger logger = LoggerFactory.getLogger(BulkLoadJob.class); public static class BulkLoadMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] valueStrSplit = value.toString().split("\t"); String hkey = valueStrSplit[0]; String family = valueStrSplit[1].split(":")[0]; String column = valueStrSplit[1].split(":")[1]; String hvalue = valueStrSplit[2]; final byte[] rowKey = Bytes.toBytes(hkey); final ImmutableBytesWritable HKey = new ImmutableBytesWritable(rowKey); // Put HPut = new Put(rowKey); // byte[] cell = Bytes.toBytes(hvalue); // HPut.add(Bytes.toBytes(family), Bytes.toBytes(column), cell); KeyValue kv = new KeyValue(rowKey, Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(hvalue)); context.write(HKey, kv); } } public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.property.clientPort", "2182"); conf.set("hbase.zookeeper.quorum", "msg801,msg802,msg803"); conf.set("hbase.master", "msg801:60000"); String[] dfsArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); String inputPath = dfsArgs[0]; System.out.println("source: " + dfsArgs[0]); String outputPath = dfsArgs[1]; System.out.println("dest: " + dfsArgs[1]); HTable hTable = null; try { Job job = Job.getInstance(conf, "Test Import HFile & Bulkload"); job.setJarByClass(BulkLoadJob.class); job.setMapperClass(BulkLoadJob.BulkLoadMap.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); // speculation job.setSpeculativeExecution(false); job.setReduceSpeculativeExecution(false); // in/out format job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(HFileOutputFormat2.class); FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, new Path(outputPath)); hTable = new HTable(conf, dfsArgs[2]); HFileOutputFormat2.configureIncrementalLoad(job, hTable); if (job.waitForCompletion(true)) { FsShell shell = new FsShell(conf); try { shell.run(new String[] { "-chmod", "-R", "777", dfsArgs[1] }); } catch (Exception e) { logger.error("Couldnt change the file permissions ", e); throw new IOException(e); } // 加載到hbase表 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); // 兩種方式均可以 // 方式一 String[] loadArgs = { outputPath, dfsArgs[2] }; loader.run(loadArgs); // 方式二 // loader.doBulkLoad(new Path(outputPath), hTable); } else { logger.error("loading failed."); System.exit(1); } } catch (IllegalArgumentException e) { e.printStackTrace(); } finally { if (hTable != null) { hTable.close(); } } } }
2.1建立新表
create ‘testImport3’,’fm1’,’fm2’
2.2建立sample2.csv,並上傳到HDFS,內容爲:
key1 fm1:col1 value1
key1 fm1:col2 value2
key1 fm2:col1 value3
key4 fm1:col1 value4
使用命令:
hadoop jar BulkLoadJob.jar hdfs://msg/sample2,csv hdfs://msg/HFileOut testImport3
注:1.mapper中使用KeyValue和Put均可以;2.注意jar包的classpath;3.若是Hadoop是HA,則須要使用HA的名字,好比咱們的active namenode名稱爲msg801,可是HA的nameservice爲msg,則HDFS的路徑必須使用hdfs://msg而不能使用hdfs://msg801:9000(WHY?)。
具體報錯爲:
IllegalArgumentException: Wrong FS: hdfs://msg801:9000/HFileOut/fm2/bbab9d883a574d518cdcb304d1e681e9, expected: hdfs://msg |