levelDB是BigTable的單機版實現,是目前很是流行的存儲引擎。用一句話歸納levelDB:簡約而不簡單。簡約體如今他的設計思想清楚明瞭,它的實現簡潔,代碼量較少。他的設計思想同時也是不簡單的,值得仔細研究,實現細節,有不少值得深思的地方。本篇文章做爲解析levelDB源碼的第一版,會有不完善、甚至不正確的地方,敬請諒解。java版本的源碼,參見https://github.com/dain/leveldb。java
共分爲3部分:寫操做、讀操做、compact。ios
ldb的單機寫操做性能好,100byte爲value tps能夠達到7w左右。ldb在必定程度上能夠說是犧牲了讀的性能,保證了寫的性能。很快就會知道爲何寫更快。git
DBImpl.java 是引擎的實現,put接口中的option對象能夠自定義字段和方法,擴展引擎。github
首先進入方法makeRoomForWrite(boolean force),默認force參數爲false。接着判斷makeRoom的條件:算法
if (allowDelay && versions.numberOfFilesInLevel(0) > L0_SLOWDOWN_WRITES_TRIGGER) { // We are getting close to hitting a hard limit on the number of // L0 files. Rather than delaying a single write by several // seconds when we hit the hard limit, start delaying each // individual write by 1ms to reduce latency variance. Also, // this delay hands over some CPU to the compaction thread in // case it is sharing the same core as the writer. try { mutex.unlock(); Thread.sleep(1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } finally { mutex.lock(); } // Do not delay a single write more than once allowDelay = false; }
默認的level0層文件數是4,緩衝寫的觸發條件是level0文件數>8。會將CPU資源交給compaction操做,由於在backgroundCall()時獲取了重入鎖。接着,若是memTable還有空間,則跳過makeRoom,若是此時正在執行Compaction或者level0文件數超過12個,則執行backgroundCondition.awaitUninterruptibly();暫停當前線程。 若是memTable沒有足夠空間,其餘條件又正常,則執行以下過程,關閉當前log日誌,建立新log日誌,memTable賦值給ImmutableTable,建立新memTable,很是好理解:數組
// Attempt to switch to a new memtable and trigger compaction of old Preconditions.checkState(versions.getPrevLogNumber() == 0); // close the existing log try { log.close(); } catch (IOException e) { throw new RuntimeException("Unable to close log file " + log.getFile(), e); } // open a new log long logNumber = versions.getNextFileNumber(); try { this.log = Logs.createLogWriter(new File(databaseDir, Filename.logFileName(logNumber)), logNumber); } catch (IOException e) { throw new RuntimeException("Unable to open new log file " + new File(databaseDir, Filename.logFileName(logNumber)).getAbsoluteFile(), e); } // create a new mem table immutableMemTable = memTable; memTable = new MemTable(internalKeyComparator); // Do not force another compaction there is space available force = false; maybeScheduleCompaction();
須要深層次看的是log日誌的類型。若是是64操做系統架構,則爲MMapLogWriter,不然爲FileChannelLogWriter。其實雖然是先將更新操做寫入log日誌,拿MMapLogWriter爲例,但仍是將record寫入log日誌映射的mappedByteBuffer中,也是寫入內存,若是mmap空間不足,會調unmap()刷盤,mmap堆外內存,每一塊對外內存多有對應的cleaner,執行刷盤操做。因此若是發生掉電等異常狀況,(直接)內存中的全部record也是沒法恢復的,對於日誌的操做,後面還會有詳細說明。數據結構
接着調maybeScheduleCompaction(),寫操做會觸發compact操做,讀操做一樣也會觸發,對於Compaction請直接參考第三節。架構
f7出來,,每次對DB的更新都會更新version的LastSequence。這些更新操做會先寫入日誌文件。寫入以前對更新操做執行一些包裝操做。其實要記錄數據,每每將數據的元數據,例如suquence、某些字段的長度寫入‘header’中,將值寫入‘body’。record也包含這兩個部分,header中依次保存sequence、updates size、updates length。ldb在record中寫入一個int或long型數字,都是變長存儲的。本來int 4個字節,long 8個字節,但ldb不會將前面全零的byte存下來。讓咱們來仔細看看,先看int:app
public static void writeVariableLengthInt(int value, SliceOutput sliceOutput) { int highBitMask = 0x80; if (value < (1 << 7) && value >= 0) { sliceOutput.writeByte(value); } else if (value < (1 << 14) && value > 0) { sliceOutput.writeByte(value | highBitMask); sliceOutput.writeByte(value >>> 7); } else if (value < (1 << 21) && value > 0) { sliceOutput.writeByte(value | highBitMask); sliceOutput.writeByte((value >>> 7) | highBitMask); sliceOutput.writeByte(value >>> 14); } else if (value < (1 << 28) && value > 0) { sliceOutput.writeByte(value | highBitMask); sliceOutput.writeByte((value >>> 7) | highBitMask); sliceOutput.writeByte((value >>> 14) | highBitMask); sliceOutput.writeByte(value >>> 21); } else { sliceOutput.writeByte(value | highBitMask); sliceOutput.writeByte((value >>> 7) | highBitMask); sliceOutput.writeByte((value >>> 14) | highBitMask); sliceOutput.writeByte((value >>> 21) | highBitMask); sliceOutput.writeByte(value >>> 28); } }
若是value < pow(2, 7),那麼直接將低位byte寫入,高位捨棄。若是pow(2, 7) < value < pow(2, 14),先或掩碼將低位byte最高位置1,再將剩餘的高位byte繼續寫入output,捨棄更高位,以此類推。其核心思想就是用最高位0仍是1決定一個變長數字的實際長度,若是爲0,則是一個新數字的開始。所以能夠理解讀變長int的過程。接下來是寫變長long型:less
public static void writeVariableLengthLong(long value, SliceOutput sliceOutput) { // while value more than the first 7 bits set while ((value & (~0x7f)) != 0) { sliceOutput.writeByte((int) ((value & 0x7f) | 0x80)); value >>>= 7; } sliceOutput.writeByte((int) value); }
這個算法使顯然的,value & 0x7f 取後7位,再 | 0x80 將最高位置爲1,接着將value右移7位,高位補0。看的時候,,我再想兩個很愚蠢的問題- -、第一爲何是& (~0x7f)),而不是& 0x80...這個問題簡直是傻逼,我徹底忽略了位操做的位數要對齊。第二個問題,,爲何int和long要用兩種本質相同,而形式不一樣的實現呢,仍是不理解,明顯第二種更簡潔,並且效率更高。
包裝好record,將record寫入log。這裏仔細要仔細看。咱們能夠看到,log日誌的默認塊大小是32K。算出剩餘的塊的空間,若是剩餘空間不足,會嘗試擴容。每當向mappedByteBuffer中寫入一塊數據,這些數據會落在一個或多個塊上。那麼根據數據落在塊的不一樣位置,抽象出4中塊狀態,分別是FULL/FIRST/LAST/MIDDLE。我記得網上有圖說明,且算法簡單,就是根據每一次寫數據的開始和結束位置判斷是哪一種狀態,addRecord實現以下:
public synchronized void addRecord(Slice record, boolean force) throws IOException { Preconditions.checkState(!closed.get(), "Log has been closed"); SliceInput sliceInput = record.input(); // used to track first, middle and last blocks boolean begin = true; // Fragment the record int chunks as necessary and write it. Note that if record // is empty, we still want to iterate once to write a single // zero-length chunk. do { int bytesRemainingInBlock = BLOCK_SIZE - blockOffset; Preconditions.checkState(bytesRemainingInBlock >= 0); // Switch to a new block if necessary if (bytesRemainingInBlock < HEADER_SIZE) { if (bytesRemainingInBlock > 0) { // Fill the rest of the block with zeros // todo lame... need a better way to write zeros ensureCapacity(bytesRemainingInBlock); mappedByteBuffer.put(new byte[bytesRemainingInBlock]); } blockOffset = 0; bytesRemainingInBlock = BLOCK_SIZE - blockOffset; } // Invariant: we never leave less than HEADER_SIZE bytes available in a block int bytesAvailableInBlock = bytesRemainingInBlock - HEADER_SIZE; Preconditions.checkState(bytesAvailableInBlock >= 0); // if there are more bytes in the record then there are available in the block, // fragment the record; otherwise write to the end of the record boolean end; int fragmentLength; if (sliceInput.available() > bytesAvailableInBlock) { end = false; fragmentLength = bytesAvailableInBlock; } else { end = true; fragmentLength = sliceInput.available(); } // determine block type LogChunkType type; if (begin && end) { type = LogChunkType.FULL; } else if (begin) { type = LogChunkType.FIRST; } else if (end) { type = LogChunkType.LAST; } else { type = LogChunkType.MIDDLE; }
ldb先寫文件,後寫內存。接着執行更新memTable的操做:
updates.forEach(new InsertIntoHandler(memTable, sequenceBegin));
memTable在java版本中的類型是ConcurrentSkipListMap。跳錶是典型的用空間換時間的數據結構。插入、刪除、修改、查詢的時間複雜度平都可以達到O(logn)。我的以爲是一個頗有想象力的數據結構。點進去能夠看到,無論是add仍是delete,都調memTable的add方法。只是valueType不一樣。
最後會返回最新的sequence。寫操做返回。咱們能夠清晰的看到,ldb的寫操做是直接寫內存,只要Compaction不出現問題,保證level 0層文件保持穩定,就不會影響寫。
但仍是有幾個遺留的問題:1.log日誌適用於recover的,但何時廢棄 2.恢復過程是如何讀log日誌的 3.變長int和long爲何形式不一樣,這些問題咱們慢慢研究,帶着問題看源碼,效果更好。??
第一個疑問,,log文件何時廢棄。當執行deleteObsoleteFiles時,只保留當前和前一次的日誌文件。
case LOG: keep = ((number >= versions.getLogNumber()) || (number == versions.getPrevLogNumber())); break;
第三個疑問,,還沒找到合理的解釋。測試過,long的效率要略優於int。
做爲引擎,ldb中提供完整的讀操做。所謂完整的讀操做是指,先從memTable中讀,若是沒讀到則讀immutableTable,還沒讀到則讀level0,進而讀level L。參見DBImpl的get方法。
讀、寫內存中的memTable或immutableTable都要先獲取重入鎖mutex,讀、寫結束以後,釋放鎖。memTable和immutableTable本質是相同的,感受跳錶也挺奇怪的,調table.ceilingEntry(key)找key,跳錶的實現有待研究。?
接着調versions.get(lookupKey);點進去看看。readStats用於記錄記錄查找特定層級文件時第一個遍歷的文件,和層數。與compaction相關,頻繁的讀某個一文件會致使compaction,但ldb的compact的策略是「文件數量多引發的conpact優先於seek過多致使的compact」,這個結論第三章會看到。下面的代碼通俗易懂,先從l0中get,不然遍歷各個層次:
public LookupResult get(LookupKey key) { // We can search level-by-level since entries never hop across // levels. Therefore we are guaranteed that if we find data // in an smaller level, later levels are irrelevant. ReadStats readStats = new ReadStats(); LookupResult lookupResult = level0.get(key, readStats); if (lookupResult == null) { for (Level level : levels) { lookupResult = level.get(key, readStats); if (lookupResult != null) { break; } } } updateStats(readStats.getSeekFileLevel(), readStats.getSeekFile()); return lookupResult; }
點進去,level0.get()。咱們知道level0的文件是overlapping的,因此先遍歷全部文件,找到包含指定key的文件。而後對其按照時間進行排序 Collections.sort(fileMetaDataList, NEWEST_FIRST);接着遍歷全部包含key的文件,爲每一個文件建立iterator,這個iterator是蠻關鍵的,因此點進去看BlockIterator:
/** * Reads the entry at the current data readIndex. * After this method, data readIndex is positioned at the beginning of the next entry * or at the end of data if there was not a next entry. * * @return true if an entry was read */ private static BlockEntry readEntry(SliceInput data, BlockEntry previousEntry) { Preconditions.checkNotNull(data, "data is null"); // read entry header int sharedKeyLength = VariableLengthQuantity.readVariableLengthInt(data); int nonSharedKeyLength = VariableLengthQuantity.readVariableLengthInt(data); int valueLength = VariableLengthQuantity.readVariableLengthInt(data); // read key Slice key = Slices.allocate(sharedKeyLength + nonSharedKeyLength); SliceOutput sliceOutput = key.output(); if (sharedKeyLength > 0) { Preconditions.checkState(previousEntry != null, "Entry has a shared key but no previous entry was provided"); sliceOutput.writeBytes(previousEntry.getKey(), 0, sharedKeyLength); } sliceOutput.writeBytes(data, nonSharedKeyLength); // read value Slice value = data.readSlice(valueLength); return new BlockEntry(key, value); }
首先read header,讀取三個長度。由於在level >= 1 文件中key都是有序的,因此ldb對key進行優化存儲,若是兩個能夠有相同的前綴(此處,僅描述成相同的前綴,詳見compaction),ldb不會將重複的部分重複存儲,而是經過記錄長度的方式,壓縮空間。sharedKeyLength顧名思義是key共享部分的長度,nonSharedKeyLength顯而易見。很顯然,這些數字也是變長的,前面說過寫變長變量的方法,讀取則很好理解了。每次都會傳入前一個key,是爲了獲取共享部分。後面的寫操做更顯然,分兩次拼接key,讀取value,返回entry。
f7出去,,iterator.seek(key.getInternalKey());ldb對key進行了不少不一樣類型的包裝,用戶也能夠包裝本身的key,實現功能擴展,好比增長過時時間或邏輯桶號等等。seek過程是要仔細看看的,由於咱們看到,seek調用以後,就已經定位到了對應KV的位置,經過再次對比key就能夠返回結果了。因此核心就是如何seek:
/** * Repositions the iterator so the key of the next BlockElement returned greater than or equal to the specified targetKey. */ @Override public void seek(Slice targetKey) { if (restartCount == 0) { return; } int left = 0; int right = restartCount - 1; // binary search restart positions to find the restart position immediately before the targetKey while (left < right) { int mid = (left + right + 1) / 2; seekToRestartPosition(mid); if (comparator.compare(nextEntry.getKey(), targetKey) < 0) { // key at mid is smaller than targetKey. Therefore all restart // blocks before mid are uninteresting. left = mid; } else { // key at mid is greater than or equal to targetKey. Therefore // all restart blocks at or after mid are uninteresting. right = mid - 1; } } // linear search (within restart block) for first key greater than or equal to targetKey for (seekToRestartPosition(left); nextEntry != null; next()) { if (comparator.compare(peek().getKey(), targetKey) >= 0) { break; } } }
restartCount是一個Block中檢查點的數量,它是如何算出來的,暫時放一放。所謂檢查點,由於ldb存儲key的時候不存重複的部分,檢查點就是記錄全部完整key的下標位置,以下圖所示,圖是盜用網上的。這樣就很是好理解了。用二分法先找到包含key所在的一段區域,好比是record4,會先找到record2,再從record2開始遍歷。看過這張圖,檢查點數量的計算方法就顯而易見了,源碼以下:
// key restart count is the last int of the block int restartCount = block.getInt(block.length() - SIZE_OF_INT); if (restartCount > 0) { // restarts are written at the end of the block int restartOffset = block.length() - (1 + restartCount) * SIZE_OF_INT; Preconditions.checkArgument(restartOffset < block.length() - SIZE_OF_INT, "Block is corrupt: restart offset count is greater than block size"); restartPositions = block.slice(restartOffset, restartCount * SIZE_OF_INT); // data starts at 0 and extends to the restart index data = block.slice(0, restartOffset); }
restartCount直接讀取最後int,restartOffset顯然就是restart[0]的偏移量,restartPositions就是上圖藍色這段byte數組,表示全部的重啓點。在構造BlockIterator,傳入了restartPositions。seek中的重啓點的數量也是顯然的,除以4便可。Block的結構和二分查找等細節,是ldb中值得思考和掌握的。固然不要誤解,restart部分和num_restarts都是int,其餘的record長度不限。塊的元數據每每存在塊的尾部。
@Override public BlockIterator iterator() { return new BlockIterator(data, restartPositions, comparator); }
由於寫操做,只將全部的記錄寫入內存,而文件則是由compaction產生的,因此在讀操做這一章,出現了不少陌生的東西,固然若是你已經理解讀的過程,那麼compaction就更容易理解了。固然若是你已經開始看ldb源碼解析,那麼你對ldb的結構應該是有了解的。下面的compaction解析會逐步解答上面遺留的問題。
compaction是壓縮歸檔。是LSM樹結構的具體實現。
在DBImpl.get()、makeRoomForWrite()中都會調用maybeScheduleCompaction();看看DBImpl.get() :
// schedule compaction if necessary mutex.lock(); try { if (versions.needsCompaction()) { maybeScheduleCompaction(); } } finally { mutex.unlock(); }
何時才須要compaction操做呢,進入needsCompaction(),兩個條件知足其一便可:
public boolean needsCompaction() { return current.getCompactionScore() >= 1 || current.getFileToCompact() != null; }
第一個條件是分數>=1,出現了分數這個概念,第二個條件是否有fileToCompact。下面看分數是如何計算出來的,以及toCompact的file是如何篩選出來的:
private void finalizeVersion(Version version) { // Precomputed best level for next compaction int bestLevel = -1; double bestScore = -1; for (int level = 0; level < version.numberOfLevels() - 1; level++) { double score; if (level == 0) { // We treat level-0 specially by bounding the number of files // instead of number of bytes for two reasons: // // (1) With larger write-buffer sizes, it is nice not to do too // many level-0 compactions. // // (2) The files in level-0 are merged on every read and // therefore we wish to avoid too many files when the individual // file size is small (perhaps because of a small write-buffer // setting, or very high compression ratios, or lots of // overwrites/deletions). score = 1.0 * version.numberOfFilesInLevel(level) / L0_COMPACTION_TRIGGER; } else { // Compute the ratio of current size to size limit. long levelBytes = 0; for (FileMetaData fileMetaData : version.getFiles(level)) { levelBytes += fileMetaData.getFileSize(); } score = 1.0 * levelBytes / maxBytesForLevel(level); } if (score > bestScore) { bestLevel = level; bestScore = score; } } version.setCompactionLevel(bestLevel); version.setCompactionScore(bestScore); }
level-0和其餘層次的分數計算方法是不一樣的,註釋中寫的很清楚,level-0採用文件數的緣由是1.寫緩衝越大,越不適合作level-0層的歸檔;2.level-0層文件每次讀都會執行歸檔,因此會避免過多的小的level-0層文件。其餘level的算法是計算層內文件的總的大小,maxBytesForLevel的算法是單個文件大小爲10MB,leve L層,最多容許容納pow(10, L)個文件,很好算出結果。
private boolean updateStats(int seekFileLevel, FileMetaData seekFile) { if (seekFile == null) { return false; } seekFile.decrementAllowedSeeks(); if (seekFile.getAllowedSeeks() <= 0 && fileToCompact == null) { fileToCompact = seekFile; fileToCompactLevel = seekFileLevel; return true; } return false; }
這個函數在讀操做過程當中掉用過,傳進來的是特定層次讀到的第一個文件。接着該seekFile的seek次數減一,若是已經<0則將該seekFile賦值給fileToCompact。一個文件的初始allowedSeeks值是多少呢:
int allowedSeeks = (int) (fileMetaData.getFileSize() / 16384); if (allowedSeeks < 100) { allowedSeeks = 100; } fileMetaData.setAllowedSeeks(allowedSeeks);
這段代碼在後面還會出現。通過上面的分析,needsCompaction就很清晰了。接下來進入maybeScheduleCompaction:
protected void maybeScheduleCompaction() { Preconditions.checkState(mutex.isHeldByCurrentThread()); if (backgroundCompaction != null) { // Already scheduled } else if (shuttingDown.get()) { // DB is being shutdown; no more background compactions } else if (immutableMemTable == null && manualCompaction == null && !versions.needsCompaction()) { // No work to be done } else { backgroundCompaction = compactionExecutor.submit(new Callable<Void>() { @Override public Void call() throws Exception { try { backgroundCall(); } catch (DatabaseShutdownException ignored) { } catch (Throwable e) { e.printStackTrace(); backgroundException = e; } return null; } }); } }
compactionExecutor = Executors.newSingleThreadExecutor(compactionThreadFactory);
一次只對一個層次進行歸檔操做,若是有規定操做正在執行,則直接返回。compaction也會獲取重入鎖。進入backgroundCompaction():
protected void backgroundCompaction() throws IOException { Preconditions.checkState(mutex.isHeldByCurrentThread()); compactMemTableInternal(); Compaction compaction; if (manualCompaction != null) { compaction = versions.compactRange(manualCompaction.level, new InternalKey(manualCompaction.begin, MAX_SEQUENCE_NUMBER, ValueType.VALUE), new InternalKey(manualCompaction.end, 0, ValueType.DELETION)); } else { compaction = versions.pickCompaction(); } if (compaction == null) { // no compaction } else if (manualCompaction == null && compaction.isTrivialMove()) { // Move file to next level Preconditions.checkState(compaction.getLevelInputs().size() == 1); FileMetaData fileMetaData = compaction.getLevelInputs().get(0); compaction.getEdit().deleteFile(compaction.getLevel(), fileMetaData.getNumber()); compaction.getEdit().addFile(compaction.getLevel() + 1, fileMetaData); versions.logAndApply(compaction.getEdit()); // log } else { CompactionState compactionState = new CompactionState(compaction); doCompactionWork(compactionState); cleanupCompaction(compactionState); } // manual compaction complete if (manualCompaction != null) { manualCompaction = null; } }
首先歸檔內存,再計算歸檔的層次和文件,而後執行compaction。一步一步分析,首先是compactMemTableInternal();compactMemTableInternal先將immutableTable寫入level-0。看meta = buildTable(mem, fileNumber); 建立FIleChannel,遍歷全部KV,這裏須要仔細看看,tableBuilder中作了一些細節邏輯的封裝,進入add()、finish()看看。
private FileMetaData buildTable(SeekingIterable<InternalKey, Slice> data, long fileNumber) throws IOException { File file = new File(databaseDir, Filename.tableFileName(fileNumber)); try { InternalKey smallest = null; InternalKey largest = null; FileChannel channel = new FileOutputStream(file).getChannel(); try { TableBuilder tableBuilder = new TableBuilder(options, channel, new InternalUserComparator(internalKeyComparator)); for (Entry<InternalKey, Slice> entry : data) { // update keys InternalKey key = entry.getKey(); if (smallest == null) { smallest = key; } largest = key; tableBuilder.add(key.encode(), entry.getValue()); } tableBuilder.finish(); } finally { try { channel.force(true); } finally { channel.close(); } } if (smallest == null) { return null; } FileMetaData fileMetaData = new FileMetaData(fileNumber, file.length(), smallest, largest); // verify table can be opened tableCache.newIterator(fileMetaData); pendingOutputs.remove(fileNumber); return fileMetaData; } catch (IOException e) { file.delete(); throw e; } }
進入tableBuilder.add(k ,v): userComparator默認使用BytewiseComparator。
public void add(Slice key, Slice value) throws IOException { Preconditions.checkNotNull(key, "key is null"); Preconditions.checkNotNull(value, "value is null"); Preconditions.checkState(!closed, "table is finished"); if (entryCount > 0) { assert (userComparator.compare(key, lastKey) > 0) : "key must be greater than last key"; } // If we just wrote a block, we can now add the handle to index block if (pendingIndexEntry) { Preconditions.checkState(dataBlockBuilder.isEmpty(), "Internal error: Table has a pending index entry but data block builder is empty"); Slice shortestSeparator = userComparator.findShortestSeparator(lastKey, key); Slice handleEncoding = BlockHandle.writeBlockHandle(pendingHandle); indexBlockBuilder.add(shortestSeparator, handleEncoding); pendingIndexEntry = false; } lastKey = key; entryCount++; dataBlockBuilder.add(key, value); int estimatedBlockSize = dataBlockBuilder.currentSizeEstimate(); if (estimatedBlockSize >= blockSize) { flush(); } }
Slice shortestSeparator = userComparator.findShortestSeparator(lastKey, key); 點進去:
@Override public Slice findShortestSeparator( Slice start, Slice limit) { // Find length of common prefix int sharedBytes = BlockBuilder.calculateSharedBytes(start, limit); // Do not shorten if one string is a prefix of the other if (sharedBytes < Math.min(start.length(), limit.length())) { // if we can add one to the last shared byte without overflow and the two keys differ by more than // one increment at this location. int lastSharedByte = start.getUnsignedByte(sharedBytes); if (lastSharedByte < 0xff && lastSharedByte + 1 < limit.getUnsignedByte(sharedBytes)) { Slice result = start.copySlice(0, sharedBytes + 1); result.setByte(sharedBytes, lastSharedByte + 1); assert (compare(result, limit) < 0) : "start must be less than last limit"; return result; } } return start; }
經過依次對比byte,算出sharedBytes,這個數字也對應的第一個不相同的byte的下標。接着,判斷一個key是否爲另外一個key的前綴,若是是的不壓縮。而且要求第一個key的第一個不一樣byte+1 < 第二個key的第一個不一樣byte。舉例:若是start是"abc123",limit是"abc456",則符合壓縮的要求,返回的result爲"abc2"。前面說過的「前綴」實際上是不完整的。
這段邏輯是在pendingIndexEntry == true的前提下執行的,看pendingIndexEntry的引用,發如今TableBuilder的flush()中將其置爲true,意味着,當向一個新的block中寫入記錄時,纔會觸發這段邏輯,將該block起始的能夠寫入indexBlockBuilder。這裏的lastKey指的是上一個塊最後一個key,findShortestSuccessor的邏輯很簡單,找到最後這個key第一個不等於0xff的byte,並返回前面全部的bytes。這樣作的目的是,節省空間!!這是ldb的一個優化策略。舉例:若是lastKey是"aqq",則shortSuccessor是"b"。且pendingHandle也只是在flush()時被從新賦值的。先看finish()和flush():
public void finish() throws IOException { Preconditions.checkState(!closed, "table is finished"); // flush current data block flush(); // mark table as closed closed = true; // write (empty) meta index block BlockBuilder metaIndexBlockBuilder = new BlockBuilder(256, blockRestartInterval, new BytewiseComparator()); // TODO(postrelease): Add stats and other meta blocks BlockHandle metaindexBlockHandle = writeBlock(metaIndexBlockBuilder); // add last handle to index block if (pendingIndexEntry) { Slice shortSuccessor = userComparator.findShortSuccessor(lastKey); Slice handleEncoding = BlockHandle.writeBlockHandle(pendingHandle); indexBlockBuilder.add(shortSuccessor, handleEncoding); pendingIndexEntry = false; } // write index block BlockHandle indexBlockHandle = writeBlock(indexBlockBuilder); // write footer Footer footer = new Footer(metaindexBlockHandle, indexBlockHandle); Slice footerEncoding = Footer.writeFooter(footer); position += fileChannel.write(footerEncoding.toByteBuffer()); }
private void flush() throws IOException { Preconditions.checkState(!closed, "table is finished"); if (dataBlockBuilder.isEmpty()) { return; } Preconditions.checkState(!pendingIndexEntry, "Internal error: Table already has a pending index entry to flush"); pendingHandle = writeBlock(dataBlockBuilder); pendingIndexEntry = true; }
進到 private BlockHandle writeBlock(BlockBuilder blockBuilder) :
private BlockHandle writeBlock(BlockBuilder blockBuilder) throws IOException { // close the block Slice raw = blockBuilder.finish(); // attempt to compress the block Slice blockContents = raw; CompressionType blockCompressionType = CompressionType.NONE; if (compressionType == CompressionType.SNAPPY) { ensureCompressedOutputCapacity(maxCompressedLength(raw.length())); try { int compressedSize = Snappy.compress(raw.getRawArray(), raw.getRawOffset(), raw.length(), compressedOutput.getRawArray(), 0); // Don't use the compressed data if compressed less than 12.5%, if (compressedSize < raw.length() - (raw.length() / 8)) { blockContents = compressedOutput.slice(0, compressedSize); blockCompressionType = CompressionType.SNAPPY; } } catch (IOException ignored) { // compression failed, so just store uncompressed form } } // create block trailer BlockTrailer blockTrailer = new BlockTrailer(blockCompressionType, crc32c(blockContents, blockCompressionType)); Slice trailer = BlockTrailer.writeBlockTrailer(blockTrailer); // create a handle to this block BlockHandle blockHandle = new BlockHandle(position, blockContents.length()); // write data and trailer position += fileChannel.write(new ByteBuffer[]{blockContents.toByteBuffer(), trailer.toByteBuffer()}); // clean up state blockBuilder.reset(); return blockHandle; }
blockBuilder.finish(),將重啓點依次寫入dataBlockBuilder,並寫入檢查點的數量:
if (entryCount > 0) { restartPositions.write(block); block.writeInt(restartPositions.size()); } else { block.writeInt(0); }
接着將content壓縮,若是壓縮率<12.5%則不用壓縮。壓縮好後,建立blockTrailer,並寫在Block後面。Block之間的結構以下圖,圖也是盜用的,type是壓縮算法,接着講content和trailer寫入channel,並返回這段block的偏移量position和長度length。
從writeBlock出來,將pendingIndexEntry賦值爲true,標誌開始寫新的Block。從flush() f7出來,感受進度快多了,開心。固然Snappy和crc的算法是跳過了。。這兩個算法都不是ldb特有的,暫時放一放。接着寫metaIndexBlockBuilder,metaIndexBlockBuilder是空的。接着寫indexBlockBuilder,將lastKey、block的真實的offset和length寫入文件底部。在block的最後寫入metaIndexBlockBuilder和indexBlockBuilder的索引,即他們的開始位置和長度,封裝成footer寫入。
再回頭看TableBuilder的add方法。它的核心是dataBlockBuilder.add(key, value);blockRestartInterval指的是兩個重啓點的間距,即能容納多少個不完整的key。blockRestartInterval默認是16。這個過程就很是好理解了,迭代存儲KV,判斷是否超過了間隔。存文件的格式,在讀操做已經詳細分析過了。對於一條記錄,依次寫入共享長度、非共享長度、value長度、非共享key、value。再更新狀態信息:
public void add(Slice key, Slice value) { Preconditions.checkNotNull(key, "key is null"); Preconditions.checkNotNull(value, "value is null"); Preconditions.checkState(!finished, "block is finished"); Preconditions.checkPositionIndex(restartBlockEntryCount, blockRestartInterval); Preconditions.checkArgument(lastKey == null || comparator.compare(key, lastKey) > 0, "key must be greater than last key"); int sharedKeyBytes = 0; if (restartBlockEntryCount < blockRestartInterval) { sharedKeyBytes = calculateSharedBytes(key, lastKey); } else { // restart prefix compression restartPositions.add(block.size()); restartBlockEntryCount = 0; } int nonSharedKeyBytes = key.length() - sharedKeyBytes; // write "<shared><non_shared><value_size>" VariableLengthQuantity.writeVariableLengthInt(sharedKeyBytes, block); VariableLengthQuantity.writeVariableLengthInt(nonSharedKeyBytes, block); VariableLengthQuantity.writeVariableLengthInt(value.length(), block); // write non-shared key bytes block.writeBytes(key, sharedKeyBytes, nonSharedKeyBytes); // write value bytes block.writeBytes(value, 0, value.length()); // update last key lastKey = key; // update state entryCount++; restartBlockEntryCount++; }
最後finally,執行channal.force(true); 再驗證該文件是否能夠被打開,最後返回。buildTable過程結束。pendingOutputs變量應該是存儲正在刷盤的文件名,若是成功寫入文件,則將其刪除。在讀寫和conpact過程當中都有計算最小最大值,雖然這個邏輯簡單,但我以爲,,寫的仍是蠻好的。。:
if (smallest == null) { smallest = key; } largest = key;
buildTable的過程解析過了。。出棧出棧,buildTable是在compaction的writeLevel0時調用的。咱們繼續解析,,完成buildTable,會返回新文件的元數據fileMetaData。接着:
if (meta != null && meta.getFileSize() > 0) { Slice minUserKey = meta.getSmallest().getUserKey(); Slice maxUserKey = meta.getLargest().getUserKey(); if (base != null) { level = base.pickLevelForMemTableOutput(minUserKey, maxUserKey); } edit.addFile(level, meta); }
貌似這個文件也可能記錄在其餘level中,進pickLevelForMemTableOutput看下。其目的是爲了將文件放入level x,且與level x+1有交集,但重疊的部分不要太大。這是爲了不,若是低層次某個文件與它下一層的全部文件沒有交集,則沒法達到歸檔刪除無效數據的做用,直接將文件從低層次移動到高層次而已。其實若是真的發生了這種狀況,compact也會避免這種無效的讀寫文件的。pickLevelForMemTableOutput的前提是這個文件跟level0層文件沒有交集,緣由是,在歸檔level0層文件時,會先找出全部相互覆蓋的文件,做爲input,ldb的策略是,低層次的input越多越好,這樣一次compaction的效率更高。後面也會有合理增長低層次input的算法,請留意。
從writeLevel0Table中出來。終於出來了,,writeLevel0Table的過程在某個層次下增長了一個文件,並修改了log日誌。因此須要對各個層級的文件進行整理,保證它的有序性。接着執行versions.logAndApply(edit);這個過程的調用深度也是蠻深的,點進去看看:
Version version = new Version(this); Builder builder = new Builder(this, current); builder.apply(edit); builder.saveTo(version); finalizeVersion(version); ... appendVersion(version);
通過排序整理,會造成新的version,代替當前version。先進入builder.apply(edit);再看builder.saveTo(version);哇感受解析起來好累啊,每一步都要儘量的明白,但願你也能理解做者的想法。Builder是VersionSet的一個內部類:
public void apply(VersionEdit edit) { // Update compaction pointers for (Entry<Integer, InternalKey> entry : edit.getCompactPointers().entrySet()) { Integer level = entry.getKey(); InternalKey internalKey = entry.getValue(); versionSet.compactPointers.put(level, internalKey); } // Delete files for (Entry<Integer, Long> entry : edit.getDeletedFiles().entries()) { Integer level = entry.getKey(); Long fileNumber = entry.getValue(); levels.get(level).deletedFiles.add(fileNumber); // todo missing update to addedFiles? } // Add new files for (Entry<Integer, FileMetaData> entry : edit.getNewFiles().entries()) { Integer level = entry.getKey(); FileMetaData fileMetaData = entry.getValue(); // We arrange to automatically compact this file after // a certain number of seeks. Let's assume: // (1) One seek costs 10ms // (2) Writing or reading 1MB costs 10ms (100MB/s) // (3) A compaction of 1MB does 25MB of IO: // 1MB read from this level // 10-12MB read from next level (boundaries may be misaligned) // 10-12MB written to next level // This implies that 25 seeks cost the same as the compaction // of 1MB of data. I.e., one seek costs approximately the // same as the compaction of 40KB of data. We are a little // conservative and allow approximately one seek for every 16KB // of data before triggering a compaction. int allowedSeeks = (int) (fileMetaData.getFileSize() / 16384); if (allowedSeeks < 100) { allowedSeeks = 100; } fileMetaData.setAllowedSeeks(allowedSeeks); levels.get(level).deletedFiles.remove(fileMetaData.getNumber()); levels.get(level).addedFiles.add(fileMetaData); } }
每一層都會保留compactPointer做爲compaction的指針,即從哪裏開始對該層次進行歸檔,ldb的歸檔策略是輪詢的,即會對低層次的文件按順序進行歸檔,若是達到最後,則返回第一個文件。後面會看到這種機制的實現。剛剛在writeLevel0Table中調了edit.addFile(level, meta);因此當遍歷getNewFiles時,會設置新文件的allowedSeeks,更新levels,再看saveTo:
/** * Saves the current state in specified version. */ public void saveTo(Version version) throws IOException { FileMetaDataBySmallestKey cmp = new FileMetaDataBySmallestKey(versionSet.internalKeyComparator); for (int level = 0; level < baseVersion.numberOfLevels(); level++) { // Merge the set of added files with the set of pre-existing files. // Drop any deleted files. Store the result in *v. Collection<FileMetaData> baseFiles = baseVersion.getFiles().asMap().get(level); if (baseFiles == null) { baseFiles = ImmutableList.of(); } SortedSet<FileMetaData> addedFiles = levels.get(level).addedFiles; if (addedFiles == null) { addedFiles = ImmutableSortedSet.of(); } // files must be added in sorted order so assertion check in maybeAddFile works ArrayList<FileMetaData> sortedFiles = newArrayListWithCapacity(baseFiles.size() + addedFiles.size()); sortedFiles.addAll(baseFiles); sortedFiles.addAll(addedFiles); Collections.sort(sortedFiles, cmp); for (FileMetaData fileMetaData : sortedFiles) { maybeAddFile(version, level, fileMetaData); } //#ifndef NDEBUG todo // Make sure there is no overlap in levels > 0 version.assertNoOverlappingFiles(); //#endif } }
所謂saveTo的意思是,將added的文件存入baseFile。在整個解析的過程當中,屢次看到comparator,一直也沒有看過它的compare方法,即依次對比byte大小:
public int compareTo(Slice that) { if (this == that) { return 0; } if (this.data == that.data && length == that.length && offset == that.offset) { return 0; } int minLength = Math.min(this.length, that.length); for (int i = 0; i < minLength; i++) { int thisByte = 0xFF & this.data[this.offset + i]; int thatByte = 0xFF & that.data[that.offset + i]; if (thisByte != thatByte) { return (thisByte) - (thatByte); } } return this.length - that.length; }
將文件按照最小key進行排序後,向version中寫入排好序的文件。進入maybeAddFile。這裏會判斷先後兩個文件是否有相交。level-0跳過,去version最後一個文件的最大key與新加問價的最小key進行對比,很好理解:
private void maybeAddFile(Version version, int level, FileMetaData fileMetaData) throws IOException { if (levels.get(level).deletedFiles.contains(fileMetaData.getNumber())) { // File is deleted: do nothing } else { List<FileMetaData> files = version.getFiles(level); if (level > 0 && !files.isEmpty()) { // Must not overlap boolean filesOverlap = versionSet.internalKeyComparator.compare(files.get(files.size() - 1).getLargest(), fileMetaData.getSmallest()) >= 0; if (filesOverlap) { // A memory compaction, while this compaction was running, resulted in a a database state that is // incompatible with the compaction. This is rare and expensive to detect while the compaction is // running, so we catch here simply discard the work. throw new IOException(String.format("Compaction is obsolete: Overlapping files %s and %s in level %s", files.get(files.size() - 1).getNumber(), fileMetaData.getNumber(), level)); } } version.addFile(level, fileMetaData); } }
至此,對內存的歸檔操做就已經解析結束了。ldb的策略是優先對內存進行歸檔,後面解析到doCompactionWork()時還有可能對內存進行歸檔。其實咱們剛剛分析了一小部分,後面還有不少邏輯,經過compaction能夠輻射到ldb的不少部分的實現。compaction是ldb最關鍵的實現!!
接着看辣。manualCompaction,搜索它的引用,發現這個實際上是ldb爲用戶預留的一個接口實現,可讓用戶對compaction進行功能擴展,實現對指定層次的歸檔。 compaction = versions.compactRange,點進去看。getOverlappingInputs略過,關鍵看setupOtherInputs(level, levelInputs); setup的註釋也寫得很是清楚,在不擴展level+1層的輸入文件的同時,儘量擴大level層的輸入文件。expanded0是level+1層範圍映射回level的文件範圍,expanded1是expanded0再映射回level+1的範圍,經過對比expanded1和原來的levelUpInputs,來判斷是否增長了level+1的文件數,若是沒增長,則將expanded0做爲level的輸入。同時還會計算grandparents,進入Compaction的構造方法,關於grandparents用法會在後面提到。
public Compaction compactRange(int level, InternalKey begin, InternalKey end) { List<FileMetaData> levelInputs = getOverlappingInputs(level, begin, end); if (levelInputs.isEmpty()) { return null; } return setupOtherInputs(level, levelInputs); }
private Compaction setupOtherInputs(int level, List<FileMetaData> levelInputs) { Entry<InternalKey, InternalKey> range = getRange(levelInputs); InternalKey smallest = range.getKey(); InternalKey largest = range.getValue(); List<FileMetaData> levelUpInputs = getOverlappingInputs(level + 1, smallest, largest); // Get entire range covered by compaction range = getRange(levelInputs, levelUpInputs); InternalKey allStart = range.getKey(); InternalKey allLimit = range.getValue(); // See if we can grow the number of inputs in "level" without // changing the number of "level+1" files we pick up. if (!levelUpInputs.isEmpty()) { List<FileMetaData> expanded0 = getOverlappingInputs(level, allStart, allLimit); if (expanded0.size() > levelInputs.size()) { range = getRange(expanded0); InternalKey newStart = range.getKey(); InternalKey newLimit = range.getValue(); List<FileMetaData> expanded1 = getOverlappingInputs(level + 1, newStart, newLimit); if (expanded1.size() == levelUpInputs.size()) { // Log(options_->info_log, // "Expanding@%d %d+%d to %d+%d\n", // level, // int(c->inputs_[0].size()), // int(c->inputs_[1].size()), // int(expanded0.size()), // int(expanded1.size())); smallest = newStart; largest = newLimit; levelInputs = expanded0; levelUpInputs = expanded1; range = getRange(levelInputs, levelUpInputs); allStart = range.getKey(); allLimit = range.getValue(); } } } // Compute the set of grandparent files that overlap this compaction // (parent == level+1; grandparent == level+2) List<FileMetaData> grandparents = null; if (level + 2 < NUM_LEVELS) { grandparents = getOverlappingInputs(level + 2, allStart, allLimit); } // if (false) { // Log(options_ - > info_log, "Compacting %d '%s' .. '%s'", // level, // EscapeString(smallest.Encode()).c_str(), // EscapeString(largest.Encode()).c_str()); // } Compaction compaction = new Compaction(current, level, levelInputs, levelUpInputs, grandparents); // Update the place where we will do the next compaction for this level. // We update this immediately instead of waiting for the VersionEdit // to be applied so that if the compaction fails, we will try a different // key range next time. compactPointers.put(level, largest); compaction.getEdit().setCompactPointer(level, largest); return compaction; }
再進入compaction = versions.pickCompaction();ldb的策略是,文件大小過大而觸發歸檔的優先於讀文件過多而觸發歸檔。從level層找到第一個比compact指針大的文件,若是沒找到,則說明已經到達最後,則將第一個文件加入輸入。若是沒有某個層次分數超過1,且觸發了seekCompaction,則將getFileToCompactLevel做爲輸入。一樣會執行setupOtherInputs,擴大level層的輸入。機制都是類似的。
public Compaction pickCompaction() { // We prefer compactions triggered by too much data in a level over // the compactions triggered by seeks. boolean sizeCompaction = (current.getCompactionScore() >= 1); boolean seekCompaction = (current.getFileToCompact() != null); int level; List<FileMetaData> levelInputs; if (sizeCompaction) { level = current.getCompactionLevel(); Preconditions.checkState(level >= 0); Preconditions.checkState(level + 1 < NUM_LEVELS); // Pick the first file that comes after compact_pointer_[level] levelInputs = newArrayList(); for (FileMetaData fileMetaData : current.getFiles(level)) { if (!compactPointers.containsKey(level) || internalKeyComparator.compare(fileMetaData.getLargest(), compactPointers.get(level)) > 0) { levelInputs.add(fileMetaData); break; } } if (levelInputs.isEmpty()) { // Wrap-around to the beginning of the key space levelInputs.add(current.getFiles(level).get(0)); } } else if (seekCompaction) { level = current.getFileToCompactLevel(); levelInputs = ImmutableList.of(current.getFileToCompact()); } else { return null; } // Files in level 0 may overlap each other, so pick up all overlapping ones if (level == 0) { Entry<InternalKey, InternalKey> range = getRange(levelInputs); // Note that the next call will discard the file we placed in // c->inputs_[0] earlier and replace it with an overlapping set // which will include the picked file. levelInputs = getOverlappingInputs(0, range.getKey(), range.getValue()); Preconditions.checkState(!levelInputs.isEmpty()); } Compaction compaction = setupOtherInputs(level, levelInputs); return compaction; }
已經算出對哪一層的哪些文件進行歸檔,下面的判斷是爲了不TrivialMove,即level層的輸入文件與level+1層沒有交集且與grandparents沒有過多的交集,則將該文件直接修改成level+1層文件,而不執行對輸入文件的讀取,以及歸檔爲新文件的過程。接着看doCompactionWork,如何執行歸檔的:
遍歷輸入的kv前,經過versions.makeInputIterator(compactionState.compaction);獲取迭代器。發現,level-0和其它層次的迭代器類型不一樣,分別看兩種迭代器的實現:
Level0Iterator.getNextElement():
@Override protected Entry<InternalKey, Slice> getNextElement() { Entry<InternalKey, Slice> result = null; ComparableIterator nextIterator = priorityQueue.poll(); if (nextIterator != null) { result = nextIterator.next(); if (nextIterator.hasNext()) { priorityQueue.add(nextIterator); } } return result; }
priorityQueue裏存的是每一個文件的迭代器,nextIterator.next()本質調用BlockIterator的next方法,進而調用readEntry,前面有列舉過。priorityQueue傳入的比較器是按照key排序的,因此能夠實現從多個輸入文件中按key的順序找出next。從隊列中拿出後還要塞回去。每一次從優先級隊列中拿優先級最高的元素,add和poll操做的時間複雜度爲O(logn)。在構造priorityQueue時,沒有傳入comparator,對於priorityQueue的具體實現仍是留給讀者本身去探索。看priorityQueue的一個方法,也許會使思路明朗一些:
private void siftDown(int k, E x) { if (comparator != null) siftDownUsingComparator(k, x); else siftDownComparable(k, x); }
再看Level0Iterator.ComparableInterator的compareTo方法,由於level0多個文件是相互覆蓋的,因此須要全部文件中最小的key,而LevelIterator則沒有用到優先級隊列,由於文件是有序的。咱們注意到這兩個Iterator繼承和實現都是相同的,均爲extends AbstractSeekingIterator<InternalKey, Slice> implements InternalIterator:
@Override public int compareTo(ComparableIterator that) { int result = comparator.compare(this.nextElement.getKey(), that.nextElement.getKey()); if (result == 0) { result = Ints.compare(this.ordinal, that.ordinal); } return result; }
LevelIterator.getNextElement():
@Override protected Entry<InternalKey, Slice> getNextElement() { // note: it must be here & not where 'current' is assigned, // because otherwise we'll have called inputs.next() before throwing // the first NPE, and the next time around we'll call inputs.next() // again, incorrectly moving beyond the error. boolean currentHasNext = false; while (true) { if (current != null) { currentHasNext = current.hasNext(); } if (!(currentHasNext)) { if (index < files.size()) { current = openNextFile(); } else { break; } } else { break; } } if (currentHasNext) { return current.next(); } else { // set current to empty iterator to avoid extra calls to user iterators current = null; return null; } } private InternalTableIterator openNextFile() { FileMetaData fileMetaData = files.get(index); index++; return tableCache.newIterator(fileMetaData); }
current.hasNext();指的是當前文件是否還有記錄,,而openNextFile則依次打開list files的文件。實際上是很好理解的。至此,獲取Iterator的過程解析結束,在解析的過程當中,也分析了Iterator是如何迭代文件的。繼續回到doCompactionWork。ldb還會優先將內存歸檔,防止在執行更高層次的文件歸檔以前,還有immutableTable須要歸檔,由於內存的歸檔會影響level0的文件數,進而影響後面的歸檔順序。
shouldStopBefore(key)爲了防止某個key覆蓋了太多grandparents的文件,防止後面level+1層的歸檔耗時過長。接着判斷某個key是否應該被捨棄,其實,這段邏輯能看懂60%,仍是要詳細推敲的,此處留個疑問?若是不捨棄,則執行刷盤操做,刷盤的操做前面都解析過,這裏只剩下簡單的邏輯,因此跳過。
若是是一個獨立的key,不重複,首先會進入First occurrence,將lastSequenceForKey賦值爲最大值。執行完一個判斷,lastSequenceForKey被置爲key的sequence。若是第二個key與前面相同,lastSequenceForKey必定是 <= smallestSnapshot。由於smallestSnapshot是version的當前最大sequence。因此新的key就覆蓋了老的相同的key。DELETE標籤執行的刪除操做也是在這裏刪除的。
compactionState.compaction.isBaseLevelForKey(key.getUserKey())這裏判斷DELETE標籤是否應該刪除。判斷條件是,比level+1更高層次的文件中是否可能包含這個key。所以從level+2遍歷文件,若是有任何一個文件包含,則返回false。遍歷結束返回true。levelPointers的做用是減小遍歷文件的次數。其原理是由於key都是逐漸增長的,因此當第二次驗證某個key的DELETE標籤,那麼前面驗證過的文件就已經不用在判斷一遍了。所以提升了效率。
boolean drop = false; // todo if key doesn't parse (it is corrupted), if (false /*!ParseInternalKey(key, &ikey)*/) { // do not hide error keys currentUserKey = null; hasCurrentUserKey = false; lastSequenceForKey = MAX_SEQUENCE_NUMBER; } else { if (!hasCurrentUserKey || internalKeyComparator.getUserComparator().compare(key.getUserKey(), currentUserKey) != 0) { // First occurrence of this user key currentUserKey = key.getUserKey(); hasCurrentUserKey = true; lastSequenceForKey = MAX_SEQUENCE_NUMBER; } if (lastSequenceForKey <= compactionState.smallestSnapshot) { // Hidden by an newer entry for same user key drop = true; // (A) } else if (key.getValueType() == DELETION && key.getSequenceNumber() <= compactionState.smallestSnapshot && compactionState.compaction.isBaseLevelForKey(key.getUserKey())) { // For this user key: // (1) there is no data in higher levels // (2) data in lower levels will have larger sequence numbers // (3) data in layers that are being compacted here and have // smaller sequence numbers will be dropped in the next // few iterations of this loop (by rule (A) above). // Therefore this deletion marker is obsolete and can be dropped. drop = true; } lastSequenceForKey = key.getSequenceNumber(); }
// Returns true if the information we have available guarantees that // the compaction is producing data in "level+1" for which no data exists // in levels greater than "level+1". public boolean isBaseLevelForKey(Slice userKey) { // Maybe use binary search to find right entry instead of linear search? UserComparator userComparator = inputVersion.getInternalKeyComparator().getUserComparator(); for (int level = this.level + 2; level < NUM_LEVELS; level++) { List<FileMetaData> files = inputVersion.getFiles(level); while (levelPointers[level] < files.size()) { FileMetaData f = files.get(levelPointers[level]); if (userComparator.compare(userKey, f.getLargest().getUserKey()) <= 0) { // We've advanced far enough if (userComparator.compare(userKey, f.getSmallest().getUserKey()) >= 0) { // Key falls in this file's range, so definitely not base level return false; } break; } levelPointers[level]++; } } return true; }
至此,compaction解析結束。
還有兩個問題1.recover 2.cache,之後慢慢補充。??