Hbase結合MapReduce的批量導入

對如下手機流量信息進行模擬導入,放置到HDFS文件系統input文件夾下
java

1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200apache

1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200安全

1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200app

1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200socket

1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 視頻網站 15 12 1527 2106 200oop

1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200網站

1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200搜索引擎

1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200spa

1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200.net

1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站點統計 24 9 6960 690 200

1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200

1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站點統計 3 3 1938 180 200

1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200

1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200

1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 綜合門戶 15 12 1938 2910 200

1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200

1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 綜合門戶 57 102 7335 110349 200

1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200

1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200

1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200

1363157985079 13823070001 20-7C-8F-70-68-1F:CMCC 120.196.100.99 6 3 360 180 200

1363157985069 13600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 18 138 1080 186852 200


01 package Hbase;
02
03 import java.io.IOException;
04 import java.text.SimpleDateFormat;
05 import java.util.Date;
06
07 import org.apache.hadoop.conf.Configuration;
08 import org.apache.hadoop.hbase.client.Put;
09 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
10 import org.apache.hadoop.hbase.mapreduce.TableReducer;
11 import org.apache.hadoop.hbase.util.Bytes;
12 import org.apache.hadoop.io.LongWritable;
13 import org.apache.hadoop.io.NullWritable;
14 import org.apache.hadoop.io.Text;
15 import org.apache.hadoop.mapreduce.Counter;
16 import org.apache.hadoop.mapreduce.Job;
17 import org.apache.hadoop.mapreduce.Mapper;
18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
19 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
20
21 public class BatchImport {
22     static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
23         SimpleDateFormat dateformat1=new SimpleDateFormat("yyyyMMddHHmmss");
24         Text v2 = new Text();
25
26         protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
27             final String[] splited = value.toString().split("\t");
28             try {
29                 final Date date = new Date(Long.parseLong(splited[0].trim()));
30                 final String dateFormat = dateformat1.format(date);
31                 String rowKey = splited[1]+":"+dateFormat;
32                 v2.set(rowKey+"\t"+value.toString());
33                 context.write(key, v2);
34             catch (NumberFormatException e) {
35                 final Counter counter = context.getCounter("BatchImport""ErrorFormat");
36                 counter.increment(1L);
37                 System.out.println("出錯了"+splited[0]+" "+e.getMessage());
38             }
39         };
40     }
41
42     static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{
43         protected void reduce(LongWritable key, java.lang.Iterable<Text> values,    Context context) throws java.io.IOException ,InterruptedException {
44             for (Text text : values) {
45                 final String[] splited = text.toString().split("\t");
46
47                 final Put put = new Put(Bytes.toBytes(splited[0]));
48                 put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"), Bytes.toBytes(splited[1]));
49                 put.add(Bytes.toBytes("cf"), Bytes.toBytes("msisdn"), Bytes.toBytes(splited[2]));
50                 put.add(Bytes.toBytes("cf"),Bytes.toBytes("apmac"), Bytes.toBytes(splited[3]));
51                 //省略其餘字段,調用put.add(....)便可
52                 context.write(NullWritable.get(), put);
53             }
54         };
55     }
56
57     public static void main(String[] args) throws Exception {
58
59
60         final Configuration configuration = new Configuration();
61         //設置zookeeper
62         configuration.set("hbase.zookeeper.quorum""hadoop");
63         //設置hbase表名稱
64         configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");
65         //將該值改大,防止hbase超時退出
66         configuration.set("dfs.socket.timeout""180000");
67
68         final Job job = new Job(configuration, "HBaseBatchImport");
69
70         job.setMapperClass(BatchImportMapper.class);
71         job.setReducerClass(BatchImportReducer.class);
72         //設置map的輸出,不設置reduce的輸出類型
73         job.setMapOutputKeyClass(LongWritable.class);
74         job.setMapOutputValueClass(Text.class);
75
76         job.setInputFormatClass(TextInputFormat.class);
77         //再也不設置輸出路徑,而是設置輸出格式類型
78         job.setOutputFormatClass(TableOutputFormat.class);
79
80         FileInputFormat.setInputPaths(job, "hdfs://hadoop:9000/input");
81
82         job.waitForCompletion(true);
83     }
84 }





前後啓動Hadoop-->zookeeper-->habase以下

建立與之對應的表結構

RowKey設計:msisdn:日期時間串(yyyyMMddHHmmss

執行java程序後

01 查詢手機13450456688的全部上網記錄
02   public static void scan(String tableName)throws IOException{
03       HTable table = new HTable(getConfiguration(),tableName);
04       Scan scan = new Scan();
05       scan.setStartRow(Bytes.toBytes("13450456688:/"));
06       scan.setStopRow(Bytes.toBytes("13450456688::"));
07       ResultScanner scanner =table.getScanner(scan);
08       int i=0;
09       for (Result result : scanner) {
10          System.out.println("Scan:"+i+++" "+result);
11       }
12   }
01 查詢134號段的全部上網記錄
02 public static void scanPeriod(String tableName) throws IOException{
03   HTable table = new HTable(getConfiguration(),tableName);
04   Scan scan = new Scan();
05   scan.setStartRow(Bytes.toBytes("134/"));
06   scan.setStopRow(
07   Bytes.toBytes("134:"));
08   scan.setMaxVersions(1);
09   ResultScanner scanner =table.getScanner(scan);
10   int i=0;
11   for (Result result : scanner) {
12        System.out.println("Scan:"+i+++" "+result);
13     }
14   }
相關文章
相關標籤/搜索