Version :hadoop1.2.1; hbaes0.94.16;java
HBase寫入數據方式(參考:《HBase The Definitive Guide》),能夠簡單分爲下面幾種:apache
1. 直接使用HTable進行導入,代碼以下:app
package hbase.curd; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Random; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; public class PutExample { /** * @param args * @throws IOException */ private HTable table = HTableUtil.getHTable("testtable"); public static void main(String[] args) throws IOException { // TODO Auto-generated method stub PutExample pe = new PutExample(); pe.putRows(); } public void putRows(){ List<Put> puts = new ArrayList<Put>(); for(int i=0;i<10;i++){ Put put = new Put(Bytes.toBytes("row_"+i)); Random random = new Random(); if(random.nextBoolean()){ put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("colfam1_qual1_value_"+i)); } if(random.nextBoolean()){ put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("colfam1_qual1_value_"+i)); } if(random.nextBoolean()){ put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual3"), Bytes.toBytes("colfam1_qual1_value_"+i)); } if(random.nextBoolean()){ put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual4"), Bytes.toBytes("colfam1_qual1_value_"+i)); } if(random.nextBoolean()){ put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual5"), Bytes.toBytes("colfam1_qual1_value_"+i)); } puts.add(put); } try{ table.put(puts); table.close(); }catch(Exception e){ e.printStackTrace(); return ; } System.out.println("done put rows"); } }
其中HTableUtil以下:dom
package hbase.curd; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.util.Bytes; public class HTableUtil { private static HTable table; private static Configuration conf; static{ conf =HBaseConfiguration.create(); conf.set("mapred.job.tracker", "hbase:9001"); conf.set("fs.default.name", "hbase:9000"); conf.set("hbase.zookeeper.quorum", "hbase"); try { table = new HTable(conf,"testtable"); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public static Configuration getConf(){ return conf; } public static HTable getHTable(String tablename){ if(table==null){ try { table= new HTable(conf,tablename); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } return table; } public static byte[] gB(String name){ return Bytes.toBytes(name); } }
這一種是沒有使用MR的,下面介紹的幾種方式都是使用MR的。ide
2.1 從HDFS文件導入HBase,繼承自Mapper,代碼以下:oop
package hbase.mr; import java.io.IOException; import hbase.curd.HTableUtil; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class ImportFromFile { /** * 從文件導入到HBase * @param args */ public static final String NAME="ImportFromFile"; public enum Counters{LINES} static class ImportMapper extends Mapper<LongWritable,Text, ImmutableBytesWritable,Writable>{ private byte[] family =null; private byte[] qualifier = null; @Override protected void setup(Context cxt){ String column = cxt.getConfiguration().get("conf.column"); byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column)); family = colkey[0]; if(colkey.length>1){ qualifier = colkey[1]; } } @Override public void map(LongWritable offset,Text line,Context cxt){ try{ String lineString= line.toString(); byte[] rowkey= DigestUtils.md5(lineString); Put put = new Put(rowkey); put.add(family,qualifier,Bytes.toBytes(lineString)); cxt.write(new ImmutableBytesWritable(rowkey), put); cxt.getCounter(Counters.LINES).increment(1); }catch(Exception e){ e.printStackTrace(); } } } private static CommandLine parseArgs(String[] args){ Options options = new Options(); Option o = new Option("t" ,"table",true,"table to import into (must exist)"); o.setArgName("table-name"); o.setRequired(true); options.addOption(o); o= new Option("c","column",true,"column to store row data into"); o.setArgName("family:qualifier"); o.setRequired(true); options.addOption(o); o = new Option("i", "input", true, "the directory or file to read from"); o.setArgName("path-in-HDFS"); o.setRequired(true); options.addOption(o); options.addOption("d", "debug", false, "switch on DEBUG log level"); CommandLineParser parser = new PosixParser(); CommandLine cmd = null; try { cmd = parser.parse(options, args); } catch (Exception e) { System.err.println("ERROR: " + e.getMessage() + "\n"); HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(NAME + " ", options, true); System.exit(-1); } return cmd; } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = HTableUtil.getConf(); String[] otherArgs = new GenericOptionsParser(conf, initialArg()).getRemainingArgs(); CommandLine cmd = parseArgs(otherArgs); String table = cmd.getOptionValue("t"); String input = cmd.getOptionValue("i"); String column = cmd.getOptionValue("c"); conf.set("conf.column", column); Job job = new Job(conf, "Import from file " + input + " into table " + table); job.setJarByClass(ImportFromFile.class); job.setMapperClass(ImportMapper.class); job.setOutputFormatClass(TableOutputFormat.class); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Writable.class); job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(input)); System.exit(job.waitForCompletion(true) ? 0 : 1); } private static String[] initialArg(){ String []args = new String[6]; args[0]="-c"; args[1]="fam:data"; args[2]="-i"; args[3]="/user/hadoop/input/picdata"; args[4]="-t"; args[5]="testtable"; return args; } }
2.2 讀取HBase表寫入HBase表中字段,代碼以下:測試
package hbase.mr; import hadoop.util.HadoopUtils; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ParseDriver { /** * 把hbase表中數據拷貝到其餘表(或本表)相同字段 * @param args */ enum Counters{ VALID, ROWS, COLS, ERROR } private static Logger log = LoggerFactory.getLogger(ParseDriver.class); static class ParseMapper extends TableMapper<ImmutableBytesWritable,Writable>{ private byte[] columnFamily =null ; private byte[] columnQualifier =null; @Override protected void setup(Context cxt){ columnFamily = Bytes.toBytes(cxt.getConfiguration().get("conf.columnfamily")); columnQualifier = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier")); } @Override public void map(ImmutableBytesWritable row,Result columns,Context cxt){ cxt.getCounter(Counters.ROWS).increment(1); String value =null; try{ Put put = new Put(row.get()); for(KeyValue kv : columns.list()){ cxt.getCounter(Counters.COLS).increment(1); value= Bytes.toStringBinary(kv.getValue()); if(equals(columnQualifier,kv.getQualifier())){ // 過濾column put.add(columnFamily,columnQualifier,kv.getValue()); cxt.write(row, put); cxt.getCounter(Counters.VALID).increment(1); } } }catch(Exception e){ log.info("Error:"+e.getMessage()+",Row:"+Bytes.toStringBinary(row.get())+ ",Value:"+value); cxt.getCounter(Counters.ERROR).increment(1); } } private boolean equals(byte[] a,byte[] b){ String aStr= Bytes.toString(a); String bStr= Bytes.toString(b); if(aStr.equals(bStr)){ return true; } return false; } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { byte[] columnFamily = Bytes.toBytes("fam"); byte[] columnQualifier = Bytes.toBytes("data"); Scan scan = new Scan (); scan.addColumn(columnFamily, columnQualifier); HadoopUtils.initialConf("hbase"); Configuration conf = HadoopUtils.getConf(); conf.set("conf.columnfamily", Bytes.toStringBinary(columnFamily)); conf.set("conf.columnqualifier", Bytes.toStringBinary(columnQualifier)); String input ="testtable" ;// String output="testtable1"; // Job job = new Job(conf,"Parse data in "+input+",write to"+output); job.setJarByClass(ParseDriver.class); TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class, ImmutableBytesWritable.class, Put.class,job); TableMapReduceUtil.initTableReducerJob(output, IdentityTableReducer.class, job); System.exit(job.waitForCompletion(true)?0:1); } }
其中HadoopUtils代碼以下:ui
package hadoop.util; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.util.LineReader; public class HadoopUtils { private static Configuration conf; public static void initialConf(){ conf = new Configuration(); conf.set("mapred.job.tracker", "hbase:9001"); conf.set("fs.default.name", "hbase:9000"); conf.set("hbase.zookeeper.quorum", "hbase"); } public static void initialConf(String host){ conf = new Configuration(); conf.set("mapred.job.tracker", host+":9001"); conf.set("fs.default.name", host+":9000"); conf.set("hbase.zookeeper.quorum", host); } public static Configuration getConf(){ if(conf==null){ initialConf(); } return conf; } public static List<String> readFromHDFS(String fileName) throws IOException { Configuration conf = getConf(); FileSystem fs = FileSystem.get(URI.create(fileName), conf); FSDataInputStream hdfsInStream = fs.open(new Path(fileName)); // 按行讀取(新版本的方法) LineReader inLine = new LineReader(hdfsInStream, conf); Text txtLine = new Text(); int iResult = inLine.readLine(txtLine); //讀取第一行 List<String> list = new ArrayList<String>(); while (iResult > 0 ) { list.add(txtLine.toString()); iResult = inLine.readLine(txtLine); } hdfsInStream.close(); fs.close(); return list; } }
2.3 MR和HTable結合,代碼以下:.net
package hbase.mr; import hadoop.util.HadoopUtils; import hbase.mr.AnalyzeDriver.Counters; import java.io.IOException; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ParseSinglePutDriver { /** * 使用HTable進行寫入 * 把infoTable 表中的 qualifier字段複製到qualifier1字段 * 單個Put * @param args */ private static Logger log = LoggerFactory.getLogger(ParseMapper.class); static class ParseMapper extends TableMapper<ImmutableBytesWritable,Writable>{ private HTable infoTable =null ; private byte[] columnFamily =null ; private byte[] columnQualifier =null; private byte[] columnQualifier1 =null; @Override protected void setup(Context cxt){ log.info("ParseSinglePutDriver setup,current time: "+new Date()); try { infoTable = new HTable(cxt.getConfiguration(), cxt.getConfiguration().get("conf.infotable")); infoTable.setAutoFlush(false); } catch (IOException e) { log.error("Initial infoTable error:\n"+e.getMessage()); } columnFamily = Bytes.toBytes(cxt.getConfiguration().get("conf.columnfamily")); columnQualifier = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier")); columnQualifier1 = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier1")); } @Override protected void cleanup(Context cxt){ try { infoTable.flushCommits(); log.info("ParseSinglePutDriver cleanup ,current time :"+new Date()); } catch (IOException e) { log.error("infoTable flush commits error:\n"+e.getMessage()); } } @Override public void map(ImmutableBytesWritable row,Result columns,Context cxt){ cxt.getCounter(Counters.ROWS).increment(1); String value =null ; try{ Put put = new Put(row.get()); for(KeyValue kv : columns.list()){ cxt.getCounter(Counters.COLS).increment(1); value= Bytes.toStringBinary(kv.getValue()); if(equals(columnQualifier,kv.getQualifier())){ // 過濾column put.add(columnFamily,columnQualifier1,kv.getValue()); infoTable.put(put); } } }catch(Exception e){ log.info("Error:"+e.getMessage()+",Row:"+Bytes.toStringBinary(row.get())+ ",Value:"+value); cxt.getCounter(Counters.ERROR).increment(1); } } private boolean equals(byte[] a,byte[] b){ String aStr= Bytes.toString(a); String bStr= Bytes.toString(b); if(aStr.equals(bStr)){ return true; } return false; } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { String input ="testtable"; byte[] columnFamily = Bytes.toBytes("fam"); byte[] columnQualifier = Bytes.toBytes("data"); byte[] columnQualifier1 = Bytes.toBytes("data1"); Scan scan = new Scan (); scan.addColumn(columnFamily, columnQualifier); HadoopUtils.initialConf("hbase"); Configuration conf = HadoopUtils.getConf(); conf.set("conf.columnfamily", Bytes.toStringBinary(columnFamily)); conf.set("conf.columnqualifier", Bytes.toStringBinary(columnQualifier)); conf.set("conf.columnqualifier1", Bytes.toStringBinary(columnQualifier1)); conf.set("conf.infotable", input); Job job = new Job(conf,"Parse data in "+input+",into tables"); job.setJarByClass(ParseSinglePutDriver.class); TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class, ImmutableBytesWritable.class, Put.class,job); job.setOutputFormatClass(NullOutputFormat.class); job.setNumReduceTasks(0); System.exit(job.waitForCompletion(true)?0:1); } }
2.4 上面2.3中的HTable其實也是能夠put一個List的,下面的方式就是put一個list的方式,這樣效率會高。debug
package hbase.mr; import hadoop.util.HadoopUtils; import hbase.mr.AnalyzeDriver.Counters; import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ParseListPutDriver { /** * 使用HTable進行寫入 * List <Put> 進行測試,查看效率 * 把infoTable 表中的 qualifier字段複製到qualifier1字段 * @param args */ private static Logger log = LoggerFactory.getLogger(ParseMapper.class); static class ParseMapper extends TableMapper<ImmutableBytesWritable,Writable>{ private HTable infoTable =null ; private byte[] columnFamily =null ; private byte[] columnQualifier =null; private byte[] columnQualifier1 =null; private List<Put> list = new ArrayList<Put>(); @Override protected void setup(Context cxt){ log.info("ParseListPutDriver setup,current time: "+new Date()); try { infoTable = new HTable(cxt.getConfiguration(), cxt.getConfiguration().get("conf.infotable")); infoTable.setAutoFlush(false); } catch (IOException e) { log.error("Initial infoTable error:\n"+e.getMessage()); } columnFamily = Bytes.toBytes(cxt.getConfiguration().get("conf.columnfamily")); columnQualifier = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier")); columnQualifier1 = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier1")); } @Override protected void cleanup(Context cxt){ try { infoTable.put(list); infoTable.flushCommits(); log.info("ParseListPutDriver cleanup ,current time :"+new Date()); } catch (IOException e) { log.error("infoTable flush commits error:\n"+e.getMessage()); } } @Override public void map(ImmutableBytesWritable row,Result columns,Context cxt){ cxt.getCounter(Counters.ROWS).increment(1); String value =null ; try{ Put put = new Put(row.get()); for(KeyValue kv : columns.list()){ cxt.getCounter(Counters.COLS).increment(1); value= Bytes.toStringBinary(kv.getValue()); if(equals(columnQualifier,kv.getQualifier())){ // 過濾column put.add(columnFamily,columnQualifier1,kv.getValue()); list.add(put); } } }catch(Exception e){ log.info("Error:"+e.getMessage()+",Row:"+Bytes.toStringBinary(row.get())+ ",Value:"+value); cxt.getCounter(Counters.ERROR).increment(1); } } private boolean equals(byte[] a,byte[] b){ String aStr= Bytes.toString(a); String bStr= Bytes.toString(b); if(aStr.equals(bStr)){ return true; } return false; } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { String input ="testtable"; byte[] columnFamily = Bytes.toBytes("fam"); byte[] columnQualifier = Bytes.toBytes("data"); byte[] columnQualifier1 = Bytes.toBytes("data2"); Scan scan = new Scan (); scan.addColumn(columnFamily, columnQualifier); HadoopUtils.initialConf("hbase"); Configuration conf = HadoopUtils.getConf(); conf.set("conf.columnfamily", Bytes.toStringBinary(columnFamily)); conf.set("conf.columnqualifier", Bytes.toStringBinary(columnQualifier)); conf.set("conf.columnqualifier1", Bytes.toStringBinary(columnQualifier1)); conf.set("conf.infotable", input); Job job = new Job(conf,"Parse data in "+input+",into tables"); job.setJarByClass(ParseListPutDriver.class); TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class, ImmutableBytesWritable.class, Put.class,job); job.setOutputFormatClass(NullOutputFormat.class); job.setNumReduceTasks(0); System.exit(job.waitForCompletion(true)?0:1); } }
數據記錄條數爲:26632,能夠看到下面圖片中的時間記錄對比:
因爲結合了hbase,因此須要在hadoop_home/lib目錄下面加些額外的包,其整個包以下:
分享,成長,快樂