Hbase設計以及優化

一、表的設計

1.一、Column Familyhtml

因爲Hbase是一個面向列族的存儲器,調優和存儲都是在列族這個層次上進行的,最好使列族成員都有相同的"訪問模式(access pattern)"和大小特徵
在一張表裏不要定義太多的column family。目前Hbase並不能很好的處理超過2~3個column family的表。由於某個column family在flush的時候,它鄰近的column family也會因關聯效應被觸發flush,最終致使系統產生更多的I/O。

java

1.二、Row Keyapache

Row Key 設計原則:
1)Rowkey長度原則,Rowkey是一個二進制碼流,能夠是任意字符串,最大長度64KB,實際應用中通常爲10~100bytes,存爲byte[]字節數組,通常設計成定長的建議是越短越好,不要超過16個字節。緣由一數據的持久化文件HFile中是按照KeyValue存儲的,若是Rowkey過長好比100個字節,1000萬列數據光Rowkey就要佔用100*1000萬=10億個字節,將近1G數據,這會極大影響HFile的存儲效率;緣由二MemStore將緩存部分數據到內存,若是Rowkey字段過長內存的有效利用率會下降,系統將沒法緩存更多的數據,這會下降檢索效率。所以Rowkey的字節長度越短越好。緣由三目前操做系統是都是64位系統,內存8字節對齊。控制在16個字節,8字節的整數倍利用操做系統的最佳特性。
2)是Rowkey散列原則,若是Rowkey是按時間戳的方式遞增,不要將時間放在二進制碼的前面,建議將Rowkey的高位做爲散列字段,由程序循環生成,低位放時間字段,這樣將提升數據均衡分佈在每一個Regionserver實現負載均衡的概率。若是沒有散列字段,首字段直接是時間信息將產生全部新數據都在一個RegionServer上堆積的熱點現象,這樣在作數據檢索的時候負載將會集中在個別RegionServer,下降查詢效率。
3)Rowkey惟一原則,必須在設計上保證其惟一性
row key是按照字典序存儲,所以,設計row key時,要充分利用這個排序特色,將常常一塊兒讀取的數據存儲到一塊,將最近可能會被訪問的數據放在一塊。
舉個例子:若是最近寫入HBase表中的數據是最可能被訪問的,能夠考慮將時間戳做爲row key的一部分,因爲是字典序排序,因此可使用Long.MAX_VALUE – timestamp做爲row key,這樣能保證新寫入的數據在讀取時能夠被快速命中。

數組

1.三、 In Memory緩存

建立表的時候,能夠經過HColumnDescriptor.setInMemory(true)將表放到RegionServer的緩存中,保證在讀取的時候被cache命中。網絡

1.4 、Max Version多線程

建立表的時候,能夠經過HColumnDescriptor.setMaxVersions(intmaxVersions)設置表中數據的最大版本,若是隻須要保存最新版本的數據,那麼能夠設置setMaxVersions(1)。併發

1.五、 Time to Live(設置數據存儲的生命週期)負載均衡

建立表的時候,能夠經過HColumnDescriptor.setTimeToLive(inttimeToLive)設置表中數據的存儲生命期,過時數據將自動被刪除,例如若是隻須要存儲最近兩天的數據,那麼能夠設置setTimeToLive(2 * 24 * 60 * 60)。ide

1.六、 Compact & Split

在HBase中,數據在更新時首先寫入WAL 日誌(HLog)和內存(MemStore)中,MemStore中的數據是排序的,當MemStore累計到必定閾值時,就會建立一個新的MemStore,而且將老的MemStore添加到flush隊列,由單獨的線程flush到磁盤上,成爲一個StoreFile。於此同時, 系統會在zookeeper中記錄一個redo point,表示這個時刻以前的變動已經持久化了(minor compact)。
StoreFile是隻讀的,一旦建立後就不能夠再修改。所以Hbase的更新實際上是不斷追加的操做。當一個Store中的StoreFile達到必定的閾值後,就會進行一次合併(major compact),將對同一個key的修改合併到一塊兒,造成一個大的StoreFile,當StoreFile的大小達到必定閾值後,又會對 StoreFile進行分割(split),等分爲兩個StoreFile。
因爲對錶的更新是不斷追加的,處理讀請求時,須要訪問Store中所有的StoreFile和MemStore,將它們按照row key進行合併,因爲StoreFile和MemStore都是通過排序的,而且StoreFile帶有內存中索引,一般合併過程仍是比較快的。
實際應用中,能夠考慮必要時手動進行major compact,將同一個row key的修改進行合併造成一個大的StoreFile。同時,能夠將StoreFile設置大些,減小split的發生

