轉載請標註原連接:http://www.cnblogs.com/xczyd/p/5577124.htmlhtml
客戶在使用HBase的時候,常常會抱怨說寫入太慢,併發上不去等等。從前我遇到這種狀況,通常都二話不說,直接去看HBase集羣的負載,看看有什麼性能瓶頸等等。linux
某老司機說,且慢,先看看用戶怎麼寫的客戶端訪問HBase集羣的代碼。數據庫
因而花了一些時間去看。ubuntu
不看不知道,一看就嚇尿。客戶(也包括咱們本身的實施)寫出來的客戶端,不少時候存在不少低級錯誤,好比:緩存
(1)濫用sychronize;服務器
(2)建立了鏈接不釋放;網絡
(3)明明只須要調用一次的API,卻進行了屢次調用,要是碰巧遇到比較花時間的API,那性能就可想而知了;併發
(4)其餘各類幺蛾子...oracle
爲此,本篇僅從HBase的Java API入手,經過源碼分析和簡單的實驗,找到最合適Java API調用方法(主要服務於高併發場景)。dom
若是對HBase的Java API不熟悉的話,能夠先去官網看一下文檔。
下面開始正文:
使用Java API與HBase集羣交互時,須要先建立一個HTable的實例,再使用該實例提供的方法來進行插入/刪除/查詢等操做。
要建立HTable對象,要先建立一個包含了HBase集羣信息的配置實例Configuration conf,其通常建立方法以下:
Configuration conf = HBaseConfiguration.create(); //設置HBase集羣的IP和端口 conf.set("hbase.zookeeper.quorum", "XX.XXX.X.XX"); conf.set("hbase.zookeeper.property.clientPort", "2181");
在擁有了conf以後,能夠經過HTable提供的以下兩種構造方法來建立HTable實例:
方法一:直接利用conf來建立HTable實例
對應的構造函數以下:
public HTable(Configuration conf, final TableName tableName) throws IOException { this.tableName = tableName; this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true; if (conf == null) { this.connection = null; return; } this.connection = HConnectionManager.getConnection(conf); this.configuration = conf; this.pool = getDefaultExecutor(conf); this.finishSetup(); }
注意紅色部分的代碼。在這種構造方法中,會調用HConnectionManager的getConnection函數,這個函數以conf做爲輸入參數,來獲取了一個HConnection的實例connection。熟悉odbc,jdbc的話,會知道使用Java API進行數據庫操做的時候,都會建立一個相似的connection/connection pool來維護一些數據庫與客戶端之間相互的鏈接。對於Hbase來講,承擔相似角色的就是HConnection。不過與oracle不一樣的一點是,HConnection實際上去鏈接的並非HBase集羣自己,而是維護其關鍵數據信息的Zookeeper(簡稱ZK)集羣。有關ZK的內容在這裏不作展開,不熟悉的話能夠單純地理解爲一個獨立的元信息管理角色。回過來看getConnection函數,其具體實現以下:
public static HConnection getConnection(final Configuration conf) throws IOException { HConnectionKey connectionKey = new HConnectionKey(conf); synchronized (CONNECTION_INSTANCES) { HConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey); if (connection == null) { connection = (HConnectionImplementation)createConnection(conf, true); CONNECTION_INSTANCES.put(connectionKey, connection); } else if (connection.isClosed()) { HConnectionManager.deleteConnection(connectionKey, true); connection = (HConnectionImplementation)createConnection(conf, true); CONNECTION_INSTANCES.put(connectionKey, connection); } connection.incCount(); return connection; } }
其中,CONNECTION_INSTANCES的類型是LinkedHashMap<HConnectionKey,HConnectionImplementation>。所謂HConnectionImplementation其實就是HConnection的具體實現。繼續注意紅色部分的三行代碼。第一行,經過conf建立了一個HConnectionKey的實例connectionKey;第二行,去CONNECTION_INSTANCES中查找是否存在與connectionKey對應的一個HConnection的實例;第三行,若是不存在,那麼調用createConnection來建立一個HConnection的實例,不然直接返回剛纔從Map中查找獲得的HConnection對象
不嫌麻煩,再看一下HConnectionKey的構造函數和重寫的hashCode函數,代碼分別以下:
HConnectionKey(Configuration conf) { Map<String, String> m = new HashMap<String, String>(); if (conf != null) { for (String property : CONNECTION_PROPERTIES) { String value = conf.get(property); if (value != null) { m.put(property, value); } } } this.properties = Collections.unmodifiableMap(m); try { UserProvider provider = UserProvider.instantiate(conf); User currentUser = provider.getCurrent(); if (currentUser != null) { username = currentUser.getName(); } } catch (IOException ioe) { HConnectionManager.LOG.warn("Error obtaining current user, skipping username in HConnectionKey", ioe); }
}
public int hashCode() { final int prime = 31; int result = 1; if (username != null) { result = username.hashCode(); } for (String property : CONNECTION_PROPERTIES) { String value = properties.get(property); if (value != null) { result = prime * result + value.hashCode(); } } return result; }
能夠看到,hashCode函數被重寫之後,其返回值其實是username的hashCode函數的返回值,而username來自於currentuser,currentuser又來自於provider,provider是由conf建立的。能夠看出,只要有相同的conf,就能建立出相同的username,也就能保證HConnectionKey的hashCode函數被重寫之後,可以在username相同時返回相同的值。而CONNECTION_INSTANCES是一個LinkedHashMap,其get函數會調用HConnectionKey的hashCode函數來判斷該對象是否已經存在。所以,getConnection函數的本質就是根據conf信息返回connection對象,對每個內容相同的conf,只會返回一個connection
方法二:調用createConnection方法來顯式地建立Hconnection的實例,再將其做爲輸入參數來建立HTable實例
createConnection方法和Htable對應的構造函數分別以下:
public static HConnection createConnection(Configuration conf) throws IOException { UserProvider provider = UserProvider.instantiate(conf); return createConnection(conf, false, null, provider.getCurrent()); } static HConnection createConnection(final Configuration conf, final boolean managed,final ExecutorService pool, final User user)
throws IOException { String className = conf.get("hbase.client.connection.impl",HConnectionManager.HConnectionImplementation.class.getName()); Class<?> clazz = null; try { clazz = Class.forName(className); } catch (ClassNotFoundException e) { throw new IOException(e); } try { // Default HCM#HCI is not accessible; make it so before invoking. Constructor<?> constructor = clazz.getDeclaredConstructor(Configuration.class, boolean.class, ExecutorService.class, User.class); constructor.setAccessible(true); return (HConnection) constructor.newInstance(conf, managed, pool, user); } catch (Exception e) { throw new IOException(e); } }
public HTable(TableName tableName, HConnection connection) throws IOException { this.tableName = tableName; this.cleanupPoolOnClose = true; this.cleanupConnectionOnClose = false; this.connection = connection; this.configuration = connection.getConfiguration(); this.pool = getDefaultExecutor(this.configuration); this.finishSetup(); }
能夠看出,這種構造HTable的方法會經過反射來建立一個新的HConnection實例,而不像方法一中那樣共享一個HConnection實例。
值得一提的是,經過此種方法建立出來的HConnection,是須要在再也不使用的時候顯式調用close方法去釋放掉的,不然容易形成端口占用等問題。
那麼,上述兩種方法,在執行插入/刪除/查找的時候,性能如何呢?不妨先從代碼角度分析一下。爲了簡便,先分析HTable在執行put(插入)操做時具體作的事情。
HTable的put函數以下:
public void put(final Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException { doPut(put); if (autoFlush) { flushCommits(); } } private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException { if (ap.hasError()){ writeAsyncBuffer.add(put); backgroundFlushCommits(true); } validatePut(put); currentWriteBufferSize += put.heapSize(); writeAsyncBuffer.add(put); while (currentWriteBufferSize > writeBufferSize) { backgroundFlushCommits(false); } } private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException, RetriesExhaustedWithDetailsException { try { do { ap.submit(writeAsyncBuffer, true); } while (synchronous && !writeAsyncBuffer.isEmpty()); if (synchronous) { ap.waitUntilDone(); } if (ap.hasError()) { LOG.debug(tableName + ": One or more of the operations have failed -" + " waiting for all operation in progress to finish (successfully or not)"); while (!writeAsyncBuffer.isEmpty()) { ap.submit(writeAsyncBuffer, true); } ap.waitUntilDone(); if (!clearBufferOnFail) { // if clearBufferOnFailed is not set, we're supposed to keep the failed operation in the // write buffer. This is a questionable feature kept here for backward compatibility writeAsyncBuffer.addAll(ap.getFailedOperations()); } RetriesExhaustedWithDetailsException e = ap.getErrors(); ap.clearErrors(); throw e; } } finally { currentWriteBufferSize = 0; for (Row mut : writeAsyncBuffer) { if (mut instanceof Mutation) { currentWriteBufferSize += ((Mutation) mut).heapSize(); } } } }
如紅色部分所表示,調用順序是put->doPut->backgroundFlushCommits->ap.submit,其中ap是類AsyncProcess的對象。所以追蹤到AsyncProcess類,其代碼以下:
public void submit(List<? extends Row> rows, boolean atLeastOne) throws InterruptedIOException { submitLowPriority(rows, atLeastOne, false); } public void submitLowPriority(List<? extends Row> rows, boolean atLeastOne, boolean isLowPripority) throws InterruptedIOException { if (rows.isEmpty()) { return; } // This looks like we are keying by region but HRegionLocation has a comparator that compares // on the server portion only (hostname + port) so this Map collects regions by server. Map<HRegionLocation, MultiAction<Row>> actionsByServer = new HashMap<HRegionLocation, MultiAction<Row>>(); List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size()); long currentTaskCnt = tasksDone.get(); boolean alreadyLooped = false; NonceGenerator ng = this.hConnection.getNonceGenerator(); do { if (alreadyLooped){ // if, for whatever reason, we looped, we want to be sure that something has changed. waitForNextTaskDone(currentTaskCnt); currentTaskCnt = tasksDone.get(); } else { alreadyLooped = true; } // Wait until there is at least one slot for a new task. waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1); // Remember the previous decisions about regions or region servers we put in the // final multi. Map<Long, Boolean> regionIncluded = new HashMap<Long, Boolean>(); Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>(); int posInList = -1; Iterator<? extends Row> it = rows.iterator(); while (it.hasNext()) { Row r = it.next(); HRegionLocation loc = findDestLocation(r, posInList); if (loc == null) { // loc is null if there is an error such as meta not available. it.remove(); } else if (canTakeOperation(loc, regionIncluded, serverIncluded)) { Action<Row> action = new Action<Row>(r, ++posInList); setNonce(ng, r, action); retainedActions.add(action); addAction(loc, action, actionsByServer, ng); it.remove(); } } } while (retainedActions.isEmpty() && atLeastOne && !hasError()); HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker(); sendMultiAction(retainedActions, actionsByServer, 1, errorsByServer, isLowPripority); } private HRegionLocation findDestLocation(Row row, int posInList) { if (row == null) throw new IllegalArgumentException("#" + id + ", row cannot be null"); HRegionLocation loc = null; IOException locationException = null; try { loc = hConnection.locateRegion(this.tableName, row.getRow()); if (loc == null) { locationException = new IOException("#" + id + ", no location found, aborting submit for" + " tableName=" + tableName + " rowkey=" + Arrays.toString(row.getRow())); } } catch (IOException e) { locationException = e; } if (locationException != null) { // There are multiple retries in locateRegion already. No need to add new. // We can't continue with this row, hence it's the last retry. manageError(posInList, row, false, locationException, null); return null; } return loc; }
這裏代碼的主要實現機制是異步調用,也就是說,並不是每一次put操做都是直接往HBase裏面寫數據的,而是等到緩存區域內的數據多到必定程度(默認設置是2M),再進行一次寫操做。固然此次操做在Server端應當仍是要排隊執行的,具體執行機制這裏不做展開。能夠肯定的是,HConnection在插入/查詢/刪除的Java API中,只是起到一個定位RegionServer的做用,在定位到RegionServer以後,操做都是由client端經過rpc調用完成的,與客戶端建立的connection的數目無關。另外,locateRegion其實只有在沒有命中緩存的時候纔會進行rpc通訊,其餘時候都是直接從緩存中獲取RegionServer信息,詳情能夠查看locateRegion的源碼,這裏也再也不展開。
代碼分析告一段落,經過分析能夠肯定,createConnection的方法建立出大量的HConnection並不會對寫入性能有任何幫助。相反,因爲白白浪費了資源,還會比getConnection更慢。可是慢多少,沒法僅憑代碼做出判斷。
不妨簡單作一個實驗來驗證上述論斷:
服務器環境:四臺linux服務器組成的HBase集羣, 內存64G,ping一次平均約5ms(嚴謹一點的話應該再提供一下cpu核數、頻率,以及磁盤轉速等信息)
客戶端環境:在Mac上裝的ubuntu虛擬機,分配內存10G,CPU、網絡和磁盤讀寫速度都要比物理機慢很多,可是不影響結論
實驗代碼以下:
public class HbaseConectionTest { public static void main(String[] args) throws Exception{ Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "XX.XXX.X.XX"); conf.set("hbase.zookeeper.property.clientPort", "2181"); ThreadInfo info = new ThreadInfo(); info.setTableNamePrefix("test"); info.setColNames("col1,col2"); info.setTableCount(1); info.setConnStrategy("CREATEWITHCONF");//CREATEWITHCONF,CREATEWITHCONN info.setWriteStrategy("SEPERATE");//OVERLAP,SEPERATE info.setLifeCycle(60000L); int threadCount = 100; for(int i=0;i<threadCount;i++){ //createTable(tableNamePrefix+i,colNames,conf); } // for(int i=0;i<threadCount;i++){ new Thread(new WriteThread(conf,info,i)).start(); } //HBaseAdmin admin = new HBaseAdmin(conf); //System.out.println(admin.tableExists("test")); } public static void createTable(String tableName,String[] colNames,Configuration conf) { System.out.println("start create table "+tableName); try { HBaseAdmin hBaseAdmin = new HBaseAdmin(conf); if (hBaseAdmin.tableExists(tableName)) { System.out.println(tableName + " is exist"); //hBaseAdmin.disableTable(tableName); //hBaseAdmin.deleteTable(tableName); return; } HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); for(int i=0;i<colNames.length;i++) { tableDescriptor.addFamily(new HColumnDescriptor(colNames[i])); } hBaseAdmin.createTable(tableDescriptor); } catch (Exception ex) { ex.printStackTrace(); } System.out.println("end create table "+tableName); } } //Thread執行操做的配置信息 class ThreadInfo { private int tableCount; String tableNamePrefix; String[] colNames; //CREATEBYCONF or CREATEBYCONN String connStrategy; //overlap or seperate String writeStrategy; long lifeCycle; public ThreadInfo(){ } public int getTableCount() { return tableCount; } public void setTableCount(int tableCount) { this.tableCount = tableCount; } public String getTableNamePrefix() { return tableNamePrefix; } public void setTableNamePrefix(String tableNamePrefix) { this.tableNamePrefix = tableNamePrefix; } public String[] getColNames() { return colNames; } public void setColNames(String[] colNames) { this.colNames = colNames; } public void setColNames(String colNames) { if(colNames == null){ this.colNames = null; } else{ this.colNames = colNames.split(","); } } public String getWriteStrategy() { return writeStrategy; } public void setWriteStrategy(String writeStrategy) { this.writeStrategy = writeStrategy; } public String getConnStrategy() { return connStrategy; } public void setConnStrategy(String connStrategy) { this.connStrategy = connStrategy; } public long getLifeCycle() { return lifeCycle; } public void setLifeCycle(long lifeCycle) { this.lifeCycle = lifeCycle; } } class WriteThread implements Runnable{ private Configuration conf; private ThreadInfo info; private int index; public WriteThread(Configuration conf,ThreadInfo info,int index){ this.conf = conf; this.info = info; this.index = index; } @Override public void run(){ String threadName = Thread.currentThread().getName(); int operationCount = 0; HTable[] htables = null; HConnection conn = null; int tableCount = info.getTableCount(); String tableNamePrefix = info.getTableNamePrefix(); String[] colNames = info.getColNames(); String connStrategy = info.getConnStrategy(); String writeStrategy = info.getWriteStrategy(); long lifeCycle = info.getLifeCycle(); System.out.println(threadName+": started with index "+index); try{ if (connStrategy.equals("CREATEWITHCONN")) { conn = HConnectionManager.createConnection(conf); if (writeStrategy.equals("SEPERATE")) { htables = new HTable[1]; htables[0] = new HTable(TableName.valueOf(tableNamePrefix+(index%tableCount)), conn); } else if(writeStrategy.equals("OVERLAP")) { htables = new HTable[tableCount]; for (int i = 0; i < tableCount; i++) { htables[i] = new HTable(TableName.valueOf(tableNamePrefix+i), conn); } } else{ return; } } else if (connStrategy.equals("CREATEWITHCONF")) { conn = null; if (writeStrategy.equals("SEPERATE")) { htables = new HTable[1]; htables[0] = new HTable(conf,TableName.valueOf(tableNamePrefix+(index%tableCount))); } else if(writeStrategy.equals("OVERLAP")) { htables = new HTable[tableCount]; for (int i = 0; i < tableCount; i++) { htables[i] = new HTable(conf,TableName.valueOf(tableNamePrefix+i)); } } else{ return; } } else { return; } long start = System.currentTimeMillis(); long end = System.currentTimeMillis(); Map<HTable,HColumnDescriptor[]> table_columnFamilies = new HashMap<HTable,HColumnDescriptor[]>(); for(int i=0;i<htables.length;i++){ table_columnFamilies.put(htables[i],htables[i].getTableDescriptor().getColumnFamilies()); } while(end-start<=lifeCycle){ HTable table = htables.length==1?htables[0]:htables[(int)Math.random()*htables.length]; long s1 = System.currentTimeMillis(); double r = Math.random(); HColumnDescriptor[] columnFamilies = table_columnFamilies.get(table); Put put = generatePut(threadName,columnFamilies,colNames,operationCount); table.put(put); if(r>0.999){ System.out.println(System.currentTimeMillis()-s1); } operationCount++; end = System.currentTimeMillis(); } if(conn != null){ conn.close(); } }catch(Exception ex){ ex.printStackTrace(); } System.out.println(threadName+": ended with operation count:"+operationCount); } private Put generatePut(String threadName,HColumnDescriptor[] columnFamilies,String[] colNames,int operationCount){ Put put = new Put(Bytes.toBytes(threadName+"_"+operationCount)); for (int i = 0; i < columnFamilies.length; i++) { String familyName = columnFamilies[i].getNameAsString(); //System.out.println("familyName:"+familyName); for(int j=0;j<colNames.length;j++){ if(familyName.equals(colNames[j])) { // String columnName = familyName+(int)(Math.floor(Math.random()*5+10*j)); String val = ""+columnName.hashCode()%100; put.add(Bytes.toBytes(familyName),Bytes.toBytes(columnName),Bytes.toBytes(val)); } } } //System.out.println(put.toString()); return put; } }
簡單來講就是先建立一些有兩列的HBase表,而後建立一些線程分別採用getConnection策略和createConnection策略來寫1分鐘的數據。固然寫幾張表,寫多久,寫什麼,怎麼寫均可以調整。好比我這裏就設計了固定寫一張表或者隨機寫一張表幾種邏輯。須要注意一下紅色部分的代碼,這裏預先得到了要寫的HBase表的列信息。作這個動做的緣由是getTableDescriptor是會產生網絡開銷的,建議寫代碼時儘可能少調用,以避免增長沒必要要的額外開銷(事實上這個額外開銷是很巨大的)。
具體實驗數據以下表所示,具體值由於網絡波動等緣由會有所差別。總的來講,在併發較高(線程數大於30)的時候,getConnection方法速度要明顯快於createConnection;在併發較低的(線程數小於等於10)的時候,createConnection則稍微佔優。另外,使用getConnection的時候,寫一張表的速度在高併發場景下要明顯快於寫多張表,可是在低併發狀況下此現象不明顯;使用createConnection的時候,不管併發高低,寫一張表的速度與寫多張表大體相同,甚至還偏慢。
上述現象與代碼分析的結果並不徹底一致。不一致的地方主要包括以下兩點:
(1)爲何線程少的時候,createConnection佔優?理論上應該持平纔是。這一點沒法獲得很合理的解釋,存疑;
(2)爲何線程不少的時候,createConnection會慢這麼多?這裏猜想服務端的ZK要維護大量鏈接會負載過大,即使是多個regionServer在負責具體的寫操做,也仍舊會致使性能降低。
這兩個疑點還有待進一步論證。儘管如此,仍是能夠先建議你們在使用Java API與HBase交互時,尤爲是處理高併發場景的時候,儘可能使用getConnection的辦法去建立HTable對象,避免維護沒必要要的connection致使浪費資源。
thread_count | table_count | conn_strategy | write_strategy | interval | result |
1 | 1 | CONF | OVERLAP | 60s | 10000*1=10000 |
5 | 1 | CONF | OVERLAP | 60s | 11000*5=55000 |
10 | 1 | CONF | OVERLAP | 60s | 12000*10=120000 |
30 | 1 | CONF | OVERLAP | 60s | 8300*30=249000 |
60 | 1 | CONF | OVERLAP | 60s | 6000*60=360000 |
100 | 1 | CONF | OVERLAP | 60s | 4700*100=470000 |
1 | 1 | CONN | OVERLAP | 60s | 12000*1=12000 |
5 | 1 | CONN | OVERLAP | 60s | 16000*5=80000 |
10 | 1 | CONN | OVERLAP | 60s | 10000*10=100000 |
30 | 1 | CONN | OVERLAP | 60s | 2500*30=75000 |
60 | 1 | CONN | OVERLAP | 60s | 1200*60=72000 |
100 | 1 | CONN | OVERLAP | 60s | 1000*100=100000 |
5 | 5 | CONF | SEPERATE | 60s | 10600*5=53000 |
10 | 10 | CONF | SEPERATE | 60s | 11900*10=119000 |
30 | 30 | CONF | SEPERATE | 60s | 6900*30=207000 |
60 | 60 | CONF | SEPERATE | 60s | 3650*60=219000 |
100 | 100 | CONF | SEPERATE | 60s | 2500*100=250000 |
5 | 5 | CONN | SEPERATE | 60s | 14000*5=70000 |
10 | 10 | CONN | SEPERATE | 60s | 10500*10=105000 |
30 | 30 | CONN | SEPERATE | 60s | 3250*30=97500 |
60 | 60 | CONN | SEPERATE | 60s | 1450*60=87000 |
100 | 100 | CONN | SEPERATE | 60s | 930*100=93000 |