生車filehtml
/** * */ package HBaseIA.TwitBase.mapreduce; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; /** * @author jason * */ public class HFileGenerator { public static class HFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] items = line.split(",", -1); ImmutableBytesWritable rowkey = new ImmutableBytesWritable( items[0].getBytes()); KeyValue kv = new KeyValue(Bytes.toBytes(items[0]), Bytes.toBytes(items[1]), Bytes.toBytes(items[2]), System.currentTimeMillis(), Bytes.toBytes(items[3])); if (null != kv) { context.write(rowkey, kv); } } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); String[] dfsArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); Job job = new Job(conf, "HFile bulk load test"); job.setJarByClass(HFileGenerator.class); job.setMapperClass(HFileMapper.class); job.setReducerClass(KeyValueSortReducer.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setPartitionerClass(SimpleTotalOrderPartitioner.class); FileInputFormat.addInputPath(job, new Path(dfsArgs[0])); FileOutputFormat.setOutputPath(job, new Path(dfsArgs[1])); // HFileOutputFormat.configureIncrementalLoad(job,ConnectionUtil.getTable()); HFileOutputFormat.configureIncrementalLoad(job,new HTable(conf, "test")); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
2.hfle如庫到hbasejava
/** * */ package HBaseIA.TwitBase.mapreduce; /** * @author jason * */ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.util.GenericOptionsParser; public class HFileLoader { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] dfsArgs = new GenericOptionsParser( conf, args).getRemainingArgs(); LoadIncrementalHFiles loader = new LoadIncrementalHFiles( conf); loader.doBulkLoad(new Path(dfsArgs[0]), new HTable(conf, "test")); } }
3.建立測試hbase表(使用hbase shell)git
hbase(main):013:0> create 'test',{NAME => 'NAME'}
4.修改hbase-env.sh,將job jar包加入到HBASE_CLASSPATH中sql
export HBASE_CLASSPATH=/home/jason/git/twitbase/target/twitbase-1.0.0.jar
5.測試生成hfileshell
hbase HBaseIA.TwitBase.mapreduce.HFileGenerator /example/buckload/in/test.txt /example/buckload/out
6.hfile生成後在把hfle導入到hbaseapache
hbase HBaseIA.TwitBase.mapreduce.HFileLoader /example/buckload/out
參考連接app