Kafka 源碼解析之 Producer 發送模型(一)

歡迎你們關注 github.com/hsfxuebao/j… ,但願對你們有所幫助,要是以爲能夠的話麻煩給點一下Star哈java

前言

Kafka,做爲目前在大數據領域應用最爲普遍的消息隊列,其內部實現和設計有不少值得深刻研究和分析的地方。node

再 0.10.2 的 Kafka 中,其 Client 端是由 Java 實現,Server 端是由 Scala 來實現的,在使用 Kafka 時,Client 是用戶最早接觸到部分,所以,計劃寫的源碼分析也會從 Client 端開始,會先從 Producer 端開始,今天講的是 Producer 端的發送模型的實現。git

Producer 使用

在分析 Producer 發送模型以前,先看一下用戶是如何使用 Producer 向 Kafka 寫數據的,下面是一個關於 Producer 最簡單的應用示例。github

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Producer;

import java.util.Properties;

/**
 * Created by matt on 16/7/26.
 */
public class ProducerTest {
    private static String topicName;
    private static int msgNum;
    private static int key;

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092,127.0.0.2:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        topicName = "test";
        msgNum = 10; // 發送的消息數

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < msgNum; i++) {
            String msg = i + " This is matt's blog.";
            producer.send(new ProducerRecord<String, String>(topicName, msg));
        }
        producer.close();
    }
}
複製代碼

從上面的代碼能夠看出 Kafka 爲用戶提供了很是簡單的 API,在使用時,只須要以下兩步:算法

  1. 初始化 KafkaProducer 實例;
  2. 調用 send 接口發送數據。

本文主要是圍繞着 Producer 在內部是如何實現 send 接口而展開的。apache

Producer 數據發送流程

下面經過對 send 源碼分析來一步步剖析 Producer 數據的發送流程。bootstrap

Producer 的 send 實現

用戶是直接使用 producer.send() 發送的數據,先看一下 send() 接口的實現緩存

// 異步向一個 topic 發送數據
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
    return send(record, null);
}

// 向 topic 異步地發送數據,當發送確認後喚起回調函數
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
    ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
}
複製代碼

數據發送的最終實現仍是調用了 Producer 的 doSend() 接口。安全

Producer 的 doSend 實現

下面是 doSend() 的具體實現markdown

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
       TopicPartition tp = null;
       try {
           // 1.確認數據要發送到的 topic 的 metadata 是可用的
           ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
           long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
           Cluster cluster = clusterAndWaitTime.cluster;
           // 2.序列化 record 的 key 和 value
           byte[] serializedKey;
           try {
               serializedKey = keySerializer.serialize(record.topic(), record.key());
           } catch (ClassCastException cce) {
               throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                       " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                       " specified in key.serializer");
           }
           byte[] serializedValue;
           try {
               serializedValue = valueSerializer.serialize(record.topic(), record.value());
           } catch (ClassCastException cce) {
               throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                       " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                       " specified in value.serializer");
           }

           // 3. 獲取該 record 的 partition 的值(能夠指定,也能夠根據算法計算)
           int partition = partition(record, serializedKey, serializedValue, cluster);
           int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
           ensureValidRecordSize(serializedSize); // record 的字節超出限制或大於內存限制時,就會拋出 RecordTooLargeException 異常
           tp = new TopicPartition(record.topic(), partition);
           long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); // 時間戳
           log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
           Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
           // 4. 向 accumulator 中追加數據
           RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
           // 5. 若是 batch 已經滿了,喚醒 sender 線程發送數據
           if (result.batchIsFull || result.newBatchCreated) {
               log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
               this.sender.wakeup();
           }
           return result.future;
       } catch (ApiException e) {
           log.debug("Exception occurred during message send:", e);
           if (callback != null)
               callback.onCompletion(null, e);
           this.errors.record();
           if (this.interceptors != null)
               this.interceptors.onSendError(record, tp, e);
           return new FutureFailure(e);
       } catch (InterruptedException e) {
           this.errors.record();
           if (this.interceptors != null)
               this.interceptors.onSendError(record, tp, e);
           throw new InterruptException(e);
       } catch (BufferExhaustedException e) {
           this.errors.record();
           this.metrics.sensor("buffer-exhausted-records").record();
           if (this.interceptors != null)
               this.interceptors.onSendError(record, tp, e);
           throw e;
       } catch (KafkaException e) {
           this.errors.record();
           if (this.interceptors != null)
               this.interceptors.onSendError(record, tp, e);
           throw e;
       } catch (Exception e) {
           if (this.interceptors != null)
               this.interceptors.onSendError(record, tp, e);
           throw e;
       }
   }
複製代碼

