HBase學習筆記1 - 如何編寫高性能的客戶端Java代碼

轉載請標註原連接:http://www.cnblogs.com/xczyd/p/5577124.htmlhtml

客戶在使用HBase的時候,常常會抱怨說寫入太慢,併發上不去等等。從前我遇到這種狀況,通常都二話不說,直接去看HBase集羣的負載,看看有什麼性能瓶頸等等。linux

某老司機說,且慢,先看看用戶怎麼寫的客戶端訪問HBase集羣的代碼。數據庫

因而花了一些時間去看。ubuntu

不看不知道,一看就嚇尿。客戶(也包括咱們本身的實施)寫出來的客戶端,不少時候存在不少低級錯誤,好比:緩存

(1)濫用sychronize;服務器

(2)建立了鏈接不釋放;網絡

(3)明明只須要調用一次的API,卻進行了屢次調用,要是碰巧遇到比較花時間的API,那性能就可想而知了;併發

(4)其餘各類幺蛾子...oracle

爲此,本篇僅從HBase的Java API入手,經過源碼分析和簡單的實驗,找到最合適Java API調用方法(主要服務於高併發場景)。dom

若是對HBase的Java API不熟悉的話,能夠先去官網看一下文檔。

下面開始正文:

使用Java API與HBase集羣交互時,須要先建立一個HTable的實例,再使用該實例提供的方法來進行插入/刪除/查詢等操做。

要建立HTable對象,要先建立一個包含了HBase集羣信息的配置實例Configuration conf,其通常建立方法以下:

Configuration conf = HBaseConfiguration.create();
//設置HBase集羣的IP和端口
conf.set("hbase.zookeeper.quorum", "XX.XXX.X.XX");
conf.set("hbase.zookeeper.property.clientPort", "2181");

在擁有了conf以後,能夠經過HTable提供的以下兩種構造方法來建立HTable實例:

 

方法一:直接利用conf來建立HTable實例

對應的構造函數以下:

public HTable(Configuration conf, final TableName tableName)
  throws IOException {
    this.tableName = tableName;
    this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
    if (conf == null) {
      this.connection = null;
      return;
    }
    this.connection = HConnectionManager.getConnection(conf);
    this.configuration = conf;

    this.pool = getDefaultExecutor(conf);
    this.finishSetup();
 }

注意紅色部分的代碼。在這種構造方法中,會調用HConnectionManager的getConnection函數,這個函數以conf做爲輸入參數,來獲取了一個HConnection的實例connection。熟悉odbc,jdbc的話,會知道使用Java API進行數據庫操做的時候,都會建立一個相似的connection/connection pool來維護一些數據庫與客戶端之間相互的鏈接。對於Hbase來講,承擔相似角色的就是HConnection。不過與oracle不一樣的一點是,HConnection實際上去鏈接的並非HBase集羣自己,而是維護其關鍵數據信息的Zookeeper(簡稱ZK)集羣。有關ZK的內容在這裏不作展開,不熟悉的話能夠單純地理解爲一個獨立的元信息管理角色。回過來看getConnection函數,其具體實現以下:

public static HConnection getConnection(final Configuration conf)
  throws IOException {
    HConnectionKey connectionKey = new HConnectionKey(conf);
    synchronized (CONNECTION_INSTANCES) {
      HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
      if (connection == null) {
        connection = (HConnectionImplementation)createConnection(conf, true);
        CONNECTION_INSTANCES.put(connectionKey, connection);
      } else if (connection.isClosed()) {
        HConnectionManager.deleteConnection(connectionKey, true);
        connection = (HConnectionImplementation)createConnection(conf, true);
        CONNECTION_INSTANCES.put(connectionKey, connection);
      }
      connection.incCount();
      return connection;
    }
}

其中,CONNECTION_INSTANCES的類型是LinkedHashMap<HConnectionKey,HConnectionImplementation>。所謂HConnectionImplementation其實就是HConnection的具體實現。繼續注意紅色部分的三行代碼。第一行,經過conf建立了一個HConnectionKey的實例connectionKey;第二行,去CONNECTION_INSTANCES中查找是否存在與connectionKey對應的一個HConnection的實例;第三行,若是不存在,那麼調用createConnection來建立一個HConnection的實例,不然直接返回剛纔從Map中查找獲得的HConnection對象

