HBase行鎖原理及實現

  hbase mutation操做,好比delete put等,都須要先獲取行鎖,而後再進行操做,在獲取行鎖時,是經過HRegion.getRowLockInternal(byte[] row, boolean waitForLock)進行的,所以,咱們先大致瀏覽一下這個方法的流程,以下。能夠看到,該方法中主要涉及到行鎖相關的內容爲RowLock和RowLockContext兩個類。這兩個都是HRegion的內部類,下面詳細看一下這兩個類是咋實現的。java

protected RowLock getRowLockInternal(byte[] row, boolean waitForLock) throws IOException {
    HashedBytes rowKey = new HashedBytes(row);
    RowLockContext rowLockContext = new RowLockContext(rowKey);

    // loop until we acquire the row lock (unless !waitForLock)
    while (true) {
      RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
      if (existingContext == null) {
        // Row is not already locked by any thread, use newly created context.
        break;
      } else if (existingContext.ownedByCurrentThread()) {
        // Row is already locked by current thread, reuse existing context instead.
        rowLockContext = existingContext;
        break;
      } else {
        if (!waitForLock) {
          return null;
        }
        try {
          // Row is already locked by some other thread, give up or wait for it
          if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
            throw new IOException("Timed out waiting for lock for row: " + rowKey);
          }
        } catch (InterruptedException ie) {
          LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
          InterruptedIOException iie = new InterruptedIOException();
          iie.initCause(ie);
          throw iie;
        }
      }
    }

    // allocate new lock for this thread
    return rowLockContext.newLock();
  }

  首先看RowLock類,該類主要邏輯是release方法,是用來釋放行鎖的。同時有一個布爾類型參數release,默認爲false,表明該行鎖是否被釋放掉了。app

public static class RowLock {
    @VisibleForTesting final RowLockContext context;
    private boolean released = false;

    @VisibleForTesting RowLock(RowLockContext context) {
      this.context = context;
    }

    /**
     * Release the given lock.  If there are no remaining locks held by the current thread
     * then unlock the row and allow other threads to acquire the lock.
     * @throws IllegalArgumentException if called by a different thread than the lock owning thread
     */
    public void release() {
      if (!released) {
        context.releaseLock();
        released = true;
      }
    }
  }

  可是在RowLock中,並無看到實際涉及到鎖的信息,這是咋回事呢,別急,細細看下release方法,裏面有一個context,是RowLockContext類型。同時其構造方法中也傳了一個context對象,所以懷疑是在RowLockContext中new出了一個rowlock,進RowLockContext中看下:less

@VisibleForTesting class RowLockContext {
    private final HashedBytes row;
  //經過計數以及CountDownLatch實現對行鎖的condition。這裏之因此將countdownlatch設置爲一,是由於hbase本身也不知道到底有多少condition來競爭鎖,因此加一個計數lockCount,
  //當lockCount爲零時,再把latch.coutDown。不然會在getRowLockInternal中await。
private final CountDownLatch latch = new CountDownLatch(1); private final Thread thread; private int lockCount = 0; RowLockContext(HashedBytes row) { this.row = row; this.thread = Thread.currentThread(); } boolean ownedByCurrentThread() { return thread == Thread.currentThread(); } RowLock newLock() { lockCount++; return new RowLock(this); } void releaseLock() { if (!ownedByCurrentThread()) { throw new IllegalArgumentException("Lock held by thread: " + thread + " cannot be released by different thread: " + Thread.currentThread()); } lockCount--; if (lockCount == 0) { // no remaining locks by the thread, unlock and allow other threads to access RowLockContext existingContext = lockedRows.remove(row); if (existingContext != this) { throw new RuntimeException( "Internal row lock state inconsistent, should not happen, row: " + row); } latch.countDown(); } } }

  經過計數以及CountDownLatch實現對行鎖的condition。這裏之因此將countdownlatch設置爲一,是由於hbase本身也不知道到底有多少condition來競爭鎖,因此加一個計數lockCount,
當lockCount爲零時,再把latch.coutDown。不然會在getRowLockInternal中await。
  在HRegion中還有一個關鍵的成員變量: lockedrows,用來存儲當前已經獲取了行鎖的全部行信息,key爲rowkey,value爲RowLockContext。
// map from a locked row to the context for that lock including:
  // - CountDownLatch for threads waiting on that row
  // - the thread that owns the lock (allow reentrancy)
  // - reference count of (reentrant) locks held by the thread
  // - the row itself
  private final ConcurrentHashMap<HashedBytes, RowLockContext> lockedRows =
      new ConcurrentHashMap<HashedBytes, RowLockContext>();
  好啦,行鎖涉及到的內容,咱們都大致瀏覽了,再從getRowLockInternal中開始通一遍邏輯:
  1. 根據rowkey構建RowLockContext對象
  2. while循環,直到獲取到行鎖,或者wait超時
    1. 首先判斷lockedrows中是否有該rowkey的行鎖信息,此處利用的是concurrentMap的putIfAbsent
      1. 若是不存在,覺得着這行鎖尚未其餘線程拿到,將行鎖信息加入到lockedrows中,直接break跳出循環,而後now一個行鎖。
      2. 若是存在,則覺得着該行鎖已經被佔有了,邏輯以下
        1. 判斷持有該行鎖的線程是不是本身自己,若是是,則直接覆蓋rowLockContext,跳出循環
        2. 判斷是否須要wait 行鎖,經過參數waitForLock,若是不wait直接return;若是wait,則調用latch.await等待,若是超時則拋出異常。
    2. 若是跳出了循環,則意味着獲取成功,則newLock並返回。

   上面是獲取行鎖的流程,釋放行鎖呢,是經過HRegion的releaseRowLocks方式實現,咱們看下代碼:oop

 /**
   * If the given list of row locks is not null, releases all locks.
   */
  public void releaseRowLocks(List<RowLock> rowLocks) {
    if (rowLocks != null) {
      for (RowLock rowLock : rowLocks) {
        rowLock.release();
      }
      rowLocks.clear();
    }
  }

  可見是調用RowLock.release實現,該方法代碼在上面有,具體的邏輯以下:ui

      在lockedrows中將該行鎖刪除。this

      判斷release是否爲false,若是爲false,則調用context.releaseLock,context.releaseLock邏輯以下spa

        首先判斷釋放該行鎖的線程是不是該行鎖的持有者,若不是則拋出異常線程

        將count--;對象

        若是count==0了,則直接調用latch.countDown,這個方法會觸發其餘線程去獲取行鎖。當count==0了也就是說該線程已經不須要改行鎖,已經釋放blog

        將release設置爲true。

注意

這裏在getRowLockInternal中,只要lockedRows.putIfAbsent(rowKey, rowLockContext)成功,其餘線程將不會獲取成功,由concurrentMap保證。

相關文章
相關標籤/搜索