# Kafka發送者源碼解析

緊接着Kafka Producer發送消息這一篇文章繼續往深刻探索。java

消息體

這裏介紹一下ProducerRecord,他不是單純的消息,它包含了多個屬性。類定義以下:apache

public class ProducerRecord<K, V> {

    private final String topic; //主題
    private final Integer partition;//分區號
    private final Headers headers;//消息頭部
    private final K key;//鍵
    private final V value;//值
    private final Long timestamp;//消息時間戳
    //省略其餘
    }
複製代碼

key用來指定消息的鍵,它不只是消息的附加信息,還能夠用來計算分區號進而讓消息發往特定的分區。後面再說 對於的構造函數有不少種,咱們用的是最簡單的一種。參考以下:api

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) public ProducerRecord(String topic, V value) //示例代碼中用的這個,至關於將其餘屬性所有設置爲null 複製代碼

消息發送

再來看看send(ProducerRecord,Callback)方法緩存

public class KafkaProducer<K, V> implements Producer<K, V> {
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        1.1
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }
    ···
}
複製代碼

1.1 生產者攔截器

消息在發送前會先調用org.apache.kafka.clients.producer.ProducerInterceptor#onSend方法來對消息作定製化處理。 doSend(ProducerRecord, Callback)方法bash

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            // 確認生產者實例沒有被關閉
            throwIfProducerClosed();
            // 首先確認主題的元數據是可用的
            ClusterAndWaitTime clusterAndWaitTime;
            try {
                // 2.1
                clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            } catch (KafkaException e) {
                if (metadata.isClosed())
                    throw new KafkaException("Producer closed while send in progress", e);
                throw e;
            }
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            byte[] serializedKey;
            try {
                // 2.2
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer", cce);
            }
            byte[] serializedValue;
            try {
                // 2.3
                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer", cce);
            }
            // 2.4 
            int partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);

            setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();

            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                    compressionType, serializedKey, serializedValue, headers);
            // 2.5
            ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            // producer callback will make sure to call both 'callback' and interceptor callback
            Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

            if (transactionManager != null && transactionManager.isTransactional())
                transactionManager.maybeAddPartitionToTransaction(tp);
            // 2.6
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);
            // 2.7
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
            ···
    }
複製代碼

2.1 獲取集羣信息

KafkaProducer#waitOnMetadata,負責觸發Kafka集羣元數據的更新,並阻塞主線程等待更新完畢,底層會喚醒sender線程更新metadata保存的Kafka集羣元信。網絡

private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
        // 獲取Cluster信息
        Cluster cluster = metadata.fetch();

        // 若是cluster中的不合法主題列表包含指定主題,則拋出異常InvalidTopicException
        if (cluster.invalidTopics().contains(topic))
            throw new InvalidTopicException(topic);

        // 添加主題,下面會介紹
        metadata.add(topic);

        // 獲取Topic中分區數量
        Integer partitionsCount = cluster.partitionCountForTopic(topic);
        // 若是分區數不爲空,而且partition不爲空或者partition小於分區數,則返回一個ClusterAndWaitTime
        if (partitionsCount != null && (partition == null || partition < partitionsCount))
            return new ClusterAndWaitTime(cluster, 0);

        long begin = time.milliseconds();
        long remainingWaitMs = maxWaitMs;
        long elapsed;
        do {
            if (partition != null) {
                log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
            } else {
                log.trace("Requesting metadata update for topic {}.", topic);
            }
            metadata.add(topic);
            // 獲取當前元數據版本號
            int version = metadata.requestUpdate();
            // 喚醒sender線程
            sender.wakeup();
            try {
                // 阻塞等到元數據更新
                metadata.awaitUpdate(version, remainingWaitMs);
            } catch (TimeoutException ex) {
                // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
                throw new TimeoutException(
                        String.format("Topic %s not present in metadata after %d ms.",
                                topic, maxWaitMs));
            }
            // 再次獲取cluster
            cluster = metadata.fetch();
            elapsed = time.milliseconds() - begin;
            // 若是更新元數據的時間超過了最大等待時間,則跑出TimeoutException異常
            if (elapsed >= maxWaitMs) {
                throw new TimeoutException(partitionsCount == null ?
                        String.format("Topic %s not present in metadata after %d ms.",
                                topic, maxWaitMs) :
                        String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
                                partition, topic, partitionsCount, maxWaitMs));
            }
            // 這一步其實就是校驗元數據更新後,topic是否合法,若是不合法,則拋出異常
            metadata.maybeThrowExceptionForTopic(topic);
            remainingWaitMs = maxWaitMs - elapsed;
            // 獲取獲取Topic中分區數量
            partitionsCount = cluster.partitionCountForTopic(topic);
            // 循環條件:分區數等於空或者傳進來的partition不等於空而且partition小於分區數
        } while (partitionsCount == null || (partition != null && partition >= partitionsCount));

        return new ClusterAndWaitTime(cluster, elapsed);
    }
複製代碼
// 元數據添加主題
metadata.add(topic);

org.apache.kafka.clients.producer.internals.ProducerMetadata#add(String)
    public synchronized void add(String topic) {
        Objects.requireNonNull(topic, "topic cannot be null");
        // 若是topics裏面不包含傳入的topic,則更新topic列表
        if (topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE) == null) {
            requestUpdateForNewTopics();
        }
    }

    public synchronized void requestUpdateForNewTopics() {
        // Override the timestamp of last refresh to let immediate update.
        this.lastRefreshMs = 0; // 將最近一次刷新時間置爲0
        this.requestVersion++;
        requestUpdate();
    }

    public synchronized int requestUpdate() {
        this.needUpdate = true; // 將更新字段needUpdate設置true,表示須要強制更新
        return this.updateVersion; // 返回更新後版本值
    }
