hbase 學習(十二)非mapreduce生成Hfile,而後導入hbase當中

  最近一個羣友的boss讓研究hbase,讓hbase的入庫速度達到5w+/s,這可愁死了,4臺我的電腦組成的集羣,多線程入庫調了很久,速度也才1w左右,都沒有達到理想的那種速度,而後就想到了這種方式,可是網上可能是用mapreduce來實現入庫,而如今的需求是實時入庫,不生成文件了,因此就只能本身用代碼實現了,可是網上查了不少資料都沒有查到,最後在一個網友的指引下,看了源碼,最後找到了生成Hfile的方式,實現了以後,發現單線程入庫速度才達到1w4左右,和以前的多線程的全速差很少了,百思不得其解之時,調整了一下代碼把列的Byte.toBytes(cols)這個方法調整出來只作一次,速度立馬就到3w了,提高很是明顯,這是個人電腦上的速度,估計在它的集羣上能更快一點吧,下面把代碼和你們分享一下。node

 1        String tableName = "taglog";
 2             byte[] family = Bytes.toBytes("logs");
 3             //配置文件設置
 4             Configuration conf = HBaseConfiguration.create();
 5             conf.set("hbase.master", "192.168.1.133:60000");
 6             conf.set("hbase.zookeeper.quorum", "192.168.1.135");
 7             //conf.set("zookeeper.znode.parent", "/hbase");
 8             conf.set("hbase.metrics.showTableName", "false");
 9             //conf.set("io.compression.codecs", "org.apache.hadoop.io.compress.SnappyCodec");
10             
11             String outputdir = "hdfs://hadoop.Master:8020/user/SEA/hfiles/";
12             Path dir = new Path(outputdir);
13             Path familydir = new Path(outputdir, Bytes.toString(family));
14             FileSystem fs = familydir.getFileSystem(conf);
15             BloomType bloomType = BloomType.NONE;
16             final HFileDataBlockEncoder encoder = NoOpDataBlockEncoder.INSTANCE;
17             int blockSize = 64000;
18             Configuration tempConf = new Configuration(conf);
19             tempConf.set("hbase.metrics.showTableName", "false");
20             tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 1.0f);
21             //實例化HFile的Writer,StoreFile實際上只是HFile的輕量級的封裝
22             StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf),
23                     fs, blockSize)
24                     .withOutputDir(familydir)
25                     .withCompression(Compression.Algorithm.NONE)
26                     .withBloomType(bloomType).withComparator(KeyValue.COMPARATOR)
27                     .withDataBlockEncoder(encoder).build();
28             long start = System.currentTimeMillis();
29             
30             DecimalFormat df = new DecimalFormat("0000000");
31             
32             
33             
34             KeyValue kv1 = null;
35             KeyValue kv2 = null;
36             KeyValue kv3 = null;
37             KeyValue kv4 = null;
38             KeyValue kv5 = null;
39             KeyValue kv6 = null;
40             KeyValue kv7 = null;
41             KeyValue kv8 = null;
42             
43             //這個是耗時操做,只進行一次
44             byte[] cn = Bytes.toBytes("cn");
45             byte[] dt = Bytes.toBytes("dt");
46             byte[] ic = Bytes.toBytes("ic");
47             byte[] ifs = Bytes.toBytes("if");
48             byte[] ip = Bytes.toBytes("ip");
49             byte[] le = Bytes.toBytes("le");
50             byte[] mn = Bytes.toBytes("mn");
51             byte[] pi = Bytes.toBytes("pi");
52             
53             int maxLength = 3000000;
54             for(int i=0;i<maxLength;i++){
55                 String currentTime = ""+System.currentTimeMillis() + df.format(i);
56                 long current = System.currentTimeMillis();
57                  //rowkey和列都要按照字典序的方式順序寫入,不然會報錯的
58                  kv1 = new KeyValue(Bytes.toBytes(currentTime), 
59                          family, cn,current,KeyValue.Type.Put,Bytes.toBytes("3"));
60                 
61                  kv2 = new KeyValue(Bytes.toBytes(currentTime), 
62                          family, dt,current,KeyValue.Type.Put,Bytes.toBytes("6"));
63                 
64                  kv3 = new KeyValue(Bytes.toBytes(currentTime), 
65                          family, ic,current,KeyValue.Type.Put,Bytes.toBytes("8"));
66                 
67                  kv4 = new KeyValue(Bytes.toBytes(currentTime), 
68                          family, ifs,current,KeyValue.Type.Put,Bytes.toBytes("7"));
69                 
70                  kv5 = new KeyValue(Bytes.toBytes(currentTime), 
71                          family, ip,current,KeyValue.Type.Put,Bytes.toBytes("4"));
72                 
73                  kv6 = new KeyValue(Bytes.toBytes(currentTime), 
74                          family, le,current,KeyValue.Type.Put,Bytes.toBytes("2"));
75                 
76                  kv7 = new KeyValue(Bytes.toBytes(currentTime), 
77                          family, mn,current,KeyValue.Type.Put,Bytes.toBytes("5"));
78                 
79                  kv8 = new KeyValue(Bytes.toBytes(currentTime), 
80                          family,pi,current,KeyValue.Type.Put,Bytes.toBytes("1"));
81                 
82                 writer.append(kv1);
83                 writer.append(kv2);
84                 writer.append(kv3);
85                 writer.append(kv4);
86                 writer.append(kv5);
87                 writer.append(kv6);
88                 writer.append(kv7);
89                 writer.append(kv8);
90             }
91             
92             
93             writer.close();
94             
95             //把生成的HFile導入到hbase當中
96             HTable table = new HTable(conf,tableName);
97             LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
98             loader.doBulkLoad(dir, table);

  

  最後再附上查看hfile的方式,查詢正確的hfile和本身生成的hfile,方便查找問題。
  hbase org.apache.hadoop.hbase.io.hfile.HFile -p -f hdfs://hadoop.Master:8020/user/SEA/hfiles/logs/51aa97b2a25446f89d5c870af92c9fc1
相關文章
相關標籤/搜索