kafka生產者的蓄水池機制

總體架構圖

01

Kafka還有蓄水池?你們先別急,咱們先上一張架構圖。
kafka生產者的蓄水池機制
從上面的架構圖能夠看出,生產的流程主要就是一個producer線程和一個sender線程,它們之間經過BatchQueue來獲取數據,它們的關係是一一對應的,因此kafka的生產過程都是異步過程,它的同步和異步指的是接收響應結果的模式是同步阻塞仍是異步回調。同步和異步的生產者調用示例以下:node

異步生產模式:react

producer.send(new ProducerRecord<>(topic,
                    messageNo,
                    messageStr), new DemoCallBack(startTime, messageNo, messageStr));

同步生產模式:算法

producer.send(new ProducerRecord<>(topic,
                        messageNo,
                        messageStr)).get();

同步接收是依據send以後返回Future,再調用Future的get方法進行阻塞等待。下面咱們就從producer和sender兩個類所對應的流程來進行分析,他們分別是消息收集過程和消息發送過程。本文先介紹消息的收集過程,從上面的架構圖咱們能夠看到這個過程的數據最終是放在BatchQueue,像是將水流入了一個蓄水池的場景,這就是本文稱其爲」蓄水池」的含義了。bootstrap

消息收集過程

02

消息的收集過程主要涉及到的類以下:api

kafka生產者的蓄水池機制
咱們接下來也主要是從這幾個類的功能來闡述消息收集的過程。緩存

kafkaProducer字段含義及構造

kafkaProducer類包含的字段含義詳見以下注釋:markdown

public class KafkaProducer<K, V> implements Producer<K, V> {

    /** clientId 生成器,若是沒有明確指定客戶端 ID,則使用該字段順序生成一個 */
    private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    /** 生產者惟一標識(對應 client.id 屬性配置 ) */
    private String clientId;
    /** 分區選擇器(對應 partitioner.class 屬性配置),若是未明確指定分區,則基於默認的策略RR爲消息選擇合適的分區 */
    private final Partitioner partitioner;
    /** 消息的最大長度(對應 max.request.size 配置,包含消息頭、序列化以後的 key 和 value) */
    private final int maxRequestSize;
    /** 發送單條消息的緩衝區大小(對應 buffer.memory 配置) */
    private final long totalMemorySize;
    /** kafka 集羣元數據 */
    private final Metadata metadata;
    /** 消息收集器,用於收集並緩存消息,等待 Sender 線程的發送 */
    private final RecordAccumulator accumulator;
    /** 消息發送線程對象 */
    private final Sender sender;
    /** 消息發送線程,Sender由此線程啓動 */
    private final Thread ioThread;
    /** 壓縮算法(對應 compression.type 配置) */
    private final CompressionType compressionType;
    /** 時間戳工具 */
    private final Time time;
    /** key 序列化器(對應 key.serializer 配置) */
    private final Serializer<K> keySerializer;
    /** value 序列化器(對應 value.serializer 配置) */
    private final Serializer<V> valueSerializer;
    /** 封裝配置信息 */
    private final ProducerConfig producerConfig;
    /** 等待更新 kafka 集羣元數據的最大時長 */
    private final long maxBlockTimeMs;
    /** 消息發送的超時時間(從發送到收到 ACK 響應) */
    private final int requestTimeoutMs;
    /** 發送攔截器(對應 interceptor.classes 配置),用於待發送的消息進行攔截並修改,也能夠對 ACK 響應進行攔截處理 */
    private final ProducerInterceptors<K, V> interceptors;
  /** kafka定義的版本編號,如今爲止有3個,分別爲v0: kafka<0.10.0  v1:0.10.0<=kakfa<0.11.0 v2:kafka >=0.11.0 **/
  private final ApiVersions apiVersions;
  /** 生產者的事務管理器 **/
    private final TransactionManager transactionManager;

    // ... 省略方法定義

}

瞭解完kafkaProducer的字段含義,咱們接下來看下kafkaProducer的構造過程:網絡

