用MapReduce 向Hbase 中插入數據

首先要保證hbase中有要插入的表java

package hbasemapperreduce;node

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;apache

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;服務器

public class HbaseMapper extends Mapper<LongWritable, Text, Text, Text>{
    @Override
    protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
        String line=value.toString();
        String[] splited=line.split("\t");
SimpleDateFormat simpleDateFormatimpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
        String format = simpleDateFormatimpleDateFormat.format(new Date(Long.parseLong(splited[0].trim())));
        String rowKey=splited[1]+"_"+format;
        context.write(new Text(rowKey), new Text(rowKey+"\t"+line));
    }
}app

package hbasemapperreduce;
import java.io.IOException;ide

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;;oop

public class HbaseReduce extends TableReducer<Text, Text, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<Text> arg1,Context arg2)throws IOException, InterruptedException {
        for (Text text : arg1) {
            String value=text.toString();
            String row=key.toString();
            String tel=value.split("\t")[2];
            String apmac=value.split("\t")[3];
            String acmac=value.split("\t")[4];
            String host=value.split("\t")[5];
            String sitetype=value.split("\t")[6];
            String uppack=value.split("\t")[7];
            String downpack=value.split("\t")[8];
            String upload=value.split("\t")[9];
            String download=value.split("\t")[10];
            String httpstatus=value.split("\t")[11];
            Put put=new Put(row.getBytes());
            put.addColumn("dianxin".getBytes(), "tel".getBytes(), tel.getBytes());
            put.addColumn("dianxin".getBytes(), "apmac".getBytes(), apmac.getBytes());
            put.addColumn("dianxin".getBytes(), "acmac".getBytes(), acmac.getBytes());
            put.addColumn("dianxin".getBytes(), "host".getBytes(), host.getBytes());
            put.addColumn("dianxin".getBytes(), "sitetype".getBytes(), sitetype.getBytes());
            put.addColumn("dianxin".getBytes(), "uppack".getBytes(),uppack.getBytes());
            put.addColumn("dianxin".getBytes(), "downpack".getBytes(), downpack.getBytes());
            put.addColumn("dianxin".getBytes(), "upload".getBytes(), upload.getBytes());
            put.addColumn("dianxin".getBytes(), "download".getBytes(), download.getBytes());
            put.addColumn("dianxin".getBytes(), "httpstatus".getBytes(), httpstatus.getBytes());
            arg2.write(NullWritable.get(), put);
        }
    }
}ui

package hbasemapperreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class HbaseJob {
    public static void main(String[] args) {
        Configuration conf = HBaseConfiguration.create();
        //在本地直接調用,執行過程在服務器上(真正企業運行環境)  在src下加hadoop的配置文件
        conf.set("mapred.jar", "E:\\hbase.jar");
        conf.set("hbase.zookeeper.quorum", "node22:2181,node33:2181,node44:2181");
        conf.set(TableOutputFormat.OUTPUT_TABLE, "dianhuajilu");
        Job job;
        try {
            job = Job.getInstance(conf);
            TableMapReduceUtil.addDependencyJars(job);
            job.setJarByClass(HbaseJob.class);
            job.setJobName("hbasemapper");
            job.setMapperClass(HbaseMapper.class);
            job.setReducerClass(HbaseReduce.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputFormatClass(TableOutputFormat.class);
            FileInputFormat.addInputPath(job, new Path("/dianxinin/"));
            boolean b = job.waitForCompletion(true);
            if(b){
                System.out.println("執行成功");
            }else{
                System.out.println("執行失敗");
            }
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
        
        
    }this

}.net

 

mapreduce 生成hfile 代碼

public class HBaseMR extends Configured implements Tool{

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        int status = ToolRunner.run(conf, new HBaseMR(), args);
        System.exit(status);

    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration configuration = this.getConf();
        Job job=Job.getInstance(configuration);
        job.setJarByClass(HBaseMR.class);
        job.setJobName(this.getClass().getSimpleName());
        // set input and set mapper
        FileInputFormat.setInputPaths(job, new Path("hdfs://bigdata01:8020/user/xiaozhou/csv"));
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(HBaseMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        //set reduce
        job.setReducerClass(PutSortReducer.class);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(KeyValue.class);
        //生成hfile路徑
        FileOutputFormat.setOutputPath(job, new Path("/user/xiaozhou/hbase1/hfileoutput"));
        HTable table=new HTable(configuration, "person4".getBytes());
        
        HFileOutputFormat2.configureIncrementalLoad(job, table);
        job.setNumReduceTasks(1);   // at least one, adjust as required
                
        boolean b = job.waitForCompletion(true);
        
        return b?0:1;
    }

}

public class HBaseMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable, Put> {
    @Override
    protected void map(LongWritable key, Text value,Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)throws IOException, InterruptedException {
        String[] split = value.toString().split(",");
        String rowkey = split[0];
        String name = split[1];
        String age = split[2];
        String sex = split[3];
        String address = split[4];
        String phone = split[5];
        Put put=new Put(Bytes.toBytes(rowkey));
        put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name));
        put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(age));
        put.add(Bytes.toBytes("info"), Bytes.toBytes("sex"), Bytes.toBytes(sex));
        put.add(Bytes.toBytes("info"), Bytes.toBytes("address"), Bytes.toBytes(address));
        put.add(Bytes.toBytes("info"), Bytes.toBytes("phone"), Bytes.toBytes(phone));
        context.write(new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put);
    }
}

用completebulkload 把hfile加載到hbase中

./yarn jar /opt/moduels/hbase-0.98.6-hadoop2/lib/hbase-server-0.98.6-hadoop2-tests.jar completebulkload hdfs://bigdata01:8020/user/xiaozhou/hbase1/hfileoutput person4  

相關文章
相關標籤/搜索