@Test public void myConsumer(){ Properties props = new Properties(); props.put("bootstrap.servers", "47.100.131.226:9092,47.100.95.99:9092,47.100.91.36:9092"); props.put("group.id", "etlGroupTEST003"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("max.poll.records", "10"); //props.put("auto.offset.reset", "latest"); // latest : 不一樣group 若沒有已提交的offset 則從最新的offset開始消費 props.put("auto.offset.reset", "earliest"); // earliest : 不一樣group 若沒有已提交的offset 則從頭開始消費 //建立消費者配置對象 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String,String>(props); /*TopicPartition[] topicPartitions = new TopicPartition[]{ new TopicPartition("SC.CMCC_Children_login", 0), new TopicPartition("SC.CMCC_Children_login", 1), new TopicPartition("SC.CMCC_Children_login", 2), };*/ //kafkaConsumer.assign(Arrays.asList(topicPartitions)); kafkaConsumer.subscribe(Arrays.asList("SC.CMCC_Children_ui")); //訂閱該主題下所有分區, 各分區內容不一樣 while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(1000); System.out.println("拉取記錄數:"+records.count()); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, partition = %d, value = %s", record.offset(), record.partition(), record.value()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(); } } }