spring boot 集成kafka (多線程,消費者使用kafka的原生api實現,由於@KakfkaListener修改groupId無效)

application-test.properties算法

 1 #kafka
 2 kafka.consumer.zookeeper.connect=*:2181
 3 kafka.consumer.servers=*:9092
 4 kafka.consumer.enable.auto.commit=true
 5 kafka.consumer.session.timeout=6000
 6 kafka.consumer.auto.commit.interval=1000
 7 #保證每一個組一個消費者消費同一條消息,若設置爲earliest,那麼會從頭開始讀partition(none)
 8 kafka.consumer.auto.offset.reset=latest
 9 kafka.consumer.concurrency=10
10 
11 kafka.producer.servers=*:9092
12 kafka.producer.retries=0
13 kafka.producer.batch.size=4096
14 #//往kafka服務器提交消息間隔時間,0則當即提交不等待
15 kafka.producer.linger=1
16 kafka.producer.buffer.memory=40960

啓動類apache

@SpringBootApplication
@EnableScheduling
public class Application {

    @Autowired
    private KafkaSender kafkaSender;

    public static void main(String[] args) {
        SpringApplication.run(Application .class, args);
    }


    //而後每隔1分鐘執行一次
    @Scheduled(fixedRate = 1000 * 60)
    public void testKafka() throws Exception {
        kafkaSender.sendTest();
    }
}

生產者:安全

 1 @Component
 2 public class KafkaSender {
 3 
 4     @Resource
 5     KafkaConsumerPool consumerPool;
 6 
 7     /**
 8      *  這裏須要放到程序啓動完成以後執行 TODO
 9      */
10     @PostConstruct
11     void d(){
12 
13         ConsumerGroup consumerThread = new ConsumerGroup("gropu-1","access_data",consumerConfig);
14         ConsumerGroup consumerThread2 = new ConsumerGroup("gropu-2","access_data", consumerConfig);
15 
16         /**
17          * 各起兩個消費者 ,Kafka consumer是非線程安全的 Consumer 須要一個new 的
18          */
19         consumerPool.SubmitConsumerPool(new Consumer(consumerThread));
20         consumerPool.SubmitConsumerPool(new Consumer(consumerThread));
21 
22         consumerPool.SubmitConsumerPool(new Consumer(consumerThread2));
23         consumerPool.SubmitConsumerPool(new Consumer(consumerThread2));
24     }
25 
26 
27     @Resource
28     KafkaConsumerConfig consumerConfig;
29 
30     @Autowired
31     private KafkaTemplate kafkaTemplate;
32 
33     @Autowired
34     private KafkaTopics kafkaTopics;
35 
36     /**
37      * 發送消息到kafka
38      *
39      */
40     public void sendTest() throws InterruptedException, IOException, KeeperException {
41 
42         /**
43          * topic='access_data'
44          */
45         kafkaTemplate.send("access_data",""+ System.currentTimeMillis());
46         kafkaTemplate.send("access_data",""+System.currentTimeMillis());
47         kafkaTemplate.send("access_data",""+System.currentTimeMillis());
48         kafkaTemplate.send("access_data",""+System.currentTimeMillis());
49         kafkaTemplate.send("access_data",""+System.currentTimeMillis());
50         kafkaTemplate.send("access_data",""+System.currentTimeMillis());
51     }
52 
53 
54 }
KafkaProducerConfig
@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Value("${kafka.producer.servers}")
    private String servers;
    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch.size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }
}
KafkaConsumerConfig
 1 @Configuration
 2 @EnableKafka
 3 public class KafkaConsumerConfig {
 4 
 5     @Value("${kafka.consumer.zookeeper.connect}")
 6     public String zookeeperConnect;
 7     @Value("${kafka.consumer.servers}")
 8     public  String servers;
 9     @Value("${kafka.consumer.enable.auto.commit}")
10     public  boolean enableAutoCommit;
11     @Value("${kafka.consumer.session.timeout}")
12     public  String sessionTimeout;
13     @Value("${kafka.consumer.auto.commit.interval}")
14     public  String autoCommitInterval;
15     @Value("${kafka.consumer.auto.offset.reset}")
16     public  String autoOffsetReset;
17     @Value("${kafka.consumer.concurrency}")
18     public  int concurrency;
19 
20 
21     public String getZookeeperConnect() {
22         return zookeeperConnect;
23     }
24 
25     public void setZookeeperConnect(String zookeeperConnect) {
26         this.zookeeperConnect = zookeeperConnect;
27     }
28 
29     public String getServers() {
30         return servers;
31     }
32 
33     public void setServers(String servers) {
34         this.servers = servers;
35     }
36 
37     public boolean isEnableAutoCommit() {
38         return enableAutoCommit;
39     }
40 
41     public void setEnableAutoCommit(boolean enableAutoCommit) {
42         this.enableAutoCommit = enableAutoCommit;
43     }
44 
45     public String getSessionTimeout() {
46         return sessionTimeout;
47     }
48 
49     public void setSessionTimeout(String sessionTimeout) {
50         this.sessionTimeout = sessionTimeout;
51     }
52 
53     public String getAutoCommitInterval() {
54         return autoCommitInterval;
55     }
56 
57     public void setAutoCommitInterval(String autoCommitInterval) {
58         this.autoCommitInterval = autoCommitInterval;
59     }
60 
61     public String getAutoOffsetReset() {
62         return autoOffsetReset;
63     }
64 
65     public void setAutoOffsetReset(String autoOffsetReset) {
66         this.autoOffsetReset = autoOffsetReset;
67     }
68 
69     public int getConcurrency() {
70         return concurrency;
71     }
72 
73     public void setConcurrency(int concurrency) {
74         this.concurrency = concurrency;
75     }
76 }
Consumer
/**
 * 實際消費者,繼承了ShutdownableThread ,要多加幾個消費者直接繼承實現便可
 *
 * @create 2017-11-06 12:42
 * @update 2017-11-06 12:42
 **/
