Kafka生產者的使用和原理

本文將學習Kafka生產者的使用和原理,文中使用的kafka-clients版本號爲2.6.0。下面進入正文,先經過一個示例看下如何使用生產者API發送消息。node

public class Producer {

    public static void main(String[] args) {
        // 1. 配置參數
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        // 2. 根據參數建立KafkaProducer實例(生產者)
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        // 3. 建立ProducerRecord實例(消息)
        ProducerRecord<String, String> record = new ProducerRecord<>("topic-demo", "hello kafka");
        // 4. 發送消息
        producer.send(record);
        // 5. 關閉生產者示例
        producer.close();
    }

}

首先建立一個Properties實例,設置了三個必填參數:apache

  • bootstrap.servers:broker的地址清單;
  • key.serializer:消息的鍵的序列化器;
  • value.serializer:消息的值的序列化器。

因爲broker但願接受的是字節數組,因此須要將消息中的鍵值序列化成字節數組。在設置好參數後,根據參數建立KafkaProducer實例,也就是用於發送消息的生產者,接着再建立準備發送的消息ProducerRecord實例,而後使用KafkaProducer的send方法發送消息,最後再關閉生產者。bootstrap

關於KafkaProducer,咱們先記住兩點:api

  1. 在建立實例的時候,須要指定配置;
  2. send方法可發送消息。

關於配置咱們先只瞭解這三個必填參數,下面咱們看下send方法,關於發送消息的方式有三種:數組

  1. 發送並忘記(fire-and-forget)
    在發送消息給Kafka時,不關心消息是否正常到達,只負責成功發送,存在丟失消息的可能。上面給出的示例就是這種方式。緩存

  2. 同步發送(sync)
    send方法的返回值是一個Future對象,當調用其get方法時將阻塞等待Kafka的響應。以下:
Future<RecordMetadata> recordMetadataFuture = producer.send(record);
RecordMetadata recordMetadata = recordMetadataFuture.get();

RecordMetadata對象中包含有消息的一些元數據,如消息的主題、分區號、分區中的偏移量、時間戳等。markdown

  1. 異步發送(async)
    在調用send方法時,指定回調函數,在Kafka返回響應時,將調用該函數。以下:
producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {
            e.printStackTrace();
        } else {
            System.out.println(recordMetadata.topic() + "-"
                               + recordMetadata.partition() + ":" + recordMetadata.offset());
        }
    }
});

onCompletion有兩個參數,其類型分別是RecordMetadata和Exception。當消息發送成功時,recordMetadata爲非null,而e將爲null。當消息發送失敗時,則反之。
下面咱們認識下消息對象ProducerRecord,封裝了發送的消息,其定義以下:架構

public class ProducerRecord<K, V> {
    private final String topic;  // 主題
    private final Integer partition;  // 分區號
    private final Headers headers;  // 消息頭部
    private final K key;  // 鍵
    private final V value;  // 值
    private final Long timestamp;  // 時間戳
    // ...其餘構造方法和成員方法
}

其中主題和值爲必填,其他非必填。例如當給出了分區號,則至關於指定了分區,而當未給出分區號時,若給出了鍵,則可用於計算分區號。關於消息頭部和時間戳,暫不講述。app

在對生產者對象KafkaProducer和消息對象ProducerRecord有了認識後,下面咱們看下在使用生產者發送消息時,會使用到的組件有生產者攔截器、序列化器和分區器。其架構(部分)以下:
Kafka生產者的使用和原理異步

  1. 生產者攔截器:ProducerInterceptor接口,主要用於在消息發送前作一些準備工做,好比對消息作過濾,或者修改消息內容,也能夠用於在發送回調邏輯前作一些定製化的需求,例如統計類工做。
  2. 序列化器,Serializer接口,用於將數據轉換爲字節數組。
  3. 分區器,Partitioner接口,若未指定分區號,且提供key。

下面結合代碼來看下處理過程,加深印象。

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // 攔截器,攔截消息進行處理
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
}

