kafka 源碼分析 3 : Producer

Producer算法

 

Producer是生產者的接口定義bootstrap

經常使用的方法有api

 

public Future<RecordMetadata> send(ProducerRecord<K, V> record);數組

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);網絡

public void flush();app

public void close();負載均衡

 

KafkaProducer是異步的,調用send方法後,kafka並無當即發送給broker,而是先放在buffer緩衝池中就當即返回,後臺的IO線程來負責把消息記錄轉換成請求發送給kafka集羣。dom

 

buffer大小經過batch.size配置置頂,producer維護每一個partition的沒有發送記錄的buffer。異步

默認狀況下不滿的buffer也是能夠發送的,能夠經過linger.ms來設置等待時間減小請求數量,跟TCP中的Nagle算法是一個道理。socket

producer的總的buffer大小能夠經過buffer.memory控制,若是生產太快來不及發送超過了這個值則會block住,block的最大時間經過max.block.ms,超時後會拋出TimeoutException

key.serialize和value.serializer控制如何把Java對象轉換成byte數組傳輸給kafka集羣。

acks控制producer何時認爲寫成功了,數量是須要leader得到的ack的數量。acks=0時producer把消息記錄放到socket buffer中就認爲成功了;acks=1時,須要leader成功寫到本地就返回,可是不須要等待follower的ack。acks=all是,須要全部的in-sync replica都返回ack才認爲是發送成功,這樣只要有一個in-sync replica存活消息就沒有丟。

 

Partitioner負責決定將哪個消息寫入到哪個partition, 有一些場景但願特定的key發送到特定的partition時能夠指定本身實現的Paritioner。

默認的Partitioner是隨機負載均衡的。

 

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

       List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

       int numPartitions = partitions.size();

       if (keyBytes == null) {

           int nextValue = nextValue(topic);

           List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);

           if (!availablePartitions.isEmpty()) {

               int part = Utils.toPositive(nextValue) % availablePartitions.size();

               return availablePartitions.get(part).partition();

           } else {

               // no partitions are available, give a non-available partition

               return Utils.toPositive(nextValue) % numPartitions;

           }

       } else {

           // hash the keyBytes to choose a partition

           return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

       }

   }

   private int nextValue(String topic) {

       AtomicInteger counter = topicCounterMap.get(topic);

       if (null == counter) {

           counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());

           AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);

           if (currentCounter != null) {

               counter = currentCounter;

           }

       }

       return counter.getAndIncrement();

   }

 

ProducerRecord

 

ProducerRecord包含了發送給Broker須要的內容

 

class ProducerRecord<K, V> {

    private final String topic;

    private final Integer partition;

    private final Headers headers;

    private final K key;

    private final V value;

    private final Long timestamp;

}

 

KafkaProducer構建過程

 

// 建立partitioner

this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);

long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);

// 配置序列化

if (keySerializer == null) {

    this.keySerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,

                                                                             Serializer.class));

    this.keySerializer.configure(config.originals(), true);

} else {

    config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);

    this.keySerializer = ensureExtended(keySerializer);

}

if (valueSerializer == null) {

    this.valueSerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,

                                                                               Serializer.class));

    this.valueSerializer.configure(config.originals(), false);

} else {

    config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);

    this.valueSerializer = ensureExtended(valueSerializer);

}

// load interceptors and make sure they get clientId

userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);

List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,

        ProducerInterceptor.class);

this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);

this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),

        true, true, clusterResourceListeners);

this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);

this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);

this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));

this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);

this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);

this.transactionManager = configureTransactionState(config);

int retries = configureRetries(config, transactionManager != null);

int maxInflightRequests = configureInflightRequests(config, transactionManager != null);

short acks = configureAcks(config, transactionManager != null);

this.apiVersions = new ApiVersions();

// RecordAccumulator中實現了累加和等待的邏輯

this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),

        this.totalMemorySize,

        this.compressionType,

        config.getLong(ProducerConfig.LINGER_MS_CONFIG),

        retryBackoffMs,

        metrics,

        time,

        apiVersions,

        transactionManager);

List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));

this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());

ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);

Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics);

// 高層的網絡處理,封裝了send、poll等接口

NetworkClient client = new NetworkClient(

        new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),

                this.metrics, time, "producer", channelBuilder),

        this.metadata,

        clientId,

        maxInflightRequests,

        config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),

        config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),

        config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),

        config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),

        this.requestTimeoutMs,

        time,

        true,

        apiVersions,

        throttleTimeSensor);

// 負責實際發送請求給kafka集羣的後臺線程

this.sender = new Sender(client,

        this.metadata,

        this.accumulator,

        maxInflightRequests == 1,

        config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),

        acks,

        retries,

        this.metrics,

        Time.SYSTEM,

        this.requestTimeoutMs,

        config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),

        this.transactionManager,

        apiVersions);

