最近一個羣友的boss讓研究hbase,讓hbase的入庫速度達到5w+/s,這可愁死了,4臺我的電腦組成的集羣,多線程入庫調了很久,速度也才1w左右,都沒有達到理想的那種速度,而後就想到了這種方式,可是網上可能是用mapreduce來實現入庫,而如今的需求是實時入庫,不生成文件了,因此就只能本身用代碼實現了,可是網上查了不少資料都沒有查到,最後在一個網友的指引下,看了源碼,最後找到了生成Hfile的方式,實現了以後,發現單線程入庫速度才達到1w4左右,和以前的多線程的全速差很少了,百思不得其解之時,調整了一下代碼把列的Byte.toBytes(cols)這個方法調整出來只作一次,速度立馬就到3w了,提高很是明顯,這是個人電腦上的速度,估計在它的集羣上能更快一點吧,下面把代碼和你們分享一下。node
1 String tableName = "taglog"; 2 byte[] family = Bytes.toBytes("logs"); 3 //配置文件設置 4 Configuration conf = HBaseConfiguration.create(); 5 conf.set("hbase.master", "192.168.1.133:60000"); 6 conf.set("hbase.zookeeper.quorum", "192.168.1.135"); 7 //conf.set("zookeeper.znode.parent", "/hbase"); 8 conf.set("hbase.metrics.showTableName", "false"); 9 //conf.set("io.compression.codecs", "org.apache.hadoop.io.compress.SnappyCodec"); 10 11 String outputdir = "hdfs://hadoop.Master:8020/user/SEA/hfiles/"; 12 Path dir = new Path(outputdir); 13 Path familydir = new Path(outputdir, Bytes.toString(family)); 14 FileSystem fs = familydir.getFileSystem(conf); 15 BloomType bloomType = BloomType.NONE; 16 final HFileDataBlockEncoder encoder = NoOpDataBlockEncoder.INSTANCE; 17 int blockSize = 64000; 18 Configuration tempConf = new Configuration(conf); 19 tempConf.set("hbase.metrics.showTableName", "false"); 20 tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 1.0f); 21 //實例化HFile的Writer,StoreFile實際上只是HFile的輕量級的封裝 22 StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), 23 fs, blockSize) 24 .withOutputDir(familydir) 25 .withCompression(Compression.Algorithm.NONE) 26 .withBloomType(bloomType).withComparator(KeyValue.COMPARATOR) 27 .withDataBlockEncoder(encoder).build(); 28 long start = System.currentTimeMillis(); 29 30 DecimalFormat df = new DecimalFormat("0000000"); 31 32 33 34 KeyValue kv1 = null; 35 KeyValue kv2 = null; 36 KeyValue kv3 = null; 37 KeyValue kv4 = null; 38 KeyValue kv5 = null; 39 KeyValue kv6 = null; 40 KeyValue kv7 = null; 41 KeyValue kv8 = null; 42 43 //這個是耗時操做,只進行一次 44 byte[] cn = Bytes.toBytes("cn"); 45 byte[] dt = Bytes.toBytes("dt"); 46 byte[] ic = Bytes.toBytes("ic"); 47 byte[] ifs = Bytes.toBytes("if"); 48 byte[] ip = Bytes.toBytes("ip"); 49 byte[] le = Bytes.toBytes("le"); 50 byte[] mn = Bytes.toBytes("mn"); 51 byte[] pi = Bytes.toBytes("pi"); 52 53 int maxLength = 3000000; 54 for(int i=0;i<maxLength;i++){ 55 String currentTime = ""+System.currentTimeMillis() + df.format(i); 56 long current = System.currentTimeMillis(); 57 //rowkey和列都要按照字典序的方式順序寫入,不然會報錯的 58 kv1 = new KeyValue(Bytes.toBytes(currentTime), 59 family, cn,current,KeyValue.Type.Put,Bytes.toBytes("3")); 60 61 kv2 = new KeyValue(Bytes.toBytes(currentTime), 62 family, dt,current,KeyValue.Type.Put,Bytes.toBytes("6")); 63 64 kv3 = new KeyValue(Bytes.toBytes(currentTime), 65 family, ic,current,KeyValue.Type.Put,Bytes.toBytes("8")); 66 67 kv4 = new KeyValue(Bytes.toBytes(currentTime), 68 family, ifs,current,KeyValue.Type.Put,Bytes.toBytes("7")); 69 70 kv5 = new KeyValue(Bytes.toBytes(currentTime), 71 family, ip,current,KeyValue.Type.Put,Bytes.toBytes("4")); 72 73 kv6 = new KeyValue(Bytes.toBytes(currentTime), 74 family, le,current,KeyValue.Type.Put,Bytes.toBytes("2")); 75 76 kv7 = new KeyValue(Bytes.toBytes(currentTime), 77 family, mn,current,KeyValue.Type.Put,Bytes.toBytes("5")); 78 79 kv8 = new KeyValue(Bytes.toBytes(currentTime), 80 family,pi,current,KeyValue.Type.Put,Bytes.toBytes("1")); 81 82 writer.append(kv1); 83 writer.append(kv2); 84 writer.append(kv3); 85 writer.append(kv4); 86 writer.append(kv5); 87 writer.append(kv6); 88 writer.append(kv7); 89 writer.append(kv8); 90 } 91 92 93 writer.close(); 94 95 //把生成的HFile導入到hbase當中 96 HTable table = new HTable(conf,tableName); 97 LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); 98 loader.doBulkLoad(dir, table);
最後再附上查看hfile的方式,查詢正確的hfile和本身生成的hfile,方便查找問題。
hbase org.apache.hadoop.hbase.io.hfile.HFile -p -f hdfs://hadoop.Master:8020/user/SEA/hfiles/logs/51aa97b2a25446f89d5c870af92c9fc1