一、kafka shell和api

1、shell操做

  • l、列出全部topicjava

    kafka-topics --zookeeper zk01:2181/kafka --list
  • 二、建立topic(分區、副本)shell

    kafka-topics --zookeeper zk01:2181/kafka  --create --topic test --partitions 1  --replication-factor 1
  • 三、查看某個Topic的詳情apache

    kafka-topics --zookeeper zk01:2181/kafka --describe --topic test
  • 四、對分區數進行修改:只能增長分區,不能減小bootstrap

    kafka-topics --zookeeper zk01:2181/kafka --alter --topic   utopic --partitions 15
  • 五、刪除topicapi

    kafka-topics --zookeeper zk01:2181/kafka --delete  --topic test
      須要server.properties中設置delete.topic.enable=true,不然只是標記刪除或者直接重啓。

    須要在頁面勾選容許刪除,不然只是「僞」刪除maven

  • 六、發送消息(手動一行行發送)oop

    kafka-console-producer --broker-list kafka01:9092 --topic test
  • 七、發送消息(管道,批量上傳一個文件內容)測試

    tail -F /root/log.log | kafka-console-producer --broker-list kafka01:9092 --topic testui

  • 八、消費消息.net

    kafka-console-consumer --zookeeper zk01:2181/kafka --topic test --from-beginning
  • 九、查看消費位置

    kafka-run-class kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181/kafka --group testgroup
    
      kafka-run-class kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181/kafka --group testgroup --topic test
    
      或者
      kafka-consumer-offset-checker --zookeeper zk01/kafka --group testgroup --topic test

