Hbase1.1.5源碼:java
HTable類:ide
@Override public void put(final Put put) throws IOException { getBufferedMutator().mutate(put);//放入bufferMutator中 if (autoFlush) { flushCommits(); } } /** * {@inheritDoc} * @throws IOException */ @Override public void put(final List<Put> puts) throws IOException { getBufferedMutator().mutate(puts);////放入bufferMutator中 if (autoFlush) { flushCommits(); } }
BufferedMutatorImpl類:ui
@Override public synchronized void mutate(Mutation m) throws InterruptedIOException, RetriesExhaustedWithDetailsException { doMutate(m); } @Override public synchronized void mutate(List<? extends Mutation> ms) throws InterruptedIOException, RetriesExhaustedWithDetailsException { for (Mutation m : ms) { doMutate(m); //調用單條的mutate方法 } }
private void doMutate(Mutation m) throws InterruptedIOException, RetriesExhaustedWithDetailsException { if (closed) { throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); } if (!(m instanceof Put) && !(m instanceof Delete)) { throw new IllegalArgumentException("Pass a Delete or a Put"); } // This behavior is highly non-intuitive... it does not protect us against // 94-incompatible behavior, which is a timing issue because hasError, the below code // and setter of hasError are not synchronized. Perhaps it should be removed. if (ap.hasError()) { writeAsyncBuffer.add(m); backgroundFlushCommits(true); } if (m instanceof Put) { validatePut((Put) m); } currentWriteBufferSize += m.heapSize(); // 計算字節大小,並累加 writeAsyncBuffer.add(m); // 若是當前的數據字節大小大於閾值,則進行commit while (currentWriteBufferSize > writeBufferSize) { backgroundFlushCommits(false); } }