批量put和單條put

Hbase1.1.5源碼:java

HTable類:ide

@Override
  public void put(final Put put) throws IOException {
    getBufferedMutator().mutate(put);//放入bufferMutator中
    if (autoFlush) {
      flushCommits();
    }
  }

  /**
   * {@inheritDoc}
   * @throws IOException
   */
  @Override
  public void put(final List<Put> puts) throws IOException {
    getBufferedMutator().mutate(puts);////放入bufferMutator中
    if (autoFlush) {
      flushCommits();
    }
  }

BufferedMutatorImpl類:ui

@Override
  public synchronized void mutate(Mutation m) throws InterruptedIOException,
      RetriesExhaustedWithDetailsException {
    doMutate(m);
  }

  @Override
  public synchronized void mutate(List<? extends Mutation> ms) throws InterruptedIOException,
      RetriesExhaustedWithDetailsException {
    for (Mutation m : ms) {
      doMutate(m); //調用單條的mutate方法
    }
  }
private void doMutate(Mutation m) throws InterruptedIOException,
    RetriesExhaustedWithDetailsException {
  if (closed) {
    throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
  }
  if (!(m instanceof Put) && !(m instanceof Delete)) {
    throw new IllegalArgumentException("Pass a Delete or a Put");
  }

  // This behavior is highly non-intuitive... it does not protect us against
  // 94-incompatible behavior, which is a timing issue because hasError, the below code
  // and setter of hasError are not synchronized. Perhaps it should be removed.
  if (ap.hasError()) {
    writeAsyncBuffer.add(m);
    backgroundFlushCommits(true);
  }

  if (m instanceof Put) {
    validatePut((Put) m);
  }

  currentWriteBufferSize += m.heapSize(); // 計算字節大小,並累加
  writeAsyncBuffer.add(m);

  // 若是當前的數據字節大小大於閾值,則進行commit
  while (currentWriteBufferSize > writeBufferSize) {
    backgroundFlushCommits(false);
  }
}
相關文章
相關標籤/搜索