不嫌麻煩,再看一下HConnectionKey的構造函數和重寫的hashCode函數,代碼分別以下:

HConnectionKey(Configuration conf) {
    Map<String, String> m = new HashMap<String, String>();
    if (conf != null) {
      for (String property : CONNECTION_PROPERTIES) {
        String value = conf.get(property);
        if (value != null) {
          m.put(property, value);
        }
      }
    }
    this.properties = Collections.unmodifiableMap(m);

    try {
      UserProvider provider = UserProvider.instantiate(conf);
      User currentUser = provider.getCurrent();
      if (currentUser != null) {
        username = currentUser.getName(); }
    } catch (IOException ioe) {
      HConnectionManager.LOG.warn("Error obtaining current user, skipping username in HConnectionKey", ioe);
    }
}
public int hashCode() {
    final int prime = 31;
    int result = 1;
    if (username != null) {
      result = username.hashCode();
    }
    for (String property : CONNECTION_PROPERTIES) {
      String value = properties.get(property);
      if (value != null) {
        result = prime * result + value.hashCode();
      }
    }

    return result;
}

能夠看到,hashCode函數被重寫之後,其返回值其實是username的hashCode函數的返回值,而username來自於currentuser,currentuser又來自於provider,provider是由conf建立的。能夠看出,只要有相同的conf,就能建立出相同的username,也就能保證HConnectionKey的hashCode函數被重寫之後,可以在username相同時返回相同的值。而CONNECTION_INSTANCES是一個LinkedHashMap,其get函數會調用HConnectionKey的hashCode函數來判斷該對象是否已經存在。所以,getConnection函數的本質就是根據conf信息返回connection對象,對每個內容相同的conf,只會返回一個connection

 

方法二:調用createConnection方法來顯式地建立Hconnection的實例,再將其做爲輸入參數來建立HTable實例

createConnection方法和Htable對應的構造函數分別以下:

public static HConnection createConnection(Configuration conf) throws IOException {
    UserProvider provider = UserProvider.instantiate(conf);
    return createConnection(conf, false, null, provider.getCurrent());
}

static HConnection createConnection(final Configuration conf, final boolean managed,final ExecutorService pool, final User user)
throws IOException { String className = conf.get("hbase.client.connection.impl",HConnectionManager.HConnectionImplementation.class.getName()); Class<?> clazz = null; try { clazz = Class.forName(className); } catch (ClassNotFoundException e) { throw new IOException(e); } try { // Default HCM#HCI is not accessible; make it so before invoking. Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class, boolean.class, ExecutorService.class, User.class); constructor.setAccessible(true); return (HConnection) constructor.newInstance(conf, managed, pool, user); } catch (Exception e) { throw new IOException(e); } }
public HTable(TableName tableName, HConnection connection) throws IOException {
    this.tableName = tableName;
    this.cleanupPoolOnClose = true;
    this.cleanupConnectionOnClose = false;
    this.connection = connection;
    this.configuration = connection.getConfiguration();

    this.pool = getDefaultExecutor(this.configuration);
    this.finishSetup();
 }

能夠看出,這種構造HTable的方法會經過反射來建立一個新的HConnection實例,而不像方法一中那樣共享一個HConnection實例。

值得一提的是,經過此種方法建立出來的HConnection,是須要在再也不使用的時候顯式調用close方法去釋放掉的,不然容易形成端口占用等問題。

 

那麼,上述兩種方法,在執行插入/刪除/查找的時候,性能如何呢?不妨先從代碼角度分析一下。爲了簡便,先分析HTable在執行put(插入)操做時具體作的事情。

HTable的put函數以下:

public void put(final Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
    doPut(put); if (autoFlush) {
      flushCommits();
    }
}

private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
    if (ap.hasError()){
      writeAsyncBuffer.add(put);
      backgroundFlushCommits(true);
    }

    validatePut(put);

    currentWriteBufferSize += put.heapSize();
    writeAsyncBuffer.add(put);

    while (currentWriteBufferSize > writeBufferSize) {
      backgroundFlushCommits(false);
    }
}

private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
    try {
      do {
        ap.submit(writeAsyncBuffer, true);
      } while (synchronous && !writeAsyncBuffer.isEmpty());

      if (synchronous) {
        ap.waitUntilDone();
      }

      if (ap.hasError()) {
        LOG.debug(tableName + ": One or more of the operations have failed -" +
            " waiting for all operation in progress to finish (successfully or not)");
        while (!writeAsyncBuffer.isEmpty()) {
          ap.submit(writeAsyncBuffer, true);
        }
        ap.waitUntilDone();

        if (!clearBufferOnFail) {
          // if clearBufferOnFailed is not set, we're supposed to keep the failed operation in the
          //  write buffer. This is a questionable feature kept here for backward compatibility
          writeAsyncBuffer.addAll(ap.getFailedOperations());
        }
        RetriesExhaustedWithDetailsException e = ap.getErrors();
        ap.clearErrors();
        throw e;
      }
    } finally {
      currentWriteBufferSize = 0;
      for (Row mut : writeAsyncBuffer) {
        if (mut instanceof Mutation) {
          currentWriteBufferSize += ((Mutation) mut).heapSize();
        }
      }
    }
}

