package com.atlxl.mr1; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FruitMapper extends TableMapper<ImmutableBytesWritable, Put>{ @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //構建Put對象 Put put = new Put(key.get()); //遍歷數據 Cell[] cells = value.rawCells(); for (Cell cell : cells) { if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ put.add(cell); } } //寫出去 context.write(key, put); } }
package com.atlxl.mr1; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FruitReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable>{ @Override protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { //遍歷寫出 for (Put value : values) { context.write(NullWritable.get(), value); } } }
package com.atlxl.mr1; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FruitDriver extends Configuration implements Tool{ private Configuration configuration = null; public int run(String[] strings) throws Exception { //獲取任務對象 Job job = Job.getInstance(configuration); //指定Driver類 job.setJarByClass(FruitDriver.class); //指定Mapper TableMapReduceUtil.initTableMapperJob("fruit",new Scan(),FruitMapper.class, ImmutableBytesWritable.class,Put.class,job); //指定Reducer TableMapReduceUtil.initTableReducerJob("fruit_mr", FruitReducer.class, job); //提交 boolean b = job.waitForCompletion(true); return b?0:1; } public void setConf(Configuration conf) { this.configuration = conf; } public Configuration getConf() { return configuration; }
public static void main(String[] args) throws Exception { Configuration configuration = HBaseConfiguration.create(); int i = ToolRunner.run(configuration, new FruitDriver(), args); } }
1)將打好的jar包丟到hbase目錄下java
2)建立接受數據的表apache
hbase(main):005:0> create 'fruit_mr','info'
3)運行jar包app
[lxl@hadoop102 hbase]$ /opt/module/hadoop-2.7.2/bin/yarn jar Hbase01-1.0-SNAPSHOT.jar com.atlxl.mr1.FruitDriver
4)查看導入的數據maven
hbase(main):006:0> scan "fruit_mr" ROW COLUMN+CELL 1001 column=info:name, timestamp=1560441335521, value=Apple 1002 column=info:name, timestamp=1560441335521, value=Pear 1003 column=info:name, timestamp=1560441335521, value=Pineapple 3 row(s) in 0.1330 seconds
package com.atlxl.mr2; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class HDFSMapper extends Mapper<LongWritable, Text, NullWritable, Put> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //獲取一行數據 String line = value.toString(); //切割 String[] split = line.split("\t"); //封裝Put對象 Put put = new Put(Bytes.toBytes(split[0])); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1])); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(split[2])); //寫出 context.write(NullWritable.get(), put); } }
package com.atlxl.mr2; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class HDFSReducer extends TableReducer<NullWritable, Put,NullWritable>{ @Override protected void reduce(NullWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException { //遍歷寫出 for (Put value : values) { context.write(NullWritable.get(), value); } } }
package com.atlxl.mr2; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class HDFSDriver extends Configuration implements Tool{ private Configuration configuration = null; public int run(String[] args) throws Exception { //獲取Job對象 Job job = Job.getInstance(configuration); //設置主類 job.setJarByClass(HDFSDriver.class); //設置Mapper job.setMapperClass(HDFSMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Put.class); //設置Reducer TableMapReduceUtil.initTableReducerJob("fruit2", HDFSReducer.class, job); //設置輸入路徑 FileInputFormat.setInputPaths(job, args[0]); //提交 boolean result = job.waitForCompletion(true); return result?0:1; } public void setConf(Configuration conf) { configuration = conf; } public Configuration getConf() { return configuration; }
public static void main(String[] args) throws Exception { Configuration configuration = HBaseConfiguration.create(); int i = ToolRunner.run(configuration, new HDFSDriver(), args); System.exit(i); } }
輸入路徑爲:HDFSide
[lxl@hadoop102 hbase]$ /opt/module/hadoop-2.7.2/bin/yarn jar Hbase01-1.0-SNAPSHOT.jar com.atlxl.mr2.HDFSDriver /fruit.tsv
輸入路徑爲:本地hbase包下函數
[lxl@hadoop102 hbase]$ /opt/module/hadoop-2.7.2/bin/yarn jar Hbase01-1.0-SNAPSHOT.jar com.atlxl.mr2.HDFSDriver file:///opt/module/hbase/fruit.tsv