Kafka消費者重置offset讀取數據

public class KafkaConsumer2 {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("group.id", "one");
        props.put("enable.auto.commit", true);
        props.put("auto.commit.interval.ms", 5000);
        props.put("session.timeout.ms", 50000);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //設置使用最開始的offset偏移量爲該group.id的最先。若是不設置,則會是latest即該topic最新一個消息的offset
        //若是採用latest,消費者只能得道其啓動後,生產者生產的消息
//        props.put("auto.offset.reset", "earliest");
        props.put("auto.offset.reset", "latest");
        KafkaConsumer<String, String> consumer =  new KafkaConsumer(props);
        try {
            String topicName = "mykafka";
            //重置offset,可同時設置多個topic和partition
            Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>();
            hashMaps.put(new TopicPartition(topicName, 0), new OffsetAndMetadata(2));
            consumer.commitSync(hashMaps);
            //啓動訂閱
            consumer.subscribe(Arrays.asList(topicName));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                for (ConsumerRecord<String, String> record : records) {
                    String value = record.value();
                    long offset = record.offset();
                    int partition = record.partition();
                    String topic = record.topic();
                    String f="-------->>>topic=%s,offset=%s,partition=%s,value=%s";
                    System.out.println(String.format(f,topic,offset,partition,value));
                }
                consumer.commitSync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(consumer != null){
                consumer.close();
            }
        }
    }
}
public class KafkaProudcer2 {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("retries", 3);
        props.put("linger.ms", 1);
        props.put("key.serializer",   "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",  "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        while (true){
            String topicName = "mykafka";
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                    topicName, System.currentTimeMillis()+"", "今每天氣不錯喲yoyo=======>");
            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e != null)
                        System.out.println("the producer has a error:" + e.getMessage());
                    else {
                        String f="----------->>>partition=%s,offset=%s";
                        System.out.println(String.format(f,metadata.partition(),metadata.offset()));
                    }
                }
            });
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
        // producer.close();
    }
}
相關文章
相關標籤/搜索