kafka 系列 -- 3.二、生產者客戶端原理分析

生產者發送消息的總體流程

消息追加器 RecordAccumulator

前面幾個組件,在 3.1 的文章中,已經說清楚。如今來看 RecordAccumulator 組件java

RecordAccumulator 主要用於緩存消息,以便 Sender 線程可以批量發送消息。RecordAccumulator 會將消息放入緩存 BufferPool(實際上就是 ByteBuffer) 中。BufferPool 默認最大爲 33554432B,即 32MB, 可經過 buffer.memory 進行配置。
當生產者生產消息的速度大於 sender 線程的發送速度,那麼 send 方法就會阻塞。默認阻塞 60000ms,可經過 max.block.ms 配置。node

RecordAccumulator 類的幾個重要屬性api

public final class RecordAccumulator {
    private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
    // 緩存空間,默認 32MB,可經過上面說的 buffer.memory 參數進行配置
    private final BufferPool free;
}

TopicPartition 爲分區的抽象。定義以下所示數組

public final class TopicPartition implements Serializable {
    private int hash = 0;
    private final int partition;
    private final String topic;
}

主線程發送的消息,都會被放入batcher 中, batches 將發往不一樣 TopicPartition 的消息,存放到各自的 ArrayDeque<ProducerBatch> 中。
主線程 append 時,往隊尾插入,sender 線程取出時,則往隊頭取出。緩存

ProducerBatch 批量消息

ProducerBatch 爲批量消息的抽象。
在編寫客戶端發送消息時,客戶端面向的類則是 ProducerRecordkafka 客戶端,在發送消息時,會將 ProducerRecord 放入 ProducerBatch,使消息更加緊湊。
若是爲每一個消息都獨自建立內存空間,那麼內存空間的開闢和釋放,則將會比較耗時。所以 ProducerBatch 內部有一個 ByteBufferOutputStream bufferStream(實則爲 ByteBuffer), 使用 ByteBuffer 重複利用內存空間。網絡

bufferStream 值的大小爲數據結構

public final class RecordAccumulator {
    
    // 該值大小,可經過 buffer.memory 配置
    private final BufferPool free;
    
    public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        
        int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
       
    }
}

其中,batchSize 默認 16384B,即 16kb,可經過 batch.size 配置。第2個入參的值則爲消息的大小。app

須要注意的是,bufferStream 的內存空間是從 free 內存空間中劃出的。 異步

上面有說到,ProducerBatch 會使用 ByteBuffer 追加消息。可是,若是你看代碼,你會發現 ProducerBatch 在作消息的追加時,會將消息放入 DataOutputStream appendStream。好像跟咱們說的 不同! 可是實際上,就是利用 ByteBuffer,這裏還須要看 appendStream 是如何初始化的!ui

注:MemoryRecordsBuilder 爲 ProducerBatch 中的一個屬性

public class MemoryRecordsBuilder {
    private final ByteBufferOutputStream bufferStream;
    private DataOutputStream appendStream;
    
    private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
                                     Header[] headers) throws IOException {
        ensureOpenForRecordAppend();
        int offsetDelta = (int) (offset - baseOffset);
        long timestampDelta = timestamp - firstTimestamp;
        // 往 appendStream 中追加消息
        int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
        recordWritten(offset, timestamp, sizeInBytes);
    }
}

MemoryRecordsBuilder 初始化

public class MemoryRecordsBuilder {
    private final ByteBufferOutputStream bufferStream;
    private DataOutputStream appendStream;
    
    public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
                                byte magic,
                                CompressionType compressionType,
                                TimestampType timestampType,
                                long baseOffset,
                                long logAppendTime,
                                long producerId,
                                short producerEpoch,
                                int baseSequence,
                                boolean isTransactional,
                                boolean isControlBatch,
                                int partitionLeaderEpoch,
                                int writeLimit) {
           
        // ..省略部分代碼
        bufferStream.position(initialPosition + batchHeaderSizeInBytes);
        this.bufferStream = bufferStream;
        
        // 使用 bufferStream 包裝
        this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
    }
}

能夠看到實際上使用的仍是 ByteBufferOutputStream bufferStream

Sender 線程

Sender 線程在發送消息時,會從 RecordAccumulator 中取出消息,並將放在 RecordAccumulator 中的 Deque<ProducerBatch> 轉換成 Map<nodeId, List<ProducerBatch>>,這裏的 nodeIdkafka 節點的 id。再發送給 kafka 以前,又會將消息封裝成 Map<nodeId, ClientRequest>

請求在從 Sender 發往 kafka 時,還會被存入 InFlightRequests

public class NetworkClient implements KafkaClient {
    /* the set of requests currently being sent or awaiting a response */
    private final InFlightRequests inFlightRequests;
    
