Java kafka監控 topic的數據量count狀況,每一個topic的offset,

Java使用kafka的API來監控kafka的某些topic的數據量增量,offset,定時查總量以後,而後計算差值,而後就能夠算單位間隔的每一個topic的增量,kafka監控通常都是監控的吞吐量,即數據量的大小,而不在乎這個count,數量。額,這個就是在乎count。統計一下count。java

使用的jar依賴apache

compile group: 'org.apache.kafka', name: 'kafka_2.10', version: '0.8.0'

Java代碼api

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.List;
import java.util.Map;

/**
 * kafka監控 topic的數據消費狀況
 *
 * @author LiXuekai on 2020/9/16
 */
public class KafkaMonitorTools {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMonitorTools.class);


    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = Maps.newHashMap();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            LOGGER.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }

    /**
     * @param brokers broker 地址
     * @param topic   topic
     * @return map<分區, 分區count信息>
     */
    public static Map<Integer, PartitionMetadata> findLeader(List<String> brokers, String topic) {
        Map<Integer, PartitionMetadata> map = Maps.newHashMap();
        for (String broker : brokers) {
            SimpleConsumer consumer = null;
            try {
                String[] hostAndPort = broker.split(":");
                consumer = new SimpleConsumer(hostAndPort[0], Integer.parseInt(hostAndPort[1]), 100000, 64 * 1024, "leaderLookup" + new Date().getTime());
                List<String> topics = Lists.newArrayList(topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        map.put(part.partitionId(), part);
                    }
                }
            } catch (Exception e) {
                LOGGER.error("Error communicating with Broker [" + broker + "] to find Leader for [" + topic + ", ] Reason: " + e);
            } finally {
                if (consumer != null)
                    consumer.close();
            }
        }
        return map;
    }

    public static Map<String, Long> monitor(List<String> brokers, List<String> topics) {
        if (brokers == null || brokers.isEmpty()) {
            return null;
        }
        if (topics == null || topics.isEmpty()) {
            return null;
        }
        Map<String, Long> map = Maps.newTreeMap();
        for (String topicName : topics) {
            Map<Integer, PartitionMetadata> metadata = findLeader(brokers, topicName);
            long size = 0L;
            for (Map.Entry<Integer, PartitionMetadata> entry : metadata.entrySet()) {
                int partition = entry.getKey();
                String leadBroker = entry.getValue().leader().host();
                String clientName = "Client_" + topicName + "_" + partition;
                SimpleConsumer consumer = new SimpleConsumer(leadBroker, entry.getValue().leader().port(), 100000, 64 * 1024, clientName);
                long readOffset = getLastOffset(consumer, topicName, partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                size += readOffset;
                consumer.close();
            }
            map.put(topicName, size);
        }
        return map;
    }
}

測試代碼:測試

@Test
    public void monitor() {
        Map<String, Long> monitor = KafkaMonitorTools.monitor(Lists.newArrayList(server), Lists.newArrayList(topics.split(",")));
        monitor.forEach((k, v)-> System.out.println(k + " " + v));
    }

運行結果截圖:fetch

相關文章
相關標籤/搜索