前面一篇文章討論了ConsumerFetcherManager的MaxLag與ConsumerOffsetChecker的lag值的區別。可是關於MaxLag的值尚未講的太透徹,這裏再深刻一下,如何讓ConsumerFetcherManager的MaxLag有值。git
kafka_2.10-0.8.2.2-sources.jar!/kafka/server/AbstractFetcherThread.scalagithub
override def doWork() { inLock(partitionMapLock) { if (partitionMap.isEmpty) partitionMapCond.await(200L, TimeUnit.MILLISECONDS) partitionMap.foreach { case((topicAndPartition, offset)) => fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, offset, fetchSize) } } val fetchRequest = fetchRequestBuilder.build() if (!fetchRequest.requestInfo.isEmpty) processFetchRequest(fetchRequest) }
值得注意,這裏構建了fetchRequest
這裏的partitionMap,key是TopicAndPartition,value就是本地最大的offset
每次拉取的時候,以本地已經拉取的最大值,還有拉取大小構造fetchRequest
kafka_2.10-0.8.2.2-sources.jar!/kafka/api/FetchRequest.scalaapache
def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int) = { requestMap.put(TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) this }
能夠看到這裏的offset與fetchSize決定了這個fetcher從broker拉取數據的開始位置和拉取數據的條數。
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerFetcherThread.scalaapi
class ConsumerFetcherThread(name: String, val config: ConsumerConfig, sourceBroker: Broker, partitionMap: Map[TopicAndPartition, PartitionTopicInfo], val consumerFetcherManager: ConsumerFetcherManager) extends AbstractFetcherThread(name = name, clientId = config.clientId, sourceBroker = sourceBroker, socketTimeout = config.socketTimeoutMs, socketBufferSize = config.socketReceiveBufferBytes, fetchSize = config.fetchMessageMaxBytes, fetcherBrokerId = Request.OrdinaryConsumerId, maxWait = config.fetchWaitMaxMs, minBytes = config.fetchMinBytes, isInterruptible = true) { //... }
這裏使用的fetchSize來自config.fetchMessageMaxBytes
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerConfig.scalaapp
class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(props) { //... /** the number of byes of messages to attempt to fetch */ val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize) } object ConsumerConfig extends Config { val RefreshMetadataBackoffMs = 200 val SocketTimeout = 30 * 1000 val SocketBufferSize = 64*1024 val FetchSize = 1024 * 1024 val MaxFetchSize = 10*FetchSize val NumConsumerFetchers = 1 val DefaultFetcherBackoffMs = 1000 val AutoCommit = true val AutoCommitInterval = 60 * 1000 val MaxQueuedChunks = 2 val MaxRebalanceRetries = 4 val AutoOffsetReset = OffsetRequest.LargestTimeString val ConsumerTimeoutMs = -1 val MinFetchBytes = 1 val MaxFetchWaitMs = 100 val MirrorTopicsWhitelist = "" val MirrorTopicsBlacklist = "" val MirrorConsumerNumThreads = 1 val OffsetsChannelBackoffMs = 1000 val OffsetsChannelSocketTimeoutMs = 10000 val OffsetsCommitMaxRetries = 5 val OffsetsStorage = "zookeeper" val MirrorTopicsWhitelistProp = "mirror.topics.whitelist" val MirrorTopicsBlacklistProp = "mirror.topics.blacklist" val ExcludeInternalTopics = true val DefaultPartitionAssignmentStrategy = "range" /* select between "range", and "roundrobin" */ val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads" val DefaultClientId = "" //... }
這個fetchSize默認是1024 * 1024,也就是1048576,即每次fetch的時候拉取1048576這麼多條。
private def processFetchRequest(fetchRequest: FetchRequest) { val partitionsWithError = new mutable.HashSet[TopicAndPartition] var response: FetchResponse = null try { trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) response = simpleConsumer.fetch(fetchRequest) } catch { case t: Throwable => if (isRunning.get) { warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString)) partitionMapLock synchronized { partitionsWithError ++= partitionMap.keys } } } fetcherStats.requestRate.mark() if (response != null) { // process fetched data inLock(partitionMapLock) { response.data.foreach { case(topicAndPartition, partitionData) => val (topic, partitionId) = topicAndPartition.asTuple val currentOffset = partitionMap.get(topicAndPartition) // we append to the log if the current offset is defined and it is the same as the offset requested during fetch if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) { partitionData.error match { case ErrorMapping.NoError => try { val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] val validBytes = messages.validBytes //這裏請求以後,若是返回數據爲空,那麼newOffset就是取本地最大的offset val newOffset = messages.shallowIterator.toSeq.lastOption match { case Some(m: MessageAndOffset) => m.nextOffset case None => currentOffset.get } partitionMap.put(topicAndPartition, newOffset) fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset fetcherStats.byteRate.mark(validBytes) //下面這個方法將拉回來的數據放進隊列 // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread processPartitionData(topicAndPartition, currentOffset.get, partitionData) } catch { case ime: InvalidMessageException => // we log the error and continue. This ensures two things // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and // should get fixed in the subsequent fetches logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage) case e: Throwable => throw new KafkaException("error processing data for partition [%s,%d] offset %d" .format(topic, partitionId, currentOffset.get), e) } case ErrorMapping.OffsetOutOfRangeCode => try { val newOffset = handleOffsetOutOfRange(topicAndPartition) partitionMap.put(topicAndPartition, newOffset) error("Current offset %d for partition [%s,%d] out of range; reset offset to %d" .format(currentOffset.get, topic, partitionId, newOffset)) } catch { case e: Throwable => error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) partitionsWithError += topicAndPartition } case _ => if (isRunning.get) { error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, ErrorMapping.exceptionFor(partitionData.error).getClass)) partitionsWithError += topicAndPartition } } } } } } if(partitionsWithError.size > 0) { debug("handling partitions with error for %s".format(partitionsWithError)) handlePartitionsWithErrors(partitionsWithError) } }
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerFetcherThread.scalasocket
// process fetched data def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) { val pti = partitionMap(topicAndPartition) if (pti.getFetchOffset != fetchOffset) throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d" .format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset)) pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) }
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/PartitionTopicInfo.scalaide
/** * Enqueue a message set for processing. */ def enqueue(messages: ByteBufferMessageSet) { val size = messages.validBytes if(size > 0) { val next = messages.shallowIterator.toSeq.last.nextOffset trace("Updating fetch offset = " + fetchedOffset.get + " to " + next) chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get)) fetchedOffset.set(next) debug("updated fetch offset of (%s) to %d".format(this, next)) consumerTopicStats.getConsumerTopicStats(topic).byteRate.mark(size) consumerTopicStats.getConsumerAllTopicStats().byteRate.mark(size) } else if(messages.sizeInBytes > 0) { chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get)) } }
若是數據爲空,則不放進隊列
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ZookeeperConsumerConnector.scala源碼分析
def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) : Map[String,List[KafkaStream[K,V]]] = { debug("entering consume ") if (topicCountMap == null) throw new RuntimeException("topicCountMap is null") val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap) val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic // make a list of (queue,stream) pairs, one pair for each threadId val queuesAndStreams = topicThreadIds.values.map(threadIdSet => threadIdSet.map(_ => { val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) val stream = new KafkaStream[K,V]( queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId) (queue, stream) }) ).flatten.toList val dirs = new ZKGroupDirs(config.groupId) registerConsumerInZK(dirs, consumerIdString, topicCount) reinitializeConsumer(topicCount, queuesAndStreams) loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]] }
queue在這裏建立了,大小爲config.queuedMaxMessages
/** max number of message chunks buffered for consumption, each chunk can be up to fetch.message.max.bytes*/ val queuedMaxMessages = props.getInt("queued.max.message.chunks", MaxQueuedChunks) val MaxQueuedChunks = 2
默認隊列最大隻能有2個FetchedDataChunk
而每一個FetchedDataChunk裏頭最大的消息數目就是fetchSize大小也就是1024*1024
也就是說每一個消費線程的chunkQueue裏頭默認最大的消息數目爲2 10241024當超過這個數目的時候,enquue就會阻塞,這樣就造成了對整個fetch的拉取速度的控制。fetch
要使得這個有值的話,那就是修改fetch.message.max.bytes的值,改小一點。好比ui
props.put("fetch.message.max.bytes","10"); props.put("queued.max.message.chunks","1");
那麼每次只拉10條消息,假設目前的lag以下
Group Topic Pid Offset logSize Lag Owner mgroup mtopic 0 353 8727 8374 demo-1514550322182-6d67873d-0 mgroup mtopic 1 258 8702 8444 demo-1514550322182-6d67873d-1 mgroup mtopic 2 307 8615 8308 demo-1514550322182-6d67873d-2
拉取一次以後
val newOffset = messages.shallowIterator.toSeq.lastOption match { case Some(m: MessageAndOffset) => m.nextOffset case None => currentOffset.get } partitionMap.put(topicAndPartition, newOffset) fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
這裏的nextOffset = offset + 1,也就是拉取回來的最大offset+1 = 259,hw的話是8702,那麼lag值就是8702-259=8443
這裏爲了復現,讓消費線程拉取一條以後拋異常退出
生產環境注意根據消息大小以及環境內存等對以下參數進行配置,不然很容易引起OOM
另外關於ConsumerFetcherManager的MaxLag,只有在上面兩個參數合理設置的狀況下,才能對監控有點點幫助(chunkQueue越小越能從MaxLag反應消費者消費滯後的狀況;不然只能反應client fetcher thread的消息拉取的滯後狀況;不過設置過小的話就得頻繁拉取,影響消費者消費,能夠根據狀況適中調整
)。從實際場景來看,仍是通常比較少改動參數的話,那麼仍是得以ConsumerOffsetChecker的lag值作消費者消費滯後的監控才準確。