1.七、 Pre-Creating Regions

默認狀況下,在建立HBase表的時候會自動建立一個region分區,當導入數據的時候,全部的HBase客戶端都向這一個region寫數據,直到這個region足夠大了才進行切分。一種能夠加快批量寫入速度的方法是經過預先建立一些空的regions,這樣當數據寫入HBase時,會按照region分區狀況,在集羣內作數據的負載均衡。                              有關預分區,詳情參見:TableCreation: Pre-Creating Regions,下面是一個例子:

  1. public static booleancreateTable(HBaseAdmin admin, HTableDescriptor table, byte[][] splits)  
  2. throws IOException {  
  3.   try {  
  4.     admin.createTable(table, splits);  
  5.     return true;  
  6.   } catch (TableExistsException e) {  
  7.     logger.info("table " +table.getNameAsString() + " already exists");  
  8.     // the table already exists...  
  9.     return false;  
  10.   }  
  11. }  
  12.    
  13. public static byte[][]getHexSplits(String startKey, String endKey, int numRegions) {  
  14.   byte[][] splits = new byte[numRegions-1][];  
  15.   BigInteger lowestKey = newBigInteger(startKey, 16);  
  16.   BigInteger highestKey = newBigInteger(endKey, 16);  
  17.   BigInteger range =highestKey.subtract(lowestKey);  
  18.   BigInteger regionIncrement =range.divide(BigInteger.valueOf(numRegions));  
  19.   lowestKey = lowestKey.add(regionIncrement);  
  20.   for(int i=0; i < numRegions-1;i++) {  
  21.     BigInteger key =lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i)));  
  22.     byte[] b = String.format("%016x",key).getBytes();  
  23.     splits[i] = b;  
  24.   }  
  25.   return splits;  
  26. }  
public static booleancreateTable(HBaseAdmin admin, HTableDescriptor table, byte[][] splits)
throws IOException {
  try {
    admin.createTable(table, splits);
    return true;
  } catch (TableExistsException e) {
    logger.info("table " +table.getNameAsString() + " already exists");
    // the table already exists...
    return false;
  }
}
 
public static byte[][]getHexSplits(String startKey, String endKey, int numRegions) {
  byte[][] splits = new byte[numRegions-1][];
  BigInteger lowestKey = newBigInteger(startKey, 16);
  BigInteger highestKey = newBigInteger(endKey, 16);
  BigInteger range =highestKey.subtract(lowestKey);
  BigInteger regionIncrement =range.divide(BigInteger.valueOf(numRegions));
  lowestKey = lowestKey.add(regionIncrement);
  for(int i=0; i < numRegions-1;i++) {
    BigInteger key =lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i)));
    byte[] b = String.format("%016x",key).getBytes();
    splits[i] = b;
  }
  return splits;
}

二、寫表操做

2.1 多HTable併發寫

建立多個HTable客戶端用於寫操做,提升寫數據的吞吐量,一個例子:

  1. static final Configurationconf = HBaseConfiguration.create();  
  2. static final Stringtable_log_name = 「user_log」;  
  3. wTableLog = newHTable[tableN];  
  4. for (int i = 0; i <tableN; i++) {  
  5.     wTableLog[i] = new HTable(conf,table_log_name);  
  6.     wTableLog[i].setWriteBufferSize(5 * 1024 *1024); //5MB  
  7.     wTableLog[i].setAutoFlush(false);  
  8. }  