KafkaProducer(ProducerConfig config,
                  Serializer<K> keySerializer,
                  Serializer<V> valueSerializer,
                  Metadata metadata,
                  KafkaClient kafkaClient) {
        try {
      //獲取用戶配置信息
            Map<String, Object> userProvidedConfigs = config.originals();
            this.producerConfig = config;
            this.time = Time.SYSTEM;
      //生產者id的生成,優先使用用戶配置的id,若是沒有則使用PRODUCER_CLIENT_ID_SEQUENCE遞增生成一個序列號
            String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
            if (clientId.length() <= 0)
                clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
            this.clientId = clientId;

            //省略度量打點及日誌相關信息

      //獲取用戶配置的分區、序列化的自定義類,並實例化
            this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
            long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
            if (keySerializer == null) {
                this.keySerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                                                                         Serializer.class));
                this.keySerializer.configure(config.originals(), true);
            } else {
                config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
                this.keySerializer = ensureExtended(keySerializer);
            }
            if (valueSerializer == null) {
                this.valueSerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                                                                           Serializer.class));
                this.valueSerializer.configure(config.originals(), false);
            } else {
                config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                this.valueSerializer = ensureExtended(valueSerializer);
            }

            // load interceptors and make sure they get clientId
            userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
      //獲取用戶自定義的攔截器列表
            List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                    ProducerInterceptor.class);
            this.interceptors = new ProducerInterceptors<>(interceptorList);
            ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
            this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
            this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
      //獲取用戶配置的消息壓縮類型,默認是不作壓縮
            this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));

      //省略用戶的一些配置信息

      //當前kafka的版本號
            this.apiVersions = new ApiVersions();
      //建立消息收集器,它會將爲消息申請內存、消息壓縮(若是須要)並壓如到待發送消息緩存隊列中
            this.accumulator = new RecordAccumulator(logContext,
                    config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                    this.totalMemorySize,
                    this.compressionType,
                    config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                    retryBackoffMs,
                    metrics,
                    time,
                    apiVersions,
                    transactionManager);
      // 獲取 kafka 集羣主機列表
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
      // 建立kafka元數據信息,並對它進行更新
            if (metadata != null) {
                this.metadata = metadata;
            } else {
                this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
                    true, true, clusterResourceListeners);
                this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
            }
            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
            Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
      // 建立 NetworkClient 對象,NetworkClient 是 後面Sender線程和服務端進行網絡I/O的核心類
            KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
                    new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                            this.metrics, time, "producer", channelBuilder, logContext),
                    this.metadata,
                    clientId,
                    maxInflightRequests,
                    config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                    config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
                    config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                    config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                    this.requestTimeoutMs,
                    time,
                    true,
                    apiVersions,
                    throttleTimeSensor,
                    logContext);
      //建立Sender發送對象
            this.sender = new Sender(logContext,
                    client,
                    this.metadata,
                    this.accumulator,
                    maxInflightRequests == 1,
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                    acks,
                    retries,
                    metricsRegistry.senderMetrics,
                    Time.SYSTEM,
                    this.requestTimeoutMs,
                    config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                    this.transactionManager,
                    apiVersions);
            String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
      //新建發送線程,並將sender類加入啓動
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();
            this.errors = this.metrics.sensor("errors");
      //打印用戶配置了但未使用的信息
            config.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
            log.debug("Kafka producer started");
        } catch (Throwable t) {
          //省略異常處理
        }
}

從它的構造過程來看,它的核心流程主要是以下幾點:架構

一、分區對象的建立及集羣元信息的獲取和更新併發

二、消息收集器RecordAccumulator的建立

三、網絡I/O核心類NetworkClient 的建立

四、Sender線程的建立及啓動

前面兩個就是對應着消息收集的最核心過程,後面兩個是消息發送的核心過程,可是咱們在介紹前面兩個步驟以前還須要回到kafkaProducer來,一個消息的發送首先是kafkaProducer的建立,另一個就是消息發送send方法了,接下來咱們先介紹kafkaProducer的消息發送過程再介紹上面的兩個核心流程。

kafkaProducer消息收集過程

kafkaProducer的send方法邏輯以下:

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
}

最終是調用了doSend方法,咱們來看下這個方法的主要邏輯實現:

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
        try {
            // 獲取當前的集羣元數據信息,若是緩存有,而且分區沒有超過指定分區範圍則緩存返回,不然觸發更新,等待新的元數據信息
            ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
      //對消息key進行序列化
            byte[] serializedKey;
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer", cce);
            }
      //對消息value進行序列化
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer", cce);
            }
      //依據分區算法進行分區,若是用戶指定了則使用指定分區
            int partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);

            setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();

            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                    compressionType, serializedKey, serializedValue, headers);
            ensureValidRecordSize(serializedSize);
      //獲取消息時間戳,若是未明確指定則使用當前時間戳
            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            // 生產者回調函數封裝,當消息從服務端有返回響應,最後會被觸發
            Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

            if (transactionManager != null && transactionManager.isTransactional())
                transactionManager.maybeAddPartitionToTransaction(tp);
      // 將消息追加到收集器中
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);
      //當隊列中的RecordBatch超過了1個,或者最後一個RecordBatch已經滿了(總體都是batchIsFull ),或者新建立了一個RecordBatch則都觸發喚醒sender線程
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
        } catch (ApiException e) {
       //省略異常處理
         }
    }