如紅色部分所表示,調用順序是put->doPut->backgroundFlushCommits->ap.submit,其中ap是類AsyncProcess的對象。所以追蹤到AsyncProcess類,其代碼以下:

public void submit(List<? extends Row> rows, boolean atLeastOne) throws InterruptedIOException {
    submitLowPriority(rows, atLeastOne, false);
}

public void submitLowPriority(List<? extends Row> rows, boolean atLeastOne, boolean isLowPripority) throws InterruptedIOException {
    if (rows.isEmpty()) {
      return;
    }

    // This looks like we are keying by region but HRegionLocation has a comparator that compares
    // on the server portion only (hostname + port) so this Map collects regions by server.
    Map<HRegionLocation, MultiAction<Row>> actionsByServer = new HashMap<HRegionLocation, MultiAction<Row>>();
    List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());

    long currentTaskCnt = tasksDone.get();
    boolean alreadyLooped = false;

    NonceGenerator ng = this.hConnection.getNonceGenerator();
    do {
      if (alreadyLooped){
        // if, for whatever reason, we looped, we want to be sure that something has changed.
        waitForNextTaskDone(currentTaskCnt);
        currentTaskCnt = tasksDone.get();
      } else {
        alreadyLooped = true;
      }

      // Wait until there is at least one slot for a new task.
      waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);

      // Remember the previous decisions about regions or region servers we put in the
      //  final multi.
      Map<Long, Boolean> regionIncluded = new HashMap<Long, Boolean>();
      Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();

      int posInList = -1;
      Iterator<? extends Row> it = rows.iterator();
      while (it.hasNext()) {
        Row r = it.next();
        HRegionLocation loc = findDestLocation(r, posInList);

        if (loc == null) { // loc is null if there is an error such as meta not available.
          it.remove();
        } else if (canTakeOperation(loc, regionIncluded, serverIncluded)) {
          Action<Row> action = new Action<Row>(r, ++posInList);
          setNonce(ng, r, action);
          retainedActions.add(action);
          addAction(loc, action, actionsByServer, ng);
          it.remove();
        }
      }
    } while (retainedActions.isEmpty() && atLeastOne && !hasError());

    HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();
    sendMultiAction(retainedActions, actionsByServer, 1, errorsByServer, isLowPripority);
}

private HRegionLocation findDestLocation(Row row, int posInList) {
  if (row == null) throw new IllegalArgumentException("#" + id + ", row cannot be null");
  HRegionLocation loc = null;
  IOException locationException = null;
  try {
    loc = hConnection.locateRegion(this.tableName, row.getRow());
    if (loc == null) {
      locationException = new IOException("#" + id + ", no location found, aborting submit for" +
          " tableName=" + tableName +
          " rowkey=" + Arrays.toString(row.getRow()));
    }
  } catch (IOException e) {
    locationException = e;
  }
  if (locationException != null) {
    // There are multiple retries in locateRegion already. No need to add new.
    // We can't continue with this row, hence it's the last retry.
    manageError(posInList, row, false, locationException, null);
    return null;
  }

  return loc;
}

