消息隊列在kafka中被稱爲Topic。由於kafka的分佈式,Topic會有多個Partition組成,分佈在不一樣的機器上。kafka爲了進一步的增長讀取效率,會將Partition分爲多個Segment。這篇文章將詳細的介紹Segment的消息的添加,查找和索引的恢復。app
class LogSegment(val log: FileRecords, val index: OffsetIndex, val timeIndex: TimeIndex, val txnIndex: TransactionIndex, val baseOffset: Long, val indexIntervalBytes: Int, val rollJitterMs: Long, time: Time) extends Logging { def append(firstOffset: Long, largestOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = { if (records.sizeInBytes > 0) { trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d" .format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp)) // 獲取FileRecord文件的末尾 val physicalPosition = log.sizeInBytes() if (physicalPosition == 0) rollingBasedTimestamp = Some(largestTimestamp) // 檢查offset的範圍 require(canConvertToRelativeOffset(largestOffset), "largest offset in message set can not be safely converted to relative offset.") // 調用FileRecords添加records val appendedBytes = log.append(records) trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset") // 更新最大的timestamp和對應的offset. if (largestTimestamp > maxTimestampSoFar) { maxTimestampSoFar = largestTimestamp offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp } // index記錄是當對應的FileRecrods中的記錄大於indexIntervalBytes時,會添加新的index紀錄 if(bytesSinceLastIndexEntry > indexIntervalBytes) { // 添加index紀錄 index.append(firstOffset, physicalPosition) timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp) bytesSinceLastIndexEntry = 0 } bytesSinceLastIndexEntry += records.sizeInBytes } } // 文件中存儲的是offset與baseOffset的差值,只能用四個字節表示 def canConvertToRelativeOffset(offset: Long): Boolean = { (offset - baseOffset) <= Integer.MAX_VALUE } }
class LogSegment { private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogOffsetPosition = { // 查找對應的索引記錄 val mapping = index.lookup(offset) // 從文件中查找RecordBatch log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition)) } } class FileRecords { public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int startingPosition) { // 從指定的位置startingPosition,開始順序遍歷RecordBatch for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) { long offset = batch.lastOffset(); if (offset >= targetOffset) // 直到找到第一個,batch的tlastOffset大於argetOffset // 返回 lastOffset, batch的開始位置,數據大小 return new LogOffsetPosition(offset, batch.position(), batch.sizeInBytes()); } return null; } }
read方法提供了方便查找消息的做用, 它可以指定的條件中讀取消息。分佈式
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size, minOneMessage: Boolean = false): FetchDataInfo = { if (maxSize < 0) throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize)) val logSize = log.sizeInBytes // 查找startOffset的位置信息 val startOffsetAndSize = translateOffset(startOffset) // if the start position is already off the end of the log, return null if (startOffsetAndSize == null) return null val startPosition = startOffsetAndSize.position val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition) val adjustedMaxSize = // 若是須要保證讀取一條完整的數據,則至少須要startOffset的對應的ReocrdBatch的大小 if (minOneMessage) math.max(maxSize, startOffsetAndSize.size) else maxSize if (adjustedMaxSize == 0) return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY) val fetchSize: Int = maxOffset match { case None => // 若是沒有指定maxOffset,則只須要考慮大小。 min((maxPosition - startPosition).toInt, adjustedMaxSize) case Some(offset) => if (offset < startOffset) return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false) // 獲取maxPosition對應的位置 val mapping = translateOffset(offset, startPosition) val endPosition = if (mapping == null) logSize else mapping.position // 查找知足全部條件,即之間的最小值 min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt } FetchDataInfo(offsetMetadata, log.read(startPosition, fetchSize), firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size) }
當索引文件被損壞時,kafka會自動重建索引。fetch
def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochCache] = None): Int = { // 截斷索引文件 index.truncate() index.resize(index.maxIndexSize) // 截斷時間索引文件 timeIndex.truncate() timeIndex.resize(timeIndex.maxIndexSize) // 截斷事務索引文件 txnIndex.truncate() // 文件的讀取位置 var validBytes = 0 var lastIndexEntry = 0 maxTimestampSoFar = RecordBatch.NO_TIMESTAMP try { // 遍歷數據文件的RecordBatch for (batch <- log.batches.asScala) { batch.ensureValid() // 更新最大的timestamp和對應的offset if (batch.maxTimestamp > maxTimestampSoFar) { maxTimestampSoFar = batch.maxTimestamp offsetOfMaxTimestamp = batch.lastOffset } // 若是數據間隔大小超過指定數值indexIntervalBytes,則添加索引記錄 if(validBytes - lastIndexEntry > indexIntervalBytes) { val startOffset = batch.baseOffset index.append(startOffset, validBytes) timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp) lastIndexEntry = validBytes } // 更新validBytes validBytes += batch.sizeInBytes() if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) { leaderEpochCache.foreach { cache => if (batch.partitionLeaderEpoch > cache.latestEpoch()) // this is to avoid unnecessary warning in cache.assign() cache.assign(batch.partitionLeaderEpoch, batch.baseOffset) } updateProducerState(producerStateManager, batch) } } } catch { case e: CorruptRecordException => logger.warn("Found invalid messages in log segment %s at byte offset %d: %s." .format(log.file.getAbsolutePath, validBytes, e.getMessage)) } // 檢查數據文件,是否有多於的數據 val truncated = log.sizeInBytes - validBytes if (truncated > 0) logger.debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery") log.truncateTo(validBytes) index.trimToValidSize() timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true) timeIndex.trimToValidSize() truncated }
Segment是由數據文件FileReocrds和索引文件IndexOffset共同組成。當添加新的消息時,會更新二者。當查找消息時,也會充分的利用索引文件。ui