String ioThreadName = NETWORK_THREAD_PREFIX + (clientId.length() > 0 ? " | " + clientId : "");

this.ioThread = new KafkaThread(ioThreadName, this.sender, true);

this.ioThread.start();

this.errors = this.metrics.sensor("errors");

config.logUnused();

AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);

log.debug("Kafka producer started");

 

KafkaProducer#send

 

入口在doSend(interceptedRecord, callback);

 

// 獲取cluster信息, 來獲得對應topic的cluster節點信息

ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);

long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);

   Cluster cluster = clusterAndWaitTime.cluster;

   byte[] serializedKey;

   try {

       serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());

   } catch (ClassCastException cce) {

       throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +

               " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +

               " specified in key.serializer");

   }

   byte[] serializedValue;

   try {

       serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());

   } catch (ClassCastException cce) {

       throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +

               " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +

               " specified in value.serializer");

   }

   // 找到對應的partition

   int partition = partition(record, serializedKey, serializedValue, cluster);

   tp = new TopicPartition(record.topic(), partition);

   setReadOnly(record.headers());

   Header[] headers = record.headers().toArray();

   int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),

           compressionType, serializedKey, serializedValue, headers);

   ensureValidRecordSize(serializedSize);

   long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();

   log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);

   // producer callback will make sure to call both 'callback' and interceptor callback

   Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);

   if (transactionManager != null && transactionManager.isTransactional())

       transactionManager.maybeAddPartitionToTransaction(tp);

   // 追加到RecordAccumulator中

   RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,

           serializedValue, headers, interceptCallback, remainingWaitMs);

   if (result.batchIsFull || result.newBatchCreated) {

       log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);

       this.sender.wakeup();

   }

   return result.future;

 

RecordAccumulator

 

使用雙端隊列Deque保存ProducerBatch

 

// We keep track of the number of appending thread to make sure we do not miss batches in

   // abortIncompleteBatches().

   appendsInProgress.incrementAndGet();

   ByteBuffer buffer = null;

   if (headers == null) headers = Record.EMPTY_HEADERS;

   try {

       // check if we have an in-progress batch

       // 獲取或建立對應TopicPartition的隊列

       Deque<ProducerBatch> dq = getOrCreateDeque(tp);

       synchronized (dq) {

           if (closed)

               throw new IllegalStateException("Cannot send after the producer is closed.");

           // 若是最後一個節點能加入就加入返回

           RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);

           if (appendResult != null)

               return appendResult;

       }

       // 加入不了就要新申請一個

       // we don't have an in-progress record batch try to allocate a new batch

       byte maxUsableMagic = apiVersions.maxUsableProduceMagic();

       int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));

       log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());

       buffer = free.allocate(size, maxTimeToBlock);

       synchronized (dq) {

           // Need to check if producer is closed again after grabbing the dequeue lock.

           if (closed)

               throw new IllegalStateException("Cannot send after the producer is closed.");

           // 這兩個同步塊間可能有其餘線程已經建立了下一個Batch

           RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);

           if (appendResult != null) {

               // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...

               return appendResult;

           }

           MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);

           ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());

           FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

           dq.addLast(batch);

           incomplete.add(batch);

           // Don't deallocate this buffer in the finally block as it's being used in the record batch

           buffer = null;

           return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);

       }

   } finally {

       if (buffer != null)

           free.deallocate(buffer);

       appendsInProgress.decrementAndGet();

   }

 

Sender

 

Sender是一個後臺線程, 不考慮事務的話,只分爲senProducerDat和poll, poll中等待處理返回結果

 

void run(long now) {

       if (transactionManager != null) {

           if (!transactionManager.isTransactional()) {

               // this is an idempotent producer, so make sure we have a producer id

               maybeWaitForProducerId();

           } else if (transactionManager.hasInFlightRequest() || maybeSendTransactionalRequest(now)) {

               // as long as there are outstanding transactional requests, we simply wait for them to return

               client.poll(retryBackoffMs, now);

               return;

           }

           // do not continue sending if the transaction manager is in a failed state or if there

           // is no producer id (for the idempotent case).

           if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {

               RuntimeException lastError = transactionManager.lastError();

               if (lastError != null)

                   maybeAbortBatches(lastError);

               client.poll(retryBackoffMs, now);

               return;

           } else if (transactionManager.hasAbortableError()) {

               accumulator.abortUndrainedBatches(transactionManager.lastError());

           }

       }

       long pollTimeout = sendProducerData(now);

       client.poll(pollTimeout, now);

   }

相關文章
相關標籤/搜索