l、列出全部topicjava
kafka-topics --zookeeper zk01:2181/kafka --list
二、建立topic(分區、副本)shell
kafka-topics --zookeeper zk01:2181/kafka --create --topic test --partitions 1 --replication-factor 1
三、查看某個Topic的詳情apache
kafka-topics --zookeeper zk01:2181/kafka --describe --topic test
四、對分區數進行修改:只能增長分區,不能減小bootstrap
kafka-topics --zookeeper zk01:2181/kafka --alter --topic utopic --partitions 15
五、刪除topicapi
kafka-topics --zookeeper zk01:2181/kafka --delete --topic test 須要server.properties中設置delete.topic.enable=true,不然只是標記刪除或者直接重啓。
須要在頁面勾選容許刪除,不然只是「僞」刪除maven
六、發送消息(手動一行行發送)oop
kafka-console-producer --broker-list kafka01:9092 --topic test
七、發送消息(管道,批量上傳一個文件內容)測試
tail -F /root/log.log | kafka-console-producer --broker-list kafka01:9092 --topic testui
八、消費消息.net
kafka-console-consumer --zookeeper zk01:2181/kafka --topic test --from-beginning
九、查看消費位置
kafka-run-class kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181/kafka --group testgroup kafka-run-class kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181/kafka --group testgroup --topic test 或者 kafka-consumer-offset-checker --zookeeper zk01/kafka --group testgroup --topic test
一、 maven依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.9.0.0</version> <exclusions> <exclusion> <groupId>junit</groupId> <artifactId>junit</artifactId> </exclusion> </exclusions> </dependency>
二、自定義生產者分區器
package com.bigdata.hadoop.kafka.producer; import kafka.producer.DefaultPartitioner; import kafka.utils.VerifiableProperties; public class KafkaPartitioner extends DefaultPartitioner { public KafkaPartitioner(VerifiableProperties props) { super(props); } public int partition(Object obj, int numPartitions) { // return Math.abs(obj.hashCode()%numPartitions); return 2; } }
三、新舊生產者API(不強制機器hosts文件作kafka機器名映射)
package com.bigdata.hadoop.kafka.producer; import com.bigdata.hadoop.kafka.consumer.ConsumerTest; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; /** * 新舊 kafka producer api 測試 */ public class ProducerTest { public static void main(String[] args) { // 舊api Properties oldProps = new Properties(); oldProps.setProperty("metadata.broker.list", "cloudera:9092"); oldProps.setProperty("request.required.acks", "-1"); oldProps.setProperty("partitioner.class", KafkaPartitioner.class.getName()); Producer<byte[], byte[]> oldProducer = new Producer<byte[], byte[]>(new ProducerConfig(oldProps)); for (int i = 0; i < 1000; i++) { oldProducer.send(new KeyedMessage<>("one", ("msg one:" + i).getBytes())); oldProducer.send(new KeyedMessage<>("two", ("msg two:" + i).getBytes())); } // 新api ==> kafka 0.9版本及以上 Properties newProps = new Properties(); newProps.setProperty("bootstrap.servers", "cloudera:9092"); newProps.setProperty("key.serializer", StringSerializer.class.getName()); newProps.setProperty("value.serializer", StringSerializer.class.getName()); newProps.put("partitioner.class", KafkaPartitioner.class); KafkaProducer<String, String> newProducer = new KafkaProducer<String, String>(newProps); for (int i = 0; i < 100000; i++) { newProducer.send(new ProducerRecord<>("one", "msg one:" + i)); newProducer.send(new ProducerRecord<>("two", "msg two:" + i)); } } }
四、自定義消費者「partition分配器」 =>用默認就好kafka.consumer.RangeAssignor.RangeAssignor
package com.bigdata.hadoop.kafka.consumer; import kafka.common.TopicAndPartition; import kafka.consumer.AssignmentContext; import kafka.consumer.ConsumerThreadId; import kafka.consumer.RangeAssignor; import kafka.utils.Pool; /** * 自定義分區任務分配器,通常用不上 */ public class PartitionAssignor extends RangeAssignor { public Pool<String, scala.collection.mutable.Map<TopicAndPartition, ConsumerThreadId>> assign(final AssignmentContext ctx) { return null; } }
五、新舊消費者API(必定要在hosts文件中配置broker機器名映射)
package com.bigdata.hadoop.kafka.consumer; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.*; /** * 新舊kafka consumer api 測試 */ public class ConsumerTest { public static void main(String[] args) { /** * 舊消費者api,0.8版本 */ Properties oldProps = new Properties(); oldProps.setProperty("zookeeper.connect", "192.168.50.10:2181"); oldProps.setProperty("group.id", "testGroup"); oldProps.setProperty("auto.offset.reset", "smallest"); // smallest 或者 largest if (false) { // kerberos認證開關 oldProps.setProperty("security.protocal", "SASL_PLAINTEXT"); } HashMap<String, Integer> topicPartitionsMap = new HashMap<String, Integer>(); topicPartitionsMap.put("one", 1); ConsumerConnector oldConsumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(oldProps)); Map<String, List<KafkaStream<byte[], byte[]>>> topicAndParStream = oldConsumer.createMessageStreams(topicPartitionsMap); for (Map.Entry<String, List<KafkaStream<byte[], byte[]>>> entry : topicAndParStream.entrySet()) { for (KafkaStream<byte[], byte[]> kafkaStream : entry.getValue()) { //這下面要搞成一個線程來處理最好,這樣每一個線程直接對接一個partition,提升讀取速度 ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator(); new Thread(() -> { while (it.hasNext()) { System.out.println(new String(it.next().message())); } }).start(); } } /** * 新api ==> kafka 0.9及以上,功能很強大 */ Map<String, Object> newProps = new HashMap<>(); newProps.put("bootstrap.servers", "hosts1:9092,hosts2:9092"); newProps.put("group.id", "testGroup1"); newProps.put("auto.offset.reset", "earliest"); // earliest 或者 latest newProps.put("key.deserializer", StringDeserializer.class); newProps.put("value.deserializer", StringDeserializer.class); if (false) { // kerberos認證開關 newProps.put("security.protocal", "SASL_PLAINTEXT"); } KafkaConsumer<String, String> newConsumer = new KafkaConsumer<>(newProps); // 訂閱topic LinkedList<String> topics = new LinkedList<>(); topics.add("wl_008"); topics.add("fj_008"); newConsumer.subscribe(topics); // 獲取每一個分區信息,包括leader Map<String, List<PartitionInfo>> topicMap = newConsumer.listTopics(); List<PartitionInfo> onePartitions = newConsumer.partitionsFor("wl_008"); // 收集全部的分區信息 LinkedList<TopicPartition> topicPartitions = new LinkedList<>(); for (String topic : topics) { for (PartitionInfo parTemp : newConsumer.partitionsFor(topic)) { topicPartitions.add(new TopicPartition(topic, parTemp.partition())); } } // 獲取分區最先和最新的offset => 0.10以上版本 Map<TopicPartition, Long> beginningOffsets = newConsumer.beginningOffsets(topicPartitions); Map<TopicPartition, Long> endOffsets = newConsumer.endOffsets(topicPartitions); // 若是版本不支持上面的方式,可使用consumer.assign(),注意:必須consumer.unsubscribe()不然衝突 newConsumer.unsubscribe(); newConsumer.assign(topicPartitions); newConsumer.seekToBeginning(topicPartitions); // 移動到開始獲取beginningOffsets HashMap<TopicPartition, Long> beginOffsetsMap = new HashMap<>(); for (TopicPartition temp : topicPartitions) { beginOffsetsMap.put(temp, newConsumer.position(temp)); } newConsumer.seekToEnd(topicPartitions); // 移動到最後獲取endOffsets HashMap<TopicPartition, Long> endOffsetsMap = new HashMap<>(); for (TopicPartition temp : topicPartitions) { endOffsetsMap.put(temp, newConsumer.position(temp)); } // 跳到始點開始消費 newConsumer.seekToBeginning(topicPartitions); // 跳到最新點開始消費 newConsumer.seekToEnd(topicPartitions); // 指定位置開始消費 newConsumer.seek(new TopicPartition("wl_008", 0), 100L); newConsumer.seek(new TopicPartition("fj_008", 0), 100L); newConsumer.seek(new TopicPartition("fj_008", 1), 100L); // 其餘的功能 Set<TopicPartition> assignment = newConsumer.assignment(); Map<TopicPartition, OffsetAndTimestamp> timestampMap = newConsumer.offsetsForTimes(beginOffsetsMap); newConsumer.pause(topicPartitions); // 暫停partitions newConsumer.resume(topicPartitions); // 恢復partitions // 消費數據 while (true) { ConsumerRecords<String, String> records = newConsumer.poll(100L); for (ConsumerRecord<String, String> temp : records) { System.out.println(temp.value()); } } } }
六、舊api獲取kafka的最小和最新偏移量
package com.bigdata.hadoop.kafka.utils; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; import kafka.javaapi.*; import kafka.javaapi.consumer.SimpleConsumer; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; import java.util.Map.Entry; /** * 獲取kafka broker 每一個topic分區可讀offset範圍(最小值和最大值) * 一、先獲取每一個topic分區對應的leader * 二、在鏈接對應leader獲取最小值和最大值 * 注意:clientID值是隨意取的,不過通常都會取一個有意義的值,好排查 */ public class KafkaHeplerOldAPI { private static Logger logger = LoggerFactory.getLogger(KafkaHeplerOldAPI.class); /** * 獲取brokers leader * key:topic * value:topic的全部分區信息,包括leader * SimpleConsumer最後一個參數 clientID是能夠隨意取值的,通常取一個有意義的值 */ public static HashMap<String, List<PartitionMetadata>> findLeader(String brokerHost, int port, List<String> topicList) { HashMap<String, List<PartitionMetadata>> map = new HashMap<>(); SimpleConsumer consumer = null; try { consumer = new SimpleConsumer(brokerHost, port, 100000, 64 * 1024, "findLeader" + System.currentTimeMillis()); TopicMetadataRequest req = new TopicMetadataRequest(topicList); TopicMetadataResponse resp = consumer.send(req); List<TopicMetadata> metaData = resp.topicsMetadata(); for (TopicMetadata item : metaData) { map.put(item.topic(), item.partitionsMetadata()); } } catch (Exception e) { logger.error("經過brokerhost:{}, brokerPort:{}, topic:{} 來查找leader失敗!", brokerHost, port, String.join("|", topicList)); } finally { if (consumer != null) { consumer.close(); } } return map; } /** * 經過leader的host和port來獲取指定topic、partition的偏移量 * clientName 能夠隨便取值,通常取有意義的值 * 注意:這裏能夠在requestInfo中組裝這個leader中全部的分區,這樣就能夠一次性拿出來 * 減小leader鏈接,可是這樣得返回一個list,還須要對應組裝 * 而如今這種狀況比較直觀,缺點是每一個分區都會鏈接一下,不過實測每次耗時很低 */ public static long getOffsetByLeader(String leadHost, int leadPort, String topic, int partition, boolean isLast) { String clientName = "Client_" + topic + "_" + partition; long indexFlag = isLast ? kafka.api.OffsetRequest.LatestTime() : kafka.api.OffsetRequest.EarliestTime(); Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>(); requestInfo.put(new TopicAndPartition(topic, partition), new PartitionOffsetRequestInfo(indexFlag, 1)); kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); SimpleConsumer consumer = new SimpleConsumer(leadHost, leadPort, 100000, 64 * 1024, clientName); OffsetResponse response = consumer.getOffsetsBefore(request); consumer.close(); if (response.hasError()) { logger.error("getOffsetByLeader 中獲取topicAndPartition:[{}] 失敗!", topic + "_" + partition); return 0; } long[] offsets = response.offsets(topic, partition); return offsets[0]; } /** * 獲取偏移量。最先的或者最晚的 * 先獲取每個topic的分區對應的leader */ private static Map<TopicPartition, Long> getOffsetOneBroker(String kafkaHost, int kafkaPort, List<String> topicList, boolean isLast) { Map<TopicPartition, Long> partionAndOffset = new HashMap<>(); // 獲取每個topic對應的分區信息,PartitionMetadata裏面包含了leader HashMap<String, List<PartitionMetadata>> metadatas = findLeader(kafkaHost, kafkaPort, topicList); for (Entry<String, List<PartitionMetadata>> entry : metadatas.entrySet()) { String topic = entry.getKey(); for (PartitionMetadata temp : entry.getValue()) { long offset = getOffsetByLeader(temp.leader().host(), temp.leader().port(), topic, temp.partitionId(), isLast); partionAndOffset.put(new TopicPartition(topic, temp.partitionId()), offset); } } return partionAndOffset; } /** * 對傳入進來的bootstrapServers配置逐個獲取 * 其實只須要傳遞一個正確的broker進來便可,由於只要有一個broker可以鏈接上便可獲取到全部的leader */ private static Map<TopicPartition, Long> getOffsetAllHosts(String bootstrapServers, List<String> topicList, boolean isLast) { String[] servers = bootstrapServers.split(","); List<String> kafkaHosts = new ArrayList<String>(); List<Integer> kafkaPorts = new ArrayList<Integer>(); for (String server : servers) { String[] hostAndPort = server.split(":"); kafkaHosts.add(hostAndPort[0]); kafkaPorts.add(Integer.parseInt(hostAndPort[1])); } Map<TopicPartition, Long> partionAndOffset = null; for (int i = 0, size = kafkaHosts.size(); i < size; i++) { partionAndOffset = getOffsetOneBroker(kafkaHosts.get(i), kafkaPorts.get(i), topicList, isLast); if (partionAndOffset.size() > 0) { break; } } return partionAndOffset; } /** * 爲了解決kafka.common.OffsetOutOfRangeException * 當streaming zk裏面記錄kafka偏移小於kafka有效偏移,就會出現OffsetOutOfRangeException * <p> * 最先的offset 剛開始是0,過時(默認7天)的數據刪除後,那麼就會慢慢變大 * 若是kafka中沒有可讀的數據,那麼最新和最舊的值是同樣的 * * [@param](https://my.oschina.net/u/2303379) bootstrapServers kafka配置{e.g hosts1:9092,hosts2:9092,hosts3:9092} */ public static Map<TopicPartition, Long> getEarliestOffset(String bootstrapServers, List<String> topicList) throws Exception { return getOffsetAllHosts(bootstrapServers, topicList, false); } /** * 初始化到最新數據 * * [@param](https://my.oschina.net/u/2303379) bootstrapServers kafka配置{e.g hosts1:9092,hosts2:9092,hosts3:9092} */ public static Map<TopicPartition, Long> getLastestOffset(String bootstrapServers, List<String> topicList) throws Exception { return getOffsetAllHosts(bootstrapServers, topicList, true); } public static void main(String[] args) throws Exception { List<String> topicList = Arrays.asList("one,two".split(",")); Map<TopicPartition, Long> lastestOffset = getLastestOffset("cloudera:9092", topicList); Map<TopicPartition, Long> earliestOffset = getEarliestOffset("cloudera:9092", topicList); } }
七、新api獲取kafka的最小和最新偏移量
package com.bigdata.hadoop.kafka.utils; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; /** * 獲取kafka broker 每一個topic分區可讀offset範圍(最小值和最大值) */ public class KafkaHeplerNewAPI { private static Logger logger = LoggerFactory.getLogger(KafkaHeplerNewAPI.class); /** * 獲取最新的offset * * [@param](https://my.oschina.net/u/2303379) consumer * [@param](https://my.oschina.net/u/2303379) topics * [@return](https://my.oschina.net/u/556800) */ public static Map<TopicPartition, Long> getLastOffsets(KafkaConsumer<String, String> consumer, List<String> topics) { // 收集全部的分區信息 LinkedList<TopicPartition> topicPartitions = new LinkedList<>(); for (String topic : topics) { for (PartitionInfo parTemp : consumer.partitionsFor(topic)) { topicPartitions.add(new TopicPartition(topic, parTemp.partition())); } } // 獲取分區最先和最新的offset => 0.10以上版本 Map<TopicPartition, Long> result = consumer.endOffsets(topicPartitions); // 若是api不支持上面方式,使用下面這種 // Map<TopicPartition, Long> result = new HashMap<>(); // consumer.assign(topicPartitions); // consumer.seekToEnd(topicPartitions); // 這句很重要,重定位到最後,才能拿到最新的offset // for (TopicPartition temp : topicPartitions) { // result.put(temp, consumer.position(temp)); // } return result; } /** * 獲取最新的offset * * @param consumer * @param topics * @return */ public static Map<TopicPartition, Long> getEarliestOffsets(KafkaConsumer<String, String> consumer, List<String> topics) { // 收集全部的分區信息 LinkedList<TopicPartition> topicPartitions = new LinkedList<>(); for (String topic : topics) { for (PartitionInfo parTemp : consumer.partitionsFor(topic)) { topicPartitions.add(new TopicPartition(topic, parTemp.partition())); } } // 獲取分區最先和最新的offset => 0.10以上版本 Map<TopicPartition, Long> result = consumer.beginningOffsets(topicPartitions); // 若是api不支持上面方式,使用下面這種。 // Map<TopicPartition, Long> result = new HashMap<>(); // consumer.assign(topicPartitions); // consumer.seekToBeginning(topicPartitions); // 這句很重要,重定位到最前面,才能拿到最先的offset // for (TopicPartition temp : topicPartitions) { // result.put(temp, consumer.position(temp)); // } return result; } public static void main(String[] args) throws Exception { List<String> topicList = Arrays.asList("one,two".split(",")); Map<String, Object> props = new HashMap<>(); props.put("bootstrap.servers", "cloudera:9092"); props.put("group.id", "testGroup1"); props.put("auto.offset.reset", "earliest"); // earliest 或者 latest props.put("key.deserializer", StringDeserializer.class); props.put("value.deserializer", StringDeserializer.class); if (false) { // kerberos認證開關 props.put("security.protocal", "SASL_PLAINTEXT"); } KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); Map<TopicPartition, Long> lastestOffset = getLastOffsets(consumer, topicList); Map<TopicPartition, Long> earliestOffset = getEarliestOffsets(consumer, topicList); } }