一、配置application.ymljava
spring: kafka: listener: ack-mode: manual bootstrap-servers: localhost:9092 #kafka的ip+post(若是是集羣,用逗號分隔) producer: retries: 0 #當大於0時,會啓用重試機制 batch-size: 16384 #發送前批處理的記錄數 buffer-memory: 33554432 #生產者能夠緩存的總字節數(待發送到服務器的數據) key-serializer: org.apache.kafka.common.serialization.StringSerializer #key的序列化 value-serializer: org.apache.kafka.common.serialization.StringSerializer #value的序列化 consumer: group-id: foo #消費者組(惟一) auto-offset-reset: earliest #當kafka上不存在偏移量或沒有初始偏移量時,須要作什麼 enable-auto-commit: false #若是爲true,則在後臺定時提交消費者偏移量 auto-commit-interval: 100 #若是enable-auto-commit=true,則每毫秒提交100偏移量到kafka key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #key的序列化 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #value的序列化
二、配置Application.javaspring
@SpringBootApplication @EnableKafka public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
三、配置消費者apache
@Component public class Consumer { private Logger logger = LoggerFactory.getLogger(getClass()); @KafkaListener(topics = "myTopic") public void listener(ConsumerRecord<?, ?> cr) throws Exception { String key = (String) cr.key(); logger.info("key:{}", key); String value = (String) cr.value(); logger.info("value:{}", value); } @KafkaListener(topics = "myTopic2") public void listen(String data, Acknowledgment ack) { logger.info("data:{}", data); ack.acknowledge(); } }
四、配置生產者bootstrap
@RunWith(value = SpringJUnit4ClassRunner.class) @SpringBootTest(classes = Application.class) public class ProducerTest { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private KafkaTemplate<String, String> template; @Test public void send() { for (int i = 0; i < 100; i++) { this.template.send("myTopic2", "foo" + i); } logger.info("All received"); } }