2、Java的api

  • 一、 maven依賴

    <dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka_2.10</artifactId>
         <version>0.9.0.0</version>
         <exclusions>
            <exclusion>
               <groupId>junit</groupId>
               <artifactId>junit</artifactId>
            </exclusion>
         </exclusions>
      </dependency>
  • 二、自定義生產者分區器

    package com.bigdata.hadoop.kafka.producer;
    
      import kafka.producer.DefaultPartitioner;
      import kafka.utils.VerifiableProperties;
    
      public class KafkaPartitioner extends DefaultPartitioner {
          public KafkaPartitioner(VerifiableProperties props) {
              super(props);
          }
    
          public int partition(Object obj, int numPartitions) {
      //        return Math.abs(obj.hashCode()%numPartitions);
              return 2;
          }
      }
  • 三、新舊生產者API(不強制機器hosts文件作kafka機器名映射)

    package com.bigdata.hadoop.kafka.producer;
    
      import com.bigdata.hadoop.kafka.consumer.ConsumerTest;
      import kafka.javaapi.producer.Producer;
      import kafka.producer.KeyedMessage;
      import kafka.producer.ProducerConfig;
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.ProducerRecord;
      import org.apache.kafka.common.serialization.StringSerializer;
    
      import java.util.Properties;
    
      /**
       * 新舊 kafka producer api 測試
       */
      public class ProducerTest {
          public static void main(String[] args) {
              // 舊api
              Properties oldProps = new Properties();
              oldProps.setProperty("metadata.broker.list", "cloudera:9092");
              oldProps.setProperty("request.required.acks", "-1");
              oldProps.setProperty("partitioner.class", KafkaPartitioner.class.getName());
    
              Producer<byte[], byte[]> oldProducer = new Producer<byte[], byte[]>(new ProducerConfig(oldProps));
              for (int i = 0; i < 1000; i++) {
                  oldProducer.send(new KeyedMessage<>("one", ("msg one:" + i).getBytes()));
                  oldProducer.send(new KeyedMessage<>("two", ("msg two:" + i).getBytes()));
              }
    
              // 新api ==> kafka 0.9版本及以上
              Properties newProps = new Properties();
              newProps.setProperty("bootstrap.servers", "cloudera:9092");
              newProps.setProperty("key.serializer", StringSerializer.class.getName());
              newProps.setProperty("value.serializer", StringSerializer.class.getName());
              newProps.put("partitioner.class", KafkaPartitioner.class);
              KafkaProducer<String, String> newProducer = new KafkaProducer<String, String>(newProps);
              for (int i = 0; i < 100000; i++) {
                  newProducer.send(new ProducerRecord<>("one", "msg one:" + i));
                  newProducer.send(new ProducerRecord<>("two", "msg two:" + i));
              }
          }
      }
  • 四、自定義消費者「partition分配器」 =>用默認就好kafka.consumer.RangeAssignor.RangeAssignor

    package com.bigdata.hadoop.kafka.consumer;
    
      import kafka.common.TopicAndPartition;
      import kafka.consumer.AssignmentContext;
      import kafka.consumer.ConsumerThreadId;
      import kafka.consumer.RangeAssignor;
      import kafka.utils.Pool;
    
      /**
       * 自定義分區任務分配器,通常用不上
       */
      public class PartitionAssignor extends RangeAssignor {
    
          public Pool<String, scala.collection.mutable.Map<TopicAndPartition, ConsumerThreadId>> assign(final AssignmentContext ctx) {
    
              return null;
          }
    
      }
  • 五、新舊消費者API(必定要在hosts文件中配置broker機器名映射)

    package com.bigdata.hadoop.kafka.consumer;
    
      import kafka.consumer.Consumer;
      import kafka.consumer.ConsumerConfig;
      import kafka.consumer.ConsumerIterator;
      import kafka.consumer.KafkaStream;
      import kafka.javaapi.consumer.ConsumerConnector;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
      import org.apache.kafka.common.PartitionInfo;
      import org.apache.kafka.common.TopicPartition;
      import org.apache.kafka.common.serialization.StringDeserializer;
    
      import java.util.*;
    
      /**
       * 新舊kafka consumer api 測試
       */
      public class ConsumerTest {
          public static void main(String[] args) {
              /**
               * 舊消費者api,0.8版本
               */
              Properties oldProps = new Properties();
              oldProps.setProperty("zookeeper.connect", "192.168.50.10:2181");
              oldProps.setProperty("group.id", "testGroup");
              oldProps.setProperty("auto.offset.reset", "smallest");  // smallest  或者 largest
              if (false) {  // kerberos認證開關
                  oldProps.setProperty("security.protocal", "SASL_PLAINTEXT");
              }
    
              HashMap<String, Integer> topicPartitionsMap = new HashMap<String, Integer>();
              topicPartitionsMap.put("one", 1);
         		ConsumerConnector oldConsumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(oldProps));
              Map<String, List<KafkaStream<byte[], byte[]>>> topicAndParStream = oldConsumer.createMessageStreams(topicPartitionsMap);
    
            for (Map.Entry<String, List<KafkaStream<byte[], byte[]>>> entry : topicAndParStream.entrySet()) {
                  for (KafkaStream<byte[], byte[]> kafkaStream : entry.getValue()) {
                      //這下面要搞成一個線程來處理最好,這樣每一個線程直接對接一個partition,提升讀取速度
                      ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
                      new Thread(() -> {
                          while (it.hasNext()) {
                              System.out.println(new String(it.next().message()));
                          }
                      }).start();
                  }
              }
    
    
              /**
               * 新api  ==> kafka 0.9及以上,功能很強大
               */
              Map<String, Object> newProps = new HashMap<>();
              newProps.put("bootstrap.servers", "hosts1:9092,hosts2:9092");
              newProps.put("group.id", "testGroup1");
              newProps.put("auto.offset.reset", "earliest");  // earliest  或者 latest
              newProps.put("key.deserializer", StringDeserializer.class);
              newProps.put("value.deserializer", StringDeserializer.class);
              if (false) {  // kerberos認證開關
                  newProps.put("security.protocal", "SASL_PLAINTEXT");
              }
    
              KafkaConsumer<String, String> newConsumer = new KafkaConsumer<>(newProps);
    
              // 訂閱topic
              LinkedList<String> topics = new LinkedList<>();
              topics.add("wl_008");
              topics.add("fj_008");
              newConsumer.subscribe(topics);
    
              // 獲取每一個分區信息,包括leader
              Map<String, List<PartitionInfo>> topicMap = newConsumer.listTopics();
              List<PartitionInfo> onePartitions = newConsumer.partitionsFor("wl_008");
    
              // 收集全部的分區信息
              LinkedList<TopicPartition> topicPartitions = new LinkedList<>();
              for (String topic : topics) {
                  for (PartitionInfo parTemp : newConsumer.partitionsFor(topic)) {
                      topicPartitions.add(new TopicPartition(topic, parTemp.partition()));
                  }
              }
    
              // 獲取分區最先和最新的offset  => 0.10以上版本
              Map<TopicPartition, Long> beginningOffsets = newConsumer.beginningOffsets(topicPartitions);
              Map<TopicPartition, Long> endOffsets = newConsumer.endOffsets(topicPartitions);
    
             // 若是版本不支持上面的方式,可使用consumer.assign(),注意:必須consumer.unsubscribe()不然衝突
              newConsumer.unsubscribe();
              newConsumer.assign(topicPartitions);
    
              newConsumer.seekToBeginning(topicPartitions);  // 移動到開始獲取beginningOffsets
              HashMap<TopicPartition, Long> beginOffsetsMap = new HashMap<>();
              for (TopicPartition temp : topicPartitions) {
                  beginOffsetsMap.put(temp, newConsumer.position(temp));
              }
    
              newConsumer.seekToEnd(topicPartitions);  // 移動到最後獲取endOffsets
              HashMap<TopicPartition, Long> endOffsetsMap = new HashMap<>();
              for (TopicPartition temp : topicPartitions) {
                  endOffsetsMap.put(temp, newConsumer.position(temp));
              }
    
              // 跳到始點開始消費
              newConsumer.seekToBeginning(topicPartitions);
    
              // 跳到最新點開始消費
              newConsumer.seekToEnd(topicPartitions);
    
              // 指定位置開始消費
              newConsumer.seek(new TopicPartition("wl_008", 0), 100L);
              newConsumer.seek(new TopicPartition("fj_008", 0), 100L);
              newConsumer.seek(new TopicPartition("fj_008", 1), 100L);
    
              // 其餘的功能
              Set<TopicPartition> assignment = newConsumer.assignment();
         Map<TopicPartition, OffsetAndTimestamp> timestampMap = newConsumer.offsetsForTimes(beginOffsetsMap);
              newConsumer.pause(topicPartitions);  // 暫停partitions
              newConsumer.resume(topicPartitions);  // 恢復partitions
    
              // 消費數據
              while (true) {
                  ConsumerRecords<String, String> records = newConsumer.poll(100L);
                  for (ConsumerRecord<String, String> temp : records) {
                      System.out.println(temp.value());
                  }
              }
          }
      }
  • 六、舊api獲取kafka的最小和最新偏移量

    package com.bigdata.hadoop.kafka.utils;
    
      import kafka.api.PartitionOffsetRequestInfo;
      import kafka.common.TopicAndPartition;
      import kafka.javaapi.*;
      import kafka.javaapi.consumer.SimpleConsumer;
      import org.apache.kafka.common.TopicPartition;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
    
      import java.util.*;
      import java.util.Map.Entry;
    
      /**
       * 獲取kafka broker 每一個topic分區可讀offset範圍(最小值和最大值)
       * 一、先獲取每一個topic分區對應的leader
       * 二、在鏈接對應leader獲取最小值和最大值
       * 注意:clientID值是隨意取的,不過通常都會取一個有意義的值,好排查
       */
      public class KafkaHeplerOldAPI {
    
          private static Logger logger = LoggerFactory.getLogger(KafkaHeplerOldAPI.class);
    
          /**
           * 獲取brokers  leader
           * key:topic
           * value:topic的全部分區信息,包括leader
           * SimpleConsumer最後一個參數 clientID是能夠隨意取值的,通常取一個有意義的值
           */
          public static HashMap<String, List<PartitionMetadata>> findLeader(String brokerHost, int port, List<String> topicList) {
              HashMap<String, List<PartitionMetadata>> map = new HashMap<>();
              SimpleConsumer consumer = null;
              try {
                  consumer = new SimpleConsumer(brokerHost, port, 100000, 64 * 1024, "findLeader" + System.currentTimeMillis());
                  TopicMetadataRequest req = new TopicMetadataRequest(topicList);
                  TopicMetadataResponse resp = consumer.send(req);
    
                  List<TopicMetadata> metaData = resp.topicsMetadata();
                  for (TopicMetadata item : metaData) {
                      map.put(item.topic(), item.partitionsMetadata());
                  }
              } catch (Exception e) {
                  logger.error("經過brokerhost:{}, brokerPort:{}, topic:{} 來查找leader失敗!", brokerHost, port, String.join("|", topicList));
              } finally {
                  if (consumer != null) {
                      consumer.close();
                  }
              }
              return map;
          }
    
          /**
           * 經過leader的host和port來獲取指定topic、partition的偏移量
           * clientName 能夠隨便取值,通常取有意義的值
           * 注意:這裏能夠在requestInfo中組裝這個leader中全部的分區,這樣就能夠一次性拿出來
           * 減小leader鏈接,可是這樣得返回一個list,還須要對應組裝
           * 而如今這種狀況比較直觀,缺點是每一個分區都會鏈接一下,不過實測每次耗時很低
           */
          public static long getOffsetByLeader(String leadHost, int leadPort, String topic, int partition, boolean isLast) {
              String clientName = "Client_" + topic + "_" + partition;
              long indexFlag = isLast ? kafka.api.OffsetRequest.LatestTime() : kafka.api.OffsetRequest.EarliestTime();
    
              Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
              requestInfo.put(new TopicAndPartition(topic, partition), new PartitionOffsetRequestInfo(indexFlag, 1));
              kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
    
              SimpleConsumer consumer = new SimpleConsumer(leadHost, leadPort, 100000, 64 * 1024, clientName);
              OffsetResponse response = consumer.getOffsetsBefore(request);
              consumer.close();
    
              if (response.hasError()) {
                  logger.error("getOffsetByLeader 中獲取topicAndPartition:[{}] 失敗!", topic + "_" + partition);
                  return 0;
              }
    
              long[] offsets = response.offsets(topic, partition);
              return offsets[0];
          }
    
          /**
           * 獲取偏移量。最先的或者最晚的
           * 先獲取每個topic的分區對應的leader
           */
          private static Map<TopicPartition, Long> getOffsetOneBroker(String kafkaHost, int kafkaPort, List<String> topicList, boolean isLast) {
              Map<TopicPartition, Long> partionAndOffset = new HashMap<>();
    
              // 獲取每個topic對應的分區信息,PartitionMetadata裏面包含了leader
              HashMap<String, List<PartitionMetadata>> metadatas = findLeader(kafkaHost, kafkaPort, topicList);
    
              for (Entry<String, List<PartitionMetadata>> entry : metadatas.entrySet()) {
                  String topic = entry.getKey();
                  for (PartitionMetadata temp : entry.getValue()) {
                      long offset = getOffsetByLeader(temp.leader().host(), temp.leader().port(), topic, temp.partitionId(), isLast);
                      partionAndOffset.put(new TopicPartition(topic, temp.partitionId()), offset);
                  }
              }
              return partionAndOffset;
          }
    
          /**
           * 對傳入進來的bootstrapServers配置逐個獲取
           * 其實只須要傳遞一個正確的broker進來便可,由於只要有一個broker可以鏈接上便可獲取到全部的leader
           */
          private static Map<TopicPartition, Long> getOffsetAllHosts(String bootstrapServers, List<String> topicList, boolean isLast) {
              String[] servers = bootstrapServers.split(",");
              List<String> kafkaHosts = new ArrayList<String>();
              List<Integer> kafkaPorts = new ArrayList<Integer>();
    
              for (String server : servers) {
                  String[] hostAndPort = server.split(":");
                  kafkaHosts.add(hostAndPort[0]);
                  kafkaPorts.add(Integer.parseInt(hostAndPort[1]));
              }
    
              Map<TopicPartition, Long> partionAndOffset = null;
              for (int i = 0, size = kafkaHosts.size(); i < size; i++) {
                  partionAndOffset = getOffsetOneBroker(kafkaHosts.get(i), kafkaPorts.get(i), topicList, isLast);
    
                  if (partionAndOffset.size() > 0) {
                      break;
                  }
              }
              return partionAndOffset;
          }
    
          /**
           * 爲了解決kafka.common.OffsetOutOfRangeException
           * 當streaming zk裏面記錄kafka偏移小於kafka有效偏移,就會出現OffsetOutOfRangeException
           * <p>
           * 最先的offset 剛開始是0,過時(默認7天)的數據刪除後,那麼就會慢慢變大
           * 若是kafka中沒有可讀的數據,那麼最新和最舊的值是同樣的
           *
           * [@param](https://my.oschina.net/u/2303379) bootstrapServers kafka配置{e.g hosts1:9092,hosts2:9092,hosts3:9092}
           */
          public static Map<TopicPartition, Long> getEarliestOffset(String bootstrapServers, List<String> topicList) throws Exception {
              return getOffsetAllHosts(bootstrapServers, topicList, false);
          }
    
          /**
           * 初始化到最新數據
           *
           * [@param](https://my.oschina.net/u/2303379) bootstrapServers kafka配置{e.g hosts1:9092,hosts2:9092,hosts3:9092}
           */
          public static Map<TopicPartition, Long> getLastestOffset(String bootstrapServers, List<String> topicList) throws Exception {
              return getOffsetAllHosts(bootstrapServers, topicList, true);
          }
    
          public static void main(String[] args) throws Exception {
              List<String> topicList = Arrays.asList("one,two".split(","));
              Map<TopicPartition, Long> lastestOffset = getLastestOffset("cloudera:9092", topicList);
              Map<TopicPartition, Long> earliestOffset = getEarliestOffset("cloudera:9092", topicList);
          }
      }
  • 七、新api獲取kafka的最小和最新偏移量

    package com.bigdata.hadoop.kafka.utils;
    
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.common.PartitionInfo;
      import org.apache.kafka.common.TopicPartition;
      import org.apache.kafka.common.serialization.StringDeserializer;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
    
      import java.util.*;
    
      /**
       * 獲取kafka broker 每一個topic分區可讀offset範圍(最小值和最大值)
       */
      public class KafkaHeplerNewAPI {
    
          private static Logger logger = LoggerFactory.getLogger(KafkaHeplerNewAPI.class);
    
          /**
           * 獲取最新的offset
           *
           * [@param](https://my.oschina.net/u/2303379) consumer
           * [@param](https://my.oschina.net/u/2303379) topics
           * [@return](https://my.oschina.net/u/556800)
           */
          public static Map<TopicPartition, Long> getLastOffsets(KafkaConsumer<String, String> consumer, List<String> topics) {
    
              // 收集全部的分區信息
              LinkedList<TopicPartition> topicPartitions = new LinkedList<>();
              for (String topic : topics) {
                  for (PartitionInfo parTemp : consumer.partitionsFor(topic)) {
                      topicPartitions.add(new TopicPartition(topic, parTemp.partition()));
                  }
              }
    
              // 獲取分區最先和最新的offset  => 0.10以上版本
              Map<TopicPartition, Long> result = consumer.endOffsets(topicPartitions);
    
              // 若是api不支持上面方式,使用下面這種
      //        Map<TopicPartition, Long> result = new HashMap<>();
      //        consumer.assign(topicPartitions);
      //        consumer.seekToEnd(topicPartitions);  // 這句很重要,重定位到最後,才能拿到最新的offset
      //        for (TopicPartition temp : topicPartitions) {
      //            result.put(temp, consumer.position(temp));
      //        }
    
              return result;
          }
    
          /**
           * 獲取最新的offset
           *
           * @param consumer
           * @param topics
           * @return
           */
          public static Map<TopicPartition, Long> getEarliestOffsets(KafkaConsumer<String, String> consumer, List<String> topics) {
              // 收集全部的分區信息
              LinkedList<TopicPartition> topicPartitions = new LinkedList<>();
              for (String topic : topics) {
                  for (PartitionInfo parTemp : consumer.partitionsFor(topic)) {
                      topicPartitions.add(new TopicPartition(topic, parTemp.partition()));
                  }
              }
    
              // 獲取分區最先和最新的offset  => 0.10以上版本
              Map<TopicPartition, Long> result = consumer.beginningOffsets(topicPartitions);
    
              // 若是api不支持上面方式,使用下面這種。
      //        Map<TopicPartition, Long> result = new HashMap<>();
      //        consumer.assign(topicPartitions);
      //     	  consumer.seekToBeginning(topicPartitions);  // 這句很重要,重定位到最前面,才能拿到最先的offset
      //        for (TopicPartition temp : topicPartitions) {
      //            result.put(temp, consumer.position(temp));
      //        }
    
              return result;
          }
    
          public static void main(String[] args) throws Exception {
              List<String> topicList = Arrays.asList("one,two".split(","));
              Map<String, Object> props = new HashMap<>();
              props.put("bootstrap.servers", "cloudera:9092");
              props.put("group.id", "testGroup1");
              props.put("auto.offset.reset", "earliest");  // earliest  或者 latest
              props.put("key.deserializer", StringDeserializer.class);
              props.put("value.deserializer", StringDeserializer.class);
              if (false) {  // kerberos認證開關
                  props.put("security.protocal", "SASL_PLAINTEXT");
              }
              KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    
              Map<TopicPartition, Long> lastestOffset = getLastOffsets(consumer, topicList);
              Map<TopicPartition, Long> earliestOffset = getEarliestOffsets(consumer, topicList);
          }
      }
相關文章
相關標籤/搜索