    private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
        String destination = clientRequest.destination();
        RequestHeader header = clientRequest.makeHeader(request.version());
        if (log.isDebugEnabled()) {
            int latestClientVersion = clientRequest.apiKey().latestVersion();
            if (header.apiVersion() == latestClientVersion) {
                log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request,
                        clientRequest.correlationId(), destination);
            } else {
                log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}",
                        header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), destination);
            }
        }
        Send send = request.toSend(destination, header);
        InFlightRequest inFlightRequest = new InFlightRequest(
                clientRequest,
                header,
                isInternalRequest,
                request,
                send,
                now);
        
        // 將請求放入
        this.inFlightRequests.add(inFlightRequest);
        selector.send(send);
    }
}

InFlightRequests

/**
 * The set of requests which have been sent or are being sent but haven't yet received a response
 */
final class InFlightRequests {
    private final int maxInFlightRequestsPerConnection;
    private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>();
}

InFlightRequests 的做用是存儲已經發送的,或者發送了,可是未收到響應的請求。
InFlightRequests 類中有一個屬性 maxInFlightRequestsPerConnection, 標識一個節點最多能夠緩存多少個請求。該默認值爲 5, 可經過 max.in.flight.requests.per.connection 進行配置, 須要注意的是 InFlightRequests 對象是在建立 KafkaProducer 時就會被建立。

requests 參數的 keynodeIdvalue 則爲緩存的請求。

sender 線程 在發送消息時,會先判斷 InFlightRequests 對應的請求緩存中是否超過了 maxInFlightRequestsPerConnection 的大小

代碼入口:Sender.sendProducerData

public class Sender implements Runnable {
    private long sendProducerData(long now) {
        // ... 省略部分代碼
        while (iter.hasNext()) {
            Node node = iter.next();
            
            // todo 這裏爲代碼入口
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }
        // ... 省略部分代碼
    }
}

public class NetworkClient implements KafkaClient {
    private boolean canSendRequest(String node, long now) {
        return connectionStates.isReady(node, now) && selector.isChannelReady(node) &&
            inFlightRequests.canSendMore(node);
    }
}

final class InFlightRequests {
    public boolean canSendMore(String node) {
        Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
        return queue == null || queue.isEmpty() ||
               (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
    }
}

InFlightRequests 的設計中,能夠看到,咱們能夠很輕鬆的就知道,哪一個 kafka 節點的負載是最低。由於只須要判斷 requests 中對應 node 集合的大小便可。

重要參數

  1. acks

用於指定分區中須要有多少個副本收到消息,生產者纔會認爲消息是被寫入的
acks = 1。默認爲1, 只要 leader 副本寫入,則被認爲已經寫入。若是消息已經被寫入 leader 副本,且已經返回給生產者 ok,可是在 follower 拉取 leader 消息以前, leader 副本忽然掛掉,那麼此時消息也會丟失
acks = 0。發送消息後,不須要等待服務端的響應,此配置,吞吐量最高。
acks = -1 或者 all。須要等待全部 ISR 中的全部副本都成功寫入消息以後,纔會收到服務端的成功響應。
須要注意的一點是 acks 入參是 String,而不是 int

  1. max.request.size

客戶端容許發送的消息最大長度,默認爲 1MB.

  1. retriesretry.backoff.ms

retries 配置生產者的重試次數,默認爲 0. retry.backoff.ms 配置兩次重試的間隔時間

  1. compression.type

指定消息的壓縮方式,默認爲 none。可選配置gzip,snappy,lz4

  1. connection.max.idle.ms

指定在多久以後關閉閒置的鏈接,默認 540000(ms),即 9分鐘

  1. linger.ms

指定發送 ProducerBatch 以前等待更多的消息(ProducerRecord) 加入 ProducerBatch 的時間,默認爲 0。生產者在 ProducerBatch 填充滿時,或者等待時間超過 linger.ms 發送消息出去。

  1. receive.buffer.bytes

設置 Socket 接收消息緩存區的大小,默認 32678B, 32KB。若是設置爲 -1, 則表示使用 操做系統的默認值。若是 Procuerkafka 處於不一樣的機房,能夠調大此參數。

  1. send.buffer.bytes

設置 Socket 發送消息緩衝區大小。默認 131072B, 即128KB。若是設置爲 -1,則使用操做系統的默認值

  1. request.timeout.ms

Producer 等待響應的最長時間,默認 30000ms。須要注意的是,該參數須要比 replica.lag.time.max.ms 值更大。能夠減小因客戶端重試,而形成的消息重複

  1. buffer.memory

配置消息追加器,內存大小。默認最大爲 33554432B,即 32MB

  1. batch.size

ProducerBatch ByteBuffer 。默認 16384B,即 16kb

  1. max.block.ms

生產者生成消息過快時,客戶端最多阻塞多少時間。

總結

