hbase的源碼終於搞一個段落了,在接下來的一個月,着重於把看過的源碼提煉一下,對一些有意思的主題進行分享一下。繼上一篇講了負載均衡以後,這一篇咱們從client開始講吧,從client到master再到region server,按照這個順序來開展,網友也能夠對本身感興趣的部分給我留言或者直接聯繫個人QQ。html
如今咱們講一下HTable吧,爲何講HTable,由於這是咱們最多見的一個類,這是咱們對hbase中數據的操做的入口。正則表達式
下面是一個很簡單往hbase插入一條記錄的例子。緩存
HBaseConfiguration conf = (HBaseConfiguration) HBaseConfiguration.create(); byte[] rowkey = Bytes.toBytes("cenyuhai"); byte[] family = Bytes.toBytes("f"); byte[] qualifier = Bytes.toBytes("name"); byte[] value = Bytes.toBytes("岑玉海"); HTable table = new HTable(conf, "test"); Put put = new Put(rowkey); put.add(family,qualifier,value); table.put(put);
咱們日常就是採用這種方式提交的數據,爲了提升重用性採用HTablePool,最新的API推薦使用HConnection.getTable("test")來得到HTable,舊的HTablePool已經被拋棄了。好,咱們下面開始看看HTable內部是如何實現的吧,首先咱們看看它內部有什麼屬性。多線程
/** 實際提交數據所用的類 */
protected HConnection connection;/** 須要提交的數據的列表 */ protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
/** flush的size */ private long writeBufferSize; /** 是否自動flush */ private boolean autoFlush; /** 當前的數據的size,達到指定的size就要提交 */ protected long currentWriteBufferSize; protected int scannerCaching; private int maxKeyValueSize; private ExecutorService pool; // For Multi
/** 異步提交 */ protected AsyncProcess<Object> ap;
** rpc工廠 */ private RpcRetryingCallerFactory rpcCallerFactory;
主要是靠上面的這些傢伙來幹活的,這裏面的connection、ap、rpcCallerFactory是用來和後臺通訊的,HTable只是作一個操做,數據進來以後,添加到writeAsyncBuffer,知足條件就flush。負載均衡
下面看看table.put是怎麼執行的:異步
doPut(put); if (autoFlush) { flushCommits(); }
執行put操做,若是是autoFush,就提交,先看doPut的過程,若是以前的ap異步提交到有問題,就先進行後臺提交,不過此次是同步的,若是沒有錯誤,就把put添加到隊列當中,而後檢查一下當前的 buffer的大小,超過咱們設置的內容的時候,就flush掉。async
if (ap.hasError()){ backgroundFlushCommits(true); } currentWriteBufferSize += put.heapSize(); writeAsyncBuffer.add(put); while (currentWriteBufferSize > writeBufferSize) { backgroundFlushCommits(false); }
寫下來,讓咱們看看backgroundFlushCommits這個方法吧,它的核心就這麼一句ap.submit(writeAsyncBuffer, true) ,若是出錯了的話,就報錯了。因此網上全部關於客戶端調優的方法裏面無非就這麼幾種:ide
1)關閉autoFlush優化
2)關閉wal日誌ui
3)把writeBufferSize設大一點,通常說是設置成5MB
通過實踐,就第二條關閉日誌的效果比較明顯,其它的效果都不明顯,由於提交的過程是異步的,因此提交的時候佔用的時間並很少,提交到server端後,server還有一個寫入的隊列,(⊙o⊙)… 讓人想起小米手機那噁心的排隊了。。。因此大規模寫入數據,別期望着用put來解決。。。mapreduce生成hfile,而後用bulk load的方式比較好。
不廢話了,咱們繼續追蹤ap.submit方法吧,F3進去。
int posInList = -1; Iterator<? extends Row> it = rows.iterator(); while (it.hasNext()) { Row r = it.next(); //爲row定位 HRegionLocation loc = findDestLocation(r, 1, posInList); if (loc != null && canTakeOperation(loc, regionIncluded, serverIncluded)) { // loc is null if there is an error such as meta not available. Action<Row> action = new Action<Row>(r, ++posInList); retainedActions.add(action); addAction(loc, action, actionsByServer); it.remove(); } }
循環遍歷r,爲每一個r找到它的位置loc,loc是HRegionLocation,裏面記錄着這行記錄所在的目標region所在的位置,loc怎麼得到呢,走進findDestLocation方法裏面,看到了這麼一句。
loc = hConnection.locateRegion(this.tableName, row.getRow());
經過表名和rowkey,使用HConnection就能夠定位到它的位置,這裏就先不講定位了,稍後放一節出來說,請看這一篇《Client如何找到正確的Region Server》,不然篇幅太長了,這裏咱們只須要記住,提交操做,是要知道它對應的region在哪裏的。
定位到它的位置以後,它把loc添加到了actionsByServer,一個region server對應一組操做。(插句題外話爲何這裏叫action呢,其實咱們熟知的Put、Delete,以及不經常使用的Append、Increment都是繼承自Row的,在接口傳遞時候,其實都是視爲一種操做,到了後臺以後,才作區分)。
接下來,就是多線程的rpc提交了。
MultiServerCallable<Row> callable = createCallable(loc, multiAction);
......
res = createCaller(callable).callWithoutRetries(callable);
再深挖一點,把它們的實現都扒出來吧。
protected MultiServerCallable<Row> createCallable(final HRegionLocation location, final MultiAction<Row> multi) { return new MultiServerCallable<Row>(hConnection, tableName, location, multi); } protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) { return rpcCallerFactory.<MultiResponse> newCaller(); }
ok,看到了,先構造一個MultiServerCallable,而後再經過rpcCallerFactory作最後的call操做。
好了,到這裏再總結一下put操做吧,前面寫得有點兒凌亂了。
(1)把put操做添加到writeAsyncBuffer隊列裏面,符合條件(自動flush或者超過了閥值writeBufferSize)就經過AsyncProcess異步批量提交。
(2)在提交以前,咱們要根據每一個rowkey找到它們歸屬的region server,這個定位的過程是經過HConnection的locateRegion方法得到的,而後再把這些rowkey按照HRegionLocation分組。
(3)經過多線程,一個HRegionLocation構造MultiServerCallable<Row>,而後經過rpcCallerFactory.<MultiResponse> newCaller()執行調用,忽略掉失敗從新提交和錯誤處理,客戶端的提交操做到此結束。
對於Delete,咱們也能夠經過如下代碼執行一個delete操做
Delete del = new Delete(rowkey); table.delete(del);
這個操做比較乾脆,new一個RegionServerCallable<Boolean>,直接走rpc了,爽快啊。
RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection, tableName, delete.getRow()) { public Boolean call() throws IOException { try { MutateRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), delete); MutateResponse response = getStub().mutate(null, request); return Boolean.valueOf(response.getProcessed()); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } } }; rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
這裏面注意一下這行MutateResponse response = getStub().mutate(null, request);
getStub()返回的是一個ClientService.BlockingInterface接口,實現這個接口的類是HRegionServer,這樣子咱們就知道它在服務端執行了HRegionServer裏面的mutate方法。
get操做也和delete同樣簡單
Get get = new Get(rowkey); Result row = table.get(get);
get操做也沒幾行代碼,仍是直接走的rpc
public Result get(final Get get) throws IOException { RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection, getName(), get.getRow()) { public Result call() throws IOException { return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get); } }; return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout); }
注意裏面的ProtobufUtil.get操做,它實際上是構建了一個GetRequest,須要的參數是regionName和get,而後走HRegionServer的get方法,返回一個GetResponse
public static Result get(final ClientService.BlockingInterface client, final byte[] regionName, final Get get) throws IOException { GetRequest request = RequestConverter.buildGetRequest(regionName, get); try { GetResponse response = client.get(null, request); if (response == null) return null; return toResult(response.getResult()); } catch (ServiceException se) { throw getRemoteException(se); } }
針對put、delete、get都有相應的操做的方式:
1.Put(list)操做,不少童鞋覺得這個能夠提升寫入速度,其實無效。。。爲啥?由於你構造了一個list進去,它再遍歷一下list,執行doPut操做。。。。反而還慢點。
2.delete和get的批量操做走的都是connection.processBatchCallback(actions, tableName, pool, results, callback),具體的實如今HConnectionManager的靜態類HConnectionImplementation裏面,結果咱們驚人的發現:
AsyncProcess<?> asyncProcess = createAsyncProcess(tableName, pool, cb, conf); asyncProcess.submitAll(list); asyncProcess.waitUntilDone();
它走的仍是put同樣的操做,既然是同樣的,何苦代碼寫得那麼繞呢?
如今講一下scan吧,這個操做相對複雜點。仍是老規矩,先上一下代碼吧。
Scan scan = new Scan(); //scan.setTimeRange(new Date("20140101").getTime(), new Date("20140429").getTime()); scan.setBatch(10); scan.setCaching(10); scan.setStartRow(Bytes.toBytes("cenyuhai-00000-20140101")); scan.setStopRow(Bytes.toBytes("cenyuhai-zzzzz-201400429")); //若是設置爲READ_COMMITTED,它會取當前的時間做爲讀的檢查點,在這個時間點以後的就排除掉了 scan.setIsolationLevel(IsolationLevel.READ_COMMITTED); RowFilter rowFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("pattern")); ResultScanner resultScanner = table.getScanner(scan); Result result = null; while ((result = resultScanner.next()) != null) { //本身處理去吧... }
這個是帶正則表達式的模糊查詢的scan查詢,Scan這個類是包括咱們查詢全部須要的參數,batch和caching的設置,在個人另一篇文章裏面有寫《hbase客戶端設置緩存優化查詢》。
Scan查詢的時候,設置StartRow和StopRow但是重頭戲,假設我這裏要查我01月01日到04月29日總共發了多少業務,中間是業務類型,可是我多是全部的都查,或者只查一部分,在全部都查的狀況下,我就不能設置了,那可是StartRow和StopRow我不能空着啊,因此這裏能夠填00000-zzzzz,只要保證它在這個區間就能夠了,而後咱們加了一個RowFilter,而後引入了正則表達式,以前好多人一直在問啊問的,不過我這個例子,其實不要也能夠,由於是查全部業務的,在StartRow和StopRow之間的均可以要。
好的,咱們接着看,F3進入getScanner方法
if (scan.isSmall()) { return new ClientSmallScanner(getConfiguration(), scan, getName(), this.connection); } return new ClientScanner(getConfiguration(), scan, getName(), this.connection);
這個scan還分大小, 不要緊,咱們進入ClientScanner看一下吧, 在ClientScanner的構造方法裏面發現它會去調用nextScanner去初始化一個ScannerCallable。好的,咱們接着來到ScannerCallable裏面,這裏須要注意的是它的兩個方法,prepare和call方法。在prepare裏面它主要乾了兩個事情,得到region的HRegionLocation和ClientService.BlockingInterface接口的實例,以前說過這個繼承這個接口的只有Region Server的實現類。
public void prepare(final boolean reload) throws IOException { this.location = connection.getRegionLocation(tableName, row, reload); //HConnection.getClient()這個方法簡直就是神器啊 setStub(getConnection().getClient(getLocation().getServerName())); }
ok,咱們下面看看call方法吧
public Result [] call() throws IOException { // 第一次走的地方,開啓scanner if (scannerId == -1L) { this.scannerId = openScanner(); } else { Result [] rrs = null; ScanRequest request = null; try { request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq); ScanResponse response = null;
// 準備用controller去攜帶返回的數據,這樣的話就不用進行protobuf的序列化了
PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
controller.setPriority(getTableName()); response = getStub().scan(controller, request); nextCallSeq++; long timestamp = System.currentTimeMillis(); // Results are returned via controller CellScanner cellScanner = controller.cellScanner(); rrs = ResponseConverter.getResults(cellScanner, response); } catch (IOException e) {
}
} return rrs; } return null; }
在call方法裏面,咱們能夠看得出來,實例化ScanRequest,而後調用scan方法的時候把PayloadCarryingRpcController傳過去,這裏跟蹤了一下,若是設置了codec的就從PayloadCarryingRpcController裏面返回結果,不然從response裏面返回。
好的,下面看next方法吧。
@Override public Result next() throws IOException { if (cache.size() == 0) { Result [] values = null; long remainingResultSize = maxScannerResultSize; int countdown = this.caching;
// 設置獲取數據的條數
callable.setCaching(this.caching); boolean skipFirst = false; boolean retryAfterOutOfOrderException = true; do { if (skipFirst) { // 上次讀的最後一個,此次就不讀了,直接跳過就是了 callable.setCaching(1); values = this.caller.callWithRetries(callable); callable.setCaching(this.caching); skipFirst = false; }
values = this.caller.callWithRetries(callable); if (values != null && values.length > 0) { for (Result rs : values) { //緩存起來 cache.add(rs); for (Cell kv : rs.rawCells()) {//計算出keyvalue的大小,而後減去 remainingResultSize -= KeyValueUtil.ensureKeyValue(kv).heapSize(); } countdown--; this.lastResult = rs; } } // Values == null means server-side filter has determined we must STOP } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
//緩存裏面有就從緩存裏面取
if (cache.size() > 0) { return cache.poll(); } return null; }
從next方法裏面能夠看出來,它是一次取caching條數據,而後下一次獲取的時候,先把上次獲取的最後一個給排除掉,再獲取下來保存在cache當中,只要緩存不空,就一直在緩存裏面取。
好了,至此Scan到此結束。