自定義 HBase-MapReduce1

自定義 HBase-MapReduce1

目標:將 fruit 表中的一部分數據(列爲 name 的數據),經過 MR 遷入到 fruit_mr 表中。
分步實現:

1.構建 FruitMapper 類,用於讀取 fruit 表中的數據

package com.atlxl.mr1;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FruitMapper extends TableMapper<ImmutableBytesWritable, Put>{

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

        //構建Put對象
        Put put = new Put(key.get());

        //遍歷數據
        Cell[] cells = value.rawCells();
        for (Cell cell : cells) {
            if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                put.add(cell);
            }
        }

        //寫出去
        context.write(key, put);

    }
}

 

 
 

2. 構建 FruitReducer 類,用於將讀取到的 fruit 表中的數據寫入到 fruit_mr表中

 
package com.atlxl.mr1;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FruitReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable>{

    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {

        //遍歷寫出
        for (Put value : values) {
            context.write(NullWritable.get(), value);
        }

    }
}

 

 
 
 
 

3.構建 FruitDriver extends Configured implements Tool 用於組裝運行 Job任務

 

package com.atlxl.mr1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class FruitDriver extends Configuration implements Tool{

    private Configuration configuration = null;

    public int run(String[] strings) throws Exception {

        //獲取任務對象
        Job job = Job.getInstance(configuration);

        //指定Driver類
        job.setJarByClass(FruitDriver.class);

        //指定Mapper
        TableMapReduceUtil.initTableMapperJob("fruit",new Scan(),FruitMapper.class, ImmutableBytesWritable.class,Put.class,job);

        //指定Reducer
        TableMapReduceUtil.initTableReducerJob("fruit_mr", FruitReducer.class, job);

        //提交
        boolean b = job.waitForCompletion(true);

        return b?0:1;
    }

    public void setConf(Configuration conf) {
        this.configuration = conf;
    }

    public Configuration getConf() {
        return configuration;
    }

//4.主函數中調用運行該 Job 任務


    public static void main(String[] args) throws Exception {

        Configuration configuration = HBaseConfiguration.create();

        int i = ToolRunner.run(configuration, new FruitDriver(), args);

    }
}

 

5.打包運行任務

 

 

提示:運行任務前,若是待數據導入的表不存在,則須要提早建立。
提示:maven 打包命令:-P local clean package 或-P dev clean package install(將第三方 jar 包
一同打包,須要插件:maven-shade-plugin)

1)將打好的jar包丟到hbase目錄下java

 

2)建立接受數據的表apache

hbase(main):005:0>  create 'fruit_mr','info'

 

3)運行jar包app

[lxl@hadoop102 hbase]$ /opt/module/hadoop-2.7.2/bin/yarn jar Hbase01-1.0-SNAPSHOT.jar com.atlxl.mr1.FruitDriver

 

 

4)查看導入的數據maven

hbase(main):006:0> scan "fruit_mr"
ROW                        COLUMN+CELL                                                                
 1001                      column=info:name, timestamp=1560441335521, value=Apple                     
 1002                      column=info:name, timestamp=1560441335521, value=Pear                      
 1003                      column=info:name, timestamp=1560441335521, value=Pineapple                 
3 row(s) in 0.1330 seconds

 

 

 

自定義 HBase-MapReduce2

 

目標:實現將 HDFS 中的數據寫入到 HBase 表中。
分步實現:

 

1.構建 HDFSMapper 於讀取 HDFS 中的文件數據

 

package com.atlxl.mr2;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class HDFSMapper extends Mapper<LongWritable, Text, NullWritable, Put> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //獲取一行數據
        String line = value.toString();


        //切割
        String[] split = line.split("\t");


        //封裝Put對象
        Put put = new Put(Bytes.toBytes(split[0]));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1]));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(split[2]));


        //寫出
        context.write(NullWritable.get(), put);


    }
}

 

 

 

2.構建 HDFSReducer 類

 

package com.atlxl.mr2;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class HDFSReducer extends TableReducer<NullWritable, Put,NullWritable>{

    @Override
    protected void reduce(NullWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {

        //遍歷寫出
        for (Put value : values) {
            context.write(NullWritable.get(), value);
        }

    }
}

 

 

 

3.建立HDFSDriver 組裝 Job

 

package com.atlxl.mr2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class HDFSDriver extends Configuration implements Tool{

    private Configuration configuration = null;

    public int run(String[] args) throws Exception {

        //獲取Job對象
        Job job = Job.getInstance(configuration);

        //設置主類
        job.setJarByClass(HDFSDriver.class);

        //設置Mapper
        job.setMapperClass(HDFSMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Put.class);

        //設置Reducer
        TableMapReduceUtil.initTableReducerJob("fruit2", HDFSReducer.class, job);

        //設置輸入路徑
        FileInputFormat.setInputPaths(job, args[0]);

        //提交
        boolean result = job.waitForCompletion(true);

        return result?0:1;
    }

    public void setConf(Configuration conf) {
        configuration = conf;
    }

    public Configuration getConf() {
        return configuration;
    }

4.調用執行 Job


    public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        int i = ToolRunner.run(configuration, new HDFSDriver(), args);

        System.exit(i);

    }

}

 

 

 

5.打包運行

 輸入路徑爲:HDFSide

[lxl@hadoop102 hbase]$ /opt/module/hadoop-2.7.2/bin/yarn jar Hbase01-1.0-SNAPSHOT.jar com.atlxl.mr2.HDFSDriver /fruit.tsv

 

 

輸入路徑爲:本地hbase包下函數

[lxl@hadoop102 hbase]$ /opt/module/hadoop-2.7.2/bin/yarn jar Hbase01-1.0-SNAPSHOT.jar com.atlxl.mr2.HDFSDriver file:///opt/module/hbase/fruit.tsv 
相關文章
相關標籤/搜索