這裏代碼的主要實現機制是異步調用,也就是說,並不是每一次put操做都是直接往HBase裏面寫數據的,而是等到緩存區域內的數據多到必定程度(默認設置是2M),再進行一次寫操做。固然此次操做在Server端應當仍是要排隊執行的,具體執行機制這裏不做展開。能夠肯定的是,HConnection在插入/查詢/刪除的Java API中,只是起到一個定位RegionServer的做用,在定位到RegionServer以後,操做都是由client端經過rpc調用完成的,與客戶端建立的connection的數目無關另外,locateRegion其實只有在沒有命中緩存的時候纔會進行rpc通訊,其餘時候都是直接從緩存中獲取RegionServer信息,詳情能夠查看locateRegion的源碼,這裏也再也不展開。

代碼分析告一段落,經過分析能夠肯定,createConnection的方法建立出大量的HConnection並不會對寫入性能有任何幫助。相反,因爲白白浪費了資源,還會比getConnection更慢。可是慢多少,沒法僅憑代碼做出判斷。

不妨簡單作一個實驗來驗證上述論斷:

服務器環境:四臺linux服務器組成的HBase集羣, 內存64G,ping一次平均約5ms(嚴謹一點的話應該再提供一下cpu核數、頻率,以及磁盤轉速等信息)

客戶端環境:在Mac上裝的ubuntu虛擬機,分配內存10G,CPU、網絡和磁盤讀寫速度都要比物理機慢很多,可是不影響結論

實驗代碼以下:

public class HbaseConectionTest {

    public static void main(String[] args) throws Exception{

        Configuration conf = HBaseConfiguration.create();

        conf.set("hbase.zookeeper.quorum", "XX.XXX.X.XX");
        conf.set("hbase.zookeeper.property.clientPort", "2181");

        ThreadInfo info = new ThreadInfo();
        info.setTableNamePrefix("test");
        info.setColNames("col1,col2");
        info.setTableCount(1);
        info.setConnStrategy("CREATEWITHCONF");//CREATEWITHCONF,CREATEWITHCONN
        info.setWriteStrategy("SEPERATE");//OVERLAP,SEPERATE
        info.setLifeCycle(60000L);

        int threadCount = 100;

        for(int i=0;i<threadCount;i++){
            //createTable(tableNamePrefix+i,colNames,conf);
        }

        //
        for(int i=0;i<threadCount;i++){
            new Thread(new WriteThread(conf,info,i)).start();
        }

        //HBaseAdmin admin = new HBaseAdmin(conf);

        //System.out.println(admin.tableExists("test"));

    }

    public static void createTable(String tableName,String[] colNames,Configuration conf) {
        System.out.println("start create table "+tableName);
        try {

            HBaseAdmin hBaseAdmin = new HBaseAdmin(conf);
            if (hBaseAdmin.tableExists(tableName)) {
                System.out.println(tableName + " is exist");
                //hBaseAdmin.disableTable(tableName);
                //hBaseAdmin.deleteTable(tableName);
                return;
            }
            HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
            for(int i=0;i<colNames.length;i++) {
                tableDescriptor.addFamily(new HColumnDescriptor(colNames[i]));
            }
            hBaseAdmin.createTable(tableDescriptor);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        System.out.println("end create table "+tableName);
    }

}

//Thread執行操做的配置信息
class ThreadInfo {

    private int tableCount;

    String tableNamePrefix;
    String[] colNames;

    //CREATEBYCONF or CREATEBYCONN
    String connStrategy;

    //overlap or seperate
    String writeStrategy;

    long lifeCycle;

    public ThreadInfo(){

    }

    public int getTableCount() {
        return tableCount;
    }

    public void setTableCount(int tableCount) {
        this.tableCount = tableCount;
    }

    public String getTableNamePrefix() {
        return tableNamePrefix;
    }

