近日咱們項目組採用 Kafka 來作系統日誌統一管理,可是天降橫禍的讓 Kafka 集羣(3臺服務器)都掛了,堪比中大獎的節奏,隨之而來的是使用 Kafka 發送消息日誌的服務所有卡死,通過排查發現竟然是 Kafka 當機致使了調用 Kafka 發送日誌服務一直處於阻塞狀態。java
最後咱們在檢查代碼的時候發現,若是沒法鏈接 Kafka 服務,則會出現一分鐘的阻塞。以上問題有兩種解決方案:算法
1、開啓異步模式 ( @EnableAsync )spring
@EnableAsync @Configuration public class KafkaProducerConfig { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class); @Value("${kafka.brokers}") private String servers; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class); return props; } @Bean public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) { return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper)); } @Bean public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) { return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper)); } @Bean public Producer producer() { return new Producer(); } }
public class Producer { public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); @Autowired private KafkaTemplate<String, GenericMessage> kafkaTemplate; @Async public void send(String topic, GenericMessage message) { ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message); future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() { @Override public void onSuccess(final SendResult<String, GenericMessage> message) { LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset()); } @Override public void onFailure(final Throwable throwable) { LOGGER.error("unable to send message= " + message, throwable); } }); } }
2、若是使用同步模式,能夠經過修改配置參數 MAX_BLOCK_MS_CONFIG ( max.block.ms / 默認 60s ) 來縮短阻塞時間apache
package com.havent.demo.logger.config; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.scheduling.annotation.EnableAsync; import java.util.HashMap; import java.util.Map; @EnableAsync @Configuration @EnableKafka public class KafkaConfiguration { @Value("${spring.kafka.producer.bootstrap-servers}") private String serverAddress; public Map<String, Object> producerConfigs() { System.out.println("HH > serverAddress: " + serverAddress); Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverAddress); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 若是請求失敗,生產者會自動重試,咱們指定是0次,若是啓用重試,則會有重複消息的可能性 props.put(ProducerConfig.RETRIES_CONFIG, 0); // Request發送請求,即Batch批處理,以減小請求次數,該值即爲每次批處理的大小 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); /** * 這將指示生產者發送請求以前等待一段時間,但願更多的消息填補到未滿的批中。這相似於TCP的算法,例如上面的代碼段, * 可能100條消息在一個請求發送,由於咱們設置了linger(逗留)時間爲1毫秒,而後,若是咱們沒有填滿緩衝區, * 這個設置將增長1毫秒的延遲請求以等待更多的消息。 須要注意的是,在高負載下,相近的時間通常也會組成批,即便是 * linger.ms=0。在不處於高負載的狀況下,若是設置比0大,以少許的延遲代價換取更少的,更有效的請求。 */ props.put(ProducerConfig.LINGER_MS_CONFIG, 2000); /** * 控制生產者可用的緩存總量,若是消息發送速度比其傳輸到服務器的快,將會耗盡這個緩存空間。 * 當緩存空間耗盡,其餘發送調用將被阻塞,阻塞時間的閾值經過max.block.ms設定, 以後它將拋出一個TimeoutException。 */ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); // 用於配置send數據或partitionFor函數獲得對應的leader時,最大的等待時間,默認值爲60秒 // HH 警告:如沒法鏈接 kafka 會致使程序卡住,儘可能不要設置等待過久 props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 100); // 消息發送的最長等待時間 props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 100); // 0:不保證消息的到達確認,只管發送,低延遲可是會出現消息的丟失,在某個server失敗的狀況下,有點像TCP // 1:發送消息,並會等待leader 收到確認後,必定的可靠性 // -1:發送消息,等待leader收到確認,並進行復制操做後,才返回,最高的可靠性 props.put(ProducerConfig.ACKS_CONFIG, "0"); System.out.println(props); return props; } public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory()); return kafkaTemplate; } }
謹以此獻給那些被 Spring Kafka 同步模式坑害又苦無出路的同胞。。。bootstrap