hbase源碼系列(二)HTable 探祕

  hbase的源碼終於搞一個段落了,在接下來的一個月,着重於把看過的源碼提煉一下,對一些有意思的主題進行分享一下。繼上一篇講了負載均衡以後,這一篇咱們從client開始講吧,從client到master再到region server,按照這個順序來開展,網友也能夠對本身感興趣的部分給我留言或者直接聯繫個人QQ。html

  如今咱們講一下HTable吧,爲何講HTable,由於這是咱們最多見的一個類,這是咱們對hbase中數據的操做的入口。正則表達式

  

1.Put操做

  下面是一個很簡單往hbase插入一條記錄的例子。緩存

HBaseConfiguration conf =  (HBaseConfiguration) HBaseConfiguration.create();
byte[] rowkey = Bytes.toBytes("cenyuhai");
byte[] family = Bytes.toBytes("f");
byte[] qualifier = Bytes.toBytes("name");
byte[] value = Bytes.toBytes("岑玉海");
        
HTable table = new HTable(conf, "test");
Put put = new Put(rowkey);
put.add(family,qualifier,value);
        
table.put(put);
View Code

  咱們日常就是採用這種方式提交的數據,爲了提升重用性採用HTablePool,最新的API推薦使用HConnection.getTable("test")來得到HTable,舊的HTablePool已經被拋棄了。好,咱們下面開始看看HTable內部是如何實現的吧,首先咱們看看它內部有什麼屬性。多線程

  /** 實際提交數據所用的類 */
protected
HConnection connection;/** 須要提交的數據的列表 */ protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
/** flush的size */
private long writeBufferSize; /** 是否自動flush */ private boolean autoFlush; /** 當前的數據的size,達到指定的size就要提交 */ protected long currentWriteBufferSize; protected int scannerCaching; private int maxKeyValueSize; private ExecutorService pool; // For Multi
/** 異步提交 */ protected AsyncProcess<Object> ap;
** rpc工廠 */
private RpcRetryingCallerFactory rpcCallerFactory;

  主要是靠上面的這些傢伙來幹活的,這裏面的connection、ap、rpcCallerFactory是用來和後臺通訊的,HTable只是作一個操做,數據進來以後,添加到writeAsyncBuffer,知足條件就flush。負載均衡

  下面看看table.put是怎麼執行的:異步

    doPut(put);
    if (autoFlush) {
      flushCommits();
    }

  執行put操做,若是是autoFush,就提交,先看doPut的過程,若是以前的ap異步提交到有問題,就先進行後臺提交,不過此次是同步的,若是沒有錯誤,就把put添加到隊列當中,而後檢查一下當前的 buffer的大小,超過咱們設置的內容的時候,就flush掉。async

if (ap.hasError()){
      backgroundFlushCommits(true);
}
currentWriteBufferSize += put.heapSize();
writeAsyncBuffer.add(put);
while (currentWriteBufferSize > writeBufferSize) {
    backgroundFlushCommits(false);
}

  寫下來,讓咱們看看backgroundFlushCommits這個方法吧,它的核心就這麼一句ap.submit(writeAsyncBuffer, true) ,若是出錯了的話,就報錯了。因此網上全部關於客戶端調優的方法裏面無非就這麼幾種:ide

1)關閉autoFlush優化

2)關閉wal日誌ui

3)把writeBufferSize設大一點,通常說是設置成5MB

  通過實踐,就第二條關閉日誌的效果比較明顯,其它的效果都不明顯,由於提交的過程是異步的,因此提交的時候佔用的時間並很少,提交到server端後,server還有一個寫入的隊列,(⊙o⊙)… 讓人想起小米手機那噁心的排隊了。。。因此大規模寫入數據,別期望着用put來解決。。。mapreduce生成hfile,而後用bulk load的方式比較好。

  不廢話了,咱們繼續追蹤ap.submit方法吧,F3進去。

  

      int posInList = -1;
      Iterator<? extends Row> it = rows.iterator();
      while (it.hasNext()) {
        Row r = it.next();
        //爲row定位
        HRegionLocation loc = findDestLocation(r, 1, posInList);

        if (loc != null && canTakeOperation(loc, regionIncluded, serverIncluded)) {
          // loc is null if there is an error such as meta not available.
          Action<Row> action = new Action<Row>(r, ++posInList);
          retainedActions.add(action);
          addAction(loc, action, actionsByServer);
          it.remove();
        }
      }
View Code

  循環遍歷r,爲每一個r找到它的位置loc,loc是HRegionLocation,裏面記錄着這行記錄所在的目標region所在的位置,loc怎麼得到呢,走進findDestLocation方法裏面,看到了這麼一句。

  

