【HAVENT原創】Spring Boot + Spring-Kafka 異步配置

近日咱們項目組採用 Kafka 來作系統日誌統一管理,可是天降橫禍的讓 Kafka 集羣(3臺服務器)都掛了,堪比中大獎的節奏,隨之而來的是使用 Kafka 發送消息日誌的服務所有卡死,通過排查發現竟然是 Kafka 當機致使了調用 Kafka 發送日誌服務一直處於阻塞狀態。java

最後咱們在檢查代碼的時候發現,若是沒法鏈接 Kafka 服務,則會出現一分鐘的阻塞。以上問題有兩種解決方案:算法

1、開啓異步模式 ( @EnableAsync )spring

@EnableAsync
@Configuration
public class KafkaProducerConfig {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);

    @Value("${kafka.brokers}")
    private String servers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) {
        return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper));
    }

    @Bean
    public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) {
        return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper));
    }

    @Bean
    public Producer producer() {
        return new Producer();
    }
}
public class Producer {

    public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);

    @Autowired
    private KafkaTemplate<String, GenericMessage> kafkaTemplate;

    @Async
    public void send(String topic, GenericMessage message) {
        ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() {

            @Override
            public void onSuccess(final SendResult<String, GenericMessage> message) {
                LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(final Throwable throwable) {
                LOGGER.error("unable to send message= " + message, throwable);
            }
        });
    }
}

 

2、若是使用同步模式,能夠經過修改配置參數 MAX_BLOCK_MS_CONFIG ( max.block.ms / 默認 60s ) 來縮短阻塞時間apache

package com.havent.demo.logger.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.scheduling.annotation.EnableAsync;

import java.util.HashMap;
import java.util.Map;

@EnableAsync
@Configuration
@EnableKafka
public class KafkaConfiguration {
    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String serverAddress;
    
    public Map<String, Object> producerConfigs() {
        System.out.println("HH > serverAddress: " + serverAddress);

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverAddress);

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 若是請求失敗,生產者會自動重試,咱們指定是0次,若是啓用重試,則會有重複消息的可能性
        props.put(ProducerConfig.RETRIES_CONFIG, 0);

        // Request發送請求,即Batch批處理,以減小請求次數,該值即爲每次批處理的大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);

        /**
         * 這將指示生產者發送請求以前等待一段時間,但願更多的消息填補到未滿的批中。這相似於TCP的算法,例如上面的代碼段,
         * 可能100條消息在一個請求發送,由於咱們設置了linger(逗留)時間爲1毫秒,而後,若是咱們沒有填滿緩衝區,
         * 這個設置將增長1毫秒的延遲請求以等待更多的消息。 須要注意的是,在高負載下,相近的時間通常也會組成批,即便是
         * linger.ms=0。在不處於高負載的狀況下,若是設置比0大,以少許的延遲代價換取更少的,更有效的請求。
         */
        props.put(ProducerConfig.LINGER_MS_CONFIG, 2000);

        /**
         * 控制生產者可用的緩存總量,若是消息發送速度比其傳輸到服務器的快,將會耗盡這個緩存空間。
         * 當緩存空間耗盡,其餘發送調用將被阻塞,阻塞時間的閾值經過max.block.ms設定, 以後它將拋出一個TimeoutException。
         */
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);

        // 用於配置send數據或partitionFor函數獲得對應的leader時,最大的等待時間,默認值爲60秒
        // HH 警告:如沒法鏈接 kafka 會致使程序卡住,儘可能不要設置等待過久
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 100);


        // 消息發送的最長等待時間
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 100);

        // 0:不保證消息的到達確認,只管發送,低延遲可是會出現消息的丟失,在某個server失敗的狀況下,有點像TCP
        // 1:發送消息,並會等待leader 收到確認後,必定的可靠性
        // -1:發送消息,等待leader收到確認,並進行復制操做後,才返回,最高的可靠性
        props.put(ProducerConfig.ACKS_CONFIG, "0");

        System.out.println(props);
        return props;
    }

    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        return kafkaTemplate;
    }
    
}

 

謹以此獻給那些被 Spring Kafka 同步模式坑害又苦無出路的同胞。。。bootstrap

相關文章
相關標籤/搜索