Kafka:Producer

1.producer端的基本數據結構

1.ProducerRecordnode

一個ProducerRecord封裝了一條待發送的消息api

public class ProducerRecord<K, V> {

    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;

ProducerRecord容許用戶再建立消息對象的時候直接指定要發送的分區緩存

2.RecordMetadata數據結構

該數據結構表示Kafka服務端返回給客戶端的消息的元數據信息併發

public final class RecordMetadata {

    /**
     * Partition value for record without partition assigned
     */
    public static final int UNKNOWN_PARTITION = -1;

    private final long offset;    // 位移信息
    // The timestamp of the message.
    // If LogAppendTime is used for the topic, the timestamp will be the timestamp returned by the broker.
    // If CreateTime is used for the topic, the timestamp is the timestamp in the corresponding ProducerRecord if the
    // user provided one. Otherwise, it will be the producer local time when the producer record was handed to the
    // producer.
    private final long timestamp;      // 消息時間戳
    private final int serializedKeySize;   // 序列化後的消息key字節數
    private final int serializedValueSize; // 序列化後的消息value字節數
    private final TopicPartition topicPartition; // 所屬topic的分區

    private volatile Long checksum;  // 消息的CRC32碼

2.工做流程

用戶首先建立待發送的消息對象ProducerRecord,而後調用KafkaProducer#send方法進行發送,KafkaProducer接收到消息後首先對其序列化,而後結合本地緩存的元數據信息一塊兒發送給partitioner去肯定目標分區,最後追加寫入內存中的消息緩衝池(accumulator),此時send方法成功返回。app

KafkaProducer中有一個專門的Sender IO線程負責將緩衝池的消息分批發送給對應的broker,完成真正的發送邏輯ide

下面看一下KafkaProducer.send(ProducerRecord,callback)時kafka內部都發生了什麼事情。ui

第一步,序列化+計算目標分區

代碼this

byte[] serializedKey;
try {
    serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
    throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
            " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
            " specified in key.serializer", cce);
}
byte[] serializedValue;
try {
    serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
    throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
            " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
            " specified in value.serializer", cce);
}
int partition = partition(record, serializedKey, serializedValue, cluster);

第二步:追加寫入消息緩衝區(accumulator)

producer建立時會建立一個默認32MB(buffer.memory指定)的accumulator緩衝區,專門保存待發送的消息。spa

accumulator的實現類是RecordAccumulator,咱們看一下它的構造方法

public RecordAccumulator(LogContext logContext,
                             int batchSize,
                             long totalSize,
                             CompressionType compression,
                             long lingerMs,
                             long retryBackoffMs,
                             Metrics metrics,
                             Time time,
                             ApiVersions apiVersions,
                             TransactionManager transactionManager) {
        this.log = logContext.logger(RecordAccumulator.class);
        this.drainIndex = 0;
        this.closed = false;
        this.flushesInProgress = new AtomicInteger(0);
        this.appendsInProgress = new AtomicInteger(0);
        this.batchSize = batchSize;
        this.compression = compression;
        this.lingerMs = lingerMs;
        this.retryBackoffMs = retryBackoffMs;
        this.batches = new CopyOnWriteMap<>();
        String metricGrpName = "producer-metrics";
        this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName);
        this.incomplete = new IncompleteBatches();
        this.muted = new HashSet<>();
        this.time = time;
        this.apiVersions = apiVersions;
        this.transactionManager = transactionManager;
        registerMetrics(metrics, metricGrpName);
    }

除了關鍵參數linger.ms和batch.size等,還有一個重要的集合消息:消息批次信息(batches),該集合自己的是一個hashMap,以下:

private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

裏面保存了每一個topic分區下的batch隊列,意思是前面所說的batches其實就是按照Topic-Partition進行分組的,這樣發往不一樣分區的消息保存再對應分區下的batch隊列中。

