kafka存儲-segment

介紹

消息隊列在kafka中被稱爲Topic。由於kafka的分佈式,Topic會有多個Partition組成,分佈在不一樣的機器上。kafka爲了進一步的增長讀取效率,會將Partition分爲多個Segment。這篇文章將詳細的介紹Segment的消息的添加,查找和索引的恢復。app

添加消息

  • 添加消息到文件,調用FileRecords的append方法
  • 更新記錄到索引文件,調用OffsetIndex的append方法
class LogSegment(val log: FileRecords,
                 val index: OffsetIndex,
                 val timeIndex: TimeIndex,
                 val txnIndex: TransactionIndex,
                 val baseOffset: Long,
                 val indexIntervalBytes: Int,
                 val rollJitterMs: Long,
                 time: Time) extends Logging {

  def append(firstOffset: Long,  largestOffset: Long, largestTimestamp: Long,
                         shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
    if (records.sizeInBytes > 0) {
      trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d"
          .format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp))
      // 獲取FileRecord文件的末尾
      val physicalPosition = log.sizeInBytes()
      if (physicalPosition == 0)
        rollingBasedTimestamp = Some(largestTimestamp)
      // 檢查offset的範圍
      require(canConvertToRelativeOffset(largestOffset), "largest offset in message set can not be safely converted to relative offset.")
      // 調用FileRecords添加records
      val appendedBytes = log.append(records)
      trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset")
      // 更新最大的timestamp和對應的offset.
      if (largestTimestamp > maxTimestampSoFar) {
        maxTimestampSoFar = largestTimestamp
        offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
      }
      // index記錄是當對應的FileRecrods中的記錄大於indexIntervalBytes時,會添加新的index紀錄
      if(bytesSinceLastIndexEntry > indexIntervalBytes) {
        // 添加index紀錄
        index.append(firstOffset, physicalPosition)
        timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
        bytesSinceLastIndexEntry = 0
      }
      bytesSinceLastIndexEntry += records.sizeInBytes
    }
  }

  // 文件中存儲的是offset與baseOffset的差值,只能用四個字節表示
  def canConvertToRelativeOffset(offset: Long): Boolean = {
    (offset - baseOffset) <= Integer.MAX_VALUE
  }

}

查找消息位置

  • 從索引文件查找對應的索引記錄,調用OffsetIndex的lookup方法
  • 從文件中查找對應的RecordBatch,調用FileReocrds的searchForOffsetWithSize方法
class LogSegment  {

  private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogOffsetPosition = {
    // 查找對應的索引記錄
    val mapping = index.lookup(offset)
    // 從文件中查找RecordBatch
    log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
  }
}

class FileRecords {
    public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
        // 從指定的位置startingPosition,開始順序遍歷RecordBatch
        for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
            long offset = batch.lastOffset();
            if (offset >= targetOffset)
                // 直到找到第一個,batch的tlastOffset大於argetOffset
                // 返回 lastOffset, batch的開始位置,數據大小
                return new LogOffsetPosition(offset, batch.position(), batch.sizeInBytes());
        }
        return null;
    }
}

讀取消息

read方法提供了方便查找消息的做用, 它可以指定的條件中讀取消息。分佈式

  • 消息的offset的範圍startOffset和maxOffset
  • 消息在物理文件中的位置不能大於maxPosition
  • 返回的數據大小不能大於maxSize
  • 是否須要讀取一條完整的記錄,minOneMessage
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size,
           minOneMessage: Boolean = false): FetchDataInfo = {
    if (maxSize < 0)
      throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))

    val logSize = log.sizeInBytes
    // 查找startOffset的位置信息
    val startOffsetAndSize = translateOffset(startOffset)

    // if the start position is already off the end of the log, return null
    if (startOffsetAndSize == null)
      return null

    val startPosition = startOffsetAndSize.position
    val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)

    val adjustedMaxSize =
      // 若是須要保證讀取一條完整的數據,則至少須要startOffset的對應的ReocrdBatch的大小
      if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
      else maxSize

    if (adjustedMaxSize == 0)
      return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)

    val fetchSize: Int = maxOffset match {
      case None =>
        // 若是沒有指定maxOffset,則只須要考慮大小。
        min((maxPosition - startPosition).toInt, adjustedMaxSize)
      case Some(offset) =>
        if (offset < startOffset)
          return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false)
        // 獲取maxPosition對應的位置
        val mapping = translateOffset(offset, startPosition)
        val endPosition =
          if (mapping == null)
            logSize
          else
            mapping.position
        // 查找知足全部條件,即之間的最小值
        min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt
    }

    FetchDataInfo(offsetMetadata, log.read(startPosition, fetchSize),
      firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
  }

索引文件恢復

當索引文件被損壞時,kafka會自動重建索引。fetch

def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochCache] = None): Int = {
    // 截斷索引文件
    index.truncate()
    index.resize(index.maxIndexSize)
    // 截斷時間索引文件
    timeIndex.truncate()
    timeIndex.resize(timeIndex.maxIndexSize)
    // 截斷事務索引文件
    txnIndex.truncate()
    // 文件的讀取位置
    var validBytes = 0
    var lastIndexEntry = 0
    maxTimestampSoFar = RecordBatch.NO_TIMESTAMP
    try {
      // 遍歷數據文件的RecordBatch
      for (batch <- log.batches.asScala) {
        batch.ensureValid()

        // 更新最大的timestamp和對應的offset
        if (batch.maxTimestamp > maxTimestampSoFar) {
          maxTimestampSoFar = batch.maxTimestamp
          offsetOfMaxTimestamp = batch.lastOffset
        }

        // 若是數據間隔大小超過指定數值indexIntervalBytes,則添加索引記錄
        if(validBytes - lastIndexEntry > indexIntervalBytes) {
          val startOffset = batch.baseOffset
          index.append(startOffset, validBytes)
          timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
          lastIndexEntry = validBytes
        }
        // 更新validBytes
        validBytes += batch.sizeInBytes()

        if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
          leaderEpochCache.foreach { cache =>
            if (batch.partitionLeaderEpoch > cache.latestEpoch()) // this is to avoid unnecessary warning in cache.assign()
              cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
          }
          updateProducerState(producerStateManager, batch)
        }
      }
    } catch {
      case e: CorruptRecordException =>
        logger.warn("Found invalid messages in log segment %s at byte offset %d: %s."
          .format(log.file.getAbsolutePath, validBytes, e.getMessage))
    }

    // 檢查數據文件,是否有多於的數據
    val truncated = log.sizeInBytes - validBytes
    if (truncated > 0)
      logger.debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery")
    log.truncateTo(validBytes)
    index.trimToValidSize()
    timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true)
    timeIndex.trimToValidSize()
    truncated
  }

歸納

Segment是由數據文件FileReocrds和索引文件IndexOffset共同組成。當添加新的消息時,會更新二者。當查找消息時,也會充分的利用索引文件。ui

相關文章
相關標籤/搜索