HBase篇--HBase經常使用優化

一.前述html

HBase優化可以讓咱們對調優有必定的理解,固然企業並非全部的優化全都用,優化還要根據業務具體實施。apache

二.具體優化編程

1.表的設計

 1.1 預分區api

 

默認狀況下,在建立HBase表的時候會自動建立一個region分區,當導入數據的時候,全部的HBase客戶端都向這一個region寫數據,直到這個region足夠大了才進行切分。一種能夠加快批量寫入速度的方法是經過預先建立一些空的regions這樣當數據寫入HBase時,會按照region分區狀況,在集羣內作數據的負載均衡。數組

public static boolean createTable(HBaseAdmin admin, HTableDescriptor table, byte[][] splits)
throws IOException {
  try {
    admin.createTable(table, splits);
    return true;
  } catch (TableExistsException e) {
    logger.info("table " + table.getNameAsString() + " already exists");
    // the table already exists...
    return false;  
  }
}

public static byte[][] getHexSplits(String startKey, String endKey, int numRegions) { //start:001,endkey:100,10region [001,010]
[011,020]
  byte[][] splits = new byte[numRegions-1][];
  BigInteger lowestKey = new BigInteger(startKey, 16);
  BigInteger highestKey = new BigInteger(endKey, 16);
  BigInteger range = highestKey.subtract(lowestKey);
  BigInteger regionIncrement = range.divide(BigInteger.valueOf(numRegions));
  lowestKey = lowestKey.add(regionIncrement);
  for(int i=0; i < numRegions-1;i++) {
    BigInteger key = lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i)));
    byte[] b = String.format("%016x", key).getBytes();
    splits[i] = b;
  }
  return splits;
}

 

 1.2  Row Key設計緩存

HBaserow key用來檢索表中的記錄,支持如下三種方式:安全

  • 經過單個row key訪問:即按照某個row key鍵值進行get操做;
  • 經過row keyrange進行scan:即經過設置startRowKeyendRowKey,在這個範圍內進行掃描;
  • 全表掃描:即直接掃描整張表中全部行記錄。

HBase中,row key能夠是任意字符串,最大長度64KB,實際應用中通常爲10~100bytes,存爲byte[]字節數組,通常設計成定長的服務器

row key是按照字典序存儲,所以,設計row key時,要充分利用這個排序特色,將常常一塊兒讀取的數據存儲到一塊,將最近可能會被訪問的數據放在一塊。網絡

舉個例子:若是最近寫入HBase表中的數據是最可能被訪問的,能夠考慮將時間戳做爲row key的一部分,因爲是字典序排序,因此可使用Long.MAX_VALUE - timestamprow key,這樣能保證新寫入的數據在讀取時能夠被快速命中。多線程

Rowkey規則

一、 越小越好

二、 Rowkey的設計是要根據實際業務來

三、 散列性

a) 取反   001  002  100 200

b) Hash

 

1.3 列族的設計

 

不要在一張表裏定義太多的column family。目前Hbase並不能很好的處理超過2~3column family的表。由於某個column familyflush的時候,它鄰近的column family也會因關聯效應被觸發flush,最終致使系統產生更多的I/O。感興趣的同窗能夠對本身的HBase集羣進行實際測試,從獲得的測試結果數據驗證一下。

1.4 In Memory

 

建立表的時候,能夠經過HColumnDescriptor.setInMemory(true)將表放到RegionServer的緩存中,保證在讀取的時候被cache命中。(讀緩存)

1.5 Max Version

建立表的時候,能夠經過HColumnDescriptor.setMaxVersions(int maxVersions)設置表中數據的最大版本,若是隻須要保存最新版本的數據,那麼能夠設置setMaxVersions(1)

 

1.6 Time To Live

 

建立表的時候,能夠經過HColumnDescriptor.setTimeToLive(int timeToLive)設置表中數據的存儲生命期,過時數據將自動被刪除,例如若是隻須要存儲最近兩天的數據,那麼能夠設置setTimeToLive(2 * 24 * 60 * 60)(至關於Linux中的Crontab任務)