好比,假設消息M一、M2被髮送到test的0分區但屬於不一樣的batch,M3被髮送到test的1分區,那麼batches中包含的消息就是:{"test-0" -> [batch1, batch2], "test-1" -> [batch3]},能夠看出,batches的key就是TopicPartition,而value就是所需發送的批次的隊列。

單個topic分區下的batch隊列中保存的是若干個消息批次,那麼,上面隊列中保存的每一個ProducerBatch,其結構以下:

public final class ProducerBatch {
    private enum FinalState { ABORTED, FAILED, SUCCEEDED }

    final long createdMs;
    final TopicPartition topicPartition;
    final ProduceRequestResult produceFuture;

    private final List<Thunk> thunks = new ArrayList<>();    // 保存消息回調邏輯的集合
    private final MemoryRecordsBuilder recordsBuilder;       // 負責執行追加寫入操做
    private final AtomicInteger attempts = new AtomicInteger(0);
    private final boolean isSplitBatch;
    private final AtomicReference<FinalState> finalState = new AtomicReference<>(null);

    int recordCount;   // 每一批次的記錄數 int maxRecordSize; // 每條記錄的最大大小 private long lastAttemptMs;
    private long lastAppendTime;
    private long drainedMs;
    private String expiryErrorMessage;
    private boolean retry;
    private boolean reopened = false;

上面紅色的爲Batch中的重要組件。

第二步的目的就是將待發的信息寫入消息緩衝池,具體流程就是:

(1)調用RecordAccumulator.append()方法

(2)調用ProducerBatch.tryAppend() 方法

(3)在ProducerBatch中調用MemoryRecordsBuilder.append方法,而且將回調結果放入thunks。

這一步執行完畢以後,send方法也執行完畢,主線程會等待回調結果。

代碼:

在KafkaProducer的doSend() 方法中,序列化並計算partition之後,進行accumulator.append()操做:

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);

Accumulator.append() 方法:

public RecordAppendResult append(TopicPartition tp,
                                 long timestamp,
                                 byte[] key,
                                 byte[] value,
                                 Header[] headers,
                                 Callback callback,
                                 long maxTimeToBlock) throws InterruptedException {
    // We keep track of the number of appending thread to make sure we do not miss batches in
    // abortIncompleteBatches().
    appendsInProgress.incrementAndGet();
// 這個buffer就用來保存一個batch的消息的 ByteBuffer buffer
= null; if (headers == null) headers = Record.EMPTY_HEADERS; try { // check if we have an in-progress batch Deque<ProducerBatch> dq = getOrCreateDeque(tp); // 若是topic-partition對應的dequeue已經存在的話,直接調用tryAppend方法進行append操做 synchronized (dq) { if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) return appendResult; } // 下面的狀況是,top-partition對應的deque是空的,須要建立一個新的batch加入進去 byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); // 獲取緩衝區大小 int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); // 爲topic-partition分配一個緩衝區,用於臨時存放要發送的消息 buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); // 若是dq是空的,仍是會繼續走下去,不會返回,這裏是併發狀態下的再次判斷 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... return appendResult; } // 建立一個MemoryRecordsBuilder,負責執行追加寫入操做 MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); // 建立一個新的Batch,存放本批次的消息 ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); // 調用batch.tryAppend()方法,把本次的消息加入 FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds())); // 把batch加入隊列 dq.addLast(batch); incomplete.add(batch); // Don't deallocate this buffer in the finally block as it's being used in the record batch buffer = null; // 返回結果 return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); } } finally { if (buffer != null) free.deallocate(buffer); appendsInProgress.decrementAndGet(); } }

ProducerBatch的tryAppend()方法:

public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
    if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
        return null;
    } else {
        // 調用了MemoryRecordsBuilder.append()方法
        Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
        this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
                recordsBuilder.compressionType(), key, value, headers));
        this.lastAppendTime = now;
        FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                               timestamp, checksum,
                                                               key == null ? -1 : key.length,
                                                               value == null ? -1 : value.length);
        // we have to keep every future returned to the users in case the batch needs to be
        // split to several new batches and resent.
        // 回調的結果存進thunks
        thunks.add(new Thunk(callback, future));
        this.recordCount++;
        return future;
    }
}

