HBase性能優化方法總結(三):讀表操做

本文主要是從HBase應用程序設計與開發的角度,總結幾種經常使用的性能優化方法。有關HBase系統配置級別的優化,可參考:淘寶Ken Wu同窗的博客html

下面是本文總結的第三部份內容:讀表操做相關的優化方法。緩存

3. 讀表操做

3.1 多HTable併發讀

建立多個HTable客戶端用於讀操做,提升讀數據的吞吐量,一個例子:性能優化

複製代碼

  Configuration conf = HBaseConfiguration.create();
  String table_log_name = 「user_log」;
rTableLog =  HTable[tableN];
 ( i = 0; i < tableN; i++) {
    rTableLog[i] =  HTable(conf, table_log_name);
    rTableLog[i].setScannerCaching(50);
}

複製代碼

3.2 HTable參數設置

3.2.1 Scanner Caching

hbase.client.scanner.caching配置項能夠設置HBase scanner一次從服務端抓取的數據條數,默認狀況下一次一條。經過將其設置成一個合理的值,能夠減小scan過程當中next()的時間開銷,代價是 scanner須要經過客戶端的內存來維持這些被cache的行記錄。網絡

有三個地方能夠進行配置:1)在HBase的conf配置文件中進行配置;2)經過調用HTable.setScannerCaching(int scannerCaching)進行配置;3)經過調用Scan.setCaching(int caching)進行配置。三者的優先級愈來愈高。多線程

3.2.2 Scan Attribute Selection

scan時指定須要的Column Family,能夠減小網絡傳輸數據量,不然默認scan操做會返回整行全部Column Family的數據。

3.2.3 Close ResultScanner

經過scan取完數據後,記得要關閉ResultScanner,不然RegionServer可能會出現問題(對應的Server資源沒法釋放)。

3.3 批量讀

經過調用HTable.get(Get)方法能夠根據一個指定的row key獲取一行記錄,一樣HBase提供了另外一個方法:經過調用HTable.get(List<Get>)方法能夠根據一個指定的row key列表,批量獲取多行記錄,這樣作的好處是批量執行,只須要一次網絡I/O開銷,這對於對數據實時性要求高並且網絡傳輸RTT高的情景下可能帶來明顯 的性能提高。併發

3.4 多線程併發讀

在客戶端開啓多個HTable讀線程,每一個讀線程負責經過HTable對象進行get操做。下面是一個多線程併發讀取HBase,獲取店鋪一天內各分鐘PV值的例子:性能