整個流程能夠概括爲以下:

一、對kafka集羣元素信息的獲取及更新

二、Key和value的序列化

三、若是有指定分區則採用指定分區,不然計算目標分區

四、緩存消息壓入到RecordAccumulator 中

五、有條件的喚醒發送線程

這些流程裏面第2步驟很簡單,不作專項講解,咱們把1和3步驟放在一塊兒做爲集羣信息獲取及分區計算來說解,4和5單獨講解。

分區計算及集羣信息獲取

分區計算

咱們再回想下kafkaProducer的doSend過程,在消息發送前是須要計算分區信息的,咱們就先介紹一下分區算法的流程。

kafkaProducer的partition方法最終會調用 partitioner.partition方法,咱們來看下這個方法的實現邏輯:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    //獲取改topic下的分區信息
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
      //依據可獲取的分區大小進行roud-robin運算
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // 沒有可用的分區信息,則返回一個無效的分區序號
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // 經過key的hash運算值再作round-robin
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

集羣信息獲取

集羣信息的更新從上面介紹咱們知道它是在消息發送的時候實施的並且是阻塞等待更新,由於信息隨時可能會發生變化,咱們得到的集羣信息必定要是最新的,因此異步更新沒有任何意義,只能採起主動等待更新。那咱們先看下消息更新的一個流程圖:

kafka生產者的蓄水池機制
消息更新是一個標準的I/O通訊過程,分爲兩個線程,metadata主線程等待信息獲取,Sender線程進行網絡I/O通訊獲取,並更新到metadata當中,下面咱們會重點介紹metada的主線程觸發更新邏輯和部分的Sender線程和metada相關的邏輯,其它Sender邏輯咱們放到消息發送過程當中講解。

在講解集羣信息獲取以前,咱們先了解下集羣對象都有些什麼信息包含在裏面:

public final class Cluster {
    /** kafka 集羣中的節點信息列表(包括 id、host、port 等信息) */
    private final List<Node> nodes;
    /** 未受權的 topic 集合 */
    private final Set<String> unauthorizedTopics;
    /** 內部 topic 集合 */
    private final Set<String> internalTopics;
    /** 記錄 topic 分區與分區詳細信息的映射關係 */
    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
    /** 記錄 topic 及其分區信息的映射關係 */
    private final Map<String, List<PartitionInfo>> partitionsByTopic;
    /** 記錄 topic 及其分區信息的映射關係(必須包含 leader 副本) */
    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
    /** 記錄節點 ID 與分區信息的映射關係 */
    private final Map<Integer, List<PartitionInfo>> partitionsByNode;
    /** key 是 brokerId,value 是 broker 節點信息,方便基於 brokerId 獲取對應的節點信息 */
    private final Map<Integer, Node> nodesById;
    // ... 省略方法定義
}

Metadata主線程這邊的入口在kafkaProducer的waitOnMetadata方法中,具體邏輯以下:

private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
        //添加topic到集合中,若是是新的則會設置更新集羣元素標記
        metadata.add(topic);
    //獲取緩存集羣信息
        Cluster cluster = metadata.fetch();
        Integer partitionsCount = cluster.partitionCountForTopic(topic);
    //若是分區在指定分區範圍內則直接返回緩存集羣信息
        if (partitionsCount != null && (partition == null || partition < partitionsCount))
            return new ClusterAndWaitTime(cluster, 0);

        long begin = time.milliseconds();
        long remainingWaitMs = maxWaitMs;
        long elapsed;

    //集羣信息緩存沒有,須要等待直到能獲取到最新集羣信息
        do {
            log.trace("Requesting metadata update for topic {}.", topic);
            metadata.add(topic);
      //觸發更新標記needUpdate,並將當前版本信息獲取,方便下面等待時候和最新的版本信息進行對比
            int version = metadata.requestUpdate();
      //喚醒Sender線程
            sender.wakeup();
            try {
      //等待更新,直到version信息大於當前版本值
                metadata.awaitUpdate(version, remainingWaitMs);
            } catch (TimeoutException ex) {
                // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
            }
      //獲取最新的集羣信息
            cluster = metadata.fetch();
            elapsed = time.milliseconds() - begin;
            if (elapsed >= maxWaitMs)
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
            if (cluster.unauthorizedTopics().contains(topic))
                throw new TopicAuthorizationException(topic);
            remainingWaitMs = maxWaitMs - elapsed;
            partitionsCount = cluster.partitionCountForTopic(topic);
        } while (partitionsCount == null);
    //在最新的分區信息裏面,若是指定分區仍然無效,那麼報異常
        if (partition != null && partition >= partitionsCount) {
            throw new KafkaException(
                    String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
        }
    //返回集羣信息和本次等待的時間
        return new ClusterAndWaitTime(cluster, elapsed);
    }

