工做流程:經過ZK集羣獲取Kafka集羣信息,進而進行Kafka相關操做java
package com.zhiwei.kafka.base; import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; import kafka.server.ConfigType; import kafka.utils.ZkUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.security.JaasUtils; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; /** * @author 做者 ZHIWEI.YANG * @function 功能 Kafka主題客戶端 * @time 時間 2018年5月9日-下午4:24:18 */ @SuppressWarnings("deprecation") @Slf4j public class OldKafkaClient { private String zkUrl = "localhost:2181"; private Integer sessionTimeout = 3000; private Integer connectTimeout = 3000; private ZkUtils zkUtils = null; public OldKafkaClient(String zkUrl) { if (!StringUtils.isEmpty(zkUrl)) { this.zkUrl = zkUrl; } } public static void main(String[] args) { OldKafkaClient client = new OldKafkaClient("debian:2181"); String topicName = "mytopoc"; //建立主題 client.creatTopic(topicName, 1, 1, null); Map<String, String> properties = new HashMap<String, String>(); // 增長topic級別屬性 properties.put("min.cleanable.dirty.ratio", "0.3"); // 刪除topic級別屬性 //properties.remove("max.message.bytes"); client.updateTopic(topicName, properties, null); //查詢topic屬性 log.info("topic:{}, 屬性:{}",topicName, client.queryTopic(topicName)); //刪除主題 client.deleteTopic(topicName); } private void getZkUtils() { zkUtils = ZkUtils.apply(zkUrl, sessionTimeout, connectTimeout, JaasUtils.isZkSecurityEnabled()); } /** * 建立主題 */ public void creatTopic(String topicName, Integer partitionNum, Integer replicationFactor, Properties topicConfig) { getZkUtils(); AdminUtils.createTopic(zkUtils, topicName, partitionNum, replicationFactor, topicConfig == null ? new Properties() : topicConfig, RackAwareMode.Enforced$.MODULE$); zkUtils.close(); } /** * 刪除主題 */ public void deleteTopic(String topicName) { getZkUtils(); AdminUtils.deleteTopic(zkUtils, topicName); zkUtils.close(); } /** * 查詢主題 */ @SuppressWarnings("unchecked") public Map<String, String> queryTopic(String topicName) { getZkUtils(); Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topicName); Map<String, String> result = MapUtils.fixedSizeMap(props); zkUtils.close(); return result; } /** * 修改主題 */ public void updateTopic(String topicName, Map<String, String> addProperties, List<String> deleteProps) { getZkUtils(); Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), topicName); props.putAll(addProperties); if (deleteProps != null) { for (String key : deleteProps) { props.remove(key); } } AdminUtils.changeTopicConfig(zkUtils, topicName, props); zkUtils.close(); } }
工做流程:直連Kafka Bootstrap Server, 直接操做Kafka集羣apache
package com.zhiwei.kafka.base; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.KafkaFuture; import java.util.*; import java.util.concurrent.ExecutionException; /** * @author 做者 ZHIWEI.YANG * @function 功能 Kafka主題客戶端 * @time 時間 2018年5月9日-下午4:24:18 */ @Slf4j public class NewKafkaClient { private String host = "localhost"; private Integer port = 9092; private AdminClient adminClient; private short replicationFactor = 1; private Integer numPartitions = 1; private Map<String, Object> conf = new HashMap<String, Object>(); { conf.put("bootstrap.servers", host + ":" + port); } public NewKafkaClient() { } public NewKafkaClient(String host, Integer port) { this.host = host; this.port = port; conf.put("bootstrap.servers", host + ":" + port); adminClient = AdminClient.create(conf); } public NewKafkaClient(Map<String, Object> conf) { this.conf.putAll(conf); adminClient = AdminClient.create(conf); } public static void main(String[] args) throws InterruptedException, ExecutionException { NewKafkaClient client = new NewKafkaClient("centos", 9092); String topicName = "mytopic"; log.info("建立topic:{}, 結果:{}",topicName, client.createSingleTopic(topicName)); log.info("topic:{}, 屬性:{}", topicName, client.descriptTopic(topicName)); log.info("全部topic:{}", client.queryAllTopic()); } /** * 查詢主題名 * * @return */ public List<String> queryAllTopic() { ListTopicsResult listTopicsResult = adminClient.listTopics(); KafkaFuture<Collection<TopicListing>> kafkaFuture = listTopicsResult.listings(); Collection<TopicListing> collections; List<String> topics = null; try { collections = kafkaFuture.get(); if (collections != null && collections.size() != 0) { topics = new ArrayList<String>(collections.size()); for (TopicListing topicListing : collections) { topics.add(topicListing.name()); } } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } return topics; } public boolean createSingleTopic(String name) { NewTopic newTopic = new NewTopic(name, numPartitions, replicationFactor); List<NewTopic> newTopics = new ArrayList<NewTopic>(1); newTopics.add(newTopic); return createTopics(newTopics, null); } public boolean createTopics(List<NewTopic> newTopics, CreateTopicsOptions createTopicsOptions) { if (createTopicsOptions == null) { createTopicsOptions = new CreateTopicsOptions(); createTopicsOptions.timeoutMs(1000); } CreateTopicsResult results = adminClient.createTopics(newTopics, createTopicsOptions); KafkaFuture<Void> kafkaFuture = results.all(); return kafkaFuture.isDone(); } public boolean deleteTopic(String... name) { if (name == null || name.length == 0) { return true; } List<String> topics = Arrays.asList(name); DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topics); return deleteTopicsResult.all().isDone(); } public Map<String, TopicDescription> descriptTopic(String... topics) throws InterruptedException, ExecutionException { DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(topics)); return describeTopicsResult.all().get(); } public void close() { if (adminClient != null) { adminClient.close(); } } }