本文主要討論一下kafka consumer offset lag的監控apache
object ConsumerOffsetChecker extends Logging { private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map() private val offsetMap: mutable.Map[TopicAndPartition, Long] = mutable.Map() private var topicPidMap: immutable.Map[String, Seq[Int]] = immutable.Map() private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = { //... } private def processPartition(zkClient: ZkClient, group: String, topic: String, pid: Int) { //... } private def processTopic(zkClient: ZkClient, group: String, topic: String) { topicPidMap.get(topic) match { case Some(pids) => pids.sorted.foreach { pid => processPartition(zkClient, group, topic, pid) } case None => // ignore } } private def printBrokerInfo() { println("BROKER INFO") for ((bid, consumerOpt) <- consumerMap) consumerOpt match { case Some(consumer) => println("%s -> %s:%d".format(bid, consumer.host, consumer.port)) case None => // ignore } } def main(args: Array[String]) { //... try { zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) val topicList = topics match { case Some(x) => x.split(",").view.toList case None => ZkUtils.getChildren(zkClient, groupDirs.consumerGroupDir + "/owners").toList } topicPidMap = immutable.Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq:_*) val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs) debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port)) channel.send(OffsetFetchRequest(group, topicPartitions)) val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer) debug("Received offset fetch response %s.".format(offsetFetchResponse)) offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic) // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) try { val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong offsetMap.put(topicAndPartition, offset) } catch { case z: ZkNoNodeException => if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir)) offsetMap.put(topicAndPartition,-1) else throw z } } else if (offsetAndMetadata.error == ErrorMapping.NoError) offsetMap.put(topicAndPartition, offsetAndMetadata.offset) else { println("Could not fetch offset for %s due to %s.".format(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) } } channel.disconnect() println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner")) topicList.sorted.foreach { topic => processTopic(zkClient, group, topic) } if (options.has("broker-info")) printBrokerInfo() for ((_, consumerOpt) <- consumerMap) consumerOpt match { case Some(consumer) => consumer.close() case None => // ignore } } catch { case t: Throwable => println("Exiting due to: %s.".format(t.getMessage)) } finally { for (consumerOpt <- consumerMap.values) { consumerOpt match { case Some(consumer) => consumer.close() case None => // ignore } } if (zkClient != null) zkClient.close() if (channel != null) channel.disconnect() } } }
object ConsumerGroupCommand extends Logging { //... def main(args: Array[String]) { val opts = new ConsumerGroupCommandOptions(args) if (args.length == 0) CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, or delete consumer group info.") // should have exactly one action val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _) if (actions != 1) CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete") opts.checkArgs() val consumerGroupService = { if (opts.useOldConsumer) { System.err.println("Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).\n") new ZkConsumerGroupService(opts) } else { System.err.println("Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).\n") new KafkaConsumerGroupService(opts) } } try { if (opts.options.has(opts.listOpt)) consumerGroupService.listGroups().foreach(println(_)) else if (opts.options.has(opts.describeOpt)) { val (state, assignments) = consumerGroupService.describeGroup() val groupId = opts.options.valuesOf(opts.groupOpt).asScala.head assignments match { case None => // applies to both old and new consumer printError(s"The consumer group '$groupId' does not exist.") case Some(assignments) => if (opts.useOldConsumer) printAssignment(assignments, false) else state match { case Some("Dead") => printError(s"Consumer group '$groupId' does not exist.") case Some("Empty") => System.err.println(s"Consumer group '$groupId' has no active members.") printAssignment(assignments, true) case Some("PreparingRebalance") | Some("AwaitingSync") => System.err.println(s"Warning: Consumer group '$groupId' is rebalancing.") printAssignment(assignments, true) case Some("Stable") => printAssignment(assignments, true) case other => // the control should never reach here throw new KafkaException(s"Expected a valid consumer group state, but found '${other.getOrElse("NONE")}'.") } } } else if (opts.options.has(opts.deleteOpt)) { consumerGroupService match { case service: ZkConsumerGroupService => service.deleteGroups() case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService.") } } } catch { case e: Throwable => printError(s"Executing consumer group command failed due to ${e.getMessage}", Some(e)) } finally { consumerGroupService.close() } } }
ObjectName oName = new ObjectName("kafka.producer:*"); Set<ObjectName> metricsBeans = mBeanServer.queryNames(oName, null); for (ObjectName mBeanName : metricsBeans) { MBeanInfo metricsBean = mBeanServer.getMBeanInfo(mBeanName); MBeanAttributeInfo[] metricsAttrs = metricsBean.getAttributes(); for (MBeanAttributeInfo metricsAttr : metricsAttrs) { //get value Object value = mBeanServer.getAttribute(mBeanName, metricsAttr.getName()); //process ... } }