hbase2.0 源碼閱讀:Put/Delete流程

本文主要介紹hbase寫入流程。hbase的寫入主要有put、delete、append 3種操做。java

hbase是一個分佈式文件系統,底層依賴的是hdfs。delete時並非和mysql同樣立馬進行物理刪除,而是追加一個寫入操做,操做類型爲DELETE,和PUT的流程幾乎徹底相同。mysql

先了解hbase寫入的一個大體流程sql

Client API -> RPC -> server IPC -> write WAL -> write memStore -> flush(ifNeed)

- Table類定義了 Hbase Client 擁有的API,HTable是Table的實現類,看代碼可從Htable看起。Client生成一個PUT對象,而後調用HTable的put方法apache

public void put(final Put put) throws IOException {

//請求RPC前進行驗證,驗證是否有須要提交的列,及提交內容是否超過設定大小(默認大小爲10485760(10M))。 驗證失敗拋出異常。 validatePut(put); ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(this.connection, getName(), put.getRow(), this.rpcControllerFactory.newController(), put.getPriority()) { @Override protected Void rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), put); //使用probubuf協議序列化,並提交RPC請求 doMutate(request); return null; } }; rpcCallerFactory.<Void> newCaller(this.writeRpcTimeoutMs).callWithRetries(callable, this.operationTimeoutMs); }  

  客戶端在提交RPC請求以前會進行一次校驗,校驗內容爲1).是否有列, put.isEmpty()。 2). put對象大小是否超過設定值(默認最大值爲10485760(10M),客戶端配置參數爲MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize"),校驗不經過時拋出異常 throw new IllegalArgumentException("No columns to insert" )  或者 throw new IllegalArgumentException("KeyValue size too large")bash

// validate for well-formedness
  public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {

//校驗內容1).是否有提交列 if (put.isEmpty()) { throw new IllegalArgumentException("No columns to insert"); }
//校驗內容2).是否超過客戶端默認配置10485760,超過拋出異常 if (maxKeyValueSize > 0) { for (List<Cell> list : put.getFamilyCellMap().values()) { for (Cell cell : list) { if (KeyValueUtil.length(cell) > maxKeyValueSize) { throw new IllegalArgumentException("KeyValue size too large"); } } } } }

  客戶端的RPC請求,是由 ClientServiceCallable.doMutate 提交,RPC通訊使用的是谷歌的protobuf協議。服務器端IPC實現類爲RSRpcServices類,由mutate方法實現。服務器

在mutate方法中,會將protobuf流反序列化爲PUT、APPEND、INCREMENT、DELETE對象。在進行PUT對象處理時會檢查相關表是否有協處理器,若是沒有即調用HRegion的PUT方法進行處理。 RSRpcServices.mutate部分實現代碼以下:mvc

case PUT:

//反序列化成PUT對象 Put put = ProtobufUtil.toPut(mutation, cellScanner);
//服務器端也有校驗提交內容大小的限制。默認值同客戶端,即10485760(10M) checkCellSizeLimit(region, put); // Throws an exception when violated spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); quota.addMutation(put);
//檢查是否有協處理器 if (request.hasCondition()) { Condition condition = request.getCondition(); byte[] row = condition.getRow().toByteArray(); byte[] family = condition.getFamily().toByteArray(); byte[] qualifier = condition.getQualifier().toByteArray(); CompareOperator compareOp = CompareOperator.valueOf(condition.getCompareType().name()); ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); TimeRange timeRange = condition.hasTimeRange() ? ProtobufUtil.toTimeRange(condition.getTimeRange()) : TimeRange.allTime(); if (region.getCoprocessorHost() != null) { processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier, compareOp, comparator, put); } if (processed == null) { boolean result = region.checkAndMutate(row, family, qualifier, compareOp, comparator, timeRange, put); if (region.getCoprocessorHost() != null) { result = region.getCoprocessorHost().postCheckAndPut(row, family, qualifier, compareOp, comparator, put, result); } processed = result; } } else {
//若是沒有,提交給HRegion.put()方法處理 region.put(put); processed = Boolean.TRUE; } break;

  HRegion在進行put操做前會檢查一次region中的memStorm是否超過上限(checkResources()),若是超過了會進行一次flushapp

void checkResources() throws RegionTooBusyException {
    // If catalog region, do not impose resource constraints or block updates.
    //若是操做的是Meta表,則不處理
    if (this.getRegionInfo().isMetaRegion()) return;
    //當Region的MemSize大於blockingMemStoreSize時,進行一次flush(requestFlush0(FlushLifeCycleTracker.DUMMY);),本次flush是阻塞的,其它寫入請求先暫停
    MemStoreSize mss = this.memStoreSizing.getMemStoreSize();
    //region size = onHead size + offHead size (不是dataSize)
    // blockingMemStoreSize =flushSize(默認值128M) * mult (默認值4)
if (mss.getHeapSize() + mss.getOffHeapSize() > this.blockingMemStoreSize) {
    blockedRequestsCount.increment();
    requestFlush();
    // Don't print current limit because it will vary too much. The message is used as a key
    // over in RetriesExhaustedWithDetailsException processing.
    throw new RegionTooBusyException("Over memstore limit=" +
      org.apache.hadoop.hbase.procedure2.util.StringUtils.humanSize(this.blockingMemStoreSize) +
      ", regionName=" +
        (this.getRegionInfo() == null? "unknown": this.getRegionInfo().getEncodedName()) +
        ", server=" + (this.getRegionServerServices() == null? "unknown":
            this.getRegionServerServices().getServerName()));
  }
}

// blockingMemStoreSize 值初始化
void setHTableSpecificConf() {
  //flushSize能夠在建表時在htableDescriptor制定
  if (this.htableDescriptor == null) return;
  long flushSize = this.htableDescriptor.getMemStoreFlushSize();
  //未指定時讀取配配hbase.hregion.memstore.flush.size.
  // 默認值DEFAULT_MEMSTORE_FLUSH_SIZE = 1024 * 1024 * 128L 即128M
  if (flushSize <= 0) {
    flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
        TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE);
  }
  this.memstoreFlushSize = flushSize;
  //hbase.hregion.memstore.block.multiplier
  // 默認值 DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER = 4
  long mult = conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
      HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
  //若是超過blockingMemStoreSize則進行阻塞處理
  this.blockingMemStoreSize = this.memstoreFlushSize * mult;
}

  檢查完region 的memStore 以後,即進入寫入流程。忽略掉其它一些檢查,進入doMiniBatchMutate方法(核心操做)分佈式

步驟1:  對BatchOperation對象上鎖,返回的是一個表示正在處理中的對象MiniBatchOperationInProgress
步驟2:更新全部操做對象的時間戳,確保是最新的。
步驟3:  初始化或構造 WAL edit對象
步驟4:將WALEdits對象提交併持久化(即寫WAL)
步驟5:寫memStore
步驟6:完成寫入操做 ide

 
 
/**
   * Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[], long, long)}
   * In here we also handle replay of edits on region recover.
   * 在這裏還處理 region恢復時重放寫入操做(WAL操做的回放)
   * @return Change in size brought about by applying <code>batchOp</code>
   * 返回處理的數量
   */
  private void doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException {
    boolean success = false;
    WALEdit walEdit = null;
    WriteEntry writeEntry = null;
    boolean locked = false;
    // We try to set up a batch in the range [batchOp.nextIndexToProcess,lastIndexExclusive)
    MiniBatchOperationInProgress<Mutation> miniBatchOp = null;
    /** Keep track of the locks we hold so we can release them in finally clause */
    List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.size());
    try {
      // STEP 1. Try to acquire as many locks as we can and build mini-batch of operations with
      // locked rows
      // 對BatchOperation對象上鎖,返回的是一個表示正在處理中的對象MiniBatchOperationInProgress
      miniBatchOp = batchOp.lockRowsAndBuildMiniBatch(acquiredRowLocks);

      // We've now grabbed as many mutations off the list as we can
      // Ensure we acquire at least one.
      if (miniBatchOp.getReadyToWriteCount() <= 0) {
        // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
        return;
      }

      lock(this.updatesLock.readLock(), miniBatchOp.getReadyToWriteCount());
      locked = true;

      // STEP 2. Update mini batch of all operations in progress with  LATEST_TIMESTAMP timestamp
      // We should record the timestamp only after we have acquired the rowLock,
      // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
      // 步驟2:更新全部操做對象的時間戳,確保是最新的。
      long now = EnvironmentEdgeManager.currentTime();
      batchOp.prepareMiniBatchOperations(miniBatchOp, now, acquiredRowLocks);

      // STEP 3. Build WAL edit
      // 步驟3:初始化或構造 WAL edit對象
      List<Pair<NonceKey, WALEdit>> walEdits = batchOp.buildWALEdits(miniBatchOp);

      // STEP 4. Append the WALEdits to WAL and sync.
      //步驟4: 將WALEdits對象提交併持久化(即寫WAL)
      for(Iterator<Pair<NonceKey, WALEdit>> it = walEdits.iterator(); it.hasNext();) {
        Pair<NonceKey, WALEdit> nonceKeyWALEditPair = it.next();
        walEdit = nonceKeyWALEditPair.getSecond();
        NonceKey nonceKey = nonceKeyWALEditPair.getFirst();

        if (walEdit != null && !walEdit.isEmpty()) {
          writeEntry = doWALAppend(walEdit, batchOp.durability, batchOp.getClusterIds(), now,
              nonceKey.getNonceGroup(), nonceKey.getNonce(), batchOp.getOrigLogSeqNum());
        }

        // Complete mvcc for all but last writeEntry (for replay case)
        if (it.hasNext() && writeEntry != null) {
          mvcc.complete(writeEntry);
          writeEntry = null;
        }
      }

      // STEP 5. Write back to memStore
      // 步驟5: 寫memStore
      // NOTE: writeEntry can be null here
      writeEntry = batchOp.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry);

      // STEP 6. Complete MiniBatchOperations: If required calls postBatchMutate() CP hook and
      // complete mvcc for last writeEntry
      batchOp.completeMiniBatchOperations(miniBatchOp, writeEntry);
      writeEntry = null;
      success = true;
    } finally {
      // Call complete rather than completeAndWait because we probably had error if walKey != null
      if (writeEntry != null) mvcc.complete(writeEntry);

      if (locked) {
        this.updatesLock.readLock().unlock();
      }
      releaseRowLocks(acquiredRowLocks);

      final int finalLastIndexExclusive =
          miniBatchOp != null ? miniBatchOp.getLastIndexExclusive() : batchOp.size();
      final boolean finalSuccess = success;
      batchOp.visitBatchOperations(true, finalLastIndexExclusive, (int i) -> {
        batchOp.retCodeDetails[i] =
            finalSuccess ? OperationStatus.SUCCESS : OperationStatus.FAILURE;
        return true;
      });

      batchOp.doPostOpCleanupForMiniBatch(miniBatchOp, walEdit, finalSuccess);

      batchOp.nextIndexToProcess = finalLastIndexExclusive;
    }
  }

 

 

至此HBASE的put, delete流程即算完畢。

相關文章
相關標籤/搜索