hbase數據導入

hbase數據導入:java

參考http://blog.csdn.net/hua840812/article/details/7414875,在把代碼copy下來後,發現運行老是報錯:apache

java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.hbase.io.ImmutableBytesWritable, recieved org.apache.hadoop.io.LongWritable;app

緣由是map的輸出必須按照現有的版原本寫,也就是extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>ide

要這樣寫,不能簡單的寫成extends Mapper,oop

代碼仍是貼出來:url

生成hfile的代碼:spa

package com.bonc.db;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TestHFileToHBase {

    public static class TestHFileToHBaseMapper  extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] values = value.toString().split("\\|");
            ImmutableBytesWritable rowkey = new ImmutableBytesWritable(
                    values[0].toString().trim().getBytes());
            KeyValue kvProtocol;
            if (values.length>1){
             kvProtocol = new KeyValue(values[0].toString().trim().getBytes(), "url_type".getBytes(), "type".getBytes(),System.currentTimeMillis(), values[1].toString().trim()
                    .getBytes());
            }else{
                kvProtocol=new KeyValue(values[0].toString().trim().getBytes(), "url_type".getBytes(), "type".getBytes(),System.currentTimeMillis(), "NULL".getBytes());
            }
            context.write(rowkey, kvProtocol);
            // KeyValue kvSrcip = new KeyValue(row, "SRCIP".getBytes(),
            // "SRCIP".getBytes(), values[1].getBytes());
            // context.write(k, kvSrcip);
//             HFileOutputFormat.getRecordWriter 
        }

    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = HBaseConfiguration.create();
        Job job = new Job(conf, "TestHFileToHBase");
        job.setJarByClass(TestHFileToHBase.class);

        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(KeyValue.class);

        job.setMapperClass(TestHFileToHBaseMapper.class);
        job.setReducerClass(KeyValueSortReducer.class);
//        job.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.HFileOutputFormat.class);
        job.setOutputFormatClass(HFileOutputFormat.class);
        // job.setNumReduceTasks(4);
        // job.setPartitionerClass(org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner.class);

        // HBaseAdmin admin = new HBaseAdmin(conf);
        HTable table = new HTable(conf, "url_rule");

        HFileOutputFormat.configureIncrementalLoad(job, table);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

hfile導入到表的代碼:.net

 1 package com.bonc.db;
 2 import java.io.IOException;
 3 
 4 import org.apache.hadoop.conf.Configuration;
 5 import org.apache.hadoop.fs.Path;
 6 import org.apache.hadoop.hbase.HBaseConfiguration;
 7 import org.apache.hadoop.hbase.client.HTable;  
 8 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;  
 9 import org.apache.hadoop.hbase.util.Bytes;  
10   
11 public class TestLoadIncrementalHFileToHBase {  
12   
13     // private static final byte[] TABLE = Bytes.toBytes("hua");  
14     // private static final byte[] QUALIFIER = Bytes.toBytes("PROTOCOLID");  
15     // private static final byte[] FAMILY = Bytes.toBytes("PROTOCOLID");  
16   
17     public static void main(String[] args) throws IOException {  
18         Configuration conf = HBaseConfiguration.create();  
19 //      byte[] TABLE = Bytes.toBytes("hua");  
20         byte[] TABLE = Bytes.toBytes(args[0]);  
21         HTable table = new HTable(TABLE);  
22         LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);  
23         loader.doBulkLoad(new Path(args[1]), table);  
24 //      loader.doBulkLoad(new Path("/hua/testHFileResult/"), table);  
25     }  
26   
27 }  
View Code

 悲劇的是在從hfile導入到表的時候報錯:翻譯

java.io.IOException: java.io.IOException: Failed rename of maprfs://133.0.76.41:7222/user/hbasetest/url_type/4447706551787973235 to maprfs://133.0.76.41:7222/hbase/url_rule/732e6d3d150caa8bd3d8d228e3d9c9a0/url_type/914168143008836217
        at org.apache.hadoop.hbase.regionserver.StoreFile.rename(StoreFile.java:512)3d

雖然解決辦法在這裏:

http://answers.mapr.com/questions/685/hbase-and-completebulkload

可是我實在是沒看懂。so,我採用了最原始的方法:

split將文件分割成小文件,而後:

 1 package com.bonc.db;
 2 import java.io.BufferedReader;
 3 import java.io.FileReader;
 4 import java.io.IOException;
 5 
 6 import org.apache.hadoop.hbase.client.HTable;
 7 import org.apache.hadoop.hbase.client.HTablePool;
 8 import org.apache.hadoop.hbase.client.Put;
 9 
10 import com.bonc.URLMatch.HBaseMain;
11 public class URL_HBase {
12 
13         public static void main(String[] args) throws IOException{
14             //文件絕對路徑改爲你本身的文件路徑
15             FileReader fr=new FileReader(args[0]);
16             //能夠換成工程目錄下的其餘文本文件
17             HTablePool pool = new HTablePool(HBaseMain.conf, 1000);
18             HTable table = (HTable) pool.getTable("url_rule");
19             BufferedReader br=new BufferedReader(fr);
20             while(br.readLine()!=null){
21                 String[] s=br.readLine().toString().split("\\|");
22                 if(s.length>1){
23                 Put put = new Put(s[0].trim().getBytes());
24                 put.add("url_type".getBytes(), "type".getBytes(), s[1].trim().getBytes());
25                 table.put(put);
26                 }else{
27                     System.out.println(s);
28                 }
29             }
30             br.close();
31         }
32     }
View Code

終於成功了,但願有人可以幫我翻譯一下,怎麼解決是個什麼意思。。唉。

相關文章
相關標籤/搜索