Sender線程主要看NetWorkClient的poll方法,它會調用metadataUpdater的maybeUpdate來發送metadataRequest請求,它的邏輯以下:

private long maybeUpdate(long now, Node node) {
String nodeConnectionId = node.idString();

if (canSendRequest(nodeConnectionId)) {
this.metadataFetchInProgress = true;
//構建metadataRequest,它是客戶端request的一種類型
MetadataRequest.Builder metadataRequest;
if (metadata.needMetadataForAllTopics())
metadataRequest = MetadataRequest.Builder.allTopics();
else
metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()),
metadata.allowAutoTopicCreation());

log.debug("Sending metadata request {} to node {}", metadataRequest, node);
//調用實際的MetadataRequest發送請求
sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
return requestTimeoutMs;
}
//省略一些鏈接等待及初始化的操做

    }

其中sendInternalMetadataRequest的邏輯以下:

private void sendInternalMetadataRequest(MetadataRequest.Builder builder,
                                             String nodeConnectionId, long now) {
    //將MetadataRequest包裝成clientRequest
        ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true);
    //最終調用selector的send
        doSend(clientRequest, true, now);
    }

而響應回調主要是在NetworkClient調用poll的時候最後會handleCompletedReceives來處理接收到的信息,裏面有一部分邏輯是處理MetadataResponse的,咱們只貼出和它相關的邏輯以下:

if (req.isInternalRequest && body instanceof MetadataResponse)
                metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
metadataUpdater的handleCompletedMetadataResponse方法實現邏輯以下:
   public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
            this.metadataFetchInProgress = false;
      //獲取響應中的集羣對象信息
            Cluster cluster = response.cluster();
            // 錯誤響應碼處理
            Map<String, Errors> errors = response.errors();
            if (!errors.isEmpty())
                log.warn("Error while fetching metadata with correlation id {} : {}", requestHeader.correlationId(), errors);
      //啓動metadata的更新
            if (cluster.nodes().size() > 0) {
                this.metadata.update(cluster, response.unavailableTopics(), now);
            } else {
                log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
                this.metadata.failedUpdate(now, null);
            }
        }

而最終調用的metadata更新信息以下:

public synchronized void update(Cluster newCluster, Set<String> unavailableTopics, long now) {
        Objects.requireNonNull(newCluster, "cluster should not be null");
    //設置更新後的指標參數,其中version遞增
        this.needUpdate = false;
        this.lastRefreshMs = now;
        this.lastSuccessfulRefreshMs = now;
        this.version += 1;

        if (topicExpiryEnabled) {
            // 若是須要就設置topic的失效時間,默認本地緩存topic失效時間是5分鐘
            for (Iterator<Map.Entry<String, Long>> it = topics.entrySet().iterator(); it.hasNext(); ) {
                Map.Entry<String, Long> entry = it.next();
                long expireMs = entry.getValue();
                if (expireMs == TOPIC_EXPIRY_NEEDS_UPDATE)
                    entry.setValue(now + TOPIC_EXPIRY_MS);
                else if (expireMs <= now) {
                    it.remove();
                    log.debug("Removing unused topic {} from the metadata list, expiryMs {} now {}", entry.getKey(), expireMs, now);
                }
            }
        }
    //集羣信息更新後的監聽器觸發回調
        for (Listener listener: listeners)
            listener.onMetadataUpdate(newCluster, unavailableTopics);

        String previousClusterId = cluster.clusterResource().clusterId();

        //設置新的集羣信息
        if (this.needMetadataForAllTopics) {
            this.needUpdate = false;
            this.cluster = getClusterForCurrentTopics(newCluster);
        } else {
            this.cluster = newCluster;
        }

        // 省略部分集羣資源監聽信息
}

緩存消息收集器(RecordAccumulator )

RecordAccumulator 在消息發送中的一個重要做用能夠認爲是個蓄水池,咱們先看一張消息緩存收集的架構圖:

kafka生產者的蓄水池機制
全部消息的收集過程從這個圖能夠很明顯的看出,每條消息先從MetaData裏面獲取分區信息,再申請一段buffer空間造成一個批接收空間,RecordAccumulator 會將收到的每條消息append到這個buffer中,最後將每一個批次壓入到隊列當中,等待Sender線程來獲取發送。