static final Configurationconf = HBaseConfiguration.create();
static final Stringtable_log_name = 「user_log」;
wTableLog = newHTable[tableN];
for (int i = 0; i <tableN; i++) {
    wTableLog[i] = new HTable(conf,table_log_name);
    wTableLog[i].setWriteBufferSize(5 * 1024 *1024); //5MB
    wTableLog[i].setAutoFlush(false);
}

2.2 HTable參數設置

2.2.1 Auto Flush

經過調用HTable.setAutoFlush(false)方法能夠將HTable寫客戶端的自動flush關閉,這樣能夠批量寫入數據到 HBase,而不是有一條put就執行一次更新,只有當put填滿客戶端寫緩存時,才實際向HBase服務端發起寫請求。默認狀況下auto flush是開啓的。保證最後手動HTable.flushCommits()或HTable.close()

2.2.2 Write Buffer

經過調用HTable.setWriteBufferSize(writeBufferSize)方法能夠設置 HTable客戶端的寫buffer大小,若是新設置的buffer小於當前寫buffer中的數據時,buffer將會被flush到服務端。其 中,writeBufferSize的單位是byte字節數,能夠根據實際寫入數據量的多少來設置該值。

2.2.3 WAL Flag

在HBae中,客戶端向集羣中的RegionServer提交數據時(Put/Delete操做),首先會先寫WAL(Write Ahead Log)日誌(即HLog,一個RegionServer上的全部Region共享一個HLog),只有當WAL日誌寫成功後,再接着寫 MemStore,而後客戶端被通知提交數據成功;若是寫WAL日誌失敗,客戶端則被通知提交失敗。這樣作的好處是能夠作到RegionServer宕機 後的數據恢復。

所以,對於相對不過重要的數據,能夠在Put/Delete操做時,經過調用Put.setWriteToWAL(false)或Delete.setWriteToWAL(false)函數,放棄寫WAL日誌,從而提升數據寫入的性能。

值得注意的是:謹慎選擇關閉WAL日誌,由於這樣的話,一旦RegionServer宕機,Put/Delete的數據將會沒法根據WAL日誌進行恢復。

2.3 批量寫

經過調用HTable.put(Put)方法能夠將一個指定的row key記錄寫入HBase,一樣HBase提供了另外一個方法:經過調用HTable.put(List<Put>)方法能夠將指定的row key列表,批量寫入多行記錄,這樣作的好處是批量執行,只須要一次網絡I/O開銷,這對於對數據實時性要求高,網絡傳輸RTT高的情景下可能帶來明顯的性能提高。

2.4 多線程併發寫

在客戶端開啓多個HTable寫線程,每一個寫線程負責一個HTable對象的flush操做,這樣結合定時flush和寫 buffer(writeBufferSize),能夠既保證在數據量小的時候,數據能夠在較短期內被flush(如1秒內),同時又保證在數據量大的 時候,寫buffer一滿就及時進行flush。下面給個具體的例子:

  1. for (int i = 0; i <threadN; i++) {  
  2.     Thread th = new Thread() {  
  3.         public void run() {  
  4.             while (true) {  
  5.                 try {  
  6.                     sleep(1000); //1 second  
  7.                 } catch (InterruptedExceptione) {  
  8.                     e.printStackTrace();  
  9.                 }  
  10. synchronized (wTableLog[i]) {  
  11.                     try {  
  12.                         wTableLog[i].flushCommits();  
  13.                     } catch (IOException e) {  
  14.                         e.printStackTrace();  
  15.                     }  
  16.                 }  
  17.             }  
  18. }  
  19.     };  
  20.     th.setDaemon(true);  
  21.     th.start();  
  22. }  
