Kafka2.0生產者客戶端源碼分析

1 KafkaProducer 構造器

  1. 初始化參數配置。
  2. 初始化記錄累加器 RecordAccumulator。
  3. 初始化 Kafka 鏈接 KafkaClient,發現集羣的全部節點加入緩存。
  4. 初始化實現了 Runnable 接口的 Sender 對象,並在 ioThread 中啓動線程。

2 發送消息

  1. 執行消息攔截器
  2. 查詢 Kafka 集羣元數據
  3. 序列化 key、value
  4. 獲取分區
  5. 把消息添加到記錄累加器中
  6. 當 batch 滿了,或者建立了新的 batch 後,喚醒 Sender 線程

  核心源碼以下api

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // 執行攔截器
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    // 獲取元數據
    ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
    Cluster cluster = clusterAndWaitTime.cluster;
    // 序列化 key、value
    byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
    byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
    // 獲取分區。
    // 若是爲空,會計算 key 的 hash 值,再和該主題的分區總數取餘獲得分區號;
    // 若是 key 也爲空,客戶端會生成遞增的隨機整數,再和該主題的分區總數區域獲得分區號。
    int partition = partition(record, serializedKey, serializedValue, cluster);
    tp = new TopicPartition(record.topic(), partition);
    // 校驗序列化後的記錄是否超過限制
    int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
            compressionType, serializedKey, serializedValue, headers);
    ensureValidRecordSize(serializedSize);
    // 時間戳,默認是 KafkaProducer 初始化時間
    long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
    // 初始化回調和響應的攔截器對象
    Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
    // 把消息添加到記錄累加器中
    RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
            serializedValue, headers, interceptCallback, remainingWaitMs);
    if (result.batchIsFull || result.newBatchCreated) {
        // 當 batch 滿了,或者建立了新的 batch 後,喚醒 Sender 線程
        this.sender.wakeup();
    }
    return result.future;
}

 2.1 查詢元數據

  1. 若是根據指定的主題和分區能在緩存中查找到,則直接返回元數據,結束流程。
  2. 不然,設置須要更新元數據的標記 needUpdate=true,並獲取當前的 version。
  3. 喚醒 Sender 線程,當 Sender 線程判斷 needUpdate=true 時,發送獲取元數據的請求到 broker,獲取到後更新 needUpdate=true,version+1。
  4. 當前線程判斷,若是 version 變大,說明元數據已更新,則跳出循環,拉取新的元數據,判斷是否匹配到主題和分區,若是沒有匹配到,返回第2步。
  5. 若是 version 沒變大,說明元數據還沒更新,則調用 wait(long timeout) 方法,等待 timeout 時間後,返回第4步。
  6. 當第4步獲取到匹配的元數據後,返回給 doSend 方法。

  核心源碼以下緩存

private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) {
    // 獲取緩存的集羣信息
    Cluster cluster = metadata.fetch();
    Integer partitionsCount = cluster.partitionCountForTopic(topic);
    // Return cached metadata if we have it, and if the record's partition is either undefined
    // or within the known partition range
    // 若是緩存中的數據知足條件,直接返回緩存中的元數據。
    if (partitionsCount != null && (partition == null || partition < partitionsCount))
        return new ClusterAndWaitTime(cluster, 0);

    long begin = time.milliseconds();
    long remainingWaitMs = maxWaitMs;
    long elapsed;
    do {
        // 更新元數據的標記 needUpdate=true,並獲取當前的 version。
        int version = metadata.requestUpdate();
        sender.wakeup(); // 喚醒 Sender 線程
        try {
            metadata.awaitUpdate(version, remainingWaitMs); // 等待更新
        } catch (TimeoutException ex) {
        }
        cluster = metadata.fetch(); // 從新獲取元數據
        elapsed = time.milliseconds() - begin;
        if (elapsed >= maxWaitMs) // 超出最大等待時間,拋出異常
            throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
        remainingWaitMs = maxWaitMs - elapsed;
        partitionsCount = cluster.partitionCountForTopic(topic);
    } while (partitionsCount == null); // 分區數量是 0,繼續上述循環
    if (partition != null && partition >= partitionsCount) { // 當指定的分區號大於等於分數總數時,異常
        throw new KafkaException(String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
    }
    return new ClusterAndWaitTime(cluster, elapsed);
}
// 等待更新
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
     long begin = System.currentTimeMillis();
     long remainingWaitMs = maxWaitMs;
     // 版本號<=當前版本號,說明未更新,須要繼續循環等待更新
     while ((this.version <= lastVersion) && !isClosed()) {
         if (remainingWaitMs != 0)
             wait(remainingWaitMs); // 等待一會再判斷
         long elapsed = System.currentTimeMillis() - begin;
         if (elapsed >= maxWaitMs) // 超過了最大等待時間
             throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
         remainingWaitMs = maxWaitMs - elapsed;
     }
 }

 2.2 消息添加到累加器 RecordAccumulator

  2.2.1 緩衝池 BufferPool

  Kafka 使用緩衝池技術給消息分配堆字節緩存 HeapByteBuffer,緩衝池的空閒隊列 free 存放了空閒的緩存隊列,優先直接從中取出第一個進行分配緩存,若是緩衝池不夠了,利用 ReentrantLock + Condition 構造等待隊列,等待緩衝池足夠分配。
  Kafka 在處理消息響應時,釋放分配的內存,並把加入空閒隊列 free。安全