    public void setTableNamePrefix(String tableNamePrefix) {
        this.tableNamePrefix = tableNamePrefix;
    }

    public String[] getColNames() {
        return colNames;
    }

    public void setColNames(String[] colNames) {
        this.colNames = colNames;
    }

    public void setColNames(String colNames) {
        if(colNames == null){
            this.colNames = null;
        }
        else{
            this.colNames = colNames.split(",");
        }
    }

    public String getWriteStrategy() {
        return writeStrategy;
    }

    public void setWriteStrategy(String writeStrategy) {
        this.writeStrategy = writeStrategy;
    }

    public String getConnStrategy() {
        return connStrategy;
    }

    public void setConnStrategy(String connStrategy) {
        this.connStrategy = connStrategy;
    }

    public long getLifeCycle() {
        return lifeCycle;
    }

    public void setLifeCycle(long lifeCycle) {
        this.lifeCycle = lifeCycle;
    }

}

class WriteThread implements Runnable{

    private Configuration conf;
    private ThreadInfo info;
    private int index;

    public WriteThread(Configuration conf,ThreadInfo info,int index){
        this.conf = conf;
        this.info = info;
        this.index = index;
    }

    @Override
    public void run(){

        String threadName = Thread.currentThread().getName();
        int operationCount = 0;

        HTable[] htables = null;
        HConnection conn = null;

        int tableCount = info.getTableCount();

        String tableNamePrefix = info.getTableNamePrefix();
        String[] colNames = info.getColNames();

        String connStrategy = info.getConnStrategy();
        String writeStrategy = info.getWriteStrategy();

        long lifeCycle = info.getLifeCycle();

        System.out.println(threadName+": started with index "+index);

        try{
            if (connStrategy.equals("CREATEWITHCONN")) {

                conn = HConnectionManager.createConnection(conf);

                if (writeStrategy.equals("SEPERATE")) {
                    htables = new HTable[1];
                    htables[0] = new HTable(TableName.valueOf(tableNamePrefix+(index%tableCount)), conn);
                }
                else if(writeStrategy.equals("OVERLAP")) {
                    htables = new HTable[tableCount];
                    for (int i = 0; i < tableCount; i++) {
                        htables[i] = new HTable(TableName.valueOf(tableNamePrefix+i), conn);
                    }
                }
                else{
                    return;
                }
            }
            else if (connStrategy.equals("CREATEWITHCONF")) {

                conn = null;

                if (writeStrategy.equals("SEPERATE")) {
                    htables = new HTable[1];
                    htables[0] = new HTable(conf,TableName.valueOf(tableNamePrefix+(index%tableCount)));
                }
                else if(writeStrategy.equals("OVERLAP")) {
                    htables = new HTable[tableCount];
                    for (int i = 0; i < tableCount; i++) {
                        htables[i] = new HTable(conf,TableName.valueOf(tableNamePrefix+i));
                    }
                }
                else{
                    return;
                }
            }
            else {
                return;
            }

            long start = System.currentTimeMillis();
            long end = System.currentTimeMillis();

            Map<HTable,HColumnDescriptor[]> table_columnFamilies = new HashMap<HTable,HColumnDescriptor[]>();
            for(int i=0;i<htables.length;i++){
                table_columnFamilies.put(htables[i],htables[i].getTableDescriptor().getColumnFamilies());
            }

            while(end-start<=lifeCycle){
                HTable table = htables.length==1?htables[0]:htables[(int)Math.random()*htables.length];
                long s1 = System.currentTimeMillis();
                double r = Math.random();
                HColumnDescriptor[] columnFamilies = table_columnFamilies.get(table);
                Put put = generatePut(threadName,columnFamilies,colNames,operationCount);
                table.put(put);
                if(r>0.999){
                    System.out.println(System.currentTimeMillis()-s1);
                }
                operationCount++;
                end = System.currentTimeMillis();
            }

            if(conn != null){
                conn.close();
            }

        }catch(Exception ex){
            ex.printStackTrace();
        }

        System.out.println(threadName+": ended with operation count:"+operationCount);
    }