loc = hConnection.locateRegion(this.tableName, row.getRow());

  經過表名和rowkey,使用HConnection就能夠定位到它的位置,這裏就先不講定位了,稍後放一節出來說,請看這一篇《Client如何找到正確的Region Server》,不然篇幅太長了,這裏咱們只須要記住,提交操做,是要知道它對應的region在哪裏的。

  定位到它的位置以後,它把loc添加到了actionsByServer,一個region server對應一組操做。(插句題外話爲何這裏叫action呢,其實咱們熟知的Put、Delete,以及不經常使用的Append、Increment都是繼承自Row的,在接口傳遞時候,其實都是視爲一種操做,到了後臺以後,才作區分)。

  接下來,就是多線程的rpc提交了。

MultiServerCallable<Row> callable = createCallable(loc, multiAction);
......
res = createCaller(callable).callWithoutRetries(callable);

  再深挖一點,把它們的實現都扒出來吧。

  protected MultiServerCallable<Row> createCallable(final HRegionLocation location,
      final MultiAction<Row> multi) {
    return new MultiServerCallable<Row>(hConnection, tableName, location, multi);
  }

  protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
    return rpcCallerFactory.<MultiResponse> newCaller();
  }

  ok,看到了,先構造一個MultiServerCallable,而後再經過rpcCallerFactory作最後的call操做。

  好了,到這裏再總結一下put操做吧,前面寫得有點兒凌亂了。

  (1)把put操做添加到writeAsyncBuffer隊列裏面,符合條件(自動flush或者超過了閥值writeBufferSize)就經過AsyncProcess異步批量提交。

  (2)在提交以前,咱們要根據每一個rowkey找到它們歸屬的region server,這個定位的過程是經過HConnection的locateRegion方法得到的,而後再把這些rowkey按照HRegionLocation分組。

  (3)經過多線程,一個HRegionLocation構造MultiServerCallable<Row>,而後經過rpcCallerFactory.<MultiResponse> newCaller()執行調用,忽略掉失敗從新提交和錯誤處理,客戶端的提交操做到此結束。

  

2.Delete操做

  對於Delete,咱們也能夠經過如下代碼執行一個delete操做

Delete del = new Delete(rowkey);
table.delete(del);

  這個操做比較乾脆,new一個RegionServerCallable<Boolean>,直接走rpc了,爽快啊。

RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
        tableName, delete.getRow()) {
      public Boolean call() throws IOException {
        try {
          MutateRequest request = RequestConverter.buildMutateRequest(
            getLocation().getRegionInfo().getRegionName(), delete);
          MutateResponse response = getStub().mutate(null, request);
          return Boolean.valueOf(response.getProcessed());
        } catch (ServiceException se) {
          throw ProtobufUtil.getRemoteException(se);
        }
      }
    };
rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
View Code

  這裏面注意一下這行MutateResponse response = getStub().mutate(null, request);

  getStub()返回的是一個ClientService.BlockingInterface接口,實現這個接口的類是HRegionServer,這樣子咱們就知道它在服務端執行了HRegionServer裏面的mutate方法。

3.Get操做

  get操做也和delete同樣簡單

  

Get get = new Get(rowkey);
Result row = table.get(get);

  get操做也沒幾行代碼,仍是直接走的rpc

public Result get(final Get get) throws IOException {
    RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
        getName(), get.getRow()) {
      public Result call() throws IOException {
        return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get);
      }
    };
    return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
}
View Code

  注意裏面的ProtobufUtil.get操做,它實際上是構建了一個GetRequest,須要的參數是regionName和get,而後走HRegionServer的get方法,返回一個GetResponse

public static Result get(final ClientService.BlockingInterface client,
      final byte[] regionName, final Get get) throws IOException {
    GetRequest request =
      RequestConverter.buildGetRequest(regionName, get);
    try {
      GetResponse response = client.get(null, request);
      if (response == null) return null;
      return toResult(response.getResult());
    } catch (ServiceException se) {
      throw getRemoteException(se);
    }
}
View Code

 

 4.批量操做

  

  針對put、delete、get都有相應的操做的方式:

  1.Put(list)操做,不少童鞋覺得這個能夠提升寫入速度,其實無效。。。爲啥?由於你構造了一個list進去,它再遍歷一下list,執行doPut操做。。。。反而還慢點。

  2.delete和get的批量操做走的都是connection.processBatchCallback(actions, tableName, pool, results, callback),具體的實如今HConnectionManager的靜態類HConnectionImplementation裏面,結果咱們驚人的發現:

AsyncProcess<?> asyncProcess = createAsyncProcess(tableName, pool, cb, conf);
asyncProcess.submitAll(list);
asyncProcess.waitUntilDone();

  它走的仍是put同樣的操做,既然是同樣的,何苦代碼寫得那麼繞呢?