dosend() 方法的實現上,一條 Record 數據的發送,能夠分爲如下五步:

  1. 確認數據要發送到的 topic 的 metadata 是可用的(若是該 partition 的 leader 存在則是可用的,若是開啓權限時,client 有相應的權限),若是沒有 topic 的 metadata 信息,就須要獲取相應的 metadata;
  2. 序列化 record 的 key 和 value;
  3. 獲取該 record 要發送到的 partition(能夠指定,也能夠根據算法計算);
  4. 向 accumulator 中追加 record 數據,數據會先進行緩存;
  5. 若是追加完數據後,對應的 RecordBatch 已經達到了 batch.size 的大小(或者batch 的剩餘空間不足以添加下一條 Record),則喚醒 sender 線程發送數據。

數據的發送過程,能夠簡單總結爲以上五點,下面會這幾部分的具體實現進行詳細分析。

發送過程詳解

獲取 topic 的 metadata 信息

Producer 經過 waitOnMetadata() 方法來獲取對應 topic 的 metadata 信息,這部分後面會單獨抽出一篇文章來介紹,這裏就再也不詳述,總結起來就是:在數據發送前,須要先該 topic 是可用的。

key 和 value 的序列化

Producer 端對 record 的 keyvalue 值進行序列化操做,在 Consumer 端再進行相應的反序列化,Kafka 內部提供的序列化和反序列化算法以下圖所示:

Kafka serialize & deserializeKafka serialize & deserialize

固然咱們也是能夠自定義序列化的具體實現,不過通常狀況下,Kafka 內部提供的這些方法已經足夠使用。

獲取 partition 值

關於 partition 值的計算,分爲三種狀況:

  1. 指明 partition 的狀況下,直接將指明的值直接做爲 partiton 值;
  2. 沒有指明 partition 值但有 key 的狀況下,將 key 的 hash 值與 topic 的 partition 數進行取餘獲得 partition 值;
  3. 既沒有 partition 值又沒有 key 值的狀況下,第一次調用時隨機生成一個整數(後面每次調用在這個整數上自增),將這個值與 topic 可用的 partition 總數取餘獲得 partition 值,也就是常說的 round-robin 算法。

具體實現以下:

// 當 record 中有 partition 值時,直接返回,沒有的狀況下調用 partitioner 的類的 partition 方法去計算(KafkaProducer.class)
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    return partition != null ?
            partition :
            partitioner.partition(
                    record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
複製代碼

Producer 默認使用的 partitionerorg.apache.kafka.clients.producer.internals.DefaultPartitioner,用戶也能夠自定義 partition 的策略,下面是這個類兩個方法的具體實現:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {// 沒有指定 key 的狀況下
            int nextValue = nextValue(topic); // 第一次的時候產生一個隨機整數,後面每次調用在以前的基礎上自增;
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            // leader 不爲 null,即爲可用的 partition
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {// 有 key 的狀況下,使用 key 的 hash 值進行計算
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; // 選擇 key 的 hash 值
        }
    }

    // 根據 topic 獲取對應的整數變量
    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) { // 第一次調用時,隨機產生
            counter = new AtomicInteger(new Random().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement(); // 後面再調用時,根據以前的結果自增
    }
複製代碼

這就是 Producer 中默認的 partitioner 實現。

向 accumulator 寫數據

Producer 會先將 record 寫入到 buffer 中,當達到一個 batch.size 的大小時,再喚起 sender 線程去發送 RecordBatch(第五步),這裏先詳細分析一下 Producer 是如何向 buffer 中寫入數據的。

Producer 是經過 RecordAccumulator 實例追加數據,RecordAccumulator 模型以下圖所示,一個重要的變量就是 ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches,每一個 TopicPartition 都會對應一個 Deque<RecordBatch>,當添加數據時,會向其 topic-partition 對應的這個 queue 最新建立的一個 RecordBatch 中添加 record,而發送數據時,則會先從 queue 中最老的那個 RecordBatch 開始發送。

Producer RecordAccumulator 模型Producer RecordAccumulator 模型

// org.apache.kafka.clients.producer.internals.RecordAccumulator
     // 向 accumulator 添加一條 record,並返回添加後的結果(結果主要包含: future metadata、batch 是否滿的標誌以及新 batch 是否建立)其中, maxTimeToBlock 是 buffer.memory 的 block 的最大時間
    public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        appendsInProgress.incrementAndGet();
        try {
            Deque<RecordBatch> dq = getOrCreateDeque(tp);// 每一個 topicPartition 對應一個 queue
            synchronized (dq) {// 在對一個 queue 進行操做時,會保證線程安全
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); // 追加數據
                if (appendResult != null)// 這個 topic-partition 已經有記錄了
                    return appendResult;
            }

            // 爲 topic-partition 建立一個新的 RecordBatch, 須要初始化相應的 RecordBatch,要爲其分配的大小是: max(batch.size, 加上頭文件的本條消息的大小)
            int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            ByteBuffer buffer = free.allocate(size, maxTimeToBlock);// 給這個 RecordBatch 初始化一個 buffer
            synchronized (dq) {
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");

                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                if (appendResult != null) {// 若是忽然發現這個 queue 已經存在,那麼就釋放這個已經分配的空間
                    free.deallocate(buffer);
                    return appendResult;
                }
                // 給 topic-partition 建立一個 RecordBatch
                MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder(buffer, compression, TimestampType.CREATE_TIME, this.batchSize);
                RecordBatch batch = new RecordBatch(tp, recordsBuilder, time.milliseconds());
                // 向新的 RecordBatch 中追加數據
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));

                dq.addLast(batch);// 將 RecordBatch 添加到對應的 queue 中
                incomplete.add(batch);// 向未 ack 的 batch 集合添加這個 batch
                // 若是 dp.size()>1 就證實這個 queue 有一個 batch 是能夠發送了
                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
            }
        } finally {
            appendsInProgress.decrementAndGet();
        }
    }
