maven依賴java
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.1.0</version> </dependency>
注意:kafka-clients版本須要0.10.1.0以上,由於調用了新增接口endOffsets;apache
lag=logsize-offsetbootstrap
logsize經過consumer的endOffsets接口得到;offset經過consumer的committed接口得到;maven
import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; public class KafkaConsumeLagMonitor { public static Properties getConsumeProperties(String groupID, String bootstrap_server) { Properties props = new Properties(); props.put("group.id", groupID); props.put("bootstrap.servers", bootstrap_server); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } public static void main(String[] args) { String bootstrap_server = args[0]; String groupID = args[1]; String topic = args[2]; Map<Integer, Long> endOffsetMap = new HashMap<Integer, Long>(); Map<Integer, Long> commitOffsetMap = new HashMap<Integer, Long>(); Properties consumeProps = getConsumeProperties(groupID, bootstrap_server); System.out.println("consumer properties:" + consumeProps); //查詢topic partitions KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(consumeProps); List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>(); List<PartitionInfo> partitionsFor = consumer.partitionsFor(topic); for (PartitionInfo partitionInfo : partitionsFor) { TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition()); topicPartitions.add(topicPartition); } //查詢log size Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions); for (TopicPartition partitionInfo : endOffsets.keySet()) { endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo)); } for (Integer partitionId : endOffsetMap.keySet()) { System.out.println(String.format("at %s, topic:%s, partition:%s, logSize:%s", System.currentTimeMillis(), topic, partitionId, endOffsetMap.get(partitionId))); } //查詢消費offset for (TopicPartition topicAndPartition : topicPartitions) { OffsetAndMetadata committed = consumer.committed(topicAndPartition); commitOffsetMap.put(topicAndPartition.partition(), committed.offset()); } //累加lag long lagSum = 0l; if (endOffsetMap.size() == commitOffsetMap.size()) { for (Integer partition : endOffsetMap.keySet()) { long endOffSet = endOffsetMap.get(partition); long commitOffSet = commitOffsetMap.get(partition); long diffOffset = endOffSet - commitOffSet; lagSum += diffOffset; System.out.println("Topic:" + topic + ", groupID:" + groupID + ", partition:" + partition + ", endOffset:" + endOffSet + ", commitOffset:" + commitOffSet + ", diffOffset:" + diffOffset); } System.out.println("Topic:" + topic + ", groupID:" + groupID + ", LAG:" + lagSum); } else { System.out.println("this topic partitions lost"); } consumer.close(); } }
另一個思路可參考kafka源碼kafka.tools.ConsumerOffsetChecker實現,offset直接讀取 zk節點內容,logsize經過consumer的getOffsetsBefore方法獲取,總體來講,較麻煩;this