這2周主要將項目中寫hbase的模塊中原來的異步hbaseclient改寫成了使用hbase原生的HTable對象。大概總結下改寫過程當中和hj,xingchao發現的問題和解決方法。java
1.HTablePool的基本使用方式:數據庫
因爲HTable對象不是線程安全的,所以HBase提供HTablePool來支持多線程寫入hbase,多線程同時從HTablePool 中取出HTable並寫入是安全的。HTablePool的使用方法相似數據庫鏈接,使用時從HTablePool中取出一個HTable,使用完後再 close放回HTablePool中。安全
Put put = new Put(rowkey); put.add(LOG_COLUMN_FAMILY,HOST_NAME_QUALIFIER,values[0]); HTableInterface table = HBaseClientFactory.getHTableByName(RAW_LOG_TABLE); try { table.put(put); } catch (IOException e) { throw new RuntimeException("Put Log meet exception",e); }finally {
HBaseClientUtil.closeHTable(table); }
2.HTablePool的maxsize。多線程
HTablePool有一個maxsize,HTablePool針對每一個表都有一個Pool,maxsize表示這個Pool的最大大小,在使用HTablePool的過程當中咱們發現這個值仍是有須要注意的地方。異步
在多線程使用HTablePool拿到同一個表的HTable時,若是線程個數大於maxsize會致使寫入始終是autoflush!ide
public HTableInterface getTable(String tableName) { // call the old getTable implementation renamed to findOrCreateTable HTableInterface table = findOrCreateTable(tableName); // return a proxy table so when user closes the proxy, the actual table // will be returned to the pool return new PooledHTable(table); }
當拿到HTable時會建立一個HTable對象幷包裝成一個PooledHTable對象。Pooled作了什麼納,其餘方法都沒變,只是在close時有所不一樣:this
public void close() throws IOException { returnTable(table); }
private void returnTable(HTableInterface table) throws IOException { // this is the old putTable method renamed and made private String tableName = Bytes.toString(table.getTableName()); if (tables.size(tableName) >= maxSize) { // release table instance since we're not reusing it this.tables.remove(tableName, table); this.tableFactory.releaseHTableInterface(table); return; } tables.put(tableName, table); }
能夠看到若是tables.size大於maxsize,此時會去掉一個保存的HTable對象,而releaseHTableInterface實際調 用的就是HTable的close方法,close方法又會強制flushHTable的buffer,所以,若是咱們想不使用autoflush提高寫 入速度失效。
線程
3.HTablePool type。code
HTablePool提供了幾種方式:ReusablePool,RoundRobinPool,ThreadLocalPool。默認的是 reusable,因爲2的緣由,咱們也能夠考慮使用ThreadLocal的Pool,這樣多線程寫入時分別取本身線程的Pool,這樣互不影響,寫入 的效率也會比較高。對象
static class ThreadLocalPool<R> extends ThreadLocal<R> implements Pool<R> { private static final Map<ThreadLocalPool<?>, AtomicInteger> poolSizes = new HashMap<ThreadLocalPool<?>, AtomicInteger>(); public ThreadLocalPool() { } @Override public R put(R resource) { R previousResource = get(); if (previousResource == null) { AtomicInteger poolSize = poolSizes.get(this); if (poolSize == null) { poolSizes.put(this, poolSize = new AtomicInteger(0)); } poolSize.incrementAndGet(); } this.set(resource); return previousResource; }
4.HTable的WriteBufferSize和autoflush
若是想追求寫入的速度咱們能夠設置setWriteBufferSize爲一個比較大的大小好比1M並autoflush爲false,這樣寫 入的速度會有幾十倍的提高,但若是BufferSize比較大也會帶來寫入不夠實時的問題,尤爲有些表的數據很小會好久都不flush。所以,咱們能夠添 加按時間間隔的flush方式。
@Override public void put(final List<Put> puts) throws IOException { super.put(puts); needFlush(); } private void needFlush() throws IOException { long currentTime = System.currentTimeMillis(); if ((currentTime - lastFlushTime.longValue()) > flushInterval) { super.flushCommits(); lastFlushTime.set(currentTime); } }
HTablePool能夠設置自定義的HTableFactory來建立咱們自定義的HTable。
pool = new HTablePool(conf, maxSize, tableFactory, PoolMap.PoolType.ThreadLocal);