for (int i = 0; i <threadN; i++) {
    Thread th = new Thread() {
        public void run() {
            while (true) {
                try {
                    sleep(1000); //1 second
                } catch (InterruptedExceptione) {
                    e.printStackTrace();
                }
synchronized (wTableLog[i]) {
                    try {
                        wTableLog[i].flushCommits();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
}
    };
    th.setDaemon(true);
    th.start();
}

三、讀表操做

3.1 多HTable併發讀

建立多個HTable客戶端用於讀操做,提升讀數據的吞吐量,一個例子:

  1. static final Configurationconf = HBaseConfiguration.create();  
  2. static final Stringtable_log_name = 「user_log」;  
  3. rTableLog = newHTable[tableN];  
  4. for (int i = 0; i <tableN; i++) {  
  5.     rTableLog[i] = new HTable(conf, table_log_name);  
  6.     rTableLog[i].setScannerCaching(50);  
  7. }  
static final Configurationconf = HBaseConfiguration.create();
static final Stringtable_log_name = 「user_log」;
rTableLog = newHTable[tableN];
for (int i = 0; i <tableN; i++) {
    rTableLog[i] = new HTable(conf, table_log_name);
    rTableLog[i].setScannerCaching(50);
}

3.2 HTable參數設置

3.2.1 Scanner Caching

hbase.client.scanner.caching配置項能夠設置HBase scanner一次從服務端抓取的數據條數,默認狀況下一次一條。經過將其設置成一個合理的值,能夠減小scan過程當中next()的時間開銷,代價是 scanner須要經過客戶端的內存來維持這些被cache的行記錄。

有三個地方能夠進行配置:1)在HBase的conf配置文件中進行配置;2)經過調用HTable.setScannerCaching(int scannerCaching)進行配置;3)經過調用Scan.setCaching(int caching)進行配置。三者的優先級愈來愈高。

3.2.2 Scan AttributeSelection

scan時指定須要的Column Family,能夠減小網絡傳輸數據量,不然默認scan操做會返回整行全部Column Family的數據。

3.2.3 Close ResultScanner

經過scan取完數據後,記得要關閉ResultScanner,不然RegionServer可能會出現問題(對應的Server資源沒法釋放)。

3.3 批量讀

經過調用HTable.get(Get)方法能夠根據一個指定的row key獲取一行記錄,一樣HBase提供了另外一個方法:經過調用HTable.get(List<Get>)方法能夠根據一個指定的rowkey列表,批量獲取多行記錄,這樣作的好處是批量執行,只須要一次網絡I/O開銷,這對於對數據實時性要求高並且網絡傳輸RTT高的情景下可能帶來明顯 的性能提高。

3.4 多線程併發讀