複製代碼

總結一下其 record 寫入的具體流程以下圖所示:

Producer RecordAccumulator record 寫入流程Producer RecordAccumulator record 寫入流程

  1. 獲取該 topic-partition 對應的 queue,沒有的話會建立一個空的 queue;
  2. 向 queue 中追加數據,先獲取 queue 中最新加入的那個 RecordBatch,若是不存在或者存在但剩餘空餘不足以添加本條 record 則返回 null,成功寫入的話直接返回結果,寫入成功;
  3. 建立一個新的 RecordBatch,初始化內存大小根據 max(batch.size, Records.LOG_OVERHEAD + Record.recordSize(key, value)) 來肯定(防止單條 record 過大的狀況);
  4. 向新建的 RecordBatch 寫入 record,並將 RecordBatch 添加到 queue 中,返回結果,寫入成功。

發送 RecordBatch

當 record 寫入成功後,若是發現 RecordBatch 已知足發送的條件(一般是 queue 中有多個 batch,那麼最早添加的那些 batch 確定是能夠發送了),那麼就會喚醒 sender 線程,發送 RecordBatch

sender 線程對 RecordBatch 的處理是在 run() 方法中進行的,該方法具體實現以下:

void run(long now) {
        Cluster cluster = metadata.fetch();
        // 獲取那些已經能夠發送的 RecordBatch 對應的 nodes
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // 若是有 topic-partition 的 leader 是未知的,就強制 metadata 更新
        if (!result.unknownLeaderTopics.isEmpty()) {
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);
            this.metadata.requestUpdate();
        }

        // 若是與node 沒有鏈接(若是能夠鏈接,同時初始化該鏈接),就證實該 node 暫時不能發送數據,暫時移除該 node
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        // 返回該 node 對應的全部能夠發送的 RecordBatch 組成的 batches(key 是 node.id),並將 RecordBatch 從對應的 queue 中移除
        Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
        if (guaranteeMessageOrder) {
            //記錄將要發送的 RecordBatch
            for (List<RecordBatch> batchList : batches.values()) {
                for (RecordBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

        // 將因爲元數據不可用而致使發送超時的 RecordBatch 移除
        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
        for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

        sensors.updateProduceRequestMetrics(batches);

        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (!result.readyNodes.isEmpty()) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            pollTimeout = 0;
        }
        // 發送 RecordBatch
        sendProduceRequests(batches, now);

        this.client.poll(pollTimeout, now); // 關於 socket 的一些實際的讀寫操做(其中包括 meta 信息的更新)
    }
複製代碼

這段代碼前面有不少是其餘的邏輯處理,如:移除暫時不可用的 node、處理因爲元數據不可用致使的超時 RecordBatch,真正進行發送發送 RecordBatch 的是 sendProduceRequests(batches, now) 這個方法,具體是:

/**
 * Transfer the record batches into a list of produce requests on a per-node basis
 */
private void sendProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
    for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
        sendProduceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue());
}

/**
 * Create a produce request from the given record batches
 */
// 發送 produce 請求
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
    Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
    final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<>(batches.size());
    for (RecordBatch batch : batches) {
        TopicPartition tp = batch.topicPartition;
        produceRecordsByPartition.put(tp, batch.records());
        recordsByPartition.put(tp, batch);
    }

    ProduceRequest.Builder requestBuilder =
            new ProduceRequest.Builder(acks, timeout, produceRecordsByPartition);
    RequestCompletionHandler callback = new RequestCompletionHandler() {
        public void onComplete(ClientResponse response) {
            handleProduceResponse(response, recordsByPartition, time.milliseconds());
        }
    };

    String nodeId = Integer.toString(destination);
    ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
    client.send(clientRequest, now);
    log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}
複製代碼

這段代碼就簡單不少,總來起來就是,將 batches 中 leader 爲同一個 node 的全部 RecordBatch 放在一個請求中進行發送。

最後

本文是對 Kafka Producer 端發送模型的一個簡單分析,下一篇文章將會詳細介紹 metadata 相關的內容,包括 metadata 的內容以及在 Producer 端 metadata 的更新機制。

kafka源碼註釋分析

轉自:Kafka 源碼分析系列

相關文章
相關標籤/搜索