咱們回到源碼層面來分析,kafkaProducer在doSend的最後階段會調用以下代碼:

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);

咱們先來分析一下accumulator.append這個方法:

public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        // 記錄下全部正在向收集器添加信息的線程,以便後續處理未完成的批次信息的時候不至於會遺漏
        appendsInProgress.incrementAndGet();
        ByteBuffer buffer = null;
        if (headers == null) headers = Record.EMPTY_HEADERS;
        try {
            //獲取當前topic分區所對應的dqueue,若是不存在則建立一個
            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
            synchronized (dq) {
           // producer 已經關閉,拋出異常
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
        //首先嚐試直接向dqueue裏面的最後一個batch添加消息,並返回對應的添加結果信息
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null)
                    return appendResult;
            }

            // 沒有可以使用的batch,則新申請一塊buffer
            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
      //從bufferPool裏面申請一塊buffer
            buffer = free.allocate(size, maxTimeToBlock);
            synchronized (dq) {
                // 再次檢查producer是否關閉,關閉了拋異常
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");
        //再次嘗試向dqueue裏面追加消息
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null) {
                    return appendResult;
                }
        //追加仍然失敗,那麼就建立一個新的ProducerBatch進行追加
                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
        //對新建立的ProducerBatch進行消息追加
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
        //新建立的batch添加到dqueue
                dq.addLast(batch);
                incomplete.add(batch);

                // 這個很重要,避免釋放正在使用的內存空間,這裏只是將對象指針指爲null,實際上以前的內存空間已經被ProducerBatch接管
                buffer = null;
//返回RecordAppendResult對象
                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
            }
        } finally {
      //釋放沒必要要的內存,例如第二次向dqueue裏面追加消息成功後,正式return以前就會先執行這段程序來釋放空間
            if (buffer != null)
                free.deallocate(buffer);
            appendsInProgress.decrementAndGet();
        }
}

在這個過程當中咱們看到消息的append是這樣的,兩次向dqueue的最後一個batch來append,即tryAppend方法以及一次向新申請的batch追加消息的tryAppend方法,咱們逐個分析:

private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                         Callback callback, Deque<ProducerBatch> deque) {
    //獲取dqueue裏面的最後一個batch
        ProducerBatch last = deque.peekLast();
        if (last != null) {
        //若是batch不爲空,則向它裏面append消息,即調用batch.tryAppend
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
            if (future == null)
                last.closeForRecordAppends();
            else
          //返回消息追加結果
                return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
        }
        return null;
}

實際上上面的代碼只是從dqueue獲取最後一個ProducerBatch並調用它的tryAppend方法來追加消息,因此最終都會走到ProducerBatch的tryAppend

public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
    //判斷是否還有可用空間
        if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
            return null;
        } else {
      //調用recordsBuilder來追加消息,實際上V1版本之前的是調用了LegacyRecord來寫入,後續新版本都是採用DefaultRecord的writeTo來寫入,它們都是經過DataOutputStream寫入,寫入消息後返回其校驗碼
            Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
            this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
                    recordsBuilder.compressionType(), key, value, headers));
            this.lastAppendTime = now;
      //這個就是返回的可阻塞同步等待返回響應的對象
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length);
            // 這裏會記錄下每一個消息返回的future,以防batch會被拆分來發送
            thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
        }
}

優化思考

03

上期講解了reactor的模式架構分析,能夠點擊《kafka如何作到百萬級高併發低遲延的》瞭解詳情,並在最後提出了一個發散的問題,reactor機制裏面能夠優化的地方,如今我將本身在這一點上的思考:

requestChannel裏面所帶的隊列requestQueue是全局共享,而且是加鎖處理,是否會影響網絡IO的處理性能呢?若是換爲無鎖處理是否可行?答案是能夠的,可是最終的優化效果你們能夠下載源碼來具體修改編譯試一下看,咱們先給出一個結果,總體有必定的提高,可是不明顯,緣由是因爲消息是批量發送,不是逐個發送,大大減小了網絡請求的頻次,因此這個的瓶頸就會顯得比較弱。

Note:本公衆號全部kafka系列的架構及源碼分析文章都是基於1.1.2版本,若有特殊會進行額外聲明。

推薦閱讀

kafka系列:kafka是如何作到百萬級高併發低遲延的?
kafka生產者的蓄水池機制
kafka生產者的蓄水池機制
掃碼關注咱們
互聯網架構師之路

過濾技術雜質,只爲精品呈現

若是喜歡,請關注加星喔

相關文章
相關標籤/搜索