5.查詢操做

  如今講一下scan吧,這個操做相對複雜點。仍是老規矩,先上一下代碼吧。

        Scan scan = new Scan();
        //scan.setTimeRange(new Date("20140101").getTime(), new Date("20140429").getTime());
        scan.setBatch(10);
        scan.setCaching(10);
        scan.setStartRow(Bytes.toBytes("cenyuhai-00000-20140101"));
        scan.setStopRow(Bytes.toBytes("cenyuhai-zzzzz-201400429"));
        //若是設置爲READ_COMMITTED,它會取當前的時間做爲讀的檢查點,在這個時間點以後的就排除掉了
        scan.setIsolationLevel(IsolationLevel.READ_COMMITTED);
        RowFilter rowFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("pattern"));
        ResultScanner resultScanner = table.getScanner(scan);
        Result result = null;
        while ((result = resultScanner.next()) != null) {
            //本身處理去吧...
        }

 

  這個是帶正則表達式的模糊查詢的scan查詢,Scan這個類是包括咱們查詢全部須要的參數,batch和caching的設置,在個人另一篇文章裏面有寫《hbase客戶端設置緩存優化查詢》

Scan查詢的時候,設置StartRow和StopRow但是重頭戲,假設我這裏要查我01月01日到04月29日總共發了多少業務,中間是業務類型,可是我多是全部的都查,或者只查一部分,在全部都查的狀況下,我就不能設置了,那可是StartRow和StopRow我不能空着啊,因此這裏能夠填00000-zzzzz,只要保證它在這個區間就能夠了,而後咱們加了一個RowFilter,而後引入了正則表達式,以前好多人一直在問啊問的,不過我這個例子,其實不要也能夠,由於是查全部業務的,在StartRow和StopRow之間的均可以要。

  好的,咱們接着看,F3進入getScanner方法

if (scan.isSmall()) {
      return new ClientSmallScanner(getConfiguration(), scan, getName(), this.connection);
}
return new ClientScanner(getConfiguration(), scan, getName(), this.connection);

  這個scan還分大小, 不要緊,咱們進入ClientScanner看一下吧, 在ClientScanner的構造方法裏面發現它會去調用nextScanner去初始化一個ScannerCallable。好的,咱們接着來到ScannerCallable裏面,這裏須要注意的是它的兩個方法,prepare和call方法。在prepare裏面它主要乾了兩個事情,得到region的HRegionLocation和ClientService.BlockingInterface接口的實例,以前說過這個繼承這個接口的只有Region Server的實現類。

  public void prepare(final boolean reload) throws IOException {
    this.location = connection.getRegionLocation(tableName, row, reload);    //HConnection.getClient()這個方法簡直就是神器啊
    setStub(getConnection().getClient(getLocation().getServerName()));
  }

  ok,咱們下面看看call方法吧

  public Result [] call() throws IOException {
     // 第一次走的地方,開啓scanner
      if (scannerId == -1L) {
        this.scannerId = openScanner();
      } else {
        Result [] rrs = null;
        ScanRequest request = null;
        try {
          request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
          ScanResponse response = null;       
      // 準備用controller去攜帶返回的數據,這樣的話就不用進行protobuf的序列化了
      PayloadCarryingRpcController controller
= new PayloadCarryingRpcController();
      controller.setPriority(getTableName()); response
= getStub().scan(controller, request); nextCallSeq++; long timestamp = System.currentTimeMillis(); // Results are returned via controller CellScanner cellScanner = controller.cellScanner(); rrs = ResponseConverter.getResults(cellScanner, response);     } catch (IOException e) {              
}     
    }
    return rrs; } return null; }

 

   在call方法裏面,咱們能夠看得出來,實例化ScanRequest,而後調用scan方法的時候把PayloadCarryingRpcController傳過去,這裏跟蹤了一下,若是設置了codec的就從PayloadCarryingRpcController裏面返回結果,不然從response裏面返回。

  好的,下面看next方法吧。

    @Override
    public Result next() throws IOException { if (cache.size() == 0) {
        Result [] values = null;
        long remainingResultSize = maxScannerResultSize;
        int countdown = this.caching;      
     // 設置獲取數據的條數
     callable.setCaching(this.caching); boolean skipFirst = false; boolean retryAfterOutOfOrderException = true; do {       if (skipFirst) {          // 上次讀的最後一個,此次就不讀了,直接跳過就是了 callable.setCaching(1); values = this.caller.callWithRetries(callable); callable.setCaching(this.caching); skipFirst = false; }
       values = this.caller.callWithRetries(callable);    if (values != null && values.length > 0) { for (Result rs : values) {          //緩存起來 cache.add(rs); for (Cell kv : rs.rawCells()) {//計算出keyvalue的大小,而後減去 remainingResultSize -= KeyValueUtil.ensureKeyValue(kv).heapSize(); } countdown--; this.lastResult = rs; } } // Values == null means server-side filter has determined we must STOP } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));  
     //緩存裏面有就從緩存裏面取
     if (cache.size() > 0) { return cache.poll(); }      return null; }

  從next方法裏面能夠看出來,它是一次取caching條數據,而後下一次獲取的時候,先把上次獲取的最後一個給排除掉,再獲取下來保存在cache當中,只要緩存不空,就一直在緩存裏面取。

  好了,至此Scan到此結束。

相關文章
相關標籤/搜索