在客戶端開啓多個HTable讀線程,每一個讀線程負責經過HTable對象進行get操做。下面是一個多線程併發讀取HBase,獲取店鋪一天內各分鐘PV值的例子:

  1. public class DataReaderServer{  
  2.      //獲取店鋪一天內各分鐘PV值的入口函數  
  3.      public static ConcurrentHashMap<String,String> getUnitMinutePV(long uid, long startStamp, long endStamp){  
  4.          long min = startStamp;  
  5.          int count = (int)((endStamp -startStamp) / (60*1000));  
  6.          List<String> lst = newArrayList<String>();  
  7.          for (int i = 0; i <= count; i++) {  
  8.             min = startStamp + i * 60 * 1000;  
  9.             lst.add(uid + "_" + min);  
  10.          }  
  11.          return parallelBatchMinutePV(lst);  
  12.      }  
  13.       //多線程併發查詢,獲取分鐘PV值  
  14. private staticConcurrentHashMap<String, String>parallelBatchMinutePV(List<String> lstKeys){  
  15.         ConcurrentHashMap<String, String>hashRet = new ConcurrentHashMap<String, String>();  
  16.         int parallel = 3;  
  17.         List<List<String>>lstBatchKeys  = null;  
  18.         if (lstKeys.size() < parallel ){  
  19.             lstBatchKeys  = new ArrayList<List<String>>(1);  
  20.             lstBatchKeys.add(lstKeys);  
  21.         }  
  22.         else{  
  23.             lstBatchKeys  = newArrayList<List<String>>(parallel);  
  24.             for(int i = 0; i < parallel;i++  ){  
  25.                 List<String> lst = newArrayList<String>();  
  26.                 lstBatchKeys.add(lst);  
  27.             }  
  28.             for(int i = 0 ; i <lstKeys.size() ; i ++ ){  
  29.                lstBatchKeys.get(i%parallel).add(lstKeys.get(i));  
  30.             }  
  31.         }  
  32.         List<Future<ConcurrentHashMap<String, String> >> futures = newArrayList<Future< ConcurrentHashMap<String, String> >>(5);  
  33.         ThreadFactoryBuilder builder = newThreadFactoryBuilder();  
  34.        builder.setNameFormat("ParallelBatchQuery");  
  35.         ThreadFactory factory =builder.build();  
  36.         ThreadPoolExecutor executor =(ThreadPoolExecutor) Executors.newFixedThreadPool(lstBatchKeys.size(),factory);  
  37.         for(List<String> keys :lstBatchKeys){  
  38.             Callable<ConcurrentHashMap<String, String> > callable = newBatchMinutePVCallable(keys);  
  39.             FutureTask<ConcurrentHashMap<String, String> > future = (FutureTask<ConcurrentHashMap<String, String> >) executor.submit(callable);  
  40.             futures.add(future);  
  41.         }  
  42.         executor.shutdown();  
  43.         // Wait for all the tasks to finish  
  44.         try {  
  45.           boolean stillRunning = !executor.awaitTermination(  
  46.               5000000, TimeUnit.MILLISECONDS);  
  47.           if (stillRunning) {  
  48.             try {  
  49.                 executor.shutdownNow();  
  50.             } catch (Exception e) {  
  51.                 // TODO Auto-generated catchblock  
  52.                 e.printStackTrace();  
  53.             }  
  54.           }  
  55.         } catch (InterruptedException e) {  
  56.           try {  
  57.              Thread.currentThread().interrupt();  
  58.           } catch (Exception e1) {  
  59.             // TODO Auto-generated catch block  
  60.             e1.printStackTrace();  
  61.           }  
  62.         }  
  63.         // Look for any exception  
  64.         for (Future f : futures) {  
  65.           try {  
  66.               if(f.get() != null)  
  67.               {  
  68.                  hashRet.putAll((ConcurrentHashMap<String, String>)f.get());  
  69.               }  
  70.           } catch (InterruptedException e) {  
  71.             try {  
  72.                 Thread.currentThread().interrupt();  
  73.             } catch (Exception e1) {  
  74.                 // TODO Auto-generated catchblock  
  75.                 e1.printStackTrace();  
  76.             }  
  77.           } catch (ExecutionException e) {  
  78.             e.printStackTrace();  
  79.           }  
  80.         }  
  81.         return hashRet;  
  82.     }  
  83.      //一個線程批量查詢,獲取分鐘PV值  
  84.     protected staticConcurrentHashMap<String, String> getBatchMinutePV(List<String>lstKeys){  
  85.         ConcurrentHashMap<String, String>hashRet = null;  
  86.         List<Get> lstGet = newArrayList<Get>();  
  87.         String[] splitValue = null;  
  88.         for (String s : lstKeys) {  
  89.             splitValue =s.split("_");  
  90.             long uid =Long.parseLong(splitValue[0]);  
  91.             long min =Long.parseLong(splitValue[1]);  
  92.             byte[] key = new byte[16];  
  93.             Bytes.putLong(key, 0, uid);  
  94.             Bytes.putLong(key, 8, min);  
  95.             Get g = new Get(key);  
  96.             g.addFamily(fp);  
  97.             lstGet.add(g);  
  98.         }  
  99.         Result[] res = null;  
  100.         try {  
  101.             res =tableMinutePV[rand.nextInt(tableN)].get(lstGet);  
  102.         } catch (IOException e1) {  
  103.             logger.error("tableMinutePV exception,e=" + e1.getStackTrace());  
  104.         }  
  105.         if (res != null && res.length> 0) {  
  106.             hashRet = newConcurrentHashMap<String, String>(res.length);  
  107.             for (Result re : res) {  
  108.                 if (re != null &&!re.isEmpty()) {  
  109.                     try {  
  110.                         byte[] key =re.getRow();  
  111.                         byte[] value =re.getValue(fp, cp);  
  112.                         if (key != null&& value != null) {  
  113.                            hashRet.put(String.valueOf(Bytes.toLong(key,  
  114.                                    Bytes.SIZEOF_LONG)), String.valueOf(Bytes  
  115.                                    .toLong(value)));  
  116.                         }  
  117.                     } catch (Exception e2) {  
  118.                        logger.error(e2.getStackTrace());  
  119.                     }  
  120.                 }  
  121.             }  
  122.         }  
  123.         return hashRet;  
  124.     }  
  125. }  
  126. //調用接口類,實現Callable接口  
  127. class BatchMinutePVCallableimplements Callable<ConcurrentHashMap<String, String>>{  
  128.      private List<String> keys;  
  129.      publicBatchMinutePVCallable(List<String> lstKeys ) {  
  130.          this.keys = lstKeys;  
  131.      }  
  132.      public ConcurrentHashMap<String,String> call() throws Exception {  
  133.          returnDataReadServer.getBatchMinutePV(keys);  
  134.      }  
  135. }  