// 緩衝池
public class BufferPool {
    // 可用總內存 buffer.memory
    private final long totalMemory;
    // 一批消息的大小 batch.size
    private final int poolableSize;
    private final ReentrantLock lock;
    // 空閒緩存隊列
    private final Deque<ByteBuffer> free;
    // 等待隊列
    private final Deque<Condition> waiters;
    // 可用未分配的內存總量是nonPooledAvailableMemory和free * poolableSize中字節緩衝區的總和。
    private long nonPooledAvailableMemory;
}
// 字節緩衝分配
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
    if (size > this.totalMemory)
        throw new IllegalArgumentException("消息大小超過總內存");

    ByteBuffer buffer = null;
    this.lock.lock();
    try {
        // 直接在空閒隊列分配
        if (size == poolableSize && !this.free.isEmpty())
            return this.free.pollFirst();

        // 計算空閒隊列總大小
        int freeListSize = this.free.size() * this.poolableSize;
        if (this.nonPooledAvailableMemory + freeListSize >= size) { // 可用的總內存(未分配的+空閒隊列)>消息大小
            // we have enough unallocated or pooled memory to immediately
            // satisfy the request, but need to allocate the buffer
            freeUp(size);
            this.nonPooledAvailableMemory -= size; // 未分配內存總數-消息大小
        } else { // 內存不夠分配
            int accumulated = 0;
            Condition moreMemory = this.lock.newCondition();
            try {
                long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
                this.waiters.addLast(moreMemory); // 加入等待隊列
                // loop over and over until we have a buffer or have reserved
                // enough memory to allocate one
                while (accumulated < size) { //  輪詢,直到足夠分配內存
                    long startWaitNs = time.nanoseconds();
                    long timeNs;
                    boolean waitingTimeElapsed;
                    try { // 等待一段時間
                        waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
                    }

                    remainingTimeToBlockNs -= timeNs;

                    // 直接在空閒隊列分配
                    if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
                        buffer = this.free.pollFirst();
                        accumulated = size;
                    } else { // 內存不夠,accumulated累加計數
                        freeUp(size - accumulated);
                        int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
                        this.nonPooledAvailableMemory -= got;
                        accumulated += got;
                    }
                }
                accumulated = 0; // 清空
            }
        }
    }

    if (buffer == null) // 沒有在空閒隊列分配到內存,須要在堆上分配內存
        return new HeapByteBuffer(size, size);
    else
        return buffer;
}
private void freeUp(int size) {
    while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
        this.nonPooledAvailableMemory += this.free.pollLast().capacity(); // 釋放空閒隊列的內存
}
// 處理生產者響應消息時,釋放分配的內存
public void deallocate(ByteBuffer buffer, int size) {
    lock.lock();
    try {
        if (size == this.poolableSize && size == buffer.capacity()) {
            buffer.clear();
            this.free.add(buffer); // 加到空閒隊列
        } else {
            this.nonPooledAvailableMemory += size; // 增長未分配內存數量
        }
        Condition moreMem = this.waiters.peekFirst();
        if (moreMem != null)
            moreMem.signal();
    } finally {
        lock.unlock();
    }
}

  2.2.2 消息緩存 CopyOnWriteMap

  累加器使用 CopyOnWriteMap 來緩存消息,key 是主題分區信息,value 是個雙端隊列,隊列中的對象是壓縮後的批量消息。app

// 累加器緩存
ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches = new CopyOnWriteMap<>();

  CopyOnWriteMap 是線程安全的,是由 Kafka 實現的寫時複製 Map,內部定義了 volatile 的 Map,讀時不用加鎖,直接讀取,寫時須要加鎖,而後拷貝一個 Map 副本進行實際的寫入,寫入完成後再把原來的 Map 指向修改後的 Map。
  雙端隊列 Deque 實際上就是 ArrayDeque,非線程安全的,須要手動同步。使用雙端隊列能夠在消息發送失敗時,把消息直接放回隊列頭部進行重試。oop

// 累加消息到緩存
public RecordAppendResult append(TopicPartition tp,
                                 long timestamp,
                                 byte[] key,
                                 byte[] value,
                                 Header[] headers,
                                 Callback callback,
                                 long maxTimeToBlock) throws InterruptedException {
    ByteBuffer buffer = null;
    try {
        Deque<ProducerBatch> dq = getOrCreateDeque(tp); // 檢查 batches 是否有該分區的映射,若是沒有,則建立一個
        synchronized (dq) { // 加鎖後分配
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
            if (appendResult != null)
                return appendResult;
        }

        byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
        // 計算消息大小
        int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
        buffer = free.allocate(size, maxTimeToBlock); // 利用 BufferPool 分配字節緩存
        synchronized (dq) { // 加鎖後分配
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
            // 構造出壓縮後的批量消息對象 ProducerBatch
            MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
            ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

            dq.addLast(batch); // 加入雙端隊列

            return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
        }
    }
}
相關文章
相關標籤/搜索