Kafka源碼系列之0.10版本的Producer源碼解析及性能點講解

一,基礎講解java

本文是基於kafka 0.10講的,kafkaProducer模型和0.8的客戶端模型大體是同樣的,區別是0.8版本的會爲每一個Broker(有給定topic分區leaderBroker)建立一個SyncProducer,而0.10Producer是用一個NioSelector實現實現了多連接的維護的。也是一個後臺線程進行發送。基本步驟,也是按期獲取元數據,將消息按照key進行分區後歸類,每一類發送到正確的Broker上去。node

再寫kafka文章的緣由是0.10版本後跟spark結合有了大的變更,後面會講解多版本的sparkStreamingStructuredStreaming kafka的各類結合。因此在這裏會更新兩篇kafka文章:一篇關於kafka 0.10版本的Producer,另外一篇固然是kafka 0.10版本的Consumer了。爲後面的文章打下基礎。apache

 

二,重要類講解安全

Cluster微信

表明一個當前kafka集羣的nodestopicspartitions子集網絡

Selectorapp

org.apache.kafka.common.network.Selector。一個nioSelector的接口,負責非阻塞多連接網絡I/O操做。該類於NetworkSendNetworkReceive協同工做,傳輸大小限制的網絡請求和應答。一個新的連接能夠被加入到該nioSelector,固然須要配上一個id,經過調用機器學習

connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize)異步

內部維護了一個java NIOSelector函數

java.nio.channels.Selector nioSelector;

NetworkClient

一個針對異步請求/應答的網絡IO 的網絡客戶端。這是一個內部類,用來實現用戶層面的生產消費者客戶端。非線程安全的。

Sender

一個後臺線程,主要負責發送生產請求到kafka集羣。該線程會更新kafka集羣的metadata,將produce Request發送到正確的節點。

RecordAccumulator

該類扮演的是一個隊列的角色,將records追加到MemoryRecords實例中,用於發送到server端。

RecordBatch

一批准備發送的消息。該類是線程不安全的,須要加入外部同步加入須要修改的話。

MemoryRecords

用一個byteBuffer支撐的Records的實現。

RecordAccumulator維護了一個ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;

RecordBatch維護了一個MemoryRecords

三,源碼過程

1,構建必要對象的過程

用戶代碼裏會構建一個KafkaProducer對象。

producer = new KafkaProducer<>(props);

在構造函數裏活作三個重要的的事情

A),new Selector傳遞給NetworkClient

B),new NetworkClient

C),new Sender

D),new KafkaThread並將構建的send對象,當作該線程的runnable。並啓動該線程。

E),構建了分區器和一個Metatdata

F),構建了一個RecordAccumulator。此時須要關注的兩個配置是

batch.size:批量發送的大小。

linger.ms:超時發送的時間。

合理配置兩個值,有利於咱們提高kafkaProducer的性能。

2,消息加入發送隊列的過程

1),用戶程序裏調用KafkaProducer.send發送消息

producer.send(new ProducerRecord<>(topic,
    messageNo,
    messageStr), new DemoCallBack(startTime, messageNo, messageStr));

 

2),對消息按照partition策略進行分區。

//獲取分區號
int partition = partition(record, serializedKey, serializedValue, metadata.fetch());

 

3),將消息追加到RecordAccumulator

//將消息追加到RecordAccumulator
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);

具體追加的細節,先根據topicpartition信息獲取一個recordBatch,而後在獲取MemoryRecords,將消息加入其中

//根據topic和partition信息獲取該partition的隊列
Deque<RecordBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {//RecordBatch類非安全,須要加外部同步
    if (closed)
        throw new IllegalStateException("Cannot send after the producer is closed.");

    RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
    if (appendResult != null)
        return appendResult;
}

tryAppend內部

//        獲取最後一個RecordBatch
        RecordBatch last = deque.peekLast();
        if (last != null) {
//            將消息追加到該RecordBatch裏面
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());

Last.TryAppend

//        首先會判斷是否有充足的空間
        if (!this.records.hasRoomFor(key, value)) {
            return null;
        } else {
//            將消息加入memoryRecords
            long checksum = this.records.append(offsetCounter++, timestamp, key, value);
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            this.lastAppendTime = now;
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                   timestamp, checksum,
                                                                   key == null ? -1 : key.length,
                                                                   value == null ? -1 : value.length);
            if (callback != null)
                thunks.add(new Thunk(callback, future));
            this.recordCount++;
            return future;
        }

3,消息發送的過程

1),獲取Cluster

//獲取當前cluster信息,
Cluster cluster = metadata.fetch();

 

2),獲取那些有數據待發送的分區,依據是batch.sizelinger.ms

//獲取當前準備好發送的有數據的分區
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

 

3),更新那些leader未知的分區信息

if (result.unknownLeadersExist)
    this.metadata.requestUpdate();

 

4),移除不能發送Requestnode

Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
    Node node = iter.next();
    //判斷鏈接是否能發消息
    if (!this.client.ready(node, now)) {
        iter.remove();
        notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
    }
}

 

5),轉化爲list格式,以node爲基準,清空那些給定的node數據

清空全部給定node的數據,而後將它們放到給定適合大小的list,以單個node爲基準

Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                 result.readyNodes,
                                                                 this.maxRequestSize,
                                                                 now);

 

6),以node爲基準,將batches轉化爲ProducerRequests

//以單個node爲基準,將batches數據轉化爲ProducerRequests
List<ClientRequest> requests = createProduceRequests(batches, now);

 

7),發送數據

for (ClientRequest request : requests)
    client.send(request, now);

 

8),作真正的網絡讀寫的動做,以前會更新元數據
        this.client.poll(pollTimeout, now);

 

四,總結

寫本文的緣由是爲StructuredStreaming的系列文章之kafkaSink作準備

1,具體調優請參考kafka系列文章。

2,性能調優的參數重要的就兩個

batch.size:批量發送的大小。

linger.ms:超時發送的時間。

3,具體跟0.8.2.2區別,請參考:

 Kafka源碼系列之經過源碼分析Producer性能瓶頸

 

假如你對kafkasparkhbase源碼,spark機器學習等感興趣,請關注浪尖公衆號。一塊兒探討進步。

 


 

 

 

 


本文分享自微信公衆號 - 浪尖聊大數據(bigdatatip)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索