public class DataReaderServer{
     //獲取店鋪一天內各分鐘PV值的入口函數
     public static ConcurrentHashMap<String,String> getUnitMinutePV(long uid, long startStamp, long endStamp){
         long min = startStamp;
         int count = (int)((endStamp -startStamp) / (60*1000));
         List<String> lst = newArrayList<String>();
         for (int i = 0; i <= count; i++) {
            min = startStamp + i * 60 * 1000;
            lst.add(uid + "_" + min);
         }
         return parallelBatchMinutePV(lst);
     }
      //多線程併發查詢,獲取分鐘PV值
private staticConcurrentHashMap<String, String>parallelBatchMinutePV(List<String> lstKeys){
        ConcurrentHashMap<String, String>hashRet = new ConcurrentHashMap<String, String>();
        int parallel = 3;
        List<List<String>>lstBatchKeys  = null;
        if (lstKeys.size() < parallel ){
            lstBatchKeys  = new ArrayList<List<String>>(1);
            lstBatchKeys.add(lstKeys);
        }
        else{
            lstBatchKeys  = newArrayList<List<String>>(parallel);
            for(int i = 0; i < parallel;i++  ){
                List<String> lst = newArrayList<String>();
                lstBatchKeys.add(lst);
            }
            for(int i = 0 ; i <lstKeys.size() ; i ++ ){
               lstBatchKeys.get(i%parallel).add(lstKeys.get(i));
            }
        }
        List<Future<ConcurrentHashMap<String, String> >> futures = newArrayList<Future< ConcurrentHashMap<String, String> >>(5);
        ThreadFactoryBuilder builder = newThreadFactoryBuilder();
       builder.setNameFormat("ParallelBatchQuery");
        ThreadFactory factory =builder.build();
        ThreadPoolExecutor executor =(ThreadPoolExecutor) Executors.newFixedThreadPool(lstBatchKeys.size(),factory);
        for(List<String> keys :lstBatchKeys){
            Callable<ConcurrentHashMap<String, String> > callable = newBatchMinutePVCallable(keys);
            FutureTask<ConcurrentHashMap<String, String> > future = (FutureTask<ConcurrentHashMap<String, String> >) executor.submit(callable);
            futures.add(future);
        }
        executor.shutdown();
        // Wait for all the tasks to finish
        try {
          boolean stillRunning = !executor.awaitTermination(
              5000000, TimeUnit.MILLISECONDS);
          if (stillRunning) {
            try {
                executor.shutdownNow();
            } catch (Exception e) {
                // TODO Auto-generated catchblock
                e.printStackTrace();
            }
          }
        } catch (InterruptedException e) {
          try {
             Thread.currentThread().interrupt();
          } catch (Exception e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
          }
        }
        // Look for any exception
        for (Future f : futures) {
          try {
              if(f.get() != null)
              {
                 hashRet.putAll((ConcurrentHashMap<String, String>)f.get());
              }
          } catch (InterruptedException e) {
            try {
                Thread.currentThread().interrupt();
            } catch (Exception e1) {
                // TODO Auto-generated catchblock
                e1.printStackTrace();
            }
          } catch (ExecutionException e) {
            e.printStackTrace();
          }
        }
        return hashRet;
    }
     //一個線程批量查詢,獲取分鐘PV值
    protected staticConcurrentHashMap<String, String> getBatchMinutePV(List<String>lstKeys){
        ConcurrentHashMap<String, String>hashRet = null;
        List<Get> lstGet = newArrayList<Get>();
        String[] splitValue = null;
        for (String s : lstKeys) {
            splitValue =s.split("_");
            long uid =Long.parseLong(splitValue[0]);
            long min =Long.parseLong(splitValue[1]);
            byte[] key = new byte[16];
            Bytes.putLong(key, 0, uid);
            Bytes.putLong(key, 8, min);
            Get g = new Get(key);
            g.addFamily(fp);
            lstGet.add(g);
        }
        Result[] res = null;
        try {
            res =tableMinutePV[rand.nextInt(tableN)].get(lstGet);
        } catch (IOException e1) {
            logger.error("tableMinutePV exception,e=" + e1.getStackTrace());
        }
        if (res != null && res.length> 0) {
            hashRet = newConcurrentHashMap<String, String>(res.length);
            for (Result re : res) {
                if (re != null &&!re.isEmpty()) {
                    try {
                        byte[] key =re.getRow();
                        byte[] value =re.getValue(fp, cp);
                        if (key != null&& value != null) {
                           hashRet.put(String.valueOf(Bytes.toLong(key,
                                   Bytes.SIZEOF_LONG)), String.valueOf(Bytes
                                   .toLong(value)));
                        }
                    } catch (Exception e2) {
                       logger.error(e2.getStackTrace());
                    }
                }
            }
        }
        return hashRet;
    }
}
//調用接口類,實現Callable接口
class BatchMinutePVCallableimplements Callable<ConcurrentHashMap<String, String>>{
     private List<String> keys;
     publicBatchMinutePVCallable(List<String> lstKeys ) {
         this.keys = lstKeys;
     }
     public ConcurrentHashMap<String,String> call() throws Exception {
         returnDataReadServer.getBatchMinutePV(keys);
     }
}