MemoryRecordsBuilder最後會調用這兩個方法之一,把消息寫入到緩衝區,其中recordWritten()方法裏面是處理位移的邏輯

private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
                                     Header[] headers) throws IOException {
    ensureOpenForRecordAppend();
    int offsetDelta = (int) (offset - baseOffset);
    long timestampDelta = timestamp - firstTimestamp;
    int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
    recordWritten(offset, timestamp, sizeInBytes);
}

private long appendLegacyRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value) throws IOException {
    ensureOpenForRecordAppend();
    if (compressionType == CompressionType.NONE && timestampType == TimestampType.LOG_APPEND_TIME)
        timestamp = logAppendTime;

    int size = LegacyRecord.recordSize(magic, key, value);
    AbstractLegacyRecordBatch.writeHeader(appendStream, toInnerOffset(offset), size);

    if (timestampType == TimestampType.LOG_APPEND_TIME)
        timestamp = logAppendTime;
    long crc = LegacyRecord.write(appendStream, magic, timestamp, key, value, CompressionType.NONE, timestampType);
    recordWritten(offset, timestamp, size + Records.LOG_OVERHEAD);
    return crc;
}

第三步:Sender線程預處理及消息發送

嚴格來講,Sender線程自KafkaProducer建立之後就一直都在運行,它的基本工做流程以下:

(1)不斷輪詢緩衝區已經作好發送準備的分區

(2)將輪詢得到的各個batch按照目標分區所在的leader broker 進行分組

(3)將分組後的batch經過底層建立Socket鏈接發送給各個broker

(4)等待服務端response回來

代碼:

在Sender.sendProducerData()方法中,有這麼一段:

// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
        this.maxRequestSize, now);
if (guaranteeMessageOrder) {
    // Mute all the partitions drained
    for (List<ProducerBatch> batchList : batches.values()) {
        for (ProducerBatch batch : batchList)
            this.accumulator.mutePartition(batch.topicPartition);
    }
}

// 最後按批次發送到各個broker節點
sendProduceRequests(batches, now);

 

經過accumulator.drain() 方法,獲取全部 broker對應的batch: Map<Integer, List<ProducerBatch>> batches,其中key就是broker的ID,value就是發送到該broker的batch列表。

而後,在Sender.sendProduceRequests() 方法,遍歷batches,按照 <broker:List<Batch>>發送:

 private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
        for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
            sendProduceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue());
}

Sender.sendProduceRequest()方法:

/**
 * Create a produce request from the given record batches
 */
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
    if (batches.isEmpty())
        return;

    ......
   
    String transactionalId = null;
    if (transactionManager != null && transactionManager.isTransactional()) {
        transactionalId = transactionManager.transactionalId();
    }
    ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
            produceRecordsByPartition, transactionalId);
    // 這是一個回調處理邏輯
    RequestCompletionHandler callback = new RequestCompletionHandler() {
        public void onComplete(ClientResponse response) { handleProduceResponse(response, recordsByPartition, time.milliseconds()); } };

    String nodeId = Integer.toString(destination);
    // 封裝一個發送請求
    ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
    // 真正發送請求,底層用到了JavaNio的一些知識
    client.send(clientRequest, now);
    log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}

第四步:Sender線程處理Response

 broker處理完相應消息以後,會發送對應的PRODUCE response,一旦Sender線程接收到response,將依次調用batch中的回調方法,作完這一步,producer發送消息的工做就算完成了。

 

第三步的sendProduceRequest()方法中構建了一個resposneHander(紅色代碼):

 RequestCompletionHandler callback = new RequestCompletionHandler() {
            public void onComplete(ClientResponse response) {
                handleProduceResponse(response, recordsByPartition, time.milliseconds());
            }
};
handleProduceResponse() 方法就封裝了response的處理邏輯
相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息