Kafka producer異步發送在某些狀況會阻塞主線程

最近發現一個Kafka producer異步發送在某些狀況會阻塞主線程,後來在排查解決問題過程當中發現這能夠算是Kafka的一個說明不恰當的地方。java

問題說明

在不少場景下咱們會使用異步方式來發送Kafka的消息,會使用KafkaProducer中的如下方法:git

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {}

根據文檔的說明它是一個異步的發送方法,按道理無論如何它都不該該阻塞主線程,但實際中某些狀況下會出現阻塞線程,好比broker未正確運行,topic未建立等狀況,有些時候咱們不須要對發送的結果作保證,可是若是出現阻塞的話,會影響其餘業務邏輯。github

問題出現點

從KafkaProducer send這個方法聲明上看並無什麼問題,那麼咱們來看一下她的具體實現:apache

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
}

/**
  * Implementation of asynchronously send a record to a topic.
  */
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        throwIfProducerClosed();
        // first make sure the metadata for the topic is available
        ClusterAndWaitTime clusterAndWaitTime;
        try {
            clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);  //出現問題的地方
        } catch (KafkaException e) {
            if (metadata.isClosed())
                throw new KafkaException("Producer closed while send in progress", e);
            throw e;
        }
        ...
    } catch (ApiException e) {
        ...
    }
}

private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
    // add topic to metadata topic list if it is not there already and reset expiry
    Cluster cluster = metadata.fetch();

    if (cluster.invalidTopics().contains(topic))
        throw new InvalidTopicException(topic);

    metadata.add(topic);

    Integer partitionsCount = cluster.partitionCountForTopic(topic);
    // Return cached metadata if we have it, and if the record's partition is either undefined
    // or within the known partition range
    if (partitionsCount != null && (partition == null || partition < partitionsCount))
        return new ClusterAndWaitTime(cluster, 0);

    long begin = time.milliseconds();
    long remainingWaitMs = maxWaitMs;
    long elapsed;
    
    //一直獲取topic的元數據信息,直到獲取成功,若獲取時間超過maxWaitMs,則拋出異常
    do {
        if (partition != null) {
            log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
        } else {
            log.trace("Requesting metadata update for topic {}.", topic);
        }
        metadata.add(topic);
        int version = metadata.requestUpdate();
        sender.wakeup();
        try {
            metadata.awaitUpdate(version, remainingWaitMs);
        } catch (TimeoutException ex) {
            // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
            throw new TimeoutException(
                    String.format("Topic %s not present in metadata after %d ms.",
                            topic, maxWaitMs));
        }
        cluster = metadata.fetch();
        elapsed = time.milliseconds() - begin;
        if (elapsed >= maxWaitMs) {  //判斷執行時間是否大於maxWaitMs
            throw new TimeoutException(partitionsCount == null ?
                    String.format("Topic %s not present in metadata after %d ms.",
                            topic, maxWaitMs) :
                    String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
                            partition, topic, partitionsCount, maxWaitMs));
        }
        metadata.maybeThrowException();
        remainingWaitMs = maxWaitMs - elapsed;
        partitionsCount = cluster.partitionCountForTopic(topic);
    } while (partitionsCount == null || (partition != null && partition >= partitionsCount));

    return new ClusterAndWaitTime(cluster, elapsed);
}

從它的實現咱們能夠看出,會致使線程阻塞的緣由在於如下這個邏輯:bootstrap

private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException

經過KafkaProducer 執行send的過程當中須要先獲取Metadata,而這是一個不斷循環的操做,直到獲取成功,或者拋出異常。異步

其實Kafka本意這麼實現並無問題,由於你要發送消息的前提就是能獲取到border和topic的信息,問題在於這個send對外暴露的是Future的方法,可是內部實現倒是有阻塞的,那麼在有些時候沒有考慮到這種狀況,一旦出現border或者topic異常,將會阻塞系統線程,致使系統響應變慢,直到奔潰。async

問題解決

其實解決這個問題很簡單,就是單首創建幾個線程用於消息發送,這樣即便遇到意外狀況,也只會阻塞幾個線程,不會引發系統線程大面積阻塞,不可用,具體實現:fetch

import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}

class ProducerF[K,V](kafkaProducer: KafkaProducer[K,V]) {

  val executor: ExecutorService = Executors.newScheduledThreadPool(1)

  def sendAsync(producerRecord: ProducerRecord[K,V], callback: Callback) = {
    executor.submit(new Callable[RecordMetadata]() {
      def call = kafkaProducer.send(producerRecord, callback).get()
    })
  }
}

這是一種實現方式,固然你也能夠本身維護一個Kafka版本,但這樣或許有點麻煩,具體用什麼方式根據本身場景來作選擇。this

使用例子:線程

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}

object FixExample extends App {

  val props = new Properties()
  props.put("max.block.ms", "3000")
  props.put("bootstrap.servers", "localhost:9092")
  props.put("client.id", "ProducerSendFixExample")
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

  val producer = new KafkaProducer[String, String](props)
  val topic = "topic-trace-one"
  val userId = "godpan"
  val msg = "login wechat"
  val data = new ProducerRecord[String, String](topic, userId, msg)

  val startTime = System.currentTimeMillis()
  val producerF = new ProducerF(producer)
  producerF.sendAsync(data,(metadata: RecordMetadata, exception: Exception) => {
    println(s"[producerF-sendAsync] data producerRecord: ${data}, exception: ${exception}")
  })

  // 若是想要獲得發送結果,能夠線程等待4s
  // Thread.sleep(4000)
  System.exit(0)

}

相關代碼已經傳到github上了,地址:kafka-send-async-bug-fix

相關文章
相關標籤/搜索