對如下手機流量信息進行模擬導入,放置到HDFS文件系統input文件夾下
java
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200apache
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200安全
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200app
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200socket
1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 視頻網站 15 12 1527 2106 200oop
1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200網站
1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200搜索引擎
1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200spa
1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200.net
1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站點統計 24 9 6960 690 200
1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200
1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站點統計 3 3 1938 180 200
1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200
1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200
1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 綜合門戶 15 12 1938 2910 200
1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200
1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 綜合門戶 57 102 7335 110349 200
1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200
1363157985079 13823070001 20-7C-8F-70-68-1F:CMCC 120.196.100.99 6 3 360 180 200
1363157985069 13600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 18 138 1080 186852 200
03 |
import java.io.IOException; |
04 |
import java.text.SimpleDateFormat; |
05 |
import java.util.Date; |
07 |
import org.apache.hadoop.conf.Configuration; |
08 |
import org.apache.hadoop.hbase.client.Put; |
09 |
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; |
10 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
11 |
import org.apache.hadoop.hbase.util.Bytes; |
12 |
import org.apache.hadoop.io.LongWritable; |
13 |
import org.apache.hadoop.io.NullWritable; |
14 |
import org.apache.hadoop.io.Text; |
15 |
import org.apache.hadoop.mapreduce.Counter; |
16 |
import org.apache.hadoop.mapreduce.Job; |
17 |
import org.apache.hadoop.mapreduce.Mapper; |
18 |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
19 |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; |
21 |
public class BatchImport { |
22 |
static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text>{ |
23 |
SimpleDateFormat dateformat1= new SimpleDateFormat( "yyyyMMddHHmmss" ); |
26 |
protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException { |
27 |
final String[] splited = value.toString().split( "\t" ); |
29 |
final Date date = new Date(Long.parseLong(splited[ 0 ].trim())); |
30 |
final String dateFormat = dateformat1.format(date); |
31 |
String rowKey = splited[ 1 ]+ ":" +dateFormat; |
32 |
v2.set(rowKey+ "\t" +value.toString()); |
33 |
context.write(key, v2); |
34 |
} catch (NumberFormatException e) { |
35 |
final Counter counter = context.getCounter( "BatchImport" , "ErrorFormat" ); |
36 |
counter.increment(1L); |
37 |
System.out.println( "出錯了" +splited[ 0 ]+ " " +e.getMessage()); |
42 |
static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{ |
43 |
protected void reduce(LongWritable key, java.lang.Iterable<Text> values, Context context) throws java.io.IOException ,InterruptedException { |
44 |
for (Text text : values) { |
45 |
final String[] splited = text.toString().split( "\t" ); |
47 |
final Put put = new Put(Bytes.toBytes(splited[ 0 ])); |
48 |
put.add(Bytes.toBytes( "cf" ), Bytes.toBytes( "date" ), Bytes.toBytes(splited[ 1 ])); |
49 |
put.add(Bytes.toBytes( "cf" ), Bytes.toBytes( "msisdn" ), Bytes.toBytes(splited[ 2 ])); |
50 |
put.add(Bytes.toBytes( "cf" ),Bytes.toBytes( "apmac" ), Bytes.toBytes(splited[ 3 ])); |
51 |
//省略其餘字段,調用put.add(....)便可 |
52 |
context.write(NullWritable.get(), put); |
57 |
public static void main(String[] args) throws Exception { |
60 |
final Configuration configuration = new Configuration(); |
62 |
configuration.set( "hbase.zookeeper.quorum" , "hadoop" ); |
64 |
configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log" ); |
66 |
configuration.set( "dfs.socket.timeout" , "180000" ); |
68 |
final Job job = new Job(configuration, "HBaseBatchImport" ); |
70 |
job.setMapperClass(BatchImportMapper. class ); |
71 |
job.setReducerClass(BatchImportReducer. class ); |
72 |
//設置map的輸出,不設置reduce的輸出類型 |
73 |
job.setMapOutputKeyClass(LongWritable. class ); |
74 |
job.setMapOutputValueClass(Text. class ); |
76 |
job.setInputFormatClass(TextInputFormat. class ); |
77 |
//再也不設置輸出路徑,而是設置輸出格式類型 |
78 |
job.setOutputFormatClass(TableOutputFormat. class ); |
80 |
FileInputFormat.setInputPaths(job, "hdfs://hadoop:9000/input" ); |
82 |
job.waitForCompletion( true ); |
前後啓動Hadoop-->zookeeper-->habase以下

建立與之對應的表結構
RowKey設計:msisdn:日期時間串(yyyyMMddHHmmss)

執行java程序後

01 |
查詢手機 13450456688 的全部上網記錄 |
02 |
public static void scan(String tableName) throws IOException{ |
03 |
HTable table = new HTable(getConfiguration(),tableName); |
04 |
Scan scan = new Scan(); |
05 |
scan.setStartRow(Bytes.toBytes( "13450456688:/" )); |
06 |
scan.setStopRow(Bytes.toBytes( "13450456688::" )); |
07 |
ResultScanner scanner =table.getScanner(scan); |
09 |
for (Result result : scanner) { |
10 |
System.out.println( "Scan:" +i+++ " " +result); |
02 |
public static void scanPeriod(String tableName) throws IOException{ |
03 |
HTable table = new HTable(getConfiguration(),tableName); |
04 |
Scan scan = new Scan(); |
05 |
scan.setStartRow(Bytes.toBytes( "134/" )); |
07 |
Bytes.toBytes( "134:" )); |
08 |
scan.setMaxVersions( 1 ); |
09 |
ResultScanner scanner =table.getScanner(scan); |
11 |
for (Result result : scanner) { |
12 |
System.out.println( "Scan:" +i+++ " " +result); |