hbase數據導入:java
參考http://blog.csdn.net/hua840812/article/details/7414875,在把代碼copy下來後,發現運行老是報錯:apache
java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.hbase.io.ImmutableBytesWritable, recieved org.apache.hadoop.io.LongWritable;app
緣由是map的輸出必須按照現有的版原本寫,也就是extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>ide
要這樣寫,不能簡單的寫成extends Mapper,oop
代碼仍是貼出來:url
生成hfile的代碼:spa
package com.bonc.db; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class TestHFileToHBase { public static class TestHFileToHBaseMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> { protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] values = value.toString().split("\\|"); ImmutableBytesWritable rowkey = new ImmutableBytesWritable( values[0].toString().trim().getBytes()); KeyValue kvProtocol; if (values.length>1){ kvProtocol = new KeyValue(values[0].toString().trim().getBytes(), "url_type".getBytes(), "type".getBytes(),System.currentTimeMillis(), values[1].toString().trim() .getBytes()); }else{ kvProtocol=new KeyValue(values[0].toString().trim().getBytes(), "url_type".getBytes(), "type".getBytes(),System.currentTimeMillis(), "NULL".getBytes()); } context.write(rowkey, kvProtocol); // KeyValue kvSrcip = new KeyValue(row, "SRCIP".getBytes(), // "SRCIP".getBytes(), values[1].getBytes()); // context.write(k, kvSrcip); // HFileOutputFormat.getRecordWriter } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = HBaseConfiguration.create(); Job job = new Job(conf, "TestHFileToHBase"); job.setJarByClass(TestHFileToHBase.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setMapperClass(TestHFileToHBaseMapper.class); job.setReducerClass(KeyValueSortReducer.class); // job.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.HFileOutputFormat.class); job.setOutputFormatClass(HFileOutputFormat.class); // job.setNumReduceTasks(4); // job.setPartitionerClass(org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner.class); // HBaseAdmin admin = new HBaseAdmin(conf); HTable table = new HTable(conf, "url_rule"); HFileOutputFormat.configureIncrementalLoad(job, table); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
hfile導入到表的代碼:.net
1 package com.bonc.db; 2 import java.io.IOException; 3 4 import org.apache.hadoop.conf.Configuration; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.hbase.HBaseConfiguration; 7 import org.apache.hadoop.hbase.client.HTable; 8 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; 9 import org.apache.hadoop.hbase.util.Bytes; 10 11 public class TestLoadIncrementalHFileToHBase { 12 13 // private static final byte[] TABLE = Bytes.toBytes("hua"); 14 // private static final byte[] QUALIFIER = Bytes.toBytes("PROTOCOLID"); 15 // private static final byte[] FAMILY = Bytes.toBytes("PROTOCOLID"); 16 17 public static void main(String[] args) throws IOException { 18 Configuration conf = HBaseConfiguration.create(); 19 // byte[] TABLE = Bytes.toBytes("hua"); 20 byte[] TABLE = Bytes.toBytes(args[0]); 21 HTable table = new HTable(TABLE); 22 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); 23 loader.doBulkLoad(new Path(args[1]), table); 24 // loader.doBulkLoad(new Path("/hua/testHFileResult/"), table); 25 } 26 27 }
悲劇的是在從hfile導入到表的時候報錯:翻譯
java.io.IOException: java.io.IOException: Failed rename of maprfs://133.0.76.41:7222/user/hbasetest/url_type/4447706551787973235 to maprfs://133.0.76.41:7222/hbase/url_rule/732e6d3d150caa8bd3d8d228e3d9c9a0/url_type/914168143008836217
at org.apache.hadoop.hbase.regionserver.StoreFile.rename(StoreFile.java:512)3d
雖然解決辦法在這裏:
http://answers.mapr.com/questions/685/hbase-and-completebulkload
可是我實在是沒看懂。so,我採用了最原始的方法:
split將文件分割成小文件,而後:
1 package com.bonc.db; 2 import java.io.BufferedReader; 3 import java.io.FileReader; 4 import java.io.IOException; 5 6 import org.apache.hadoop.hbase.client.HTable; 7 import org.apache.hadoop.hbase.client.HTablePool; 8 import org.apache.hadoop.hbase.client.Put; 9 10 import com.bonc.URLMatch.HBaseMain; 11 public class URL_HBase { 12 13 public static void main(String[] args) throws IOException{ 14 //文件絕對路徑改爲你本身的文件路徑 15 FileReader fr=new FileReader(args[0]); 16 //能夠換成工程目錄下的其餘文本文件 17 HTablePool pool = new HTablePool(HBaseMain.conf, 1000); 18 HTable table = (HTable) pool.getTable("url_rule"); 19 BufferedReader br=new BufferedReader(fr); 20 while(br.readLine()!=null){ 21 String[] s=br.readLine().toString().split("\\|"); 22 if(s.length>1){ 23 Put put = new Put(s[0].trim().getBytes()); 24 put.add("url_type".getBytes(), "type".getBytes(), s[1].trim().getBytes()); 25 table.put(put); 26 }else{ 27 System.out.println(s); 28 } 29 } 30 br.close(); 31 } 32 }
終於成功了,但願有人可以幫我翻譯一下,怎麼解決是個什麼意思。。唉。