POM apache
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency>
生產者:debug
public class KafkaProducerTest { private static final Logger LOG = LoggerFactory.getLogger(Mqtt2Kafka.class); public static void main(String[] avgs){ LOG.debug("開始"); Producer<String, String> producer = new KafkaProducer<>(KafkaParams.getSer9ParamsSender()); int i=1; while (i<=10_0000){ producer.send(new ProducerRecord<>("test2", 0, null, Integer.toString(i))); if (i%1000==0) producer.flush(); i++; } producer.flush(); producer.close(); LOG.debug("結束"); } }
消費者:code
public class KafkaConsumerTest { private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerTest.class); public static void main(String[] avgs) { LOG.debug("開始"); Consumer<String, String> consumer = new KafkaConsumer<>(KafkaParams.getSer9ParamsSender(), new StringDeserializer(), new StringDeserializer()); consumer.subscribe(Arrays.asList("test2")); while (true){ ConsumerRecords<String, String> poll = consumer.poll(20); Iterator<ConsumerRecord<String, String>> iterator = poll.iterator(); while (iterator.hasNext()){ ConsumerRecord<String, String> next = iterator.next(); System.out.println("key:"+ next.key() + "---value:"+ next.value()); } } } }