kafka存儲-日誌索引

介紹

Partition是由多個Segment組成,Segment又是由數據文件,索引文件組成。app

數據文件是以.log結尾,索引文件是由.index結尾。dom

OffsetIndex

OffsetIndex表示的就是一個索引文件ide

AbstractIndex

OffsetIndex繼承AbstractIndex,它使用了內存映射MappedByteBuffer讀取索引文件。函數

abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1, val writable: Boolean)
    extends Logging {

  // 每條記錄的大小
  protected def entrySize: Int

  protected val lock = new ReentrantLock

  @volatile
  protected var mmap: MappedByteBuffer = {
    val newlyCreated = file.createNewFile()
    val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r")
    try {
      //若是文件是新建的,則分配空間。空間大小爲,最接近maxIndexSize的entrySize的倍數
      if(newlyCreated) {
        if(maxIndexSize < entrySize)
          throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
        raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
      }
      // 獲取file的大小。注意文件大小每次初始化爲maxIndexSize,可是當文件關閉時,
      // 會截斷掉多餘的數據,因此文件的大小不是同樣的
      val len = raf.length()
      val idx = {
        // 實例MappedByteBuffer
        if (writable)
          raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
        else
          raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, len)
      }
      // 設置MappedByteBuffer的position值
      if(newlyCreated)
        idx.position(0)
      else
        // 指向文件的最後位置(必須爲entrySize的倍數)
        idx.position(roundDownToExactMultiple(idx.limit, entrySize))
      idx
    } finally {
      // 關閉RandomAccessFile文件。只要MappedByteBuffer沒被垃圾回收,文件實際上就不會關閉
      CoreUtils.swallow(raf.close())
    }
  }

  private def roundDownToExactMultiple(number: Int, factor: Int) = factor * (number / factor)

swallow接受傳遞的函數action,執行action。若是有異常,僅僅記錄下來,不拋出。post

def swallow(log: (Object, Throwable) => Unit, action: => Unit) {
    try {
      action
    } catch {
      case e: Throwable => log(e.getMessage(), e)
    }
  }
// 根據mmap.limit和entrySize,計算出entry的最大值
  @volatile
  private[this] var _maxEntries = mmap.limit / entrySize

  // 計算如今entry的數量
  @volatile
  protected var _entries = mmap.position / entrySize

  // 是否數據存儲已滿
  def isFull: Boolean = _entries >= _maxEntries

  def maxEntries: Int = _maxEntries

  def entries: Int = _entries

OffsetPosition

OffsetPosition有兩個屬性ui

  • offset,表示record的偏移量
  • position,表示對應數據文件的物理位置
sealed trait IndexEntry {
  // We always use Long for both key and value to avoid boxing.
  def indexKey: Long
  def indexValue: Long
}

case class OffsetPosition(offset: Long, position: Int) extends IndexEntry {
  override def indexKey = offset
  override def indexValue = position.toLong
}

OffsetIndex 添加記錄

OffsetIndex的每條記錄的大小爲8byte。this

  • offsetDelta,消息offset對應於baseOffset的差值
  • position,對應數據文件的物理位置
class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
    extends AbstractIndex[Long, Int](file, baseOffset, maxIndexSize, writable) {

  override def entrySize = 8

  // 添加紀錄  
  def append(offset: Long, position: Int) {
    inLock(lock) {
      require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
      if (_entries == 0 || offset > _lastOffset) {
        debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
        // 計算對應baseOffset的偏移量,寫進內存映射中
        mmap.putInt((offset - baseOffset).toInt)
         // 將position寫進內存映射中
        mmap.putInt(position)
        // 更新_entries
        _entries += 1
        // 更新_lastOffset
        _lastOffset = offset
        require(_entries * entrySize == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")
      } else {
        throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."
          .format(offset, entries, _lastOffset, file.getAbsolutePath))
      }
    }
  }

OffsetIndex 查找記錄

OffsetIndex是做爲數據文件的索引存在的。固然它只是存儲了數據文件的一部分。當兩條數據在數據文件的物理位置,相差大於必定的數值(由indexInterval配置),就會添加一條索引記錄。固然既然做爲索引,下面詳細講解索引的查找過程。scala

// targetOffset爲要查找的offset
  def lookup(targetOffset: Long): OffsetPosition = {
    maybeLock(lock) {
      val idx = mmap.duplicate
      // 查找offset小於targetOffset的最大項位置
      val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY)
      if(slot == -1)
        OffsetPosition(baseOffset, 0)
      else
        parseEntry(idx, slot).asInstanceOf[OffsetPosition]
    }
  }
  // 計算第n個entry的開始位置,而後讀取int值,即offsetDelta
  private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize)
  // 計算第n個entry的開始位置,而後略過offsetDelta,而後讀取int值,即position
  private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 4)
  // 返回第n個entry的數據,OffsetPosition的實例
  override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = {
      // 這裏注意到,offset計算是baseOffset +offsetDelta
      OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))

  // 返回小於或等於target的記錄中,值最大的一個
  protected def largestLowerBoundSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int =
    indexSlotRangeFor(idx, target, searchEntity)._1

  // 二分法查找。返回結果的兩個數值,都是最接近target的。第一個值小於或等於target,第二個值大於或等於target
  private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {
    // check if the index is empty
    if(_entries == 0)
      return (-1, -1)

    // 若是target比第一個entry的offfset還要小,說明不存在
    if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
      return (-1, 0)

    var lo = 0
    var hi = _entries - 1
    while(lo < hi) {
      val mid = ceil(hi/2.0 + lo/2.0).toInt
      val found = parseEntry(idx, mid)
      val compareResult = compareIndexEntry(found, target, searchEntity)
      if(compareResult > 0)
        hi = mid - 1
      else if(compareResult < 0)
        // lo位置始終是小於target
        lo = mid
      else
        return (mid, mid)
    }

    (lo, if (lo == _entries - 1) -1 else lo + 1)
  }

文件大小調整

def resize(newSize: Int) {
    inLock(lock) {
      val raf = new RandomAccessFile(file, "rw")
      // 計算newSize最多恰好容納entrySize的大小
      val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
      // 記錄當前的position
      val position = mmap.position

      if (OperatingSystem.IS_WINDOWS)
        forceUnmap(mmap);
      try {
        // 若是roundedNewSize小於當前文件的大小,等同於文件截斷。
        // 反之,等同於添加文件容量
        raf.setLength(roundedNewSize)
        // 更新mmap
        mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
        _maxEntries = mmap.limit / entrySize
        // 回覆當前的postion
        mmap.position(position)
      } finally {
        // 關閉文件
        CoreUtils.swallow(raf.close())
      }
    }
  }

  // 清空文件
  override def truncate() = truncateToEntries(0)

  // 只保留entries個記錄
  private def truncateToEntries(entries: Int) {
    inLock(lock) {
      // 更新屬性
      _entries = entries
      mmap.position(_entries * entrySize)
      _lastOffset = lastEntry.offset
    }
  }

歸納

OffsetIndex是數據文件的索引,目的是爲了提升查找的效率。OffsetIndex爲了節省空間,只是間隔性的記錄一些數據的索引。debug

OffsetIndex爲了提升讀取索引文件的速度,底層改用了內存映射的機制。code

OffsetIndex是根據數據的offset來查找數據文件的物理位置。它會根據offset,查找出小於或等於offset,而且最接近offset的值。

相關文章
相關標籤/搜索