Kafka的配置 親測

@Test
public void  myConsumer(){

   Properties props = new Properties();
   props.put("bootstrap.servers", "47.100.131.226:9092,47.100.95.99:9092,47.100.91.36:9092");
   props.put("group.id", "etlGroupTEST003");
   props.put("enable.auto.commit", "true");
   props.put("auto.commit.interval.ms", "1000");

   props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
   props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
   props.put("max.poll.records", "10");
   //props.put("auto.offset.reset", "latest");    // latest : 不一樣group 若沒有已提交的offset  則從最新的offset開始消費
   props.put("auto.offset.reset", "earliest"); // earliest : 不一樣group 若沒有已提交的offset  則從頭開始消費

   //建立消費者配置對象
   KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String,String>(props);

   /*TopicPartition[] topicPartitions = new TopicPartition[]{
         new TopicPartition("SC.CMCC_Children_login", 0),
         new TopicPartition("SC.CMCC_Children_login", 1),
         new TopicPartition("SC.CMCC_Children_login", 2),
   };*/
   //kafkaConsumer.assign(Arrays.asList(topicPartitions));
   kafkaConsumer.subscribe(Arrays.asList("SC.CMCC_Children_ui")); //訂閱該主題下所有分區, 各分區內容不一樣
   while (true) {
      ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
      System.out.println("拉取記錄數:"+records.count());
      for (ConsumerRecord<String, String> record : records) {
         System.out.printf("offset = %d, partition = %d, value = %s", record.offset(), record.partition(), record.value());
         try {
            Thread.sleep(100);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         System.out.println();
      }
   }

}
相關文章
相關標籤/搜索