【Kafka】- Java 客戶端

舊版本

工做流程:經過ZK集羣獲取Kafka集羣信息,進而進行Kafka相關操做java

package com.zhiwei.kafka.base;

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.ConfigType;
import kafka.utils.ZkUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.security.JaasUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * @author 做者      ZHIWEI.YANG
 * @function 功能      Kafka主題客戶端
 * @time 時間      2018年5月9日-下午4:24:18
 */
@SuppressWarnings("deprecation")
@Slf4j
public class OldKafkaClient {

    private String zkUrl = "localhost:2181";
    private Integer sessionTimeout = 3000;
    private Integer connectTimeout = 3000;
    private ZkUtils zkUtils = null;

    public OldKafkaClient(String zkUrl) {
        if (!StringUtils.isEmpty(zkUrl)) {
            this.zkUrl = zkUrl;
        }
    }

    public static void main(String[] args) {
        OldKafkaClient client = new OldKafkaClient("debian:2181");
        String topicName = "mytopoc";

        //建立主題
        client.creatTopic(topicName, 1, 1, null);

        Map<String, String> properties = new HashMap<String, String>();

        // 增長topic級別屬性
        properties.put("min.cleanable.dirty.ratio", "0.3");

        // 刪除topic級別屬性
        //properties.remove("max.message.bytes");

        client.updateTopic(topicName, properties, null);

        //查詢topic屬性
       log.info("topic:{}, 屬性:{}",topicName, client.queryTopic(topicName));

        //刪除主題
        client.deleteTopic(topicName);
    }

    private void getZkUtils() {
        zkUtils = ZkUtils.apply(zkUrl, sessionTimeout, connectTimeout, JaasUtils.isZkSecurityEnabled());
    }

    /**
     * 建立主題
     */
    public void creatTopic(String topicName, Integer partitionNum, Integer replicationFactor, Properties topicConfig) {
        getZkUtils();
        AdminUtils.createTopic(zkUtils, topicName, partitionNum, replicationFactor,
                topicConfig == null ? new Properties() : topicConfig,
                RackAwareMode.Enforced$.MODULE$);
        zkUtils.close();
    }

    /**
     * 刪除主題
     */
    public void deleteTopic(String topicName) {

        getZkUtils();
        AdminUtils.deleteTopic(zkUtils, topicName);
        zkUtils.close();
    }

    /**
     * 查詢主題
     */
    @SuppressWarnings("unchecked")
    public Map<String, String> queryTopic(String topicName) {
        getZkUtils();
        Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topicName);
        Map<String, String> result = MapUtils.fixedSizeMap(props);
        zkUtils.close();
        return result;
    }

    /**
     * 修改主題
     */
    public void updateTopic(String topicName, Map<String, String> addProperties, List<String> deleteProps) {

        getZkUtils();
        Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topicName);
        props.putAll(addProperties);

        if (deleteProps != null) {
            for (String key : deleteProps) {
                props.remove(key);
            }
        }
        AdminUtils.changeTopicConfig(zkUtils, topicName, props);
        zkUtils.close();
    }
}

新版本

工做流程:直連Kafka Bootstrap Server, 直接操做Kafka集羣apache

package com.zhiwei.kafka.base;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;

import java.util.*;
import java.util.concurrent.ExecutionException;

/**
 * @author 做者 ZHIWEI.YANG
 * @function 功能 Kafka主題客戶端
 * @time 時間 2018年5月9日-下午4:24:18
 */
@Slf4j
public class NewKafkaClient {

    private String host = "localhost";
    private Integer port = 9092;
    private AdminClient adminClient;
    private short replicationFactor = 1;
    private Integer numPartitions = 1;
    private Map<String, Object> conf = new HashMap<String, Object>();

    {
        conf.put("bootstrap.servers", host + ":" + port);
    }

    public NewKafkaClient() {

    }

    public NewKafkaClient(String host, Integer port) {

        this.host = host;
        this.port = port;
        conf.put("bootstrap.servers", host + ":" + port);
        adminClient = AdminClient.create(conf);
    }

    public NewKafkaClient(Map<String, Object> conf) {
        this.conf.putAll(conf);
        adminClient = AdminClient.create(conf);
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        NewKafkaClient client = new NewKafkaClient("centos", 9092);
        String topicName = "mytopic";

       log.info("建立topic:{}, 結果:{}",topicName, client.createSingleTopic(topicName));
       log.info("topic:{}, 屬性:{}", topicName, client.descriptTopic(topicName));
       log.info("全部topic:{}", client.queryAllTopic());
    }

    /**
     * 查詢主題名
     *
     * @return
     */
    public List<String> queryAllTopic() {

        ListTopicsResult listTopicsResult = adminClient.listTopics();
        KafkaFuture<Collection<TopicListing>> kafkaFuture = listTopicsResult.listings();

        Collection<TopicListing> collections;
        List<String> topics = null;
        try {
            collections = kafkaFuture.get();
            if (collections != null && collections.size() != 0) {
                topics = new ArrayList<String>(collections.size());
                for (TopicListing topicListing : collections) {
                    topics.add(topicListing.name());
                }
            }

        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        return topics;
    }

    public boolean createSingleTopic(String name) {

        NewTopic newTopic = new NewTopic(name, numPartitions, replicationFactor);
        List<NewTopic> newTopics = new ArrayList<NewTopic>(1);
        newTopics.add(newTopic);
        return createTopics(newTopics, null);
    }

    public boolean createTopics(List<NewTopic> newTopics, CreateTopicsOptions createTopicsOptions) {

        if (createTopicsOptions == null) {
            createTopicsOptions = new CreateTopicsOptions();
            createTopicsOptions.timeoutMs(1000);
        }

        CreateTopicsResult results = adminClient.createTopics(newTopics, createTopicsOptions);
        KafkaFuture<Void> kafkaFuture = results.all();
        return kafkaFuture.isDone();
    }

    public boolean deleteTopic(String... name) {

        if (name == null || name.length == 0) {
            return true;
        }

        List<String> topics = Arrays.asList(name);
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topics);
        return deleteTopicsResult.all().isDone();
    }

    public Map<String, TopicDescription> descriptTopic(String... topics) throws InterruptedException, ExecutionException {
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(topics));
        return describeTopicsResult.all().get();

    }

    public void close() {
        if (adminClient != null) {
            adminClient.close();
        }
    }
}
相關文章
相關標籤/搜索