



    ldb的單機寫操做性能好,100byte爲value tps能夠達到7w左右。ldb在必定程度上能夠說是犧牲了讀的性能,保證了寫的性能。很快就會知道爲何寫更快。git

    DBImpl.java 是引擎的實現,put接口中的option對象能夠自定義字段和方法,擴展引擎。github

    首先進入方法makeRoomForWrite(boolean force),默認force參數爲false。接着判斷makeRoom的條件:算法

if (allowDelay && versions.numberOfFilesInLevel(0) > L0_SLOWDOWN_WRITES_TRIGGER) {
                // We are getting close to hitting a hard limit on the number of
                // L0 files.  Rather than delaying a single write by several
                // seconds when we hit the hard limit, start delaying each
                // individual write by 1ms to reduce latency variance.  Also,
                // this delay hands over some CPU to the compaction thread in
                // case it is sharing the same core as the writer.
                try {
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                finally {

                // Do not delay a single write more than once
                allowDelay = false;

    默認的level0層文件數是4,緩衝寫的觸發條件是level0文件數>8。會將CPU資源交給compaction操做,由於在backgroundCall()時獲取了重入鎖。接着,若是memTable還有空間,則跳過makeRoom,若是此時正在執行Compaction或者level0文件數超過12個,則執行backgroundCondition.awaitUninterruptibly();暫停當前線程。 若是memTable沒有足夠空間,其餘條件又正常,則執行以下過程,關閉當前log日誌,建立新log日誌,memTable賦值給ImmutableTable,建立新memTable,很是好理解:數組

// Attempt to switch to a new memtable and trigger compaction of old
                Preconditions.checkState(versions.getPrevLogNumber() == 0);

                // close the existing log
                try {
                catch (IOException e) {
                    throw new RuntimeException("Unable to close log file " + log.getFile(), e);

                // open a new log
                long logNumber = versions.getNextFileNumber();
                try {
                    this.log = Logs.createLogWriter(new File(databaseDir, Filename.logFileName(logNumber)), logNumber);
                catch (IOException e) {
                    throw new RuntimeException("Unable to open new log file " +
                            new File(databaseDir, Filename.logFileName(logNumber)).getAbsoluteFile(), e);

                // create a new mem table
                immutableMemTable = memTable;
                memTable = new MemTable(internalKeyComparator);

                // Do not force another compaction there is space available
                force = false;




    f7出來,,每次對DB的更新都會更新version的LastSequence。這些更新操做會先寫入日誌文件。寫入以前對更新操做執行一些包裝操做。其實要記錄數據,每每將數據的元數據,例如suquence、某些字段的長度寫入‘header’中,將值寫入‘body’。record也包含這兩個部分,header中依次保存sequence、updates size、updates length。ldb在record中寫入一個int或long型數字,都是變長存儲的。本來int 4個字節,long 8個字節,但ldb不會將前面全零的byte存下來。讓咱們來仔細看看,先看int:app

public static void writeVariableLengthInt(int value, SliceOutput sliceOutput)
        int highBitMask = 0x80;
        if (value < (1 << 7) && value >= 0) {
        else if (value < (1 << 14) && value > 0) {
            sliceOutput.writeByte(value | highBitMask);
            sliceOutput.writeByte(value >>> 7);
        else if (value < (1 << 21) && value > 0) {
            sliceOutput.writeByte(value | highBitMask);
            sliceOutput.writeByte((value >>> 7) | highBitMask);
            sliceOutput.writeByte(value >>> 14);
        else if (value < (1 << 28) && value > 0) {
            sliceOutput.writeByte(value | highBitMask);
            sliceOutput.writeByte((value >>> 7) | highBitMask);
            sliceOutput.writeByte((value >>> 14) | highBitMask);
            sliceOutput.writeByte(value >>> 21);
        else {
            sliceOutput.writeByte(value | highBitMask);
            sliceOutput.writeByte((value >>> 7) | highBitMask);
            sliceOutput.writeByte((value >>> 14) | highBitMask);
            sliceOutput.writeByte((value >>> 21) | highBitMask);
            sliceOutput.writeByte(value >>> 28);

    若是value < pow(2, 7),那麼直接將低位byte寫入,高位捨棄。若是pow(2, 7) < value < pow(2, 14),先或掩碼將低位byte最高位置1,再將剩餘的高位byte繼續寫入output,捨棄更高位,以此類推。其核心思想就是用最高位0仍是1決定一個變長數字的實際長度,若是爲0,則是一個新數字的開始。所以能夠理解讀變長int的過程。接下來是寫變長long型:less

public static void writeVariableLengthLong(long value, SliceOutput sliceOutput)
        // while value more than the first 7 bits set
        while ((value & (~0x7f)) != 0) {
            sliceOutput.writeByte((int) ((value & 0x7f) | 0x80));
            value >>>= 7;
        sliceOutput.writeByte((int) value);

    這個算法使顯然的,value & 0x7f 取後7位,再 | 0x80 將最高位置爲1,接着將value右移7位,高位補0。看的時候,,我再想兩個很愚蠢的問題- -、第一爲何是& (~0x7f)),而不是& 0x80...這個問題簡直是傻逼,我徹底忽略了位操做的位數要對齊。第二個問題,,爲何int和long要用兩種本質相同,而形式不一樣的實現呢,仍是不理解,明顯第二種更簡潔,並且效率更高。


public synchronized void addRecord(Slice record, boolean force)
            throws IOException
        Preconditions.checkState(!closed.get(), "Log has been closed");

        SliceInput sliceInput = record.input();

        // used to track first, middle and last blocks
        boolean begin = true;

        // Fragment the record int chunks as necessary and write it.  Note that if record
        // is empty, we still want to iterate once to write a single
        // zero-length chunk.
        do {
            int bytesRemainingInBlock = BLOCK_SIZE - blockOffset;
            Preconditions.checkState(bytesRemainingInBlock >= 0);

            // Switch to a new block if necessary
            if (bytesRemainingInBlock < HEADER_SIZE) {
                if (bytesRemainingInBlock > 0) {
                    // Fill the rest of the block with zeros
                    // todo lame... need a better way to write zeros
                    mappedByteBuffer.put(new byte[bytesRemainingInBlock]);
                blockOffset = 0;
                bytesRemainingInBlock = BLOCK_SIZE - blockOffset;

            // Invariant: we never leave less than HEADER_SIZE bytes available in a block
            int bytesAvailableInBlock = bytesRemainingInBlock - HEADER_SIZE;
            Preconditions.checkState(bytesAvailableInBlock >= 0);

            // if there are more bytes in the record then there are available in the block,
            // fragment the record; otherwise write to the end of the record
            boolean end;
            int fragmentLength;
            if (sliceInput.available() > bytesAvailableInBlock) {
                end = false;
                fragmentLength = bytesAvailableInBlock;
            else {
                end = true;
                fragmentLength = sliceInput.available();

            // determine block type
            LogChunkType type;
            if (begin && end) {
                type = LogChunkType.FULL;
            else if (begin) {
                type = LogChunkType.FIRST;
            else if (end) {
                type = LogChunkType.LAST;
            else {
                type = LogChunkType.MIDDLE;


updates.forEach(new InsertIntoHandler(memTable, sequenceBegin));


    最後會返回最新的sequence。寫操做返回。咱們能夠清晰的看到,ldb的寫操做是直接寫內存,只要Compaction不出現問題,保證level 0層文件保持穩定,就不會影響寫。

    但仍是有幾個遺留的問題:1.log日誌適用於recover的,但何時廢棄 2.恢復過程是如何讀log日誌的 3.變長int和long爲何形式不一樣,這些問題咱們慢慢研究,帶着問題看源碼,效果更好。??


case LOG:
                    keep = ((number >= versions.getLogNumber()) ||
                            (number == versions.getPrevLogNumber()));



    做爲引擎,ldb中提供完整的讀操做。所謂完整的讀操做是指,先從memTable中讀,若是沒讀到則讀immutableTable,還沒讀到則讀level0,進而讀level L。參見DBImpl的get方法。



public LookupResult get(LookupKey key)
        // We can search level-by-level since entries never hop across
        // levels.  Therefore we are guaranteed that if we find data
        // in an smaller level, later levels are irrelevant.
        ReadStats readStats = new ReadStats();
        LookupResult lookupResult = level0.get(key, readStats);
        if (lookupResult == null) {
            for (Level level : levels) {
                lookupResult = level.get(key, readStats);
                if (lookupResult != null) {
        updateStats(readStats.getSeekFileLevel(), readStats.getSeekFile());
        return lookupResult;

    點進去,level0.get()。咱們知道level0的文件是overlapping的,因此先遍歷全部文件,找到包含指定key的文件。而後對其按照時間進行排序 Collections.sort(fileMetaDataList, NEWEST_FIRST);接着遍歷全部包含key的文件,爲每一個文件建立iterator,這個iterator是蠻關鍵的,因此點進去看BlockIterator:

     * Reads the entry at the current data readIndex.
     * After this method, data readIndex is positioned at the beginning of the next entry
     * or at the end of data if there was not a next entry.
     * @return true if an entry was read
    private static BlockEntry readEntry(SliceInput data, BlockEntry previousEntry)
        Preconditions.checkNotNull(data, "data is null");

        // read entry header
        int sharedKeyLength = VariableLengthQuantity.readVariableLengthInt(data);
        int nonSharedKeyLength = VariableLengthQuantity.readVariableLengthInt(data);
        int valueLength = VariableLengthQuantity.readVariableLengthInt(data);

        // read key
        Slice key = Slices.allocate(sharedKeyLength + nonSharedKeyLength);
        SliceOutput sliceOutput = key.output();
        if (sharedKeyLength > 0) {
            Preconditions.checkState(previousEntry != null, "Entry has a shared key but no previous entry was provided");
            sliceOutput.writeBytes(previousEntry.getKey(), 0, sharedKeyLength);
        sliceOutput.writeBytes(data, nonSharedKeyLength);

        // read value
        Slice value = data.readSlice(valueLength);

        return new BlockEntry(key, value);

    首先read header,讀取三個長度。由於在level >= 1 文件中key都是有序的,因此ldb對key進行優化存儲,若是兩個能夠有相同的前綴(此處,僅描述成相同的前綴,詳見compaction),ldb不會將重複的部分重複存儲,而是經過記錄長度的方式,壓縮空間。sharedKeyLength顧名思義是key共享部分的長度,nonSharedKeyLength顯而易見。很顯然,這些數字也是變長的,前面說過寫變長變量的方法,讀取則很好理解了。每次都會傳入前一個key,是爲了獲取共享部分。後面的寫操做更顯然,分兩次拼接key,讀取value,返回entry。


     * Repositions the iterator so the key of the next BlockElement returned greater than or equal to the specified targetKey.
    public void seek(Slice targetKey)
        if (restartCount == 0) {

        int left = 0;
        int right = restartCount - 1;

        // binary search restart positions to find the restart position immediately before the targetKey
        while (left < right) {
            int mid = (left + right + 1) / 2;


            if (comparator.compare(nextEntry.getKey(), targetKey) < 0) {
                // key at mid is smaller than targetKey.  Therefore all restart
                // blocks before mid are uninteresting.
                left = mid;
            else {
                // key at mid is greater than or equal to targetKey.  Therefore
                // all restart blocks at or after mid are uninteresting.
                right = mid - 1;

        // linear search (within restart block) for first key greater than or equal to targetKey
        for (seekToRestartPosition(left); nextEntry != null; next()) {
            if (comparator.compare(peek().getKey(), targetKey) >= 0) {



// key restart count is the last int of the block
        int restartCount = block.getInt(block.length() - SIZE_OF_INT);

        if (restartCount > 0) {
            // restarts are written at the end of the block
            int restartOffset = block.length() - (1 + restartCount) * SIZE_OF_INT;
            Preconditions.checkArgument(restartOffset < block.length() - SIZE_OF_INT, "Block is corrupt: restart offset count is greater than block size");
            restartPositions = block.slice(restartOffset, restartCount * SIZE_OF_INT);

            // data starts at 0 and extends to the restart index
            data = block.slice(0, restartOffset);


    public BlockIterator iterator()
        return new BlockIterator(data, restartPositions, comparator);




    在DBImpl.get()、makeRoomForWrite()中都會調用maybeScheduleCompaction();看看DBImpl.get() :

// schedule compaction if necessary
        try {
            if (versions.needsCompaction()) {
        finally {


public boolean needsCompaction()
        return current.getCompactionScore() >= 1 || current.getFileToCompact() != null;


private void finalizeVersion(Version version)
        // Precomputed best level for next compaction
        int bestLevel = -1;
        double bestScore = -1;

        for (int level = 0; level < version.numberOfLevels() - 1; level++) {
            double score;
            if (level == 0) {
                // We treat level-0 specially by bounding the number of files
                // instead of number of bytes for two reasons:
                // (1) With larger write-buffer sizes, it is nice not to do too
                // many level-0 compactions.
                // (2) The files in level-0 are merged on every read and
                // therefore we wish to avoid too many files when the individual
                // file size is small (perhaps because of a small write-buffer
                // setting, or very high compression ratios, or lots of
                // overwrites/deletions).
                score = 1.0 * version.numberOfFilesInLevel(level) / L0_COMPACTION_TRIGGER;
            else {
                // Compute the ratio of current size to size limit.
                long levelBytes = 0;
                for (FileMetaData fileMetaData : version.getFiles(level)) {
                    levelBytes += fileMetaData.getFileSize();
                score = 1.0 * levelBytes / maxBytesForLevel(level);

            if (score > bestScore) {
                bestLevel = level;
                bestScore = score;


    level-0和其餘層次的分數計算方法是不一樣的,註釋中寫的很清楚,level-0採用文件數的緣由是1.寫緩衝越大,越不適合作level-0層的歸檔;2.level-0層文件每次讀都會執行歸檔,因此會避免過多的小的level-0層文件。其餘level的算法是計算層內文件的總的大小,maxBytesForLevel的算法是單個文件大小爲10MB,leve L層,最多容許容納pow(10, L)個文件,很好算出結果。

private boolean updateStats(int seekFileLevel, FileMetaData seekFile)
        if (seekFile == null) {
            return false;

        if (seekFile.getAllowedSeeks() <= 0 && fileToCompact == null) {
            fileToCompact = seekFile;
            fileToCompactLevel = seekFileLevel;
            return true;
        return false;


int allowedSeeks = (int) (fileMetaData.getFileSize() / 16384);
                if (allowedSeeks < 100) {
                    allowedSeeks = 100;


protected void maybeScheduleCompaction()

        if (backgroundCompaction != null) {
            // Already scheduled
        else if (shuttingDown.get()) {
            // DB is being shutdown; no more background compactions
        else if (immutableMemTable == null &&
                manualCompaction == null &&
                !versions.needsCompaction()) {
            // No work to be done
        else {
            backgroundCompaction = compactionExecutor.submit(new Callable<Void>()
                public Void call()
                        throws Exception
                    try {
                    catch (DatabaseShutdownException ignored) {
                    } catch (Throwable e) {
                    	backgroundException = e;
                    return null;
compactionExecutor = Executors.newSingleThreadExecutor(compactionThreadFactory);


protected void backgroundCompaction()
            throws IOException


        Compaction compaction;
        if (manualCompaction != null) {
            compaction = versions.compactRange(manualCompaction.level,
                    new InternalKey(manualCompaction.begin, MAX_SEQUENCE_NUMBER, ValueType.VALUE),
                    new InternalKey(manualCompaction.end, 0, ValueType.DELETION));
        } else {
            compaction = versions.pickCompaction();

        if (compaction == null) {
            // no compaction
        } else if (manualCompaction == null && compaction.isTrivialMove()) {
            // Move file to next level
            Preconditions.checkState(compaction.getLevelInputs().size() == 1);
            FileMetaData fileMetaData = compaction.getLevelInputs().get(0);
            compaction.getEdit().deleteFile(compaction.getLevel(), fileMetaData.getNumber());
            compaction.getEdit().addFile(compaction.getLevel() + 1, fileMetaData);
            // log
        } else {
            CompactionState compactionState = new CompactionState(compaction);

        // manual compaction complete
        if (manualCompaction != null) {
            manualCompaction = null;

    首先歸檔內存,再計算歸檔的層次和文件,而後執行compaction。一步一步分析,首先是compactMemTableInternal();compactMemTableInternal先將immutableTable寫入level-0。看meta = buildTable(mem, fileNumber); 建立FIleChannel,遍歷全部KV,這裏須要仔細看看,tableBuilder中作了一些細節邏輯的封裝,進入add()、finish()看看。

private FileMetaData buildTable(SeekingIterable<InternalKey, Slice> data, long fileNumber)
            throws IOException
        File file = new File(databaseDir, Filename.tableFileName(fileNumber));
        try {
            InternalKey smallest = null;
            InternalKey largest = null;
            FileChannel channel = new FileOutputStream(file).getChannel();
            try {
                TableBuilder tableBuilder = new TableBuilder(options, channel, new InternalUserComparator(internalKeyComparator));

                for (Entry<InternalKey, Slice> entry : data) {
                    // update keys
                    InternalKey key = entry.getKey();
                    if (smallest == null) {
                        smallest = key;
                    largest = key;

                    tableBuilder.add(key.encode(), entry.getValue());

            finally {
                try {
                finally {

            if (smallest == null) {
                return null;
            FileMetaData fileMetaData = new FileMetaData(fileNumber, file.length(), smallest, largest);

            // verify table can be opened


            return fileMetaData;

        catch (IOException e) {
            throw e;

    進入tableBuilder.add(k ,v): userComparator默認使用BytewiseComparator。

public void add(Slice key, Slice value)
            throws IOException
        Preconditions.checkNotNull(key, "key is null");
        Preconditions.checkNotNull(value, "value is null");

        Preconditions.checkState(!closed, "table is finished");

        if (entryCount > 0) {
        	assert (userComparator.compare(key, lastKey) > 0) : "key must be greater than last key";

        // If we just wrote a block, we can now add the handle to index block
        if (pendingIndexEntry) {
            Preconditions.checkState(dataBlockBuilder.isEmpty(), "Internal error: Table has a pending index entry but data block builder is empty");

            Slice shortestSeparator = userComparator.findShortestSeparator(lastKey, key);

            Slice handleEncoding = BlockHandle.writeBlockHandle(pendingHandle);
            indexBlockBuilder.add(shortestSeparator, handleEncoding);
            pendingIndexEntry = false;

        lastKey = key;
        dataBlockBuilder.add(key, value);

        int estimatedBlockSize = dataBlockBuilder.currentSizeEstimate();
        if (estimatedBlockSize >= blockSize) {

     Slice shortestSeparator = userComparator.findShortestSeparator(lastKey, key); 點進去:

    public Slice findShortestSeparator(
            Slice start,
            Slice limit)
        // Find length of common prefix
        int sharedBytes = BlockBuilder.calculateSharedBytes(start, limit);
        // Do not shorten if one string is a prefix of the other
        if (sharedBytes < Math.min(start.length(), limit.length())) {
            // if we can add one to the last shared byte without overflow and the two keys differ by more than
            // one increment at this location.
            int lastSharedByte = start.getUnsignedByte(sharedBytes);
            if (lastSharedByte < 0xff && lastSharedByte + 1 < limit.getUnsignedByte(sharedBytes)) {
            	Slice result = start.copySlice(0, sharedBytes + 1);
                result.setByte(sharedBytes, lastSharedByte + 1);

                assert (compare(result, limit) < 0) : "start must be less than last limit";
                return result;
        return start;

    經過依次對比byte,算出sharedBytes,這個數字也對應的第一個不相同的byte的下標。接着,判斷一個key是否爲另外一個key的前綴,若是是的不壓縮。而且要求第一個key的第一個不一樣byte+1 < 第二個key的第一個不一樣byte。舉例:若是start是"abc123",limit是"abc456",則符合壓縮的要求,返回的result爲"abc2"。前面說過的「前綴」實際上是不完整的。

    這段邏輯是在pendingIndexEntry == true的前提下執行的,看pendingIndexEntry的引用,發如今TableBuilder的flush()中將其置爲true,意味着,當向一個新的block中寫入記錄時,纔會觸發這段邏輯,將該block起始的能夠寫入indexBlockBuilder。這裏的lastKey指的是上一個塊最後一個key,findShortestSuccessor的邏輯很簡單,找到最後這個key第一個不等於0xff的byte,並返回前面全部的bytes。這樣作的目的是,節省空間!!這是ldb的一個優化策略。舉例:若是lastKey是"aqq",則shortSuccessor是"b"。且pendingHandle也只是在flush()時被從新賦值的。先看finish()和flush():

public void finish()
            throws IOException
        Preconditions.checkState(!closed, "table is finished");

        // flush current data block

        // mark table as closed
        closed = true;

        // write (empty) meta index block
        BlockBuilder metaIndexBlockBuilder = new BlockBuilder(256, blockRestartInterval, new BytewiseComparator());
        // TODO(postrelease): Add stats and other meta blocks
        BlockHandle metaindexBlockHandle = writeBlock(metaIndexBlockBuilder);

        // add last handle to index block
        if (pendingIndexEntry) {
            Slice shortSuccessor = userComparator.findShortSuccessor(lastKey);

            Slice handleEncoding = BlockHandle.writeBlockHandle(pendingHandle);
            indexBlockBuilder.add(shortSuccessor, handleEncoding);
            pendingIndexEntry = false;

        // write index block
        BlockHandle indexBlockHandle = writeBlock(indexBlockBuilder);

        // write footer
        Footer footer = new Footer(metaindexBlockHandle, indexBlockHandle);
        Slice footerEncoding = Footer.writeFooter(footer);
        position += fileChannel.write(footerEncoding.toByteBuffer());
private void flush()
            throws IOException
        Preconditions.checkState(!closed, "table is finished");
        if (dataBlockBuilder.isEmpty()) {

        Preconditions.checkState(!pendingIndexEntry, "Internal error: Table already has a pending index entry to flush");

        pendingHandle = writeBlock(dataBlockBuilder);
        pendingIndexEntry = true;

    進到 private BlockHandle writeBlock(BlockBuilder blockBuilder) :

private BlockHandle writeBlock(BlockBuilder blockBuilder)
            throws IOException
        // close the block
        Slice raw = blockBuilder.finish();

        // attempt to compress the block
        Slice blockContents = raw;
        CompressionType blockCompressionType = CompressionType.NONE;
        if (compressionType == CompressionType.SNAPPY) {
            try {
                int compressedSize = Snappy.compress(raw.getRawArray(), raw.getRawOffset(), raw.length(), compressedOutput.getRawArray(), 0);

                // Don't use the compressed data if compressed less than 12.5%,
                if (compressedSize < raw.length() - (raw.length() / 8)) {
                    blockContents = compressedOutput.slice(0, compressedSize);
                    blockCompressionType = CompressionType.SNAPPY;
            catch (IOException ignored) {
                // compression failed, so just store uncompressed form

        // create block trailer
        BlockTrailer blockTrailer = new BlockTrailer(blockCompressionType, crc32c(blockContents, blockCompressionType));
        Slice trailer = BlockTrailer.writeBlockTrailer(blockTrailer);

        // create a handle to this block
        BlockHandle blockHandle = new BlockHandle(position, blockContents.length());

        // write data and trailer
        position += fileChannel.write(new ByteBuffer[]{blockContents.toByteBuffer(), trailer.toByteBuffer()});

        // clean up state

        return blockHandle;


if (entryCount > 0) {
            else {


    從writeBlock出來,將pendingIndexEntry賦值爲true,標誌開始寫新的Block。從flush() f7出來,感受進度快多了,開心。固然Snappy和crc的算法是跳過了。。這兩個算法都不是ldb特有的,暫時放一放。接着寫metaIndexBlockBuilder,metaIndexBlockBuilder是空的。接着寫indexBlockBuilder,將lastKey、block的真實的offset和length寫入文件底部。在block的最後寫入metaIndexBlockBuilder和indexBlockBuilder的索引,即他們的開始位置和長度,封裝成footer寫入。

    再回頭看TableBuilder的add方法。它的核心是dataBlockBuilder.add(key, value);blockRestartInterval指的是兩個重啓點的間距,即能容納多少個不完整的key。blockRestartInterval默認是16。這個過程就很是好理解了,迭代存儲KV,判斷是否超過了間隔。存文件的格式,在讀操做已經詳細分析過了。對於一條記錄,依次寫入共享長度、非共享長度、value長度、非共享key、value。再更新狀態信息:

public void add(Slice key, Slice value)
        Preconditions.checkNotNull(key, "key is null");
        Preconditions.checkNotNull(value, "value is null");
        Preconditions.checkState(!finished, "block is finished");
        Preconditions.checkPositionIndex(restartBlockEntryCount, blockRestartInterval);

        Preconditions.checkArgument(lastKey == null || comparator.compare(key, lastKey) > 0, "key must be greater than last key");

        int sharedKeyBytes = 0;
        if (restartBlockEntryCount < blockRestartInterval) {
            sharedKeyBytes = calculateSharedBytes(key, lastKey);
        else {
            // restart prefix compression
            restartBlockEntryCount = 0;

        int nonSharedKeyBytes = key.length() - sharedKeyBytes;

        // write "<shared><non_shared><value_size>"
        VariableLengthQuantity.writeVariableLengthInt(sharedKeyBytes, block);
        VariableLengthQuantity.writeVariableLengthInt(nonSharedKeyBytes, block);
        VariableLengthQuantity.writeVariableLengthInt(value.length(), block);

        // write non-shared key bytes
        block.writeBytes(key, sharedKeyBytes, nonSharedKeyBytes);

        // write value bytes
        block.writeBytes(value, 0, value.length());

        // update last key
        lastKey = key;

        // update state

    最後finally,執行channal.force(true); 再驗證該文件是否能夠被打開,最後返回。buildTable過程結束。pendingOutputs變量應該是存儲正在刷盤的文件名,若是成功寫入文件,則將其刪除。在讀寫和conpact過程當中都有計算最小最大值,雖然這個邏輯簡單,但我以爲,,寫的仍是蠻好的。。:

if (smallest == null) {
                        smallest = key;
                    largest = key;


if (meta != null && meta.getFileSize() > 0) {
            Slice minUserKey = meta.getSmallest().getUserKey();
            Slice maxUserKey = meta.getLargest().getUserKey();
            if (base != null) {
                level = base.pickLevelForMemTableOutput(minUserKey, maxUserKey);
            edit.addFile(level, meta);

    貌似這個文件也可能記錄在其餘level中,進pickLevelForMemTableOutput看下。其目的是爲了將文件放入level x,且與level x+1有交集,但重疊的部分不要太大。這是爲了不,若是低層次某個文件與它下一層的全部文件沒有交集,則沒法達到歸檔刪除無效數據的做用,直接將文件從低層次移動到高層次而已。其實若是真的發生了這種狀況,compact也會避免這種無效的讀寫文件的。pickLevelForMemTableOutput的前提是這個文件跟level0層文件沒有交集,緣由是,在歸檔level0層文件時,會先找出全部相互覆蓋的文件,做爲input,ldb的策略是,低層次的input越多越好,這樣一次compaction的效率更高。後面也會有合理增長低層次input的算法,請留意。


Version version = new Version(this);
        Builder builder = new Builder(this, current);



public void apply(VersionEdit edit)
            // Update compaction pointers
            for (Entry<Integer, InternalKey> entry : edit.getCompactPointers().entrySet()) {
                Integer level = entry.getKey();
                InternalKey internalKey = entry.getValue();
                versionSet.compactPointers.put(level, internalKey);

            // Delete files
            for (Entry<Integer, Long> entry : edit.getDeletedFiles().entries()) {
                Integer level = entry.getKey();
                Long fileNumber = entry.getValue();
                // todo missing update to addedFiles?

            // Add new files
            for (Entry<Integer, FileMetaData> entry : edit.getNewFiles().entries()) {
                Integer level = entry.getKey();
                FileMetaData fileMetaData = entry.getValue();

                // We arrange to automatically compact this file after
                // a certain number of seeks.  Let's assume:
                //   (1) One seek costs 10ms
                //   (2) Writing or reading 1MB costs 10ms (100MB/s)
                //   (3) A compaction of 1MB does 25MB of IO:
                //         1MB read from this level
                //         10-12MB read from next level (boundaries may be misaligned)
                //         10-12MB written to next level
                // This implies that 25 seeks cost the same as the compaction
                // of 1MB of data.  I.e., one seek costs approximately the
                // same as the compaction of 40KB of data.  We are a little
                // conservative and allow approximately one seek for every 16KB
                // of data before triggering a compaction.
                int allowedSeeks = (int) (fileMetaData.getFileSize() / 16384);
                if (allowedSeeks < 100) {
                    allowedSeeks = 100;


    每一層都會保留compactPointer做爲compaction的指針,即從哪裏開始對該層次進行歸檔,ldb的歸檔策略是輪詢的,即會對低層次的文件按順序進行歸檔,若是達到最後,則返回第一個文件。後面會看到這種機制的實現。剛剛在writeLevel0Table中調了edit.addFile(level, meta);因此當遍歷getNewFiles時,會設置新文件的allowedSeeks,更新levels,再看saveTo:

         * Saves the current state in specified version.
        public void saveTo(Version version)
                throws IOException
            FileMetaDataBySmallestKey cmp = new FileMetaDataBySmallestKey(versionSet.internalKeyComparator);
            for (int level = 0; level < baseVersion.numberOfLevels(); level++) {

                // Merge the set of added files with the set of pre-existing files.
                // Drop any deleted files.  Store the result in *v.

                Collection<FileMetaData> baseFiles = baseVersion.getFiles().asMap().get(level);
                if (baseFiles == null) {
                    baseFiles = ImmutableList.of();
                SortedSet<FileMetaData> addedFiles = levels.get(level).addedFiles;
                if (addedFiles == null) {
                    addedFiles = ImmutableSortedSet.of();

                // files must be added in sorted order so assertion check in maybeAddFile works
                ArrayList<FileMetaData> sortedFiles = newArrayListWithCapacity(baseFiles.size() + addedFiles.size());
                Collections.sort(sortedFiles, cmp);

                for (FileMetaData fileMetaData : sortedFiles) {
                    maybeAddFile(version, level, fileMetaData);

                //#ifndef NDEBUG  todo
                // Make sure there is no overlap in levels > 0


public int compareTo(Slice that)
        if (this == that) {
            return 0;
        if (this.data == that.data && length == that.length && offset == that.offset) {
            return 0;

        int minLength = Math.min(this.length, that.length);
        for (int i = 0; i < minLength; i++) {
            int thisByte = 0xFF & this.data[this.offset + i];
            int thatByte = 0xFF & that.data[that.offset + i];
            if (thisByte != thatByte) {
                return (thisByte) - (thatByte);
        return this.length - that.length;


private void maybeAddFile(Version version, int level, FileMetaData fileMetaData)
                throws IOException
            if (levels.get(level).deletedFiles.contains(fileMetaData.getNumber())) {
                // File is deleted: do nothing
            else {
                List<FileMetaData> files = version.getFiles(level);
                if (level > 0 && !files.isEmpty()) {
                    // Must not overlap
                    boolean filesOverlap = versionSet.internalKeyComparator.compare(files.get(files.size() - 1).getLargest(), fileMetaData.getSmallest())  >= 0;
                    if (filesOverlap) {
                        // A memory compaction, while this compaction was running, resulted in a a database state that is
                        // incompatible with the compaction.  This is rare and expensive to detect while the compaction is
                        // running, so we catch here simply discard the work.
                        throw new IOException(String.format("Compaction is obsolete: Overlapping files %s and %s in level %s",
                                files.get(files.size() - 1).getNumber(),
                                fileMetaData.getNumber(), level));
                version.addFile(level, fileMetaData);


    接着看辣。manualCompaction,搜索它的引用,發現這個實際上是ldb爲用戶預留的一個接口實現,可讓用戶對compaction進行功能擴展,實現對指定層次的歸檔。  compaction = versions.compactRange,點進去看。getOverlappingInputs略過,關鍵看setupOtherInputs(level, levelInputs); setup的註釋也寫得很是清楚,在不擴展level+1層的輸入文件的同時,儘量擴大level層的輸入文件。expanded0是level+1層範圍映射回level的文件範圍,expanded1是expanded0再映射回level+1的範圍,經過對比expanded1和原來的levelUpInputs,來判斷是否增長了level+1的文件數,若是沒增長,則將expanded0做爲level的輸入。同時還會計算grandparents,進入Compaction的構造方法,關於grandparents用法會在後面提到。

public Compaction compactRange(int level, InternalKey begin, InternalKey end)
        List<FileMetaData> levelInputs = getOverlappingInputs(level, begin, end);
        if (levelInputs.isEmpty()) {
            return null;

        return setupOtherInputs(level, levelInputs);
private Compaction setupOtherInputs(int level, List<FileMetaData> levelInputs)
        Entry<InternalKey, InternalKey> range = getRange(levelInputs);
        InternalKey smallest = range.getKey();
        InternalKey largest = range.getValue();

        List<FileMetaData> levelUpInputs = getOverlappingInputs(level + 1, smallest, largest);

        // Get entire range covered by compaction
        range = getRange(levelInputs, levelUpInputs);
        InternalKey allStart = range.getKey();
        InternalKey allLimit = range.getValue();

        // See if we can grow the number of inputs in "level" without
        // changing the number of "level+1" files we pick up.
        if (!levelUpInputs.isEmpty()) {

            List<FileMetaData> expanded0 = getOverlappingInputs(level, allStart, allLimit);

            if (expanded0.size() > levelInputs.size()) {
                range = getRange(expanded0);
                InternalKey newStart = range.getKey();
                InternalKey newLimit = range.getValue();

                List<FileMetaData> expanded1 = getOverlappingInputs(level + 1, newStart, newLimit);
                if (expanded1.size() == levelUpInputs.size()) {
//              Log(options_->info_log,
//                  "Expanding@%d %d+%d to %d+%d\n",
//                  level,
//                  int(c->inputs_[0].size()),
//                  int(c->inputs_[1].size()),
//                  int(expanded0.size()),
//                  int(expanded1.size()));
                    smallest = newStart;
                    largest = newLimit;
                    levelInputs = expanded0;
                    levelUpInputs = expanded1;

                    range = getRange(levelInputs, levelUpInputs);
                    allStart = range.getKey();
                    allLimit = range.getValue();


        // Compute the set of grandparent files that overlap this compaction
        // (parent == level+1; grandparent == level+2)
        List<FileMetaData> grandparents = null;
        if (level + 2 < NUM_LEVELS) {
            grandparents = getOverlappingInputs(level + 2, allStart, allLimit);

//        if (false) {
//            Log(options_ - > info_log, "Compacting %d '%s' .. '%s'",
//                    level,
//                    EscapeString(smallest.Encode()).c_str(),
//                    EscapeString(largest.Encode()).c_str());
//        }

        Compaction compaction = new Compaction(current, level, levelInputs, levelUpInputs, grandparents);

        // Update the place where we will do the next compaction for this level.
        // We update this immediately instead of waiting for the VersionEdit
        // to be applied so that if the compaction fails, we will try a different
        // key range next time.
        compactPointers.put(level, largest);
        compaction.getEdit().setCompactPointer(level, largest);

        return compaction;

    再進入compaction = versions.pickCompaction();ldb的策略是,文件大小過大而觸發歸檔的優先於讀文件過多而觸發歸檔。從level層找到第一個比compact指針大的文件,若是沒找到,則說明已經到達最後,則將第一個文件加入輸入。若是沒有某個層次分數超過1,且觸發了seekCompaction,則將getFileToCompactLevel做爲輸入。一樣會執行setupOtherInputs,擴大level層的輸入。機制都是類似的。

public Compaction pickCompaction()
        // We prefer compactions triggered by too much data in a level over
        // the compactions triggered by seeks.
        boolean sizeCompaction = (current.getCompactionScore() >= 1);
        boolean seekCompaction = (current.getFileToCompact() != null);

        int level;
        List<FileMetaData> levelInputs;
        if (sizeCompaction) {
            level = current.getCompactionLevel();
            Preconditions.checkState(level >= 0);
            Preconditions.checkState(level + 1 < NUM_LEVELS);

            // Pick the first file that comes after compact_pointer_[level]
            levelInputs = newArrayList();
            for (FileMetaData fileMetaData : current.getFiles(level)) {
                if (!compactPointers.containsKey(level) ||
                        internalKeyComparator.compare(fileMetaData.getLargest(), compactPointers.get(level)) > 0) {
            if (levelInputs.isEmpty()) {
                // Wrap-around to the beginning of the key space
        else if (seekCompaction) {
            level = current.getFileToCompactLevel();
            levelInputs = ImmutableList.of(current.getFileToCompact());
        else {
            return null;

        // Files in level 0 may overlap each other, so pick up all overlapping ones
        if (level == 0) {
            Entry<InternalKey, InternalKey> range = getRange(levelInputs);
            // Note that the next call will discard the file we placed in
            // c->inputs_[0] earlier and replace it with an overlapping set
            // which will include the picked file.
            levelInputs = getOverlappingInputs(0, range.getKey(), range.getValue());


        Compaction compaction = setupOtherInputs(level, levelInputs);
        return compaction;




    protected Entry<InternalKey, Slice> getNextElement()
        Entry<InternalKey, Slice> result = null;
        ComparableIterator nextIterator = priorityQueue.poll();
        if (nextIterator != null) {
            result = nextIterator.next();
            if (nextIterator.hasNext()) {
        return result;


private void siftDown(int k, E x) {
        if (comparator != null)
            siftDownUsingComparator(k, x);
            siftDownComparable(k, x);

    再看Level0Iterator.ComparableInterator的compareTo方法,由於level0多個文件是相互覆蓋的,因此須要全部文件中最小的key,而LevelIterator則沒有用到優先級隊列,由於文件是有序的。咱們注意到這兩個Iterator繼承和實現都是相同的,均爲extends AbstractSeekingIterator<InternalKey, Slice> implements InternalIterator:

        public int compareTo(ComparableIterator that)
            int result = comparator.compare(this.nextElement.getKey(), that.nextElement.getKey());
            if (result == 0) {
                result = Ints.compare(this.ordinal, that.ordinal);
            return result;


    protected Entry<InternalKey, Slice> getNextElement()
        // note: it must be here & not where 'current' is assigned,
        // because otherwise we'll have called inputs.next() before throwing
        // the first NPE, and the next time around we'll call inputs.next()
        // again, incorrectly moving beyond the error.
        boolean currentHasNext = false;
        while (true) {
            if (current != null) {
                currentHasNext = current.hasNext();
            if (!(currentHasNext)) {
                if (index < files.size()) {
                    current = openNextFile();
                else {
            else {
        if (currentHasNext) {
            return current.next();
        else {
            // set current to empty iterator to avoid extra calls to user iterators
            current = null;
            return null;

    private InternalTableIterator openNextFile()
        FileMetaData fileMetaData = files.get(index);
        return tableCache.newIterator(fileMetaData);

    current.hasNext();指的是當前文件是否還有記錄,,而openNextFile則依次打開list files的文件。實際上是很好理解的。至此,獲取Iterator的過程解析結束,在解析的過程當中,也分析了Iterator是如何迭代文件的。繼續回到doCompactionWork。ldb還會優先將內存歸檔,防止在執行更高層次的文件歸檔以前,還有immutableTable須要歸檔,由於內存的歸檔會影響level0的文件數,進而影響後面的歸檔順序。


    若是是一個獨立的key,不重複,首先會進入First occurrence,將lastSequenceForKey賦值爲最大值。執行完一個判斷,lastSequenceForKey被置爲key的sequence。若是第二個key與前面相同,lastSequenceForKey必定是 <= smallestSnapshot。由於smallestSnapshot是version的當前最大sequence。因此新的key就覆蓋了老的相同的key。DELETE標籤執行的刪除操做也是在這裏刪除的。


boolean drop = false;
                // todo if key doesn't parse (it is corrupted),
                if (false /*!ParseInternalKey(key, &ikey)*/) {
                    // do not hide error keys
                    currentUserKey = null;
                    hasCurrentUserKey = false;
                    lastSequenceForKey = MAX_SEQUENCE_NUMBER;
                else {
                    if (!hasCurrentUserKey || internalKeyComparator.getUserComparator().compare(key.getUserKey(), currentUserKey) != 0) {
                        // First occurrence of this user key
                        currentUserKey = key.getUserKey();
                        hasCurrentUserKey = true;
                        lastSequenceForKey = MAX_SEQUENCE_NUMBER;

                    if (lastSequenceForKey <= compactionState.smallestSnapshot) {
                        // Hidden by an newer entry for same user key
                        drop = true; // (A)
                    else if (key.getValueType() == DELETION &&
                            key.getSequenceNumber() <= compactionState.smallestSnapshot &&
                            compactionState.compaction.isBaseLevelForKey(key.getUserKey())) {
                        // For this user key:
                        // (1) there is no data in higher levels
                        // (2) data in lower levels will have larger sequence numbers
                        // (3) data in layers that are being compacted here and have
                        //     smaller sequence numbers will be dropped in the next
                        //     few iterations of this loop (by rule (A) above).
                        // Therefore this deletion marker is obsolete and can be dropped.
                        drop = true;

                    lastSequenceForKey = key.getSequenceNumber();
// Returns true if the information we have available guarantees that
    // the compaction is producing data in "level+1" for which no data exists
    // in levels greater than "level+1".
    public boolean isBaseLevelForKey(Slice userKey)
        // Maybe use binary search to find right entry instead of linear search?
        UserComparator userComparator = inputVersion.getInternalKeyComparator().getUserComparator();
        for (int level = this.level + 2; level < NUM_LEVELS; level++) {
            List<FileMetaData> files = inputVersion.getFiles(level);
            while (levelPointers[level] < files.size()) {
                FileMetaData f = files.get(levelPointers[level]);
                if (userComparator.compare(userKey, f.getLargest().getUserKey()) <= 0) {
                    // We've advanced far enough
                    if (userComparator.compare(userKey, f.getSmallest().getUserKey()) >= 0) {
                        // Key falls in this file's range, so definitely not base level
                        return false;
        return true;


    還有兩個問題1.recover 2.cache,之後慢慢補充。??
