region open
的時候會根據各個HStore
下的全部HFile
文件記錄的maxMemstoreTS
,找到最大的maxSeqId
,而後根據各個HStore
記錄的maxSeqId
回放HLog日誌(從maxMemstoreTS+1開始)。mvc
HRegion.doMiniBatchMutate
app
// STEP 1. Try to acquire as many locks as we can and build mini-batch of operations with // locked rows miniBatchOp = batchOp.lockRowsAndBuildMiniBatch(acquiredRowLocks); // We've now grabbed as many mutations off the list as we can // Ensure we acquire at least one. if (miniBatchOp.getReadyToWriteCount() <= 0) { // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily? return; } lock(this.updatesLock.readLock(), miniBatchOp.getReadyToWriteCount()); locked = true; // STEP 2. Update mini batch of all operations in progress with LATEST_TIMESTAMP timestamp // We should record the timestamp only after we have acquired the rowLock, // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp long now = EnvironmentEdgeManager.currentTime(); batchOp.prepareMiniBatchOperations(miniBatchOp, now, acquiredRowLocks); // STEP 3. Build WAL edit List<Pair<NonceKey, WALEdit>> walEdits = batchOp.buildWALEdits(miniBatchOp); // STEP 4. Append the WALEdits to WAL and sync. for(Iterator<Pair<NonceKey, WALEdit>> it = walEdits.iterator(); it.hasNext();) { Pair<NonceKey, WALEdit> nonceKeyWALEditPair = it.next(); walEdit = nonceKeyWALEditPair.getSecond(); NonceKey nonceKey = nonceKeyWALEditPair.getFirst(); if (walEdit != null && !walEdit.isEmpty()) { writeEntry = doWALAppend(walEdit, batchOp.durability, batchOp.getClusterIds(), now, nonceKey.getNonceGroup(), nonceKey.getNonce(), batchOp.getOrigLogSeqNum()); } // Complete mvcc for all but last writeEntry (for replay case) if (it.hasNext() && writeEntry != null) { mvcc.complete(writeEntry); writeEntry = null; } } // STEP 5. Write back to memStore // NOTE: writeEntry can be null here writeEntry = batchOp.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry); // STEP 6. Complete MiniBatchOperations: If required calls postBatchMutate() CP hook and // complete mvcc for last writeEntry batchOp.completeMiniBatchOperations(miniBatchOp, writeEntry); writeEntry = null; success = true;
HRegion.doWALAppend
ide
// Using default cluster id, as this can only happen in the originating cluster. // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey // here instead of WALKeyImpl directly to support legacy coprocessors. WALKeyImpl walKey = walEdit.isReplay()? new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc) : new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc, this.getReplicationScope()); if (walEdit.isReplay()) { walKey.setOrigLogSeqNum(origLogSeqNum); } WriteEntry writeEntry = null; try { long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true); // Call sync on our edit. if (txid != 0) { sync(txid, durability); } writeEntry = walKey.getWriteEntry(); } catch (IOException ioe) { if (walKey != null && walKey.getWriteEntry() != null) { mvcc.complete(walKey.getWriteEntry()); } throw ioe; } return writeEntry;
HSHLog
:oop
@Override public long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits, final boolean inMemstore) throws IOException { return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, disruptor.getRingBuffer()); }
AbstractFSWAL
post
protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer) throws IOException { if (this.closed) { throw new IOException( "Cannot append; log is closed, regionName = " + hri.getRegionNameAsString()); } MutableLong txidHolder = new MutableLong(); MultiVersionConcurrencyControl.WriteEntry we = key.getMvcc().begin(() -> { txidHolder.setValue(ringBuffer.next()); }); long txid = txidHolder.longValue(); ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall) .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null); try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) { FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall); entry.stampRegionSequenceId(we); ringBuffer.get(txid).load(entry); } finally { ringBuffer.publish(txid); } return txid; }
關鍵點:ui
begin
方法把writePoint
自增,並以自增後的writePoint
生成一個寫條目放入到寫隊列writeQueue
中FSWALEntry
wal日誌條目並publish到Disruptor
隊列中Cell
的sequenceId爲自增後的writePoint
Disruptor
的sequence id
做爲本次事物IDtxid
SyncFuture
到Disruptor
隊列,等待寫wal日誌完成FSWALEntry
this
long stampRegionSequenceId(MultiVersionConcurrencyControl.WriteEntry we) throws IOException { long regionSequenceId = we.getWriteNumber(); if (!this.getEdit().isReplay() && inMemstore) { for (Cell c : getEdit().getCells()) { PrivateCellUtil.setSequenceId(c, regionSequenceId); } } getKey().setWriteEntry(we); return regionSequenceId; }
MultiVersionConcurrencyControl
版本控制
final AtomicLong readPoint = new AtomicLong(0); // 能夠用來和Cell sequenceId作對比,判斷該Cell是否可見 final AtomicLong writePoint = new AtomicLong(0); // 每一個事物寫自增1 private final Object readWaiters = new Object(); public WriteEntry begin(Runnable action) { synchronized (writeQueue) { long nextWriteNumber = writePoint.incrementAndGet(); WriteEntry e = new WriteEntry(nextWriteNumber); writeQueue.add(e); action.run(); return e; } } public void completeAndWait(WriteEntry e) { if (!complete(e)) { waitForRead(e); } } public boolean complete(WriteEntry writeEntry) { synchronized (writeQueue) { writeEntry.markCompleted(); long nextReadValue = NONE; boolean ranOnce = false; while (!writeQueue.isEmpty()) { ranOnce = true; WriteEntry queueFirst = writeQueue.getFirst(); if (nextReadValue > 0) { if (nextReadValue + 1 != queueFirst.getWriteNumber()) { throw new RuntimeException("Invariant in complete violated, nextReadValue=" + nextReadValue + ", writeNumber=" + queueFirst.getWriteNumber()); } } if (queueFirst.isCompleted()) { nextReadValue = queueFirst.getWriteNumber(); writeQueue.removeFirst(); } else { break; } } if (!ranOnce) { throw new RuntimeException("There is no first!"); } if (nextReadValue > 0) { synchronized (readWaiters) { readPoint.set(nextReadValue); readWaiters.notifyAll(); } } return readPoint.get() >= writeEntry.getWriteNumber(); } } /** * Wait for the global readPoint to advance up to the passed in write entry number. */ void waitForRead(WriteEntry e) { boolean interrupted = false; int count = 0; synchronized (readWaiters) { while (readPoint.get() < e.getWriteNumber()) { if (count % 100 == 0 && count > 0) { LOG.warn("STUCK: " + this); } count++; try { readWaiters.wait(10); } catch (InterruptedException ie) { // We were interrupted... finish the loop -- i.e. cleanup --and then // on our way out, reset the interrupt flag. interrupted = true; } } } if (interrupted) { Thread.currentThread().interrupt(); } }
MultiVersionConcurrencyControl
類中定義了readPoint
,writePoint
兩個成員變量writePoint
自增1並建立一條WriteEntry
加入到writeQueue
隊列(LinkedList<WriteEntry>
)中。complete(WriteEntry writeEntry)
方法把傳入的writeEntry
標記爲已完成,並從隊列首部不斷移除已經完成的WriteEntry
條目,並把readPoint
更新爲最後一個已經完成的writeEntry
的writeNumber
,返回當前readPoint
跟上或者超過了傳入的writeEntry
。waitForRead(WriteEntry e)
自旋等待直到該寫條目e
完成。Scan
類能夠設置事物隔離級別:日誌
@Override public Scan setIsolationLevel(IsolationLevel level) { return (Scan) super.setIsolationLevel(level); }
public enum IsolationLevel { READ_COMMITTED(1), READ_UNCOMMITTED(2); IsolationLevel(int value) {} }
StoreFileScanner
code
@Override public boolean seek(Cell key) throws IOException { if (seekCount != null) seekCount.increment(); try { try { if(!seekAtOrAfter(hfs, key)) { this.cur = null; return false; } setCurrentCell(hfs.getCell()); if (!hasMVCCInfo && this.reader.isBulkLoaded()) { return skipKVsNewerThanReadpoint(); } else { return !hasMVCCInfo ? true : skipKVsNewerThanReadpoint(); } } finally { realSeekDone = true; } } catch (FileNotFoundException e) { throw e; } catch (IOException ioe) { throw new IOException("Could not seek " + this + " to key " + key, ioe); } }
SegmentScanner
protected void updateCurrent() { Cell next = null; try { while (iter.hasNext()) { next = iter.next(); if (next.getSequenceId() <= this.readPoint) { current = next; return;// skip irrelevant versions } // for backwardSeek() stay in the boundaries of a single row if (stopSkippingKVsIfNextRow && segment.compareRows(next, stopSkippingKVsRow) > 0) { current = null; return; } } // end of while current = null; // nothing found } finally { if (next != null) { // in all cases, remember the last KV we iterated to, needed for reseek() last = next; } } }
RegionScannerImpl
IsolationLevel isolationLevel = scan.getIsolationLevel(); long mvccReadPoint = PackagePrivateFieldAccessor.getMvccReadPoint(scan); synchronized (scannerReadPoints) { if (mvccReadPoint > 0) { this.readPt = mvccReadPoint; } else if (nonce == HConstants.NO_NONCE || rsServices == null || rsServices.getNonceManager() == null) { this.readPt = getReadPoint(isolationLevel); } else { this.readPt = rsServices.getNonceManager().getMvccFromOperationContext(nonceGroup, nonce); } scannerReadPoints.put(this, this.readPt); }
HRegion
public long getReadPoint(IsolationLevel isolationLevel) { if (isolationLevel != null && isolationLevel == IsolationLevel.READ_UNCOMMITTED) { // This scan can read even uncommitted transactions return Long.MAX_VALUE; } return mvcc.getReadPoint(); }
StoreFileScanner
和SegmentScanner
在seek
的過程當中會根據Cell
的sequenceId
和mvcc的readPoint
進行比較判斷是否須要skip
該Cell
。
hbase version
:2.1.7