Hbase mvcc

region hlog回放時mvcc的應用

region open的時候會根據各個HStore下的全部HFile文件記錄的maxMemstoreTS,找到最大的maxSeqId,而後根據各個HStore記錄的maxSeqId回放HLog日誌(從maxMemstoreTS+1開始)。mvc

數據寫入過程當中的版本管理

HRegion.doMiniBatchMutateapp

// STEP 1. Try to acquire as many locks as we can and build mini-batch of operations with
      // locked rows
      miniBatchOp = batchOp.lockRowsAndBuildMiniBatch(acquiredRowLocks);

      // We've now grabbed as many mutations off the list as we can
      // Ensure we acquire at least one.
      if (miniBatchOp.getReadyToWriteCount() <= 0) {
        // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
        return;
      }

      lock(this.updatesLock.readLock(), miniBatchOp.getReadyToWriteCount());
      locked = true;

      // STEP 2. Update mini batch of all operations in progress with  LATEST_TIMESTAMP timestamp
      // We should record the timestamp only after we have acquired the rowLock,
      // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
      long now = EnvironmentEdgeManager.currentTime();
      batchOp.prepareMiniBatchOperations(miniBatchOp, now, acquiredRowLocks);

      // STEP 3. Build WAL edit
      List<Pair<NonceKey, WALEdit>> walEdits = batchOp.buildWALEdits(miniBatchOp);

      // STEP 4. Append the WALEdits to WAL and sync.
      for(Iterator<Pair<NonceKey, WALEdit>> it = walEdits.iterator(); it.hasNext();) {
        Pair<NonceKey, WALEdit> nonceKeyWALEditPair = it.next();
        walEdit = nonceKeyWALEditPair.getSecond();
        NonceKey nonceKey = nonceKeyWALEditPair.getFirst();

        if (walEdit != null && !walEdit.isEmpty()) {
          writeEntry = doWALAppend(walEdit, batchOp.durability, batchOp.getClusterIds(), now,
              nonceKey.getNonceGroup(), nonceKey.getNonce(), batchOp.getOrigLogSeqNum());
        }

        // Complete mvcc for all but last writeEntry (for replay case)
        if (it.hasNext() && writeEntry != null) {
          mvcc.complete(writeEntry);
          writeEntry = null;
        }
      }

      // STEP 5. Write back to memStore
      // NOTE: writeEntry can be null here
      writeEntry = batchOp.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry);

      // STEP 6. Complete MiniBatchOperations: If required calls postBatchMutate() CP hook and
      // complete mvcc for last writeEntry
      batchOp.completeMiniBatchOperations(miniBatchOp, writeEntry);
      writeEntry = null;
      success = true;

HRegion.doWALAppendide

// Using default cluster id, as this can only happen in the originating cluster.
    // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey
    // here instead of WALKeyImpl directly to support legacy coprocessors.
    WALKeyImpl walKey = walEdit.isReplay()?
        new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
          this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now, clusterIds,
            nonceGroup, nonce, mvcc) :
        new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
            this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now, clusterIds,
            nonceGroup, nonce, mvcc, this.getReplicationScope());
    if (walEdit.isReplay()) {
      walKey.setOrigLogSeqNum(origLogSeqNum);
    }
    WriteEntry writeEntry = null;
    try {
      long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
      // Call sync on our edit.
      if (txid != 0) {
        sync(txid, durability);
      }
      writeEntry = walKey.getWriteEntry();
    } catch (IOException ioe) {
      if (walKey != null && walKey.getWriteEntry() != null) {
        mvcc.complete(walKey.getWriteEntry());
      }
      throw ioe;
    }
    return writeEntry;

HSHLog:oop

@Override
  public long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
      final boolean inMemstore) throws IOException {
    return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
      disruptor.getRingBuffer());
  }

AbstractFSWALpost

protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
      WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
      throws IOException {
    if (this.closed) {
      throw new IOException(
          "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString());
    }
    MutableLong txidHolder = new MutableLong();
    MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> {
      txidHolder.setValue(ringBuffer.next());
    });
    long txid = txidHolder.longValue();
    ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
      .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
    try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
      FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
      entry.stampRegionSequenceId(we);
      ringBuffer.get(txid).load(entry);
    } finally {
      ringBuffer.publish(txid);
    }
    return txid;
  }

關鍵點:ui

  • mvcc的begin方法把writePoint自增,並以自增後的writePoint生成一個寫條目放入到寫隊列writeQueue
  • 新建FSWALEntrywal日誌條目並publish到Disruptor隊列中
  • 設置本次寫入相關的Cell的sequenceId爲自增後的writePoint
  • Disruptorsequence id做爲本次事物IDtxid
  • 而後發佈一個SyncFutureDisruptor隊列,等待寫wal日誌完成

FSWALEntrythis

long stampRegionSequenceId(MultiVersionConcurrencyControl.WriteEntry we) throws IOException {
    long regionSequenceId = we.getWriteNumber();
    if (!this.getEdit().isReplay() && inMemstore) {
      for (Cell c : getEdit().getCells()) {
        PrivateCellUtil.setSequenceId(c, regionSequenceId);
      }
    }

    getKey().setWriteEntry(we);
    return regionSequenceId;
  }

MultiVersionConcurrencyControl版本控制

