public class KafkaConsumer2 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "127.0.0.1:9092"); props.put("group.id", "one"); props.put("enable.auto.commit", true); props.put("auto.commit.interval.ms", 5000); props.put("session.timeout.ms", 50000); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //設置使用最開始的offset偏移量爲該group.id的最先。若是不設置,則會是latest即該topic最新一個消息的offset //若是採用latest,消費者只能得道其啓動後,生產者生產的消息 // props.put("auto.offset.reset", "earliest"); props.put("auto.offset.reset", "latest"); KafkaConsumer<String, String> consumer = new KafkaConsumer(props); try { String topicName = "mykafka"; //重置offset,可同時設置多個topic和partition Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>(); hashMaps.put(new TopicPartition(topicName, 0), new OffsetAndMetadata(2)); consumer.commitSync(hashMaps); //啓動訂閱 consumer.subscribe(Arrays.asList(topicName)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (ConsumerRecord<String, String> record : records) { String value = record.value(); long offset = record.offset(); int partition = record.partition(); String topic = record.topic(); String f="-------->>>topic=%s,offset=%s,partition=%s,value=%s"; System.out.println(String.format(f,topic,offset,partition,value)); } consumer.commitSync(); } } catch (Exception e) { e.printStackTrace(); }finally { if(consumer != null){ consumer.close(); } } } }
public class KafkaProudcer2 { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "127.0.0.1:9092"); props.put("retries", 3); props.put("linger.ms", 1); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); while (true){ String topicName = "mykafka"; ProducerRecord<String, String> record = new ProducerRecord<String, String>( topicName, System.currentTimeMillis()+"", "今每天氣不錯喲yoyo=======>"); producer.send(record, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) System.out.println("the producer has a error:" + e.getMessage()); else { String f="----------->>>partition=%s,offset=%s"; System.out.println(String.format(f,metadata.partition(),metadata.offset())); } } }); try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } } // producer.close(); } }