1.7 Compact & Split

 

HBase中,數據在更新時首先寫入WAL 日誌(HLog)和內存(MemStore)中,MemStore中的數據是排序的,當MemStore累計到必定閾值時,就會建立一個新的MemStore,而且將老的MemStore添加到flush隊列,由單獨的線程flush到磁盤上,成爲一個StoreFile。於此同時, 系統會在zookeeper中記錄一個redo point,表示這個時刻以前的變動已經持久化了(minor compact)

 

StoreFile是隻讀的,一旦建立後就不能夠再修改。所以Hbase的更新實際上是不斷追加的操做。當一個Store中的StoreFile達到必定的閾值後,就會進行一次合併(major compact),將對同一個key的修改合併到一塊兒,造成一個大的StoreFile,當StoreFile的大小達到必定閾值後,又會對 StoreFile進行分割(split),等分爲兩個StoreFile

 

因爲對錶的更新是不斷追加的,處理讀請求時,須要訪問Store中所有的StoreFileMemStore,將它們按照row key進行合併,因爲StoreFileMemStore都是通過排序的,而且StoreFile帶有內存中索引,一般合併過程仍是比較快的。

 

實際應用中,能夠考慮必要時手動進行major compact,將同一個row key的修改進行合併造成一個大的StoreFile。同時,能夠將StoreFile設置大些,減小split的發生。

 

hbase爲了防止小文件(被刷到磁盤的menstore)過多,以保證保證查詢效率,hbase須要在必要的時候將這些小的store file合併成相對較大的store file,這個過程就稱之爲compaction。在hbase中,主要存在兩種類型的compactionminor  compactionmajor compaction

 

minor compaction:的是較小、不多文件的合併。

 

major compaction 的功能是將全部的store file合併成一個,觸發major compaction的可能條件有:major_compact 命令、majorCompact() APIregion server自動運行(相關參數:hbase.hregion.majoucompaction 默認爲24 小時、hbase.hregion.majorcompaction.jetter 默認值爲0.2 防止region server 在同一時間進行major compaction)。

 

hbase.hregion.majorcompaction.jetter參數的做用是:對參數hbase.hregion.majoucompaction 規定的值起到浮動的做用,假如兩個參數都爲默認值240,2,那麼major compact最終使用的數值爲:19.2~28.8 這個範圍。

 

一、 關閉自動major compaction

 

二、 手動編程major compaction

 

Timer類,contab

 

minor compaction的運行機制要複雜一些,它由一下幾個參數共同決定:

 

hbase.hstore.compaction.min :默認值爲 3,表示至少須要三個知足條件的store file時,minor compaction纔會啓動

 

hbase.hstore.compaction.max 默認值爲10,表示一次minor compaction中最多選取10store file

 

hbase.hstore.compaction.min.size 表示文件大小小於該值的store file 必定會加入到minor compactionstore file

 

hbase.hstore.compaction.max.size 表示文件大小大於該值的store file 必定會被minor compaction排除

 

hbase.hstore.compaction.ratio store file 按照文件年齡排序(older to younger),minor compaction老是從older store file開始選擇

 2. 寫表操做

  2.1 HTable併發寫

 

建立多個HTable客戶端用於寫操做,提升寫數據的吞吐量,一個例子:

 

