Java實現Kafka Consumerr方法

import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;java

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;apache

public class MyConsumer {
  private static KafkaConsumer<String,String> consumer;
  private static Properties kfkProperties;bootstrap

  static{
    kfkProperties = new Properties();
    kfkProperties.put("bootstrap.servers","slave1:9092");
    kfkProperties.put("group.id","kafkatest");
    kfkProperties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    kfkProperties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  }異步

  /**
  * consumer 1 : 自動提交位移
  */
  private static void generalConsumerMessageAutoCommit(){
    kfkProperties.put("enable.auto.commit",true);
    consumer = new KafkaConsumer<>(kfkProperties);
    // consumer.subscribe(Collections.singletonList("kafkatest"));
    String topic = "kafkatest";
    TopicPartition partition = new TopicPartition(topic, 1);
    List<TopicPartition> lists=new ArrayList<TopicPartition>();
    lists.add(partition);
    consumer.assign(lists);
    consumer.seekToBeginning(lists);async

    // consumer.seek(partition, 0);
    try{
      while(true){函數

        ConsumerRecords<String,String> records = consumer.poll(8000);
        for(ConsumerRecord<String,String> record : records){
        System.out.println(record.timestamp() + "," +record.topic() + "," + record.partition() + "," + record.offset() + " " + record.key() +"," + record.value());
      }
      try{
        Thread.sleep(1000);
      }catch (Exception e){
        e.printStackTrace();
      }測試

    }server

    }finally {
      consumer.close();
    }
  }get

  /**
  * consumer 2 : 手動提交位移
  */
  private static void generalConsumerMessageManualCommitSync() {
    kfkProperties.put("enable.auto.commit",false);
    consumer = new KafkaConsumer<>(kfkProperties);
    consumer.subscribe(Collections.singletonList("kafkatest"));kafka

    while(true){
      ConsumerRecords<String,String> records = consumer.poll(80);
      for(ConsumerRecord<String,String> record : records){
        System.out.println(record.timestamp() + "," +record.topic() + "," + record.partition() + "," + record.offset() + " " + record.key() +"," + record.value());
      }
      try{
        consumer.commitSync();
      }catch (CommitFailedException e){
        System.out.println("commit failed msg" + e.getMessage());
      }

    }

  }

  /**
  * consumer 3 異步提交位移
  */
  private static void generalConsumerMessageManualCommitAsync(){
    kfkProperties.put("enable.auto.commit",false);
    consumer = new KafkaConsumer<>(kfkProperties);
    consumer.subscribe(Collections.singletonList("kafkatest"));

    while(true){
      ConsumerRecords<String,String> records = consumer.poll(80);
      for(ConsumerRecord<String,String> record : records){
        System.out.println(record.timestamp() + "," +record.topic() + "," + record.partition() + "," + record.offset() + " " + record.key() +"," + record.value());

      }

      consumer.commitAsync();
    }
}

  /**
  * consumer 4 異步提交位移帶回調
  */
  private static void generalConsumerMessageManualCommitAsyncWithCallBack(){
    kfkProperties.put("enable.auto.commit",false);
    consumer = new KafkaConsumer<>(kfkProperties);
    consumer.subscribe(Collections.singletonList("kafkatest"));

    while(true){
      ConsumerRecords<String,String> records = consumer.poll(80);
      for(ConsumerRecord<String,String> record : records){
        System.out.println(record.timestamp() + "," +record.topic() + "," + record.partition() + "," + record.offset() + " " + record.key() +"," + record.value());
      }

      consumer.commitAsync((offsets,e)->{
        if(null != e){
          System.out.println("commit async callback error" + e.getMessage());
          System.out.println(offsets);
        }
      });
    }
  }
  /**
    * consumer 5 混合提交方式
  */
  private static void generalMixSyncAndAsyncCommit(){
    kfkProperties.put("enable.auto.commit",false);
    consumer = new KafkaConsumer<>(kfkProperties);
    consumer.subscribe(Collections.singletonList("kafkatest"));
    try{
      while(true){
      ConsumerRecords<String,String> records = consumer.poll(80);
      for(ConsumerRecord<String,String> record : records){
        System.out.println(record.timestamp() + "," +record.topic() + "," + record.partition() + "," + record.offset() + " " + record.key() +"," + record.value());
      }

       consumer.commitAsync();
     }

      }catch (Exception e){
        System.out.println("commit async error: " + e.getMessage());
      }finally {
        try{
          consumer.commitSync();
        }finally {
          consumer.close();
        }
      }
    }

  /**
  *
  * @param args
  * 主函數測試
  */
  public static void main(String[] args) {
    MyConsumer.generalConsumerMessageAutoCommit();
  }

  }

相關文章
相關標籤/搜索