  1. kafka 將生產者生產消息,消息發送給服務端,拆成了 2 個過程。生產消息交由 主線程, 消息發送給服務端的任務交由 sender 線程。
  2. 經過 RecordAccumulator 的設計,將生產消息,與發送消息解耦。
  3. RecordAccumulator 內部存儲數據的數據結構是 ArrayDeque. 隊尾追加消息,隊頭取出消息
  4. 開發人員編寫的 ProducerRecord,在消息發送以前會被轉爲 ProducetBatch。爲的是批量發送消息,提升網絡 IO 效率
  5. 爲了不,每一個節點負載太高,kafka 設計了 InFlightRequests, 將爲響應的消息放入其中
  6. 從源碼角度,buffer.memory 最好是 buffer.memory 整數倍大小。由於 ProducerBatchByteBuffer 是從 RecordAccumulatorByteBuffer 中劃出的

RocketMQ 區別

  1. RocketMQ 沒有將生產消息與發送消息解耦。
  2. RocketMQ 的消息發送,分爲 同步,異步、單向。其中單向發送與 kafkaacks = 0 的配置效果同樣。可是實際上,還得看 RocketMQ broker刷盤配置
  3. kafka 發送失敗,默認不重試,RocketMQ 默認重試 2 次。不過 RocketMQ 沒法配置 2 次重試的間隔時間. kafka 能夠配置重試的間隔時間。
  4. RocketMQ 默認消息最大爲 4MB, kafka 默認 1MB
  5. RocketMQ 在消息的發送上,是直接使用 Nettykafka 則是使用 NIO 本身實現通訊。(雖然說,Netty 也是基於 NIO
  6. 固然還有不少咯....., 由於設計徹底不同!,實際解決場景也不同

知識補充

ByteBuffer

ByteBuffer 通常用於網絡傳輸的緩衝區。

先來看下 ByteBuffer 的類繼承體系

ByteBuffer 主要的 2 個父類。 DirectByteBufferHeapByteBuffer。通常而言,咱們主要的是使用 HeapByteBuffer

ByteBuffer 重要屬性
  1. position

當前讀取的位置

  1. mark

爲某一讀過的位置作標記,便於某些時候回退到該位置

  1. limit

讀取的結束位置

  1. capacity

buffer 大小

ByteBuffer 基本方法
  1. put()

buffer 中寫數據,並將 position 往前移動

  1. flip()

position 設置爲0,limit 設置爲當前位置

  1. rewind()

position 設置爲0, limit 不變

  1. mark()

mark 設置爲當前 position 值,調用 reset(), 會將 mark 賦值給 position

  1. clear()

position 設置爲0,limit 設置爲 capacity

ByteBuffer 食用DEMO
FileInputStream fis = new FileInputStream("/Users/chenshaoping/text.txt");
FileChannel channel = fis.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);

int read = channel.read(buffer);
while (read != -1) {
    System.out.println(new String(buffer.array(), Charset.defaultCharset()));
    buffer.clear();
    read = channel.read(buffer);
}

ArrayDeque

ArrayDeque,是一個雙端隊列。便可以從隊頭插入元素,也能夠從隊尾插入元素
對於雙端隊列,既可使用 鏈表的方式實現,也可使用數組的方式實現。
JDKLinkedList 使用鏈表實現,ArrayDeque 則使用數組的方式實現

來看 ArrayDeque 的實現。
ArrayDeque 中,有 head, tail 分別指向 頭指針,和尾指針。能夠把 ArrayDeque 想象成循環數組

插入
  1. 當往隊尾插入元素時,tail 指針會往前走

  1. 當往隊前插入元素時, head 指針會向後走

刪除
  1. 從隊頭刪除元素 4 , head 會往前走

  1. 從隊尾刪除元素 3, tail 會日後走

能夠看到,這裏經過移動 head, tail 指針就能夠刪除元素了。

擴容

tailhead 都指向都一個位置時,則須要擴容

擴容會將數組的大小擴充爲原來的 2 倍,而後從新將 head 指向數組 0 下標, tail 指向數組的最後一個元素位置。

上面的數組,在從新擴容後,會變成下面這個樣子

public class ArrayDeque<E> extends AbstractCollection<E>
                           implements Deque<E>, Cloneable, Serializable
{
    private void doubleCapacity() {
            assert head == tail;
            int p = head;
            int n = elements.length;
            int r = n - p; // number of elements to the right of p
            int newCapacity = n << 1;
            if (newCapacity < 0)
                throw new IllegalStateException("Sorry, deque too big");
            Object[] a = new Object[newCapacity];
            System.arraycopy(elements, p, a, 0, r);
            System.arraycopy(elements, 0, a, r, p);
            elements = a;
            head = 0;
            tail = n;
    }
}
相關文章
相關標籤/搜索