    private Put generatePut(String threadName,HColumnDescriptor[] columnFamilies,String[] colNames,int operationCount){
        Put put = new Put(Bytes.toBytes(threadName+"_"+operationCount));
        for (int i = 0; i < columnFamilies.length; i++) {
            String familyName = columnFamilies[i].getNameAsString();
            //System.out.println("familyName:"+familyName);
            for(int j=0;j<colNames.length;j++){
                if(familyName.equals(colNames[j])) { //
                    String columnName = familyName+(int)(Math.floor(Math.random()*5+10*j));
                    String val = ""+columnName.hashCode()%100;
                    put.add(Bytes.toBytes(familyName),Bytes.toBytes(columnName),Bytes.toBytes(val));
                }
            }
        }
        //System.out.println(put.toString());
        return put;
    }
}

簡單來講就是先建立一些有兩列的HBase表,而後建立一些線程分別採用getConnection策略和createConnection策略來寫1分鐘的數據。固然寫幾張表,寫多久,寫什麼,怎麼寫均可以調整。好比我這裏就設計了固定寫一張表或者隨機寫一張表幾種邏輯。須要注意一下紅色部分的代碼,這裏預先得到了要寫的HBase表的列信息。作這個動做的緣由是getTableDescriptor是會產生網絡開銷的,建議寫代碼時儘可能少調用,以避免增長沒必要要的額外開銷(事實上這個額外開銷是很巨大的)。

具體實驗數據以下表所示,具體值由於網絡波動等緣由會有所差別。總的來講,在併發較高(線程數大於30)的時候,getConnection方法速度要明顯快於createConnection;在併發較低的(線程數小於等於10)的時候,createConnection則稍微佔優。另外,使用getConnection的時候,寫一張表的速度在高併發場景下要明顯快於寫多張表,可是在低併發狀況下此現象不明顯;使用createConnection的時候,不管併發高低,寫一張表的速度與寫多張表大體相同,甚至還偏慢。

上述現象與代碼分析的結果並不徹底一致。不一致的地方主要包括以下兩點:

(1)爲何線程少的時候,createConnection佔優?理論上應該持平纔是。這一點沒法獲得很合理的解釋,存疑;

(2)爲何線程不少的時候,createConnection會慢這麼多?這裏猜想服務端的ZK要維護大量鏈接會負載過大,即使是多個regionServer在負責具體的寫操做,也仍舊會致使性能降低。

這兩個疑點還有待進一步論證。儘管如此,仍是能夠先建議你們在使用Java API與HBase交互時,尤爲是處理高併發場景的時候,儘可能使用getConnection的辦法去建立HTable對象,避免維護沒必要要的connection致使浪費資源。

thread_count table_count conn_strategy write_strategy interval result
1 1 CONF OVERLAP 60s 10000*1=10000
5 1 CONF OVERLAP 60s 11000*5=55000
10 1 CONF OVERLAP 60s 12000*10=120000
30 1 CONF OVERLAP 60s 8300*30=249000
60 1 CONF OVERLAP 60s 6000*60=360000
100 1 CONF OVERLAP 60s 4700*100=470000
1 1 CONN OVERLAP 60s 12000*1=12000
5 1 CONN OVERLAP 60s 16000*5=80000
10 1 CONN OVERLAP 60s 10000*10=100000
30 1 CONN OVERLAP 60s 2500*30=75000
60 1 CONN OVERLAP 60s 1200*60=72000
100 1 CONN OVERLAP 60s 1000*100=100000
5 5 CONF SEPERATE 60s 10600*5=53000
10 10 CONF SEPERATE 60s 11900*10=119000
30 30 CONF SEPERATE 60s 6900*30=207000
60 60 CONF SEPERATE 60s 3650*60=219000
100 100 CONF SEPERATE 60s 2500*100=250000
5 5 CONN SEPERATE 60s 14000*5=70000
10 10 CONN SEPERATE 60s 10500*10=105000
30 30 CONN SEPERATE 60s 3250*30=97500
60 60 CONN SEPERATE 60s 1450*60=87000
100 100 CONN SEPERATE 60s 930*100=93000
相關文章
相關標籤/搜索