abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1, val writable: Boolean) extends Logging { // 每條記錄的大小 protected def entrySize: Int protected val lock = new ReentrantLock @volatile protected var mmap: MappedByteBuffer = { val newlyCreated = file.createNewFile() val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r") try { //若是文件是新建的,則分配空間。空間大小爲,最接近maxIndexSize的entrySize的倍數 if(newlyCreated) { if(maxIndexSize < entrySize) throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize) raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize)) } // 獲取file的大小。注意文件大小每次初始化爲maxIndexSize,可是當文件關閉時, // 會截斷掉多餘的數據,因此文件的大小不是同樣的 val len = raf.length() val idx = { // 實例MappedByteBuffer if (writable), 0, len) else, 0, len) } // 設置MappedByteBuffer的position值 if(newlyCreated) idx.position(0) else // 指向文件的最後位置(必須爲entrySize的倍數) idx.position(roundDownToExactMultiple(idx.limit, entrySize)) idx } finally { // 關閉RandomAccessFile文件。只要MappedByteBuffer沒被垃圾回收,文件實際上就不會關閉 CoreUtils.swallow(raf.close()) } } private def roundDownToExactMultiple(number: Int, factor: Int) = factor * (number / factor)
def swallow(log: (Object, Throwable) => Unit, action: => Unit) { try { action } catch { case e: Throwable => log(e.getMessage(), e) } }
// 根據mmap.limit和entrySize,計算出entry的最大值 @volatile private[this] var _maxEntries = mmap.limit / entrySize // 計算如今entry的數量 @volatile protected var _entries = mmap.position / entrySize // 是否數據存儲已滿 def isFull: Boolean = _entries >= _maxEntries def maxEntries: Int = _maxEntries def entries: Int = _entries
sealed trait IndexEntry { // We always use Long for both key and value to avoid boxing. def indexKey: Long def indexValue: Long } case class OffsetPosition(offset: Long, position: Int) extends IndexEntry { override def indexKey = offset override def indexValue = position.toLong }
class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true) extends AbstractIndex[Long, Int](file, baseOffset, maxIndexSize, writable) { override def entrySize = 8 // 添加紀錄 def append(offset: Long, position: Int) { inLock(lock) { require(!isFull, "Attempt to append to a full index (size = " + _entries + ").") if (_entries == 0 || offset > _lastOffset) { debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName)) // 計算對應baseOffset的偏移量,寫進內存映射中 mmap.putInt((offset - baseOffset).toInt) // 將position寫進內存映射中 mmap.putInt(position) // 更新_entries _entries += 1 // 更新_lastOffset _lastOffset = offset require(_entries * entrySize == mmap.position, entries + " entries but file position in index is " + mmap.position + ".") } else { throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s." .format(offset, entries, _lastOffset, file.getAbsolutePath)) } } }
// targetOffset爲要查找的offset def lookup(targetOffset: Long): OffsetPosition = { maybeLock(lock) { val idx = mmap.duplicate // 查找offset小於targetOffset的最大項位置 val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY) if(slot == -1) OffsetPosition(baseOffset, 0) else parseEntry(idx, slot).asInstanceOf[OffsetPosition] } } // 計算第n個entry的開始位置,而後讀取int值,即offsetDelta private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize) // 計算第n個entry的開始位置,而後略過offsetDelta,而後讀取int值,即position private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 4) // 返回第n個entry的數據,OffsetPosition的實例 override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = { // 這裏注意到,offset計算是baseOffset +offsetDelta OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n)) // 返回小於或等於target的記錄中,值最大的一個 protected def largestLowerBoundSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int = indexSlotRangeFor(idx, target, searchEntity)._1 // 二分法查找。返回結果的兩個數值,都是最接近target的。第一個值小於或等於target,第二個值大於或等於target private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = { // check if the index is empty if(_entries == 0) return (-1, -1) // 若是target比第一個entry的offfset還要小,說明不存在 if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) return (-1, 0) var lo = 0 var hi = _entries - 1 while(lo < hi) { val mid = ceil(hi/2.0 + lo/2.0).toInt val found = parseEntry(idx, mid) val compareResult = compareIndexEntry(found, target, searchEntity) if(compareResult > 0) hi = mid - 1 else if(compareResult < 0) // lo位置始終是小於target lo = mid else return (mid, mid) } (lo, if (lo == _entries - 1) -1 else lo + 1) }
def resize(newSize: Int) { inLock(lock) { val raf = new RandomAccessFile(file, "rw") // 計算newSize最多恰好容納entrySize的大小 val roundedNewSize = roundDownToExactMultiple(newSize, entrySize) // 記錄當前的position val position = mmap.position if (OperatingSystem.IS_WINDOWS) forceUnmap(mmap); try { // 若是roundedNewSize小於當前文件的大小,等同於文件截斷。 // 反之,等同於添加文件容量 raf.setLength(roundedNewSize) // 更新mmap mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) _maxEntries = mmap.limit / entrySize // 回覆當前的postion mmap.position(position) } finally { // 關閉文件 CoreUtils.swallow(raf.close()) } } } // 清空文件 override def truncate() = truncateToEntries(0) // 只保留entries個記錄 private def truncateToEntries(entries: Int) { inLock(lock) { // 更新屬性 _entries = entries mmap.position(_entries * entrySize) _lastOffset = lastEntry.offset } }