springboot+kafka(ip地址瞎寫的)

1,首先springboot對kafka的支持也很好,一樣是在配置文件中配置好參數,而後就能夠直接使用。先說一下,很簡單,,,不要怕java

2,我用的依賴是spring

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>

配置文件apache

kafka:
  bootstrap-servers: 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092
  producer:
    retries: 1
    batch-size: 16384
    buffer-memory: 33554432
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.apache.kafka.common.serialization.StringSerializer
    bootstrap-servers: 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092
  consumer:
    bootstrap-servers:  12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092
    enable-auto-commit: true
    auto-offset-reset: latest
    auto-commit-interval: 1000
    group-id: gzj

而後在須要往kafka發送數據的地方,也就是生產者,直接注入便可bootstrap

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

消費者,監聽springboot

@KafkaListener(topics = {"gzj"})
public void receive(String content){
    System.err.println("Receive:" + content);
}

消費者還有另外一種方法,app

package com.gzj.demo.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * Description
 * <p>
 * </p>
 * DATE 2018/10/23.
 *
 * @author guozhenjiang.
 */
@Component
public class KafkaConsumerTask implements Runnable,InitializingBean {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTask.class);
    private Thread thread;
    @Resource(name="kafkaConsumer")
    private KafkaConsumer<String,String> kafkaConsumer;

    @Override
    public void run() {
        logger.info("消費數據任務啓動");
        while(true){
            try{
                ConsumerRecords<String ,String > records=kafkaConsumer.poll(1000);
                if(records!=null){
                    for(ConsumerRecord<String ,String > record:records){
                        logger.error(record.key());
                        logger.error(record.topic());
                        logger.error(record.value());
                    }
                }
            }catch(Exception e){
               // logger.error("我也不知道哪兒錯了");
            }finally {
               // logger.error("不放棄");
            }
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        this.thread=new Thread(this);
        this.thread.start();
    }

}

 

package com.gzj.demo.config;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.Properties;

/**
 * Description
 * <p>
 * </p>
 * DATE 2018/10/23.
 *
 * @author guozhenjiang.
 */
@Configuration
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaConnectConfig {

    @Bean(name = "kafkaConsumer")
    public KafkaConsumer<String, String> kafkaConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("group.id", "ggg");
        props.setProperty("enable.auto.commit", enableAutoCommit);
        props.setProperty("auto.offset.reset", autoOffsetReset);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("gzj"));

        return consumer;
    }
    @Value("${server.port}")
    private String port;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String enableAutoCommit;
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    public String getGroupId() {
        return groupId;
    }

    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }

    public String getBootstrapServers() {
        return bootstrapServers;
    }

    public void setBootstrapServers(String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
    }


    public String getEnableAutoCommit() {
        return enableAutoCommit;
    }

    public void setEnableAutoCommit(String enableAutoCommit) {
        this.enableAutoCommit = enableAutoCommit;
    }

    public String getAutoOffsetReset() {
        return autoOffsetReset;
    }

    public void setAutoOffsetReset(String autoOffsetReset) {
        this.autoOffsetReset = autoOffsetReset;
    }
}

後一種暫未發現有什麼優勢。均可以實現監聽kafka,充當消費者ide

 

3,如今我有兩個消費者,以前一直好奇若是多個消費者,如何讓他們重複消費,或協同消費,據說是經過配置groupid,親自試驗了一下,確實是,同一個groupid裏是協同的,不通的是重複的。this

也沒什麼,挺簡單的,有什麼問題能夠提問,開源中國的app我下好了,應該常常登陸server

相關文章
相關標籤/搜索