複製代碼

2.2 序列化key

序列化record.key()併發

2.3序列化value

序列化record.valueapp

2.4 分區器

消息發送的過程當中嗎,若是ProducerRecord中指定了partition字段,那麼就不須要分區器的做用,由於該字段表明的就是所要發往的分區號。 若是沒有指定partition字段,那麼就須要依賴分區器,根據key這個字段來計算partition的值。分區器的做用就是爲消息分配分區。 Kafka中默認的分區器是org.apache.kafka.clients.producer.internals.DefaultPartitioner#partitionide

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        // 若是key不爲空,則會根據key進行哈希算出分區號,具備相同key的消息會被寫入同一個分區
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}
複製代碼

2.5 校驗記錄大小

private void ensureValidRecordSize(int size) {
        if (size > this.maxRequestSize)
            throw new RecordTooLargeException("The message is " + size +
                    " bytes when serialized which is larger than the maximum request size you have configured with the " +
                    ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
                    " configuration.");
        if (size > this.totalMemorySize)
            throw new RecordTooLargeException("The message is " + size +
                    " bytes when serialized which is larger than the total memory buffer you have configured with the " +
                    ProducerConfig.BUFFER_MEMORY_CONFIG +
                    " configuration.");
    }
複製代碼

2.6 將消息添加到RecordAccumulator

RecordAccumulator主要用來緩存消息以便Sender線程能夠批量發送,進而減小網絡傳輸的資源消耗。函數

public final class RecordAccumulator {

    // 指定每一個ProducerBatch底層ByteBuffer的大小
    private final int batchSize;
    // 壓縮類型
    private final CompressionType compression;

    // BufferPool對象
    private final BufferPool free;

    // TopicPartition與ProducerBatch集合的映射關係
    private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
    // 沒有應答的ProducerBatch集合,包括髮送與沒發送的,底層是一個Set<ProducerBatch>
    private final IncompleteBatches incomplete;
    private int drainIndex;

    public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        // We keep track of the number of appending thread to make sure we do not miss batches in
        // abortIncompleteBatches().
        appendsInProgress.incrementAndGet();
        ByteBuffer buffer = null;
        if (headers == null) headers = Record.EMPTY_HEADERS;
        try {
            // 2.6.1
            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
            synchronized (dq) {
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null)
                    return appendResult;
            }

            
            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            // 2.6.2
            buffer = free.allocate(size, maxTimeToBlock);
            // 2.6.3
            synchronized (dq) {
                if (closed)
                    throw new KafkaException("Producer closed while send in progress");

                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null) {
                    return appendResult;
                }
                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

                dq.addLast(batch);
                incomplete.add(batch);
                buffer = null;
                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
            }
        } finally {
            if (buffer != null)
                free.deallocate(buffer);
            appendsInProgress.decrementAndGet();
        }
    }
複製代碼

2.6.1

查找TopicPartition對應的Deque,如查不到,則建立新Deque,並添加到batchs集合,加鎖調用tryAppend()試圖添加消息,若添加成功則返回RecordAppendResult。

2.6.2

2.6.1 追加消息失敗後,嘗試從BufferPool中申請新的ByteBuffer,可能會致使阻塞,因此有這裏沒加鎖,可能多個線程同時申請ByteBuffer。

2.6.3

從新加鎖重試,調用tryAppend方法,是爲了防止多個線程併發申請空間後,形成內部碎片。追加成功,則返回,若失敗,則使用2.6.2 申請的ByteBuffer建立ProducerBatch,而後將消息添加到新建立的ProducerBatch中,將ProducerBatch添加到Deque,添加到incomplete集合中,返回RecordAppendResult。

// 查找Deque裏面最後一個ProducerBatch對象,並將消息追加到ProducerBatch
    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                         Callback callback, Deque<ProducerBatch> deque) {
        ProducerBatch last = deque.peekLast();
        if (last != null) {
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
            if (future == null)
                last.closeForRecordAppends();
            else
                return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
        }
        return null;
    }
複製代碼

2.7 是否喚醒sender線程

喚醒線程的條件是消息所在隊列的最後一個ProducerBatch滿了或此隊列中不止一個ProducerBatch或者這是一個新建立的ProducerBatch Sender線程將在下一篇文章詳細介紹

總結

整個生產者客戶端有兩個線程協調進行,分別是主線程與sender線程(也就是實際的發送線程)。主線程首先將業務數據封裝成ProductRecord對象,,而後經過攔截器、序列化器與分區器的做用之後將消息放入RecordAccumulator(也被稱爲消息收集器)。Sender線程負責將消息信息構成請求,並最終執行網絡I/O的線程,從RecordAccumulator中獲取消息並批量發送到Kafka中。

生產者客戶端的發送消息流程

若是有地方有疑惑或者寫的有很差,能夠評論或者經過郵箱聯繫我creazycoder@sina.com

相關參考:

圖片來自《深刻理解Kafka核心設計與實踐原理》

《Apache Kafka 源碼剖析》

《深刻理解Kafka核心設計與實踐原理》

相關文章
相關標籤/搜索