hbase bulkload導入數據

生車filehtml

/**
 * 
 */
package HBaseIA.TwitBase.mapreduce;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * @author jason
 *
 */

public class HFileGenerator {

    public static class HFileMapper extends
            Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] items = line.split(",", -1);
            ImmutableBytesWritable rowkey = new ImmutableBytesWritable(
                    items[0].getBytes());

            KeyValue kv = new KeyValue(Bytes.toBytes(items[0]),
                    Bytes.toBytes(items[1]), Bytes.toBytes(items[2]),
                    System.currentTimeMillis(), Bytes.toBytes(items[3]));
            if (null != kv) {
                context.write(rowkey, kv);
            }
        }
    }

    public static void main(String[] args) throws IOException,
            InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        String[] dfsArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();

        Job job = new Job(conf, "HFile bulk load test");
        job.setJarByClass(HFileGenerator.class);

        job.setMapperClass(HFileMapper.class);
        job.setReducerClass(KeyValueSortReducer.class);

        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(KeyValue.class); 

        job.setPartitionerClass(SimpleTotalOrderPartitioner.class);

        FileInputFormat.addInputPath(job, new Path(dfsArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(dfsArgs[1]));

//        HFileOutputFormat.configureIncrementalLoad(job,ConnectionUtil.getTable());
        HFileOutputFormat.configureIncrementalLoad(job,new HTable(conf, "test"));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

2.hfle如庫到hbasejava

/**
 * 
 */
package HBaseIA.TwitBase.mapreduce;

/**
 * @author jason
 *
 */

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.util.GenericOptionsParser;

public class HFileLoader {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] dfsArgs = new GenericOptionsParser(
                conf, args).getRemainingArgs();
        LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
                conf);
        loader.doBulkLoad(new Path(dfsArgs[0]), new HTable(conf, "test"));
    }
    
}

3.建立測試hbase表(使用hbase shell)git

hbase(main):013:0> create 'test',{NAME => 'NAME'}

4.修改hbase-env.sh,將job jar包加入到HBASE_CLASSPATH中sql

export HBASE_CLASSPATH=/home/jason/git/twitbase/target/twitbase-1.0.0.jar

5.測試生成hfileshell

hbase HBaseIA.TwitBase.mapreduce.HFileGenerator /example/buckload/in/test.txt /example/buckload/out

6.hfile生成後在把hfle導入到hbaseapache

hbase HBaseIA.TwitBase.mapreduce.HFileLoader /example/buckload/out

參考連接app

MapReduce生成HFile文件,再使用BulkLoad導入HBase中(徹底分佈式運行)
分佈式

MapReduce生成HFile入庫到HBase
ide

HBase快速導入數據--BulkLoad
oop

如何執行hbase 的mapreduce job

相關文章
相關標籤/搜索