spring kafka開發

一、配置application.ymljava

spring:
  kafka:
    listener:
      ack-mode: manual
    bootstrap-servers: localhost:9092 #kafka的ip+post(若是是集羣,用逗號分隔)
    producer:
      retries: 0 #當大於0時,會啓用重試機制
      batch-size: 16384 #發送前批處理的記錄數
      buffer-memory: 33554432 #生產者能夠緩存的總字節數(待發送到服務器的數據)
      key-serializer: org.apache.kafka.common.serialization.StringSerializer #key的序列化
      value-serializer: org.apache.kafka.common.serialization.StringSerializer #value的序列化
    consumer:
      group-id: foo #消費者組(惟一)
      auto-offset-reset: earliest #當kafka上不存在偏移量或沒有初始偏移量時,須要作什麼
      enable-auto-commit: false #若是爲true,則在後臺定時提交消費者偏移量
      auto-commit-interval: 100 #若是enable-auto-commit=true,則每毫秒提交100偏移量到kafka
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #key的序列化
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #value的序列化

二、配置Application.javaspring

@SpringBootApplication
@EnableKafka
public class Application {
	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}
}

三、配置消費者apache

@Component
public class Consumer {
	private Logger logger = LoggerFactory.getLogger(getClass());

	@KafkaListener(topics = "myTopic")
	public void listener(ConsumerRecord<?, ?> cr) throws Exception {
		String key = (String) cr.key();
		logger.info("key:{}", key);
		String value = (String) cr.value();
		logger.info("value:{}", value);
	}

	@KafkaListener(topics = "myTopic2")
	public void listen(String data, Acknowledgment ack) {
		logger.info("data:{}", data);
		ack.acknowledge();
	}
}

四、配置生產者bootstrap

@RunWith(value = SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = Application.class)
public class ProducerTest {
	private Logger logger = LoggerFactory.getLogger(getClass());

	@Autowired
	private KafkaTemplate<String, String> template;

	@Test
	public void send() {
		for (int i = 0; i < 100; i++) {
			this.template.send("myTopic2", "foo" + i);
		}
		logger.info("All received");
	}
}
相關文章
相關標籤/搜索