首先要保證hbase中有要插入的表java
package hbasemapperreduce;node
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;apache
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;服務器
public class HbaseMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
String line=value.toString();
String[] splited=line.split("\t");
SimpleDateFormat simpleDateFormatimpleDateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
String format = simpleDateFormatimpleDateFormat.format(new Date(Long.parseLong(splited[0].trim())));
String rowKey=splited[1]+"_"+format;
context.write(new Text(rowKey), new Text(rowKey+"\t"+line));
}
}app
package hbasemapperreduce;
import java.io.IOException;ide
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;;oop
public class HbaseReduce extends TableReducer<Text, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<Text> arg1,Context arg2)throws IOException, InterruptedException {
for (Text text : arg1) {
String value=text.toString();
String row=key.toString();
String tel=value.split("\t")[2];
String apmac=value.split("\t")[3];
String acmac=value.split("\t")[4];
String host=value.split("\t")[5];
String sitetype=value.split("\t")[6];
String uppack=value.split("\t")[7];
String downpack=value.split("\t")[8];
String upload=value.split("\t")[9];
String download=value.split("\t")[10];
String httpstatus=value.split("\t")[11];
Put put=new Put(row.getBytes());
put.addColumn("dianxin".getBytes(), "tel".getBytes(), tel.getBytes());
put.addColumn("dianxin".getBytes(), "apmac".getBytes(), apmac.getBytes());
put.addColumn("dianxin".getBytes(), "acmac".getBytes(), acmac.getBytes());
put.addColumn("dianxin".getBytes(), "host".getBytes(), host.getBytes());
put.addColumn("dianxin".getBytes(), "sitetype".getBytes(), sitetype.getBytes());
put.addColumn("dianxin".getBytes(), "uppack".getBytes(),uppack.getBytes());
put.addColumn("dianxin".getBytes(), "downpack".getBytes(), downpack.getBytes());
put.addColumn("dianxin".getBytes(), "upload".getBytes(), upload.getBytes());
put.addColumn("dianxin".getBytes(), "download".getBytes(), download.getBytes());
put.addColumn("dianxin".getBytes(), "httpstatus".getBytes(), httpstatus.getBytes());
arg2.write(NullWritable.get(), put);
}
}
}ui
package hbasemapperreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class HbaseJob {
public static void main(String[] args) {
Configuration conf = HBaseConfiguration.create();
//在本地直接調用,執行過程在服務器上(真正企業運行環境) 在src下加hadoop的配置文件
conf.set("mapred.jar", "E:\\hbase.jar");
conf.set("hbase.zookeeper.quorum", "node22:2181,node33:2181,node44:2181");
conf.set(TableOutputFormat.OUTPUT_TABLE, "dianhuajilu");
Job job;
try {
job = Job.getInstance(conf);
TableMapReduceUtil.addDependencyJars(job);
job.setJarByClass(HbaseJob.class);
job.setJobName("hbasemapper");
job.setMapperClass(HbaseMapper.class);
job.setReducerClass(HbaseReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.addInputPath(job, new Path("/dianxinin/"));
boolean b = job.waitForCompletion(true);
if(b){
System.out.println("執行成功");
}else{
System.out.println("執行失敗");
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}this
}.net
mapreduce 生成hfile 代碼
public class HBaseMR extends Configured implements Tool{
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int status = ToolRunner.run(conf, new HBaseMR(), args);
System.exit(status);
}
@Override
public int run(String[] args) throws Exception {
Configuration configuration = this.getConf();
Job job=Job.getInstance(configuration);
job.setJarByClass(HBaseMR.class);
job.setJobName(this.getClass().getSimpleName());
// set input and set mapper
FileInputFormat.setInputPaths(job, new Path("hdfs://bigdata01:8020/user/xiaozhou/csv"));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(HBaseMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
//set reduce
job.setReducerClass(PutSortReducer.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
//生成hfile路徑
FileOutputFormat.setOutputPath(job, new Path("/user/xiaozhou/hbase1/hfileoutput"));
HTable table=new HTable(configuration, "person4".getBytes());
HFileOutputFormat2.configureIncrementalLoad(job, table);
job.setNumReduceTasks(1); // at least one, adjust as required
boolean b = job.waitForCompletion(true);
return b?0:1;
}
}
public class HBaseMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key, Text value,Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context)throws IOException, InterruptedException {
String[] split = value.toString().split(",");
String rowkey = split[0];
String name = split[1];
String age = split[2];
String sex = split[3];
String address = split[4];
String phone = split[5];
Put put=new Put(Bytes.toBytes(rowkey));
put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name));
put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(age));
put.add(Bytes.toBytes("info"), Bytes.toBytes("sex"), Bytes.toBytes(sex));
put.add(Bytes.toBytes("info"), Bytes.toBytes("address"), Bytes.toBytes(address));
put.add(Bytes.toBytes("info"), Bytes.toBytes("phone"), Bytes.toBytes(phone));
context.write(new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put);
}
}
用completebulkload 把hfile加載到hbase中
./yarn jar /opt/moduels/hbase-0.98.6-hadoop2/lib/hbase-server-0.98.6-hadoop2-tests.jar completebulkload hdfs://bigdata01:8020/user/xiaozhou/hbase1/hfileoutput person4