hdfs數據到hbase過程

需求:將HDFS上的文件中的數據導入到hbase中html

實現上面的需求也有兩種辦法,一種是自定義mr,一種是使用hbase提供好的import工具java

1、hdfs中的數據是這樣的python

hbase建立好表apache

create 'NNTB','info'

 

 2、自定義mrapp

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;
/**
 * 用於HDFS的數據讀取,寫入到hbase中,
 * hbase裏預先建立好表:create 'NNTB','info'
 * */
public class HdfsToHBase {
    public static void main(String[] args) throws Exception{
        System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.6");//這行我是本地運行所需指定的hadoop home
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "202.168.27.196:2181");//ip亂寫的,端口默認2181
        conf.set(TableOutputFormat.OUTPUT_TABLE, "NNTB");
        Job job = Job.getInstance(conf, HdfsToHBase.class.getSimpleName());
        TableMapReduceUtil.addDependencyJars(job);
        job.setJarByClass(HdfsToHBase.class);
        
        job.setMapperClass(HdfsToHBaseMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        job.setReducerClass(HdfsToHBaseReducer.class);
        
        FileInputFormat.addInputPath(job, new Path("hdfs://202.168.27.196:9000/user/hadoop/gznt/gznt_bmda/*"));
        job.setOutputFormatClass(TableOutputFormat.class);
        job.waitForCompletion(true);
    }
    
    public static class HdfsToHBaseMapper extends Mapper<LongWritable, Text, Text, Text> {
        private Text outKey = new Text();
        private Text outValue = new Text();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] splits = value.toString().split("\t");
            outKey.set(splits[0]);
            outValue.set(splits[1]+"\t"+splits[2]+"\t"+splits[3]+"\t"+splits[4]);
            context.write(outKey, outValue);
        }
    }
    //:::   create 'NNTB','info'
    public static class HdfsToHBaseReducer extends TableReducer<Text, Text, NullWritable> {
        @Override
        protected void reduce(Text k2, Iterable<Text> v2s, Context context) throws IOException, InterruptedException {
            Put put = new Put(k2.getBytes());
            for (Text v2 : v2s) {
                String[] splis = v2.toString().split("\t");
                //info,對應hbase列族名
                if(splis[0]!=null && !"NULL".equals(splis[0])){
                    put.addColumn("info".getBytes(), "NodeCode".getBytes(),splis[0].getBytes());
                }
                if(splis[1]!=null && !"NULL".equals(splis[1])){
                    put.addColumn("info".getBytes(), "NodeType".getBytes(),splis[1].getBytes());
                }
                if(splis[2]!=null && !"NULL".equals(splis[2])){
                    put.addColumn("info".getBytes(), "NodeName".getBytes(),splis[2].getBytes());
                }
                if(splis[3]!=null && !"NULL".equals(splis[3])){
                    put.addColumn("info".getBytes(), "IsWarehouse".getBytes(),splis[3].getBytes());
                }
            }
            context.write(NullWritable.get(),put);
        }
    }
}

參考自:HBase從hdfs導入數據ide

參考文獻中的hbase導入工具介紹工具

(my_python_env)[root@hadoop26 ~]# hbase org.apache.hadoop.hbase.mapreduce.Import 
ERROR: Wrong number of arguments: 0
Usage: Import [options] <tablename> <inputdir>
By default Import will load data directly into HBase. To instead generate
HFiles of data to prepare for a bulk data load, pass the option:
  -Dimport.bulk.output=/path/for/output

在命令中中使用命令進行導入:oop

hbase org.apache.hadoop.hbase.mapreduce.Import table2 /t2
相關文章
相關標籤/搜索