3.5 緩存查詢結果

對於頻繁查詢HBase的應用場景,能夠考慮在應用程序中作緩存,當有新的查詢請求時,首先在緩存中查找,若是存在則直接返回,再也不查詢HBase;不然對HBase發起讀請求查詢,而後在應用程序中將查詢結果緩存起來。至於緩存的替換策略,能夠考慮LRU等經常使用的策略。

3.6 Blockcache

HBase上Regionserver的內存分爲兩個部分,一部分做爲Memstore,主要用來寫;另一部分做爲BlockCache,主要用於讀。寫請求會先寫入Memstore,Regionserver會給每一個region提供一個Memstore,當Memstore滿64MB之後,會啓動 flush刷新到磁盤。當Memstore的總大小超過限制時(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),會強行啓動flush進程,從最大的Memstore開始flush直到低於限制。讀請求先到Memstore中查數據,查不到就到BlockCache中查,再查不到就會到磁盤上讀,並把讀的結果放入BlockCache。因爲 BlockCache採用的是LRU策略,所以BlockCache達到上限(heapsize *hfile.block.cache.size * 0.85)後,會啓動淘汰機制,淘汰掉最老的一批數據。一個Regionserver上有一個BlockCache和N個Memstore,它們的大小之和不能大於等於heapsize * 0.8,不然HBase不能啓動。默認BlockCache爲0.2,而Memstore爲0.4。對於注重讀響應時間的系統,能夠將 BlockCache設大些,好比設置BlockCache=0.4,Memstore=0.39,以加大緩存的命中率

相關文章
相關標籤/搜索