Kafka源碼剖析 —— 生產者消息發送流水線上的大致流程

1、喚醒Selector

上篇文章說到,消息是怎麼被生產出來,簡單來講,就是消息被追加到了RecordAccumulator中,以ByteBuffer的形式存了下來。java

那麼在消息添加到ByteBuffer中後,後續的步驟又是怎麼樣的呢?node

首先上篇文章提到了Kafka追加消息時加鎖的粒度,是一個Deque<RecordBatch> dq,咱們根據topic(tp)來獲取到要發往這個Topic的隊列。這個隊列中的元素 RecordBatch ,即是即將發往Broker的消息的載體,一個RecordBatch能夠包含多條消息,咱們的消息即是追加到了最底層指向的一個ByteBuffer中。json

Deque<RecordBatch> dq = getOrCreateDeque(tp);
            synchronized (dq) {
                if (closed) {
                    throw new IllegalStateException("Cannot send after the producer is closed.");
                }
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
                if (appendResult != null) {
                    return appendResult;
                }
            }

當咱們的RecordBatch可能滿了後(只要新的消息放不下,就會被認爲是滿了)數據結構

private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<RecordBatch> deque) {
        // 獲取deque中最後一個
        RecordBatch last = deque.peekLast();
        if (last != null) {
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
            if (future == null) {
                last.records.close();
            } else {
                return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);
            }
        }
        return null;
    }

滿了之後,在最底層的Buffer的引用會被賦給 RecordBatch 中 MemoryRecords 的 ByteBuffer,而後flip一下,變成讀模式。app

public void close() {
        if (writable) {
            // close the compressor to fill-in wrapper message metadata if necessary

            compressor.close();
            // flip the underlying buffer to be ready for reads
            // flip 基礎buffer來供讀
            buffer = compressor.buffer();
            buffer.flip();

            // reset the writable flag
            writable = false;
        }
    }

那麼何時會發送呢?在咱們的RecordBatch滿了,或者建立了新的RecordBatch時,會喚醒sender => NetworkClient => Selector => nioSelectoride

// batch滿了或者建立了新的batch,就去喚醒sender
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }

喚醒nioSelector有什麼用呢?若是對java的Selector有所瞭解,那麼就應該知道Selector在select()或select(long time)的時候,是會阻塞的。oop

在建立kafkaProducer時,變回初始化一個Sender線程,這個Sender線程會循環地拉取數據發往broker。其中就有一個步驟,進行了select() 的操做。fetch

當咱們裝填好了一個ByteBuffer,就會去喚醒Selector,不要阻塞了,繼續執行,進行下一個循環,咱們來看看這個循環。這個循環實際上就是咱們的消息運輸和銷燬的流水線。this

/**
     * Run a single iteration of sending
     *
     * 發送消息的核心方法
     *
     * @param now The current POSIX time in milliseconds
     */
    void run(long now) {

        /** 一、從metadata獲取元數據 */
        Cluster cluster = metadata.fetch();

        /** 二、從Accumulator選出能夠發送的node節點 */
        // get the list of partitions with data ready to send
        // 獲取待發送的帶數據的分區列表
        // 復符合發送消息條件的節點會被返回
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        /** 三、若是ReadyCheckResult 中有 unknownLeader 的node,更新一下元數據 */
        // if there are any partitions whose leaders are not known yet, force metadata update
        // 若是有任何分區還沒選舉出leader,強制metadata進行更新
        if (result.unknownLeadersExist) {
            this.metadata.requestUpdate();
        }

        /** 四、循環調用client(NetworkClient)中的ready方法,從io層面檢查消息是否符合發送條件 */
        /*
         * 移除尚未準備好要發送的節點
         */
        // remove any nodes we aren't ready to send to
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        // create produce requests
        // 建立produce請求
        Map<Integer/* nodeId */, List<RecordBatch>> batches = this.accumulator.drain(cluster,
            result.readyNodes,
            this.maxRequestSize,
            now);

        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<RecordBatch> batchList : batches.values()) {
                for (RecordBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

        /** 六、處理RecordAccumulator中超時的消息*/
        List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
        // update sensors

        for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

        sensors.updateProduceRequestMetrics(batches);

        /** 七、將待發送的消息封裝成ClientRequest */
        List<ClientRequest> requests = createProduceRequests(batches, now);
        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
        // with sendable data that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (result.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            log.trace("Created {} produce requests: {}", requests.size(), requests);
            pollTimeout = 0;
        }

        /** 八、將ClientRequest寫入KafkaChannel中的send字段 */
        for (ClientRequest request : requests)
            client.send(request, now);

        /** 九、真正的把消息發送出去,並處理客戶端的ack,處理超時請求,調用用戶自定義的Callback等。*/
        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        this.client.poll(pollTimeout, now);
    }