public class Consumer extends ShutdownableThread {

    /**
     * kafka 消費者
     */
    private  KafkaConsumer consumer;

    /**
     *  topic
     */
    private  String topic;

    /**
     *  組id
     */
    private  String groupId;


    public Consumer(ConsumerGroup consumerGroup) {
        super("",false);
        this.consumer = consumerGroup.getConsumer();
        this.topic = consumerGroup.getTopic();
        this.groupId = consumerGroup.getA_groupId();
    }

    /**
     *  * 監聽主題,有消息就讀取
     * 從kafka裏面獲得數據後,具體怎麼去處理. 若是須要開啓kafka處理消息的廣播模式,多個監聽要監聽不一樣的group,
     * 即方法上的註解@KafkaListener裏的group必定要不同.若是多個監聽裏的group寫的同樣,就會形成只有一個監聽能處理其中的消息,
     * 另外監聽就不能處理消息了.也便是kafka的分佈式消息處理方式.
     * 在同一個group裏的監聽,共同處理接收到的消息,會根據必定的算法來處理.若是不在一個組,可是監聽的是同一個topic的話,就會造成廣播模式
     */
    @Override
    public void doWork() {
        consumer.subscribe(Collections.singletonList(this.topic));
        ConsumerRecords<Integer, String> records = consumer.poll(1000);
        for (ConsumerRecord<Integer, String> record : records) {
            System.out.println("Thread: "+Thread.currentThread().getName()
                    +"Received message: (" + this.groupId + ", " + record.value() + ") at offset "
                    + record.offset()+" partition : "+records.partitions());
        }
    }
}
ConsumerGroup 設置消費組
 1 public class ConsumerGroup  {
 2 
 3     /**
 4      *  日誌處理
 5      */
 6     private static final Log log = LogFactory.getLog(ConsumerGroup.class);
 7 
 8     /**
 9      *  topic
10      */
11     private final String topic;
12 
13     /**
14      *  公共鏈接屬性
15      */
16     private  Properties props ;
17 
18     /**
19      * 消費者組
20      */
21     private final String groupId;
22 
23 
24     public ConsumerGroup(String groupId, String topic, KafkaConsumerConfig consumerConfig) {
25         createConsumerConfig(groupId,consumerConfig);
26         this.topic = topic;
27         this.groupId = groupId;
28     }
29 
30 
31     private Properties createConsumerConfig(String groupId, KafkaConsumerConfig consumerConfig) {
32         props = new Properties();
33         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,consumerConfig.servers);
34         props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
35         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerConfig.enableAutoCommit);
36         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumerConfig.autoCommitInterval);
37         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumerConfig.sessionTimeout);
38         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
39         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
40         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerConfig.autoOffsetReset);
41         // 其餘配置再配置
42         return props;
43     }
44 
45     public KafkaConsumer getConsumer() {
46         return new KafkaConsumer(props);
47     }
48 
49     /**
50      *  其餘類獲取topic
51      * @return
52      */
53     public String getTopic() {
54         return topic;
55     }
56 
57     public String getA_groupId() {
58         return groupId;
59     }
60 }
 1 @Component
 2 public class KafkaConsumerPool {
 3 
 4     /**
 5      * 日誌處理
 6      */
 7     private static final Log log = LogFactory.getLog(KafkaConsumerPool.class);
 8 
 9     /**
10      *  線程池
11      */
12     private ExecutorService executor;
13 
14     /**
15      * 初始化10個線程
16      */
17     @PostConstruct
18     void init(){
19         executor = Executors.newFixedThreadPool(10);
20     }
21 
22     /**
23      * 提交新的消費者
24      *
25      * @param shutdownableThread
26      */
27     public void SubmitConsumerPool(ShutdownableThread shutdownableThread) {
28         executor.execute(shutdownableThread);
29     }
30 
31     /**
32      * 程序關閉,關閉線程池
33      *
34      */
35     @PreDestroy
36     void fin(){
37         shutdown();
38     }
39 
40     public void shutdown() {
41         if (executor != null) executor.shutdown();
42         try {
43             if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
44                 log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
45             }
46         } catch (InterruptedException e) {
47             log.info("Interrupted during shutdown, exiting uncleanly");
48         }
49     }
50 }
相關文章
相關標籤/搜索