static final Configuration conf = HBaseConfiguration.create();
static final String table_log_name = 「user_log」;
wTableLog = new HTable[tableN];
for (int i = 0; i < tableN; i++) {
    wTableLog[i] = new HTable(conf, table_log_name);
    wTableLog[i].setWriteBufferSize(5 * 1024 * 1024); //5MB
    wTableLog[i].setAutoFlush(false);

 

 

 

 

 

 2.2 HTable參數設置

 

2.2.1 Auto Flush

 

經過調用HTable.setAutoFlush(false)方法能夠將HTable寫客戶端的自動flush關閉,這樣能夠批量寫入數據到HBase,而不是有一條put就執行一次更新,只有當put填滿客戶端寫緩存時,才實際向HBase服務端發起寫請求。默認狀況下auto flush是開啓的。

 

2.2.2 Write Buffer

 

經過調用HTable.setWriteBufferSize(writeBufferSize)方法能夠設置HTable客戶端的寫buffer大小,若是新設置的buffer小於當前寫buffer中的數據時,buffer將會被flush到服務端。其中,writeBufferSize的單位是byte字節數,能夠根據實際寫入數據量的多少來設置該值。

 

2.2.3 WAL Flag(慎用!!!除非導入測試數據)

 

HBae中,客戶端向集羣中的RegionServer提交數據時(Put/Delete操做),首先會先寫WALWrite Ahead Log)日誌(即HLog,一個RegionServer上的全部Region共享一個HLog),只有當WAL日誌寫成功後,再接着寫MemStore,而後客戶端被通知提交數據成功;若是寫WAL日誌失敗,客戶端則被通知提交失敗。這樣作的好處是能夠作到RegionServer宕機後的數據恢復。

 

所以,對於相對不過重要的數據,能夠在Put/Delete操做時,經過調用Put.setWriteToWAL(false)Delete.setWriteToWAL(false)函數,放棄寫WAL日誌,從而提升數據寫入的性能。

 

值得注意的是:謹慎選擇關閉WAL日誌,由於這樣的話,一旦RegionServer宕機,Put/Delete的數據將會沒法根據WAL日誌進行恢復。

 

2.3 批量寫

 

經過調用HTable.put(Put)方法能夠將一個指定的row key記錄寫入HBase,一樣HBase提供了另外一個方法:經過調用HTable.put(List<Put>)方法能夠將指定的row key列表,批量寫入多行記錄,這樣作的好處是批量執行,只須要一次網絡I/O開銷,這對於對數據實時性要求高,網絡傳輸RTT高的情景下可能帶來明顯的性能提高。

 

2.4 多線程併發寫

 

在客戶端開啓多個HTable寫線程,每一個寫線程負責一個HTable對象的flush操做,這樣結合定時flush和寫bufferwriteBufferSize),能夠既保證在數據量小的時候,數據能夠在較短期內被flush(如1秒內),同時又保證在數據量大的時候,寫buffer一滿就及時進行flush。下面給個具體的例子:

 

 