2、從RecordAccumulator獲取能夠發送的消息

獲取以前先進行前置判斷:.net

簡單地獲取元數據、判斷每一個tp的leader選出來了沒之類的,若是存在未選出leader的tp,告訴元數據,等下要更新一下。

檢驗經過的節點(就是leader所在的那個節點),將會準備翻牌,將RecordBatch(封裝了ByteBuffer)從RecordAccumulator中提取出來。

這裏會循環全部的node(leader節點),根據這個節點所屬的Topic分區(TopicPartition)信息,去獲取Deque,咱們知道Deque是能夠根據 Deque<RecordBatch> deque = getDeque(tp)獲取的。

講道理是要獲取全部可發送的消息的,但kafka限定了一次request的大小,若是一次request的大小過大,則一次io須要的時間就越長(固然從宏觀上來看,一次發的越多,效率可能越高),但總不可能爲了提高整體效率,致使一條消息要發幾秒吧?

the maximum request size to attempt to send to the server

因此當RecordBatch累加的大小超過必定限制後,循環會break,最終返回給sender一個 Map<Integer/* nodeId */, List<RecordBatch>> batches

for (Node node : nodes){
	//....
	do{
	    // Only drain the batch if it is not during backoff period.
	    if (!backoff) {
	        if (size + first.records.sizeInBytes() > maxSize && !ready.isEmpty()) {
	            // there is a rare case that a single batch size is larger than the request size due
	            // to compression; in this case we will still eventually send this batch in a single
	            // request
	            // 數據量已經滿了,須要結束循環
	            break;
	        } else {
	            // 數據量沒滿,那麼取出每一個deque的第一個元素,關閉memoryRecord(關閉Compressor,並將MemoryRecords設置爲只讀)
	            RecordBatch batch = deque.pollFirst();
	            batch.records.close();
	            size += batch.records.sizeInBytes();
	            ready.add(batch);
	            batch.drainedMs = now;
	        }
	    }
	}while xxxx
}

3、將RecordBatch封裝成可發送對象ClientRequest

從RecordAccumulator中獲取出來的這個Map,將會被封裝成可發送對象 List<ClientRequest>。

咱們能夠看到 List<ClientRequest> 被循環分紅了兩個Map:

Map<TopicPartition, ByteBuffer> produceRecordsByPartition Map<TopicPartition, RecordBatch> recordsByPartition

produceRecordsByPartition 的構成十分簡單,即是key爲tp,val爲ByteBuffer的Map。 這個Map會被封裝成Struct,Struct能夠理解爲相似於json的一種數據結構。在Kafka中,數據的傳輸都是用的Struct這種數據結構。

private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
        // 將batches從新整理成兩個map
        Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size());
        final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
        for (RecordBatch batch : batches) {
            TopicPartition tp = batch.topicPartition;
            produceRecordsByPartition.put(tp, batch.records.buffer());
            recordsByPartition.put(tp, batch);
        }

        // 組裝 request
        ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);

        // TODO:建立request,這個requestSend就是真正的發送對象
        RequestSend send = new RequestSend(Integer.toString(destination),
            this.client.nextRequestHeader(ApiKeys.PRODUCE),
            request.toStruct());

        // 封裝回調
        RequestCompletionHandler callback = new RequestCompletionHandler() {

            public void onComplete(ClientResponse response) {
                handleProduceResponse(response, recordsByPartition, time.milliseconds());
            }
        };

        return new ClientRequest(now, acks != 0, send, callback);
    }

recordsByPartition 則用於封裝回調,好比說失敗了後的重試、ByteBuffer的釋放等等。並調用發送消息時的那個回調。

@Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }

4、數據的發送

數據的發送前面的文章已經說過,這個封裝好的ClientRequest會被丟到KafkaSelector,最終被nioSelector發送出去,抵達Broker。淺析KafkaChannel、NetworkReceive、Send,以及底層優秀的實現:KafkaSelector的實現


參考書籍: 《Kafka技術內幕》 鄭奇煌著 《Apache Kafka源碼剖析》 徐郡明著

相關文章
相關標籤/搜索