需求:將HDFS上的文件中的數據導入到hbase中python
實現上面的需求也有兩種辦法,一種是自定義mr,一種是使用hbase提供好的import工具apache
1、hdfs中的數據是這樣的app
每一行的數據是這樣的id name age gender birthdayide
(my_python_env)[root@hadoop26 ~]# hadoop fs -cat /t1/* 1 zhangsan 10 male NULL 2 lisi NULL NULL NULL 3 wangwu NULL NULL NULL 4 zhaoliu NULL NULL 1993
2、自定義mr工具
public class HdfsToHBase { public static void main(String[] args) throws Exception{ Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "hadoop26:2181"); conf.set("hbase.rootdir", "hdfs://hadoop26:9000/hbase"); conf.set(TableOutputFormat.OUTPUT_TABLE, args[1]); Job job = Job.getInstance(conf, HdfsToHBase.class.getSimpleName()); TableMapReduceUtil.addDependencyJars(job); job.setJarByClass(HdfsToHBase.class); job.setMapperClass(HdfsToHBaseMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(HdfsToHBaseReducer.class); FileInputFormat.addInputPath(job, new Path(args[0])); job.setOutputFormatClass(TableOutputFormat.class); job.waitForCompletion(true); } public static class HdfsToHBaseMapper extends Mapper<LongWritable, Text, Text, Text>{ private Text outKey = new Text(); private Text outValue = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] splits = value.toString().split("\t"); outKey.set(splits[0]); outValue.set(splits[1]+"\t"+splits[2]+"\t"+splits[3]+"\t"+splits[4]); context.write(outKey, outValue); } } public static class HdfsToHBaseReducer extends TableReducer<Text, Text, NullWritable>{ @Override protected void reduce(Text k2, Iterable<Text> v2s, Context context) throws IOException, InterruptedException { Put put = new Put(k2.getBytes()); for (Text v2 : v2s) { String[] splis = v2.toString().split("\t"); if(splis[0]!=null && !"NULL".equals(splis[0])){ put.add("f1".getBytes(), "name".getBytes(),splis[0].getBytes()); } if(splis[1]!=null && !"NULL".equals(splis[1])){ put.add("f1".getBytes(), "age".getBytes(),splis[1].getBytes()); } if(splis[2]!=null && !"NULL".equals(splis[2])){ put.add("f1".getBytes(), "gender".getBytes(),splis[2].getBytes()); } if(splis[3]!=null && !"NULL".equals(splis[3])){ put.add("f1".getBytes(), "birthday".getBytes(),splis[3].getBytes()); } } context.write(NullWritable.get(),put); } } }
2.1打包運行oop
首先在hbase中建立一個表spa
hbase(main):006:0> create 'table1','f1' 0 row(s) in 0.4240 seconds => Hbase::Table - table1
而後運行code
hadoop jar HdfsToHBase.jar com.lanyun.hadoop2.HdfsToHBase /t1/part* table1orm
最後查看table1中的數據blog
hbase(main):014:0* scan 'table1' ROW COLUMN+CELL 1 column=f1:age, timestamp=1469069255119, value=10 1 column=f1:gender, timestamp=1469069255119, value=male 1 column=f1:name, timestamp=1469069255119, value=zhangsan 2 column=f1:name, timestamp=1469069255119, value=lisi 3 column=f1:name, timestamp=1469069255119, value=wangwu 4 column=f1:birthday, timestamp=1469069255119, value=1993 4 column=f1:name, timestamp=1469069255119, value=zhaoliu 4 row(s) in 0.0430 seconds
3、使用habse提供的import工具
首先查看其用法
(my_python_env)[root@hadoop26 ~]# hbase org.apache.hadoop.hbase.mapreduce.Import ERROR: Wrong number of arguments: 0 Usage: Import [options] <tablename> <inputdir> By default Import will load data directly into HBase. To instead generate HFiles of data to prepare for a bulk data load, pass the option: -Dimport.bulk.output=/path/for/output
在hbase中建立表table2
hbase(main):016:0> create 'table2','f1' 0 row(s) in 0.4080 seconds => Hbase::Table - table2
在命令中中使用命令進行導入
hbase org.apache.hadoop.hbase.mapreduce.Import table2 /t2
查看table2中的數據
hbase(main):018:0> scan 'table2' ROW COLUMN+CELL 1 column=f1:age, timestamp=1468824267106, value=10 1 column=f1:gender, timestamp=1468824289990, value=male 1 column=f1:name, timestamp=1468824137463, value=zhangsan 2 column=f1:name, timestamp=1468824236014, value=lisi 3 column=f1:name, timestamp=1468824247109, value=wangwu 4 column=f1:birthday, timestamp=1468825870158, value=1993 4 column=f1:name, timestamp=1468825659207, value=zhaoliu 4 row(s) in 0.0440 seconds
4、注意
import工具很方便,可是隻能導入Export導出的數據。