1,首先springboot對kafka的支持也很好,一樣是在配置文件中配置好參數,而後就能夠直接使用。先說一下,很簡單,,,不要怕java
2,我用的依賴是spring
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
配置文件apache
kafka: bootstrap-servers: 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092 producer: retries: 1 batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer bootstrap-servers: 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092 consumer: bootstrap-servers: 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092 enable-auto-commit: true auto-offset-reset: latest auto-commit-interval: 1000 group-id: gzj
而後在須要往kafka發送數據的地方,也就是生產者,直接注入便可bootstrap
@Autowired private KafkaTemplate<String, Object> kafkaTemplate;
消費者,監聽springboot
@KafkaListener(topics = {"gzj"}) public void receive(String content){ System.err.println("Receive:" + content); }
消費者還有另外一種方法,app
package com.gzj.demo.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * Description * <p> * </p> * DATE 2018/10/23. * * @author guozhenjiang. */ @Component public class KafkaConsumerTask implements Runnable,InitializingBean { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTask.class); private Thread thread; @Resource(name="kafkaConsumer") private KafkaConsumer<String,String> kafkaConsumer; @Override public void run() { logger.info("消費數據任務啓動"); while(true){ try{ ConsumerRecords<String ,String > records=kafkaConsumer.poll(1000); if(records!=null){ for(ConsumerRecord<String ,String > record:records){ logger.error(record.key()); logger.error(record.topic()); logger.error(record.value()); } } }catch(Exception e){ // logger.error("我也不知道哪兒錯了"); }finally { // logger.error("不放棄"); } } } @Override public void afterPropertiesSet() throws Exception { this.thread=new Thread(this); this.thread.start(); } }
package com.gzj.demo.config; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Arrays; import java.util.Properties; /** * Description * <p> * </p> * DATE 2018/10/23. * * @author guozhenjiang. */ @Configuration @ConfigurationProperties(prefix = "spring.kafka") public class KafkaConnectConfig { @Bean(name = "kafkaConsumer") public KafkaConsumer<String, String> kafkaConsumer() { Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("group.id", "ggg"); props.setProperty("enable.auto.commit", enableAutoCommit); props.setProperty("auto.offset.reset", autoOffsetReset); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("gzj")); return consumer; } @Value("${server.port}") private String port; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.enable-auto-commit}") private String enableAutoCommit; @Value("${spring.kafka.consumer.auto-offset-reset}") private String autoOffsetReset; public String getGroupId() { return groupId; } public void setGroupId(String groupId) { this.groupId = groupId; } public String getBootstrapServers() { return bootstrapServers; } public void setBootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; } public String getEnableAutoCommit() { return enableAutoCommit; } public void setEnableAutoCommit(String enableAutoCommit) { this.enableAutoCommit = enableAutoCommit; } public String getAutoOffsetReset() { return autoOffsetReset; } public void setAutoOffsetReset(String autoOffsetReset) { this.autoOffsetReset = autoOffsetReset; } }
後一種暫未發現有什麼優勢。均可以實現監聽kafka,充當消費者ide
3,如今我有兩個消費者,以前一直好奇若是多個消費者,如何讓他們重複消費,或協同消費,據說是經過配置groupid,親自試驗了一下,確實是,同一個groupid裏是協同的,不通的是重複的。this
也沒什麼,挺簡單的,有什麼問題能夠提問,開源中國的app我下好了,應該常常登陸server