複製代碼

  DataReaderServer {
            ConcurrentHashMap<String, String> getUnitMinutePV( uid,  startStamp,  endStamp){
          min = startStamp;
          count = ()((endStamp - startStamp) / (60*1000));
         List<String> lst =  ArrayList<String>();
          ( i = 0; i <= count; i++) {
            min = startStamp + i * 60 * 1000;
            lst.add(uid + "_" + min);
         }
          parallelBatchMinutePV(lst);
     }
        ConcurrentHashMap<String, String> parallelBatchMinutePV(List<String> lstKeys){
        ConcurrentHashMap<String, String> hashRet =  ConcurrentHashMap<String, String>();
         parallel = 3;
        List<List<String>> lstBatchKeys  = ;
         (lstKeys.size() < parallel ){
            lstBatchKeys  =  ArrayList<List<String>>(1);
            lstBatchKeys.add(lstKeys);
        }
        {
            lstBatchKeys  =  ArrayList<List<String>>(parallel);
            ( i = 0; i < parallel; i++  ){
                List<String> lst =  ArrayList<String>();
                lstBatchKeys.add(lst);
            }

            ( i = 0 ; i < lstKeys.size() ; i ++ ){
                lstBatchKeys.get(i%parallel).add(lstKeys.get(i));
            }
        }
        
        List<Future< ConcurrentHashMap<String, String> >> futures =  ArrayList<Future< ConcurrentHashMap<String, String> >>(5);
        
        ThreadFactoryBuilder builder =  ThreadFactoryBuilder();
        builder.setNameFormat("ParallelBatchQuery");
        ThreadFactory factory = builder.build();
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(lstBatchKeys.size(), factory);
        
        (List<String> keys : lstBatchKeys){
            Callable< ConcurrentHashMap<String, String> > callable =  BatchMinutePVCallable(keys);
            FutureTask< ConcurrentHashMap<String, String> > future = (FutureTask< ConcurrentHashMap<String, String> >) executor.submit(callable);
            futures.add(future);
        }
        executor.shutdown();
        
                 {
           stillRunning = !executor.awaitTermination(
              5000000, TimeUnit.MILLISECONDS);
           (stillRunning) {
             {
                executor.shutdownNow();
            }  (Exception e) {
                                e.printStackTrace();
            }
          }
        }  (InterruptedException e) {
           {
              Thread.currentThread().interrupt();
          }  (Exception e1) {
                        e1.printStackTrace();
          }
        }
        
                 (Future f : futures) {
           {
              (f.get() != )
              {
                  hashRet.putAll((ConcurrentHashMap<String, String>)f.get());
              }
          }  (InterruptedException e) {
             {
                 Thread.currentThread().interrupt();
            }  (Exception e1) {
                                e1.printStackTrace();
            }
          }  (ExecutionException e) {
            e.printStackTrace();
          }
        }
        
         hashRet;
    }
           ConcurrentHashMap<String, String> getBatchMinutePV(List<String> lstKeys){
        ConcurrentHashMap<String, String> hashRet = ;
        List<Get> lstGet =  ArrayList<Get>();
        String[] splitValue = ;
         (String s : lstKeys) {
            splitValue = s.split("_");
             uid = Long.parseLong(splitValue[0]);
             min = Long.parseLong(splitValue[1]);
            [] key =  [16];
            Bytes.putLong(key, 0, uid);
            Bytes.putLong(key, 8, min);
            Get g =  Get(key);
            g.addFamily(fp);
            lstGet.add(g);
        }
        Result[] res = ;
         {
            res = tableMinutePV[rand.nextInt(tableN)].get(lstGet);
        }  (IOException e1) {
            logger.error("tableMinutePV exception, e=" + e1.getStackTrace());
        }

         (res !=  && res.length > 0) {
            hashRet =  ConcurrentHashMap<String, String>(res.length);
             (Result re : res) {
                 (re !=  && !re.isEmpty()) {
                     {
                        [] key = re.getRow();
                        [] value = re.getValue(fp, cp);
                         (key !=  && value != ) {
                            hashRet.put(String.valueOf(Bytes.toLong(key,
                                    Bytes.SIZEOF_LONG)), String.valueOf(Bytes
                                    .toLong(value)));
                        }
                    }  (Exception e2) {
                        logger.error(e2.getStackTrace());
                    }
                }
            }
        }

         hashRet;
    }
}
 BatchMinutePVCallable  Callable<ConcurrentHashMap<String, String>>{
      List<String> keys;

      BatchMinutePVCallable(List<String> lstKeys ) {
         .keys = lstKeys;
     }

      ConcurrentHashMap<String, String> call()  Exception {
          DataReadServer.getBatchMinutePV(keys);
     }
}

複製代碼

3.5 緩存查詢結果

對於頻繁查詢HBase的應用場景,能夠考慮在應用程序中作緩存,當有新的查詢請求時,首先在緩存中查找,若是存在則直接返回,再也不查詢HBase;不然對HBase發起讀請求查詢,而後在應用程序中將查詢結果緩存起來。至於緩存的替換策略,能夠考慮LRU等經常使用的策略。優化

3.6 Blockcache

HBase上Regionserver的內存分爲兩個部分,一部分做爲Memstore,主要用來寫;另一部分做爲BlockCache,主要用於讀。ui

寫請求會先寫入Memstore,Regionserver會給每一個region提供一個Memstore,當Memstore滿64MB之後,會 啓動 flush刷新到磁盤。當Memstore的總大小超過限制時(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),會強行啓動flush進程,從最大的Memstore開始flush直到低於限制。spa

讀請求先到Memstore中查數據,查不到就到BlockCache中查,再查不到就會到磁盤上讀,並把讀的結果放入BlockCache。因爲 BlockCache採用的是LRU策略,所以BlockCache達到上限(heapsize * hfile.block.cache.size * 0.85)後,會啓動淘汰機制,淘汰掉最老的一批數據。

一個Regionserver上有一個BlockCache和N個Memstore,它們的大小之和不能大於等於heapsize * 0.8,不然HBase不能啓動。默認BlockCache爲0.2,而Memstore爲0.4。對於注重讀響應時間的系統,能夠將 BlockCache設大些,好比設置BlockCache=0.4Memstore=0.39,以加大緩存的命中率。

有關BlockCache機制,請參考這裏:HBase的Block cacheHBase的blockcache機制hbase中的緩存的計算與使用

相關文章
相關標籤/搜索