for (int i = 0; i < threadN; i++) {
    Thread th = new Thread() {
        public void run() {
            while (true) {
                try {
                    sleep(1000); //1 second
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
synchronized (wTableLog[i]) {
                    try {
                    wTableLog[i].flushCommits();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
}
    };
    th.setDaemon(true);
    th.start();
}

 

 

3. 讀表操做

3.1 HTable併發讀

建立多個HTable客戶端用於讀操做,提升讀數據的吞吐量,一個例子:

 

static final Configuration conf = HBaseConfiguration.create();
static final String table_log_name = 「user_log」;
rTableLog = new HTable[tableN];
for (int i = 0; i < tableN; i++) {
    rTableLog[i] = new HTable(conf, table_log_name);
    rTableLog[i].setScannerCaching(50);
}

 

3.2 HTable參數設置

3.2.1 Scanner Caching

hbase.client.scanner.caching配置項能夠設置HBase scanner一次從服務端抓取的數據條數,默認狀況下一次一條。經過將其設置成一個合理的值,能夠減小scan過程當中next()的時間開銷,代價是scanner須要經過客戶端的內存來維持這些被cache的行記錄。

有三個地方能夠進行配置:1)在HBaseconf配置文件中進行配置;(通常不用次全局配置!!!)2)經過調用HTable.setScannerCaching(int scannerCaching)進行配置;3)經過調用Scan.setCaching(int caching)進行配置。三者的優先級愈來愈高。

3.2.2 Scan Attribute Selection

scan時指定須要的Column Family,能夠減小網絡傳輸數據量,不然默認scan操做會返回整行全部Column Family的數據。

3.2.3 Close ResultScanner

經過scan取完數據後,記得要關閉ResultScanner,不然RegionServer可能會出現問題(對應的Server資源沒法釋放)。

3.3 批量讀

經過調用HTable.get(Get)方法能夠根據一個指定的row key獲取一行記錄,一樣HBase提供了另外一個方法:經過調用HTable.get(List<Get>)方法能夠根據一個指定的row key列表,批量獲取多行記錄,這樣作的好處是批量執行,只須要一次網絡I/O開銷,這對於對數據實時性要求高並且網絡傳輸RTT高的情景下可能帶來明顯的性能提高。

3.4 多線程併發讀

在客戶端開啓多個HTable讀線程,每一個讀線程負責經過HTable對象進行get操做。下面是一個多線程併發讀取HBase,獲取店鋪一天內各分鐘PV值的例子:

 

public class DataReaderServer {
     //獲取店鋪一天內各分鐘PV值的入口函數
     public static ConcurrentHashMap<String, String> getUnitMinutePV(long uid, long startStamp, long endStamp){
         long min = startStamp;
         int count = (int)((endStamp - startStamp) / (60*1000));
         List<String> lst = new ArrayList<String>();
         for (int i = 0; i <= count; i++) {
            min = startStamp + i * 60 * 1000;
            lst.add(uid + "_" + min);
         }
         return parallelBatchMinutePV(lst);
     }
      //多線程併發查詢,獲取分鐘PV值
private static ConcurrentHashMap<String, String> parallelBatchMinutePV(List<String> lstKeys){
        ConcurrentHashMap<String, String> hashRet = new ConcurrentHashMap<String, String>();
        int parallel = 3;
        List<List<String>> lstBatchKeys  = null;
        if (lstKeys.size() < parallel ){
            lstBatchKeys  = new ArrayList<List<String>>(1);
            lstBatchKeys.add(lstKeys);
        }
        else{
            lstBatchKeys  = new ArrayList<List<String>>(parallel);
            for(int i = 0; i < parallel; i++  ){
                List<String> lst = new ArrayList<String>();
                lstBatchKeys.add(lst);
            }

            for(int i = 0 ; i < lstKeys.size() ; i ++ ){
                lstBatchKeys.get(i%parallel).add(lstKeys.get(i));
            }
        }
        
        List<Future< ConcurrentHashMap<String, String> >> futures = new ArrayList<Future< ConcurrentHashMap<String, String> >>(5);
        
        ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
        builder.setNameFormat("ParallelBatchQuery");
        ThreadFactory factory = builder.build();
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(lstBatchKeys.size(), factory);
        
        for(List<String> keys : lstBatchKeys){
            Callable< ConcurrentHashMap<String, String> > callable = new BatchMinutePVCallable(keys);
            FutureTask< ConcurrentHashMap<String, String> > future = (FutureTask< ConcurrentHashMap<String, String> >) executor.submit(callable);
            futures.add(future);
        }
        executor.shutdown();
        
        // Wait for all the tasks to finish
        try {
          boolean stillRunning = !executor.awaitTermination(
              5000000, TimeUnit.MILLISECONDS);
          if (stillRunning) {
            try {
                executor.shutdownNow();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
          }
        } catch (InterruptedException e) {
          try {
              Thread.currentThread().interrupt();
          } catch (Exception e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
          }
        }
        
        // Look for any exception
        for (Future f : futures) {
          try {
              if(f.get() != null)
              {
                  hashRet.putAll((ConcurrentHashMap<String, String>)f.get());
              }
          } catch (InterruptedException e) {
            try {
                 Thread.currentThread().interrupt();
            } catch (Exception e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
          } catch (ExecutionException e) {
            e.printStackTrace();
          }
        }
        
        return hashRet;
    }
     //一個線程批量查詢,獲取分鐘PV值
    protected static ConcurrentHashMap<String, String> getBatchMinutePV(List<String> lstKeys){
        ConcurrentHashMap<String, String> hashRet = null;
        List<Get> lstGet = new ArrayList<Get>();
        String[] splitValue = null;
        for (String s : lstKeys) {
            splitValue = s.split("_");
            long uid = Long.parseLong(splitValue[0]);
            long min = Long.parseLong(splitValue[1]);
            byte[] key = new byte[16];
            Bytes.putLong(key, 0, uid);
            Bytes.putLong(key, 8, min);
            Get g = new Get(key);
            g.addFamily(fp);
            lstGet.add(g);
        }
        Result[] res = null;
        try {
            res = tableMinutePV[rand.nextInt(tableN)].get(lstGet);
        } catch (IOException e1) {
            logger.error("tableMinutePV exception, e=" + e1.getStackTrace());
        }

        if (res != null && res.length > 0) {
            hashRet = new ConcurrentHashMap<String, String>(res.length);
            for (Result re : res) {
                if (re != null && !re.isEmpty()) {
                    try {
                        byte[] key = re.getRow();
                        byte[] value = re.getValue(fp, cp);
                        if (key != null && value != null) {
                            hashRet.put(String.valueOf(Bytes.toLong(key,
                                    Bytes.SIZEOF_LONG)), String.valueOf(Bytes
                                    .toLong(value)));
                        }
                    } catch (Exception e2) {
                        logger.error(e2.getStackTrace());
                    }
                }
            }
        }

        return hashRet;
    }
}
//調用接口類,實現Callable接口
class BatchMinutePVCallable implements Callable<ConcurrentHashMap<String, String>>{
     private List<String> keys;

     public BatchMinutePVCallable(List<String> lstKeys ) {
         this.keys = lstKeys;
     }

     public ConcurrentHashMap<String, String> call() throws Exception {
         return DataReadServer.getBatchMinutePV(keys);
     }

 

 

3.5 緩存查詢結果

對於頻繁查詢HBase的應用場景,能夠考慮在應用程序中作緩存,當有新的查詢請求時,首先在緩存中查找,若是存在則直接返回,再也不查詢HBase;不然對HBase發起讀請求查詢,而後在應用程序中將查詢結果緩存起來。至於緩存的替換策略,能夠考慮LRU等經常使用的策略。

3.6 Blockcache !!!經常使用,設置讀緩存,在服務器端

HBaseRegionserver的內存分爲兩個部分,一部分做爲Memstore,主要用來寫;另一部分做爲BlockCache,主要用於讀。

寫請求會先寫入MemstoreRegionserver會給每一個region提供一個Memstore,當Memstore滿64MB之後,會啓動 flush刷新到磁盤。當Memstore的總大小超過限制時(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),會強行啓動flush進程,從最大的Memstore開始flush直到低於限制。

讀請求先到Memstore中查數據,查不到就到BlockCache中查,再查不到就會到磁盤上讀,並把讀的結果放入BlockCache。因爲BlockCache採用的是LRU策略,所以BlockCache達到上限(heapsize * hfile.block.cache.size * 0.85)後,會啓動淘汰機制,淘汰掉最老的一批數據。

一個Regionserver上有一個BlockCacheNMemstore它們的大小之和不能大於等於heapsize * 0.8,不然HBase不能啓動。默認BlockCache0.2,而Memstore0.4對於注重讀響應時間的系統,能夠將 BlockCache設大些,好比設置BlockCache=0.4Memstore=0.39,以加大緩存的命中率

 

HTableHTablePool使用注意事項

HTableHTablePool都是HBase客戶端API的一部分可使用它們對HBase表進行CRUD操做。下面結合在項目中的應用狀況,對兩者使用過程當中的注意事項作一下歸納總結。

Configuration conf = HBaseConfiguration.create();

try (Connection connection = ConnectionFactory.createConnection(conf)) {

  try (Table table = connection.getTable(TableName.valueOf(tablename)) {

    // use table as needed, the table returned is lightweight

  }

}

 

HTable

HTableHBase客戶端與HBase服務端通信的Java API對象,客戶端能夠經過HTable對象與服務端進行CRUD操做(增刪改查)。它的建立很簡單:

Configuration conf = HBaseConfiguration.create();

HTable table = new HTable(conf, "tablename");

//TODO CRUD Operation……

HTable使用時的一些注意事項:

1.   規避HTable對象的建立開銷

由於客戶端建立HTable對象後,須要進行一系列的操做:檢查.META.表確認指定名稱的HBase表是否存在,表是否有效等等,整個時間開銷比較重,可能會耗時幾秒鐘之長,所以最好在程序啓動時一次性建立完成須要的HTable對象,若是使用Java API,通常來講是在構造函數中進行建立,程序啓動後直接重用。

2.   HTable對象不是線程安全的

HTable對象對於客戶端讀寫數據來講不是線程安全的,所以多線程時,要爲每一個線程單首創建複用一個HTable對象,不一樣對象間不要共享HTable對象使用,特別是在客戶端auto flash被置爲false時,因爲存在本地write buffer,可能致使數據不一致。

3.   HTable對象之間共享Configuration

HTable對象共享Configuration對象,這樣的好處在於:

  • 共享ZooKeeper的鏈接:每一個客戶端須要與ZooKeeper創建鏈接,查詢用戶的table regions位置,這些信息能夠在鏈接創建後緩存起來共享使用;
  • 共享公共的資源:客戶端須要經過ZooKeeper查找-ROOT-.META.表,這個須要網絡傳輸開銷,客戶端緩存這些公共資源後可以減小後續的網絡傳輸開銷,加快查找過程速度。

所以,與如下這種方式相比:

HTable table1 = new HTable("table1");

HTable table2 = new HTable("table2");

下面的方式更有效些:

Configuration conf = HBaseConfiguration.create();//共用一個配置

HTable table1 = new HTable(conf, "table1");

HTable table2 = new HTable(conf, "table2");

備註:即便是高負載的多線程程序,也並無發現由於共享Configuration而致使的性能問題;若是你的實際狀況中不是如此,那麼能夠嘗試不共享Configuration

HTablePool

HTablePool能夠解決HTable存在的線程不安全問題,同時經過維護固定數量的HTable對象,可以在程序運行期間複用這些HTable資源對象。

Configuration conf = HBaseConfiguration.create();

HTablePool pool = new HTablePool(conf, 10);

1.   HTablePool能夠自動建立HTable對象,並且對客戶端來講使用上是徹底透明的,能夠避免多線程間數據併發修改問題。

2.   HTablePool中的HTable對象之間是公用Configuration鏈接的,可以能夠減小網絡開銷。

HTablePool的使用很簡單:每次進行操做前,通HTablePoolgetTable方法取得一個HTable對象,而後進行put/get/scan/delete等操做,最後經過HTablePoolputTable方法將HTable對象放回到HTablePool中。

下面是個使用HTablePool的簡單例子:

 

public void createUser(String username, String firstName, String lastName, String email, String password, String roles) throws IOException {

  HTable table = rm.getTable(UserTable.NAME);

  Put put = new Put(Bytes.toBytes(username));

  put.add(UserTable.DATA_FAMILY, UserTable.FIRSTNAME,

  Bytes.toBytes(firstName));

  put.add(UserTable.DATA_FAMILY, UserTable.LASTNAME,

    Bytes.toBytes(lastName));

  put.add(UserTable.DATA_FAMILY, UserTable.EMAIL, Bytes.toBytes(email));

  put.add(UserTable.DATA_FAMILY, UserTable.CREDENTIALS,

    Bytes.toBytes(password));

  put.add(UserTable.DATA_FAMILY, UserTable.ROLES, Bytes.toBytes(roles));

  table.put(put);

  table.flushCommits();

  rm.putTable(table);

}

 

 

 

 補充:

HbaseDBMS比較:

查詢數據不靈活:

一、 不能使用column之間過濾查詢

二、 不支持全文索引。使用solrhbase整合完成全文搜索。

a) 使用MR批量讀取hbase中的數據,solr裏面創建索引(no  store)之保存rowkey的值。

b) 根據關鍵詞從索引中搜索到rowkey(分頁)

c) 根據rowkeyhbase查詢全部數據

相關文章
相關標籤/搜索