本文主要研究一下kafka的consumer.timeout.ms屬性。html
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerConfig.scalajava
/** a string that uniquely identifies a set of consumers within the same consumer group */ val groupId = props.getString("group.id") /** consumer id: generated automatically if not set. * Set this explicitly for only testing purpose. */ val consumerId: Option[String] = Option(props.getString("consumer.id", null)) /** the socket timeout for network requests. Its value should be at least fetch.wait.max.ms. */ val socketTimeoutMs = props.getInt("socket.timeout.ms", SocketTimeout) require(fetchWaitMaxMs <= socketTimeoutMs, "socket.timeout.ms should always be at least fetch.wait.max.ms" + " to prevent unnecessary socket timeouts") /** the socket receive buffer for network requests */ val socketReceiveBufferBytes = props.getInt("socket.receive.buffer.bytes", SocketBufferSize) /** the number of byes of messages to attempt to fetch */ val fetchMessageMaxBytes = props.getInt("fetch.message.max.bytes", FetchSize) /** the number threads used to fetch data */ val numConsumerFetchers = props.getInt("num.consumer.fetchers", NumConsumerFetchers) /** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */ val autoCommitEnable = props.getBoolean("auto.commit.enable", AutoCommit) /** the frequency in ms that the consumer offsets are committed to zookeeper */ val autoCommitIntervalMs = props.getInt("auto.commit.interval.ms", AutoCommitInterval) /** max number of message chunks buffered for consumption, each chunk can be up to fetch.message.max.bytes*/ val queuedMaxMessages = props.getInt("queued.max.message.chunks", MaxQueuedChunks) /** max number of retries during rebalance */ val rebalanceMaxRetries = props.getInt("rebalance.max.retries", MaxRebalanceRetries) /** the minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block */ val fetchMinBytes = props.getInt("fetch.min.bytes", MinFetchBytes) /** the maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy fetch.min.bytes */ val fetchWaitMaxMs = props.getInt("fetch.wait.max.ms", MaxFetchWaitMs) /** backoff time between retries during rebalance */ val rebalanceBackoffMs = props.getInt("rebalance.backoff.ms", zkSyncTimeMs) /** backoff time to refresh the leader of a partition after it loses the current leader */ val refreshLeaderBackoffMs = props.getInt("refresh.leader.backoff.ms", RefreshMetadataBackoffMs) /** backoff time to reconnect the offsets channel or to retry offset fetches/commits */ val offsetsChannelBackoffMs = props.getInt("offsets.channel.backoff.ms", OffsetsChannelBackoffMs) /** socket timeout to use when reading responses for Offset Fetch/Commit requests. This timeout will also be used for * the ConsumerMetdata requests that are used to query for the offset coordinator. */ val offsetsChannelSocketTimeoutMs = props.getInt("offsets.channel.socket.timeout.ms", OffsetsChannelSocketTimeoutMs) /** Retry the offset commit up to this many times on failure. This retry count only applies to offset commits during * shut-down. It does not apply to commits from the auto-commit thread. It also does not apply to attempts to query * for the offset coordinator before committing offsets. i.e., if a consumer metadata request fails for any reason, * it is retried and that retry does not count toward this limit. */ val offsetsCommitMaxRetries = props.getInt("offsets.commit.max.retries", OffsetsCommitMaxRetries) /** Specify whether offsets should be committed to "zookeeper" (default) or "kafka" */ val offsetsStorage = props.getString("offsets.storage", OffsetsStorage).toLowerCase /** If you are using "kafka" as offsets.storage, you can dual commit offsets to ZooKeeper (in addition to Kafka). This * is required during migration from zookeeper-based offset storage to kafka-based offset storage. With respect to any * given consumer group, it is safe to turn this off after all instances within that group have been migrated to * the new jar that commits offsets to the broker (instead of directly to ZooKeeper). */ val dualCommitEnabled = props.getBoolean("dual.commit.enabled", if (offsetsStorage == "kafka") true else false) /* what to do if an offset is out of range. smallest : automatically reset the offset to the smallest offset largest : automatically reset the offset to the largest offset anything else: throw exception to the consumer */ val autoOffsetReset = props.getString("auto.offset.reset", AutoOffsetReset) /** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */ val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs) /** * Client id is specified by the kafka consumer client, used to distinguish different clients */ val clientId = props.getString("client.id", groupId) /** Whether messages from internal topics (such as offsets) should be exposed to the consumer. */ val excludeInternalTopics = props.getBoolean("exclude.internal.topics", ExcludeInternalTopics) /** Select a strategy for assigning partitions to consumer streams. Possible values: range, roundrobin */ val partitionAssignmentStrategy = props.getString("partition.assignment.strategy", DefaultPartitionAssignmentStrategy)
val RefreshMetadataBackoffMs = 200 val SocketTimeout = 30 * 1000 val SocketBufferSize = 64*1024 val FetchSize = 1024 * 1024 val MaxFetchSize = 10*FetchSize val NumConsumerFetchers = 1 val DefaultFetcherBackoffMs = 1000 val AutoCommit = true val AutoCommitInterval = 60 * 1000 val MaxQueuedChunks = 2 val MaxRebalanceRetries = 4 val AutoOffsetReset = OffsetRequest.LargestTimeString val ConsumerTimeoutMs = -1 val MinFetchBytes = 1 val MaxFetchWaitMs = 100 val MirrorTopicsWhitelist = "" val MirrorTopicsBlacklist = "" val MirrorConsumerNumThreads = 1 val OffsetsChannelBackoffMs = 1000 val OffsetsChannelSocketTimeoutMs = 10000 val OffsetsCommitMaxRetries = 5 val OffsetsStorage = "zookeeper" val MirrorTopicsWhitelistProp = "mirror.topics.whitelist" val MirrorTopicsBlacklistProp = "mirror.topics.blacklist" val ExcludeInternalTopics = true val DefaultPartitionAssignmentStrategy = "range" /* select between "range", and "roundrobin" */ val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads" val DefaultClientId = ""
其中consumerTimeoutMs的解釋爲throw a timeout exception to the consumer if no message is available for consumption after the specified interval,其默認值爲-1apache
-1的話,表示若是沒有消息,一直阻塞等待,這裏的等待是ConsumerIterator裏頭的hasNext方法,而不是next方法session
def hasNext(): Boolean = { if(state == FAILED) throw new IllegalStateException("Iterator is in failed state") state match { case DONE => false case READY => true case _ => maybeComputeNext() } } protected def makeNext(): T def maybeComputeNext(): Boolean = { state = FAILED nextItem = makeNext() if(state == DONE) { false } else { state = READY true } }
這裏委託給了子類的makeNext方法app
kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerIterator.scalasocket
protected def makeNext(): MessageAndMetadata[K, V] = { var currentDataChunk: FetchedDataChunk = null // if we don't have an iterator, get one var localCurrent = current.get() if(localCurrent == null || !localCurrent.hasNext) { if (consumerTimeoutMs < 0) currentDataChunk = channel.take else { currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS) if (currentDataChunk == null) { // reset state to make the iterator re-iterable resetState() throw new ConsumerTimeoutException } } if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) { debug("Received the shutdown command") return allDone } else { currentTopicInfo = currentDataChunk.topicInfo val cdcFetchOffset = currentDataChunk.fetchOffset val ctiConsumeOffset = currentTopicInfo.getConsumeOffset if (ctiConsumeOffset < cdcFetchOffset) { error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data" .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) currentTopicInfo.resetConsumeOffset(cdcFetchOffset) } localCurrent = currentDataChunk.messages.iterator current.set(localCurrent) } // if we just updated the current chunk and it is empty that means the fetch size is too small! if(currentDataChunk.messages.validBytes == 0) throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " + "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow." .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset)) } var item = localCurrent.next() // reject the messages that have already been consumed while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) { item = localCurrent.next() } consumedOffset = item.nextOffset item.message.ensureValid() // validate checksum of message to ensure it is valid new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder) }
這個方法就是提早準備好nextItemide
當consumerTimeoutMs小於0的時候,調用的是channel.take,大於0的時候調用的是channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)svn
而channel是BlockingQueue[FetchedDataChunk]fetch
當取不到nextItem的時候,拋出ConsumerTimeoutExceptionui
override def next(): MessageAndMetadata[K, V] = { val item = super.next() if(consumedOffset < 0) throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset)) currentTopicInfo.resetConsumeOffset(consumedOffset) val topic = currentTopicInfo.topic trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark() consumerTopicStats.getConsumerAllTopicStats().messageRate.mark() item }
next方法首先調用的父類的next方法
kafka_2.10-0.8.2.2-sources.jar!/kafka/utils/IteratorTemplate.scala
def next(): T = { if(!hasNext()) throw new NoSuchElementException() state = NOT_READY if(nextItem == null) throw new IllegalStateException("Expected item but none found.") nextItem }
而next方法首先調用的hasNext方法,也就是提早準備下一個元素。
因此無論怎樣,阻塞是在hasNext方法裏頭
Properties props = new Properties(); props.put("zookeeper.connect", zk); // props.put("auto.offset.reset","smallest"); props.put("group.id",group); props.put("zookeeper.session.timeout.ms", "10000"); props.put("zookeeper.sync.time.ms", "2000"); props.put("auto.commit.interval.ms", "10000"); props.put("consumer.timeout.ms","10000"); //設置ConsumerIterator的hasNext的超時時間,不設置則永遠阻塞直到有新消息來 props.put(org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range"); ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(props); ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, consumerCount); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector .createMessageStreams(topicCountMap); consumerMap.get(topic).stream().forEach(stream -> { pool.submit(new Runnable() { @Override public void run() { ConsumerIterator<byte[], byte[]> it = stream.iterator(); //it.hasNext()取決於consumer.timeout.ms的值,默認爲-1 try{ while (it.hasNext()) { System.out.println(Thread.currentThread().getName()+" hello"); //是hasNext拋出異常,而不是next拋出 System.out.println(Thread.currentThread().getName()+":"+new String(it.next().message())); } }catch (ConsumerTimeoutException e){ e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" end"); } }); });
kafka.consumer.ConsumerTimeoutException at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:69) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) pool-2-thread-3 end at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at com.example.demo.NativeConsumer$1.run(NativeConsumer.java:49) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) pool-2-thread-2 end pool-2-thread-4 end pool-2-thread-1 end kafka.consumer.ConsumerTimeoutException at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:69) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at com.example.demo.NativeConsumer$1.run(NativeConsumer.java:49) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) kafka.consumer.ConsumerTimeoutException at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:69) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at com.example.demo.NativeConsumer$1.run(NativeConsumer.java:49) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) kafka.consumer.ConsumerTimeoutException at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:69) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at com.example.demo.NativeConsumer$1.run(NativeConsumer.java:49) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
這裏只復現了直接調用hasNext拋出的ConsumerTimeoutException,能夠理解爲hasNext這裏提早準備了nextItem,而後只要hasNext返回true,則next方法通常是有值的。