需求:將HDFS上的文件中的數據導入到hbase中html
實現上面的需求也有兩種辦法,一種是自定義mr,一種是使用hbase提供好的import工具java
1、hdfs中的數據是這樣的python
hbase建立好表apache
create 'NNTB','info'
2、自定義mrapp
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; /** * 用於HDFS的數據讀取,寫入到hbase中, * hbase裏預先建立好表:create 'NNTB','info' * */ public class HdfsToHBase { public static void main(String[] args) throws Exception{ System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.6");//這行我是本地運行所需指定的hadoop home Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "202.168.27.196:2181");//ip亂寫的,端口默認2181 conf.set(TableOutputFormat.OUTPUT_TABLE, "NNTB"); Job job = Job.getInstance(conf, HdfsToHBase.class.getSimpleName()); TableMapReduceUtil.addDependencyJars(job); job.setJarByClass(HdfsToHBase.class); job.setMapperClass(HdfsToHBaseMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(HdfsToHBaseReducer.class); FileInputFormat.addInputPath(job, new Path("hdfs://202.168.27.196:9000/user/hadoop/gznt/gznt_bmda/*")); job.setOutputFormatClass(TableOutputFormat.class); job.waitForCompletion(true); } public static class HdfsToHBaseMapper extends Mapper<LongWritable, Text, Text, Text> { private Text outKey = new Text(); private Text outValue = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] splits = value.toString().split("\t"); outKey.set(splits[0]); outValue.set(splits[1]+"\t"+splits[2]+"\t"+splits[3]+"\t"+splits[4]); context.write(outKey, outValue); } } //::: create 'NNTB','info' public static class HdfsToHBaseReducer extends TableReducer<Text, Text, NullWritable> { @Override protected void reduce(Text k2, Iterable<Text> v2s, Context context) throws IOException, InterruptedException { Put put = new Put(k2.getBytes()); for (Text v2 : v2s) { String[] splis = v2.toString().split("\t"); //info,對應hbase列族名 if(splis[0]!=null && !"NULL".equals(splis[0])){ put.addColumn("info".getBytes(), "NodeCode".getBytes(),splis[0].getBytes()); } if(splis[1]!=null && !"NULL".equals(splis[1])){ put.addColumn("info".getBytes(), "NodeType".getBytes(),splis[1].getBytes()); } if(splis[2]!=null && !"NULL".equals(splis[2])){ put.addColumn("info".getBytes(), "NodeName".getBytes(),splis[2].getBytes()); } if(splis[3]!=null && !"NULL".equals(splis[3])){ put.addColumn("info".getBytes(), "IsWarehouse".getBytes(),splis[3].getBytes()); } } context.write(NullWritable.get(),put); } } }
參考自:HBase從hdfs導入數據ide
參考文獻中的hbase導入工具介紹工具
(my_python_env)[root@hadoop26 ~]# hbase org.apache.hadoop.hbase.mapreduce.Import ERROR: Wrong number of arguments: 0 Usage: Import [options] <tablename> <inputdir> By default Import will load data directly into HBase. To instead generate HFiles of data to prepare for a bulk data load, pass the option: -Dimport.bulk.output=/path/for/output
在命令中中使用命令進行導入:oop
hbase org.apache.hadoop.hbase.mapreduce.Import table2 /t2