咱們在實際使用過程當中常常須要查詢某個topic的某分區的offset的range
命令行:html
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list xxxx:9092 -topic xxxtopic --time -2 kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list xxxx:9092 -topic xxxtopic --time -1
-1 -2 的特殊含義:java
public class ListOffsetRequest extends AbstractRequest { public static final long EARLIEST_TIMESTAMP = -2L; public static final long LATEST_TIMESTAMP = -1L; }
KafkaConsumer.endOffsets(Collection
KafkaConsumer.beginningOffsets(Collection
Fetcher.beginningOrEndOffset(Collection
Fetcher.retrieveOffsetsByTimes(Map<TopicPartition, Long>, long, boolean)
Fetcher.sendListOffsetRequests(boolean, Map<TopicPartition, Long>)
// Group the partitions by node. final Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = new HashMap<>(); for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) { TopicPartition tp = entry.getKey(); PartitionInfo info = metadata.fetch().partition(tp); if (info == null) { metadata.add(tp.topic()); log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", tp); return RequestFuture.staleMetadata(); } else if (info.leader() == null) { log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", tp); return RequestFuture.leaderNotAvailable(); } else { Node node = info.leader(); Map<TopicPartition, Long> topicData = timestampsToSearchByNode.get(node); if (topicData == null) { topicData = new HashMap<>(); timestampsToSearchByNode.put(node, topicData); } topicData.put(entry.getKey(), entry.getValue()); } } final RequestFuture<Map<TopicPartition, OffsetData>> listOffsetRequestsFuture = new RequestFuture<>(); final Map<TopicPartition, OffsetData> fetchedTimestampOffsets = new HashMap<>(); final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size()); for (Map.Entry<Node, Map<TopicPartition, Long>> entry : timestampsToSearchByNode.entrySet()) { sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps) .addListener(new RequestFutureListener<Map<TopicPartition, OffsetData>>() { @Override public void onSuccess(Map<TopicPartition, OffsetData> value) { synchronized (listOffsetRequestsFuture) { fetchedTimestampOffsets.putAll(value); if (remainingResponses.decrementAndGet() == 0 && !listOffsetRequestsFuture.isDone()) listOffsetRequestsFuture.complete(fetchedTimestampOffsets); } } @Override public void onFailure(RuntimeException e) { synchronized (listOffsetRequestsFuture) { // This may cause all the requests to be retried, but should be rare. if (!listOffsetRequestsFuture.isDone()) listOffsetRequestsFuture.raise(e); } } }); } return listOffsetRequestsFuture;
簡單點說:就是找到leader節點而後給其發送ListOffsetRequest
請求。這個請求是按時間進行offset定位。shell
KafkaApis.handleListOffsetRequestV1AndAbove(request: RequestChannel.Request)ide
這個值應該是在生產的時候維護好的fetch
val lastFetchableOffset = offsetRequest.isolationLevel match { case IsolationLevel.READ_COMMITTED => localReplica.lastStableOffset.messageOffset case IsolationLevel.READ_UNCOMMITTED => localReplica.highWatermark.messageOffset }
這個地方也能反映出 LEO,LSO,highwater的區別!!ui
kafka.log.Log.fetchOffsetsByTimestamp(targetTimestamp: Long)
這個值應該是在生產的時候維護好的spa
@threadsafe class Log(@volatile var dir: File, @volatile var config: LogConfig, @volatile var logStartOffset: Long, @volatile var recoveryPoint: Long, scheduler: Scheduler, brokerTopicStats: BrokerTopicStats, time: Time, val maxProducerIdExpirationMs: Int, val producerIdExpirationCheckIntervalMs: Int, val topicPartition: TopicPartition, val producerStateManager: ProducerStateManager, logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup { // ...... if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP) return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logStartOffset))
先肯定target segment命令行
val targetSeg = { // Get all the segments whose largest timestamp is smaller than target timestamp val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp) // We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one. if (earlierSegs.length < segmentsCopy.length) Some(segmentsCopy(earlierSegs.length)) else None }
再到seg的index根據時間查找
LogSegment.findOffsetByTimestamp(timestamp: Long, startingOffset: Long)
先定位到index而後再二分查找scala
// LogSegment.scala val timestampOffset = timeIndex.lookup(timestamp) val position = index.lookup(math.max(timestampOffset.offset, startingOffset)).position // AbstractIndex.scala /** * Lookup lower and upper bounds for the given target. */ private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = { // check if the index is empty if(_entries == 0) return (-1, -1) // check if the target offset is smaller than the least offset if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0) return (-1, 0) // binary search for the entry var lo = 0 var hi = _entries - 1 while(lo < hi) { val mid = ceil(hi/2.0 + lo/2.0).toInt val found = parseEntry(idx, mid) val compareResult = compareIndexEntry(found, target, searchEntity) if(compareResult > 0) hi = mid - 1 else if(compareResult < 0) lo = mid else return (mid, mid) } (lo, if (lo == _entries - 1) -1 else lo + 1) }