Java使用kafka的API來監控kafka的某些topic的數據量增量,offset,定時查總量以後,而後計算差值,而後就能夠算單位間隔的每一個topic的增量,kafka監控通常都是監控的吞吐量,即數據量的大小,而不在乎這個count,數量。額,這個就是在乎count。統計一下count。java
使用的jar依賴apache
compile group: 'org.apache.kafka', name: 'kafka_2.10', version: '0.8.0'
Java代碼api
import com.google.common.collect.Lists; import com.google.common.collect.Maps; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.consumer.SimpleConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; import java.util.List; import java.util.Map; /** * kafka監控 topic的數據消費狀況 * * @author LiXuekai on 2020/9/16 */ public class KafkaMonitorTools { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMonitorTools.class); public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = Maps.newHashMap(); requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { LOGGER.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } /** * @param brokers broker 地址 * @param topic topic * @return map<分區, 分區count信息> */ public static Map<Integer, PartitionMetadata> findLeader(List<String> brokers, String topic) { Map<Integer, PartitionMetadata> map = Maps.newHashMap(); for (String broker : brokers) { SimpleConsumer consumer = null; try { String[] hostAndPort = broker.split(":"); consumer = new SimpleConsumer(hostAndPort[0], Integer.parseInt(hostAndPort[1]), 100000, 64 * 1024, "leaderLookup" + new Date().getTime()); List<String> topics = Lists.newArrayList(topic); TopicMetadataRequest req = new TopicMetadataRequest(topics); kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { for (PartitionMetadata part : item.partitionsMetadata()) { map.put(part.partitionId(), part); } } } catch (Exception e) { LOGGER.error("Error communicating with Broker [" + broker + "] to find Leader for [" + topic + ", ] Reason: " + e); } finally { if (consumer != null) consumer.close(); } } return map; } public static Map<String, Long> monitor(List<String> brokers, List<String> topics) { if (brokers == null || brokers.isEmpty()) { return null; } if (topics == null || topics.isEmpty()) { return null; } Map<String, Long> map = Maps.newTreeMap(); for (String topicName : topics) { Map<Integer, PartitionMetadata> metadata = findLeader(brokers, topicName); long size = 0L; for (Map.Entry<Integer, PartitionMetadata> entry : metadata.entrySet()) { int partition = entry.getKey(); String leadBroker = entry.getValue().leader().host(); String clientName = "Client_" + topicName + "_" + partition; SimpleConsumer consumer = new SimpleConsumer(leadBroker, entry.getValue().leader().port(), 100000, 64 * 1024, clientName); long readOffset = getLastOffset(consumer, topicName, partition, kafka.api.OffsetRequest.LatestTime(), clientName); size += readOffset; consumer.close(); } map.put(topicName, size); } return map; } }
測試代碼:測試
@Test public void monitor() { Map<String, Long> monitor = KafkaMonitorTools.monitor(Lists.newArrayList(server), Lists.newArrayList(topics.split(","))); monitor.forEach((k, v)-> System.out.println(k + " " + v)); }
運行結果截圖:fetch