final AtomicLong readPoint = new AtomicLong(0); // 能夠用來和Cell sequenceId作對比,判斷該Cell是否可見
  final AtomicLong writePoint = new AtomicLong(0); // 每一個事物寫自增1
  private final Object readWaiters = new Object();

  public WriteEntry begin(Runnable action) {
    synchronized (writeQueue) {
      long nextWriteNumber = writePoint.incrementAndGet();
      WriteEntry e = new WriteEntry(nextWriteNumber);
      writeQueue.add(e);
      action.run();
      return e;
    }
  }

  public void completeAndWait(WriteEntry e) {
    if (!complete(e)) {
      waitForRead(e);
    }
  }

  public boolean complete(WriteEntry writeEntry) {
    synchronized (writeQueue) {
      writeEntry.markCompleted();
      long nextReadValue = NONE;
      boolean ranOnce = false;
      while (!writeQueue.isEmpty()) {
        ranOnce = true;
        WriteEntry queueFirst = writeQueue.getFirst();

        if (nextReadValue > 0) {
          if (nextReadValue + 1 != queueFirst.getWriteNumber()) {
            throw new RuntimeException("Invariant in complete violated, nextReadValue="
                + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber());
          }
        }

        if (queueFirst.isCompleted()) {
          nextReadValue = queueFirst.getWriteNumber();
          writeQueue.removeFirst();
        } else {
          break;
        }
      }

      if (!ranOnce) {
        throw new RuntimeException("There is no first!");
      }

      if (nextReadValue > 0) {
        synchronized (readWaiters) {
          readPoint.set(nextReadValue);
          readWaiters.notifyAll();
        }
      }
      return readPoint.get() >= writeEntry.getWriteNumber();
    }
  }


  /**
   * Wait for the global readPoint to advance up to the passed in write entry number.
   */
  void waitForRead(WriteEntry e) {
    boolean interrupted = false;
    int count = 0;
    synchronized (readWaiters) {
      while (readPoint.get() < e.getWriteNumber()) {
        if (count % 100 == 0 && count > 0) {
          LOG.warn("STUCK: " + this);
        }
        count++;
        try {
          readWaiters.wait(10);
        } catch (InterruptedException ie) {
          // We were interrupted... finish the loop -- i.e. cleanup --and then
          // on our way out, reset the interrupt flag.
          interrupted = true;
        }
      }
    }
    if (interrupted) {
      Thread.currentThread().interrupt();
    }
  }
  • MultiVersionConcurrencyControl類中定義了readPoint,writePoint兩個成員變量
  • 每個事物操做,writePoint自增1並建立一條WriteEntry加入到writeQueue隊列(LinkedList<WriteEntry>)中。
  • complete(WriteEntry writeEntry)方法把傳入的writeEntry標記爲已完成,並從隊列首部不斷移除已經完成的WriteEntry條目,並把readPoint更新爲最後一個已經完成的writeEntrywriteNumber,返回當前readPoint跟上或者超過了傳入的writeEntry
  • waitForRead(WriteEntry e)自旋等待直到該寫條目e完成。

讀取過程當中的版本控制

Scan類能夠設置事物隔離級別:日誌

@Override
  public Scan setIsolationLevel(IsolationLevel level) {
    return (Scan) super.setIsolationLevel(level);
  }
public enum IsolationLevel {

  READ_COMMITTED(1),
  READ_UNCOMMITTED(2);

  IsolationLevel(int value) {}

}

StoreFileScannercode

@Override
  public boolean seek(Cell key) throws IOException {
    if (seekCount != null) seekCount.increment();

    try {
      try {
        if(!seekAtOrAfter(hfs, key)) {
          this.cur = null;
          return false;
        }

        setCurrentCell(hfs.getCell());

        if (!hasMVCCInfo && this.reader.isBulkLoaded()) {
          return skipKVsNewerThanReadpoint();
        } else {
          return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint();
        }
      } finally {
        realSeekDone = true;
      }
    } catch (FileNotFoundException e) {
      throw e;
    } catch (IOException ioe) {
      throw new IOException("Could not seek " + this + " to key " + key, ioe);
    }
  }

SegmentScanner

protected void updateCurrent() {
    Cell next = null;

    try {
      while (iter.hasNext()) {
        next = iter.next();
        if (next.getSequenceId() <= this.readPoint) {
          current = next;
          return;// skip irrelevant versions
        }
        // for backwardSeek() stay in the boundaries of a single row
        if (stopSkippingKVsIfNextRow &&
            segment.compareRows(next, stopSkippingKVsRow) > 0) {
          current = null;
          return;
        }
      } // end of while

      current = null; // nothing found
    } finally {
      if (next != null) {
        // in all cases, remember the last KV we iterated to, needed for reseek()
        last = next;
      }
    }
  }

RegionScannerImpl

IsolationLevel isolationLevel = scan.getIsolationLevel();
      long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan);
      synchronized (scannerReadPoints) {
        if (mvccReadPoint > 0) {
          this.readPt = mvccReadPoint;
        } else if (nonce == HConstants.NO_NONCE || rsServices == null
            || rsServices.getNonceManager() == null) {
          this.readPt = getReadPoint(isolationLevel);
        } else {
          this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce);
        }
        scannerReadPoints.put(this, this.readPt);
      }

HRegion

public long getReadPoint(IsolationLevel isolationLevel) {
    if (isolationLevel != null && isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
      // This scan can read even uncommitted transactions
      return Long.MAX_VALUE;
    }
    return mvcc.getReadPoint();
  }

StoreFileScannerSegmentScannerseek的過程當中會根據CellsequenceId和mvcc的readPoint進行比較判斷是否須要skipCell

備註

hbase version:2.1.7

相關文章
相關標籤/搜索