上面是KafkaProducer的send方法,首先會將消息傳給攔截器的onSend方法,而後進入doSend方法。其中doSend方法較長,但內容並不複雜,下面給出了主要步驟的註釋。

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        throwIfProducerClosed();
        // 1.確認數據發送到的topic的metadata可用
        long nowMs = time.milliseconds();
        ClusterAndWaitTime clusterAndWaitTime;
        try {
            clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
        } catch (KafkaException e) {
            if (metadata.isClosed())
                throw new KafkaException("Producer closed while send in progress", e);
            throw e;
        }
        nowMs += clusterAndWaitTime.waitedOnMetadataMs;
        long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
        Cluster cluster = clusterAndWaitTime.cluster;
        // 2.序列化器,序列化消息的key和value
        byte[] serializedKey;
        try {
            serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                                             " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                                             " specified in key.serializer", cce);
        }
        byte[] serializedValue;
        try {
            serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                                             " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                                             " specified in value.serializer", cce);
        }
        // 3.分區器,獲取或計算分區號
        int partition = partition(record, serializedKey, serializedValue, cluster);
        tp = new TopicPartition(record.topic(), partition);

        setReadOnly(record.headers());
        Header[] headers = record.headers().toArray();

        int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                                                                           compressionType, serializedKey, serializedValue, headers);
        ensureValidRecordSize(serializedSize);
        long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
        if (log.isTraceEnabled()) {
            log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
        }
        Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

        if (transactionManager != null && transactionManager.isTransactional()) {
            transactionManager.failIfNotReadyForSend();
        }
        // 4.消息累加器,緩存消息
        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                                                                         serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

        if (result.abortForNewBatch) {
            int prevPartition = partition;
            partitioner.onNewBatch(record.topic(), cluster, prevPartition);
            partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);
            if (log.isTraceEnabled()) {
                log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
            }
            // producer callback will make sure to call both 'callback' and interceptor callback
            interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

            result = accumulator.append(tp, timestamp, serializedKey,
                                        serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
        }

        if (transactionManager != null && transactionManager.isTransactional())
            transactionManager.maybeAddPartitionToTransaction(tp);

        // 5.若是batch滿了或者消息大小超過了batch的剩餘空間須要建立新的batch
        // 將喚醒sender線程發送消息
        if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
            this.sender.wakeup();
        }
        return result.future;
    } catch (ApiException e) {
        log.debug("Exception occurred during message send:", e);
        if (callback != null)
            callback.onCompletion(null, e);
        this.errors.record();
        this.interceptors.onSendError(record, tp, e);
        return new FutureFailure(e);
    } catch (InterruptedException e) {
        this.errors.record();
        this.interceptors.onSendError(record, tp, e);
        throw new InterruptException(e);
    } catch (KafkaException e) {
        this.errors.record();
        this.interceptors.onSendError(record, tp, e);
        throw e;
    } catch (Exception e) {
        this.interceptors.onSendError(record, tp, e);
        throw e;
    }
}

doSend方法主要分爲5個步驟:

  1. 在發送數據前,先確認數據發送的topic的metadata是可用的(partition的leader存在即爲可用,若是開啓了權限控制,則還要求client具備相應的權限);
  2. 序列化器,序列化消息的key和value;
  3. 分區器,獲取或計算分區號;
  4. 消息累加器,緩存消息;
  5. 在消息累加器中,消息會被放在一個batch中,用於批量發送,當batch滿了或者消息大小超過了batch的剩餘空間須要建立新的batch,則將喚醒sender線程發送消息。

關於meatadata本文將不深究,序列化器、分區器前文也給出了介紹。下面咱們主要看下消息累加器。
消息累加器,其做用是用於緩存消息,以便批量發送消息。在RecordAccumulator中用一個ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches的map變量保存消息。做爲key的TopicPartition封裝了topic和分區號,而對應的value爲ProducerBatch的雙端隊列,也就是將發往同一個分區的消息緩存在ProducerBatch中。在發送消息時,Record會被追加在隊列的尾部,即加入到尾部的ProducerBatch中,若是ProducerBatch的空間不足或隊列爲空,則將建立新的ProducerBatch,而後追加。當ProducerBatch滿了或建立新的ProducerBatch時,將喚醒Sender線程從隊列的頭部獲取ProducerBatch進行發送。
Kafka生產者的使用和原理
RecordAccumulator

在Sender線程中會將待發送的ProducerBatch將轉換成<Integer, List<ProducerBatch>>的形式,按Kafka節點的ID進行分組,而後將同一個node的ProducerBatch放在一個請求中發送。

Kafak生產者的內容就先了解到這,下面經過思惟導圖對本文內容作一個簡單的回顧:
Kafka生產者的使用和原理

參考

  1. 《深刻理解Kafka核心設計與實踐原理》
  2. 《Kafka權威指南》
  3. Kafka 源碼解析之 Producer 發送模型(一): http://matt33.com/2017/06/25/kafka-producer-send-
相關文章
相關標籤/搜索