RecordAccumulator
前面幾個組件,在 3.1 的文章中,已經說清楚。如今來看 RecordAccumulator
組件java
RecordAccumulator
主要用於緩存消息,以便 Sender
線程可以批量發送消息。RecordAccumulator
會將消息放入緩存 BufferPool
(實際上就是 ByteBuffer
) 中。BufferPool
默認最大爲 33554432B
,即 32MB
, 可經過 buffer.memory
進行配置。
當生產者生產消息的速度大於 sender
線程的發送速度,那麼 send
方法就會阻塞。默認阻塞 60000ms
,可經過 max.block.ms
配置。node
RecordAccumulator
類的幾個重要屬性api
public final class RecordAccumulator { private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches; // 緩存空間,默認 32MB,可經過上面說的 buffer.memory 參數進行配置 private final BufferPool free; }
TopicPartition
爲分區的抽象。定義以下所示數組
public final class TopicPartition implements Serializable { private int hash = 0; private final int partition; private final String topic; }
主線程發送的消息,都會被放入batcher
中, batches
將發往不一樣 TopicPartition
的消息,存放到各自的 ArrayDeque<ProducerBatch>
中。
主線程 append
時,往隊尾插入,sender
線程取出時,則往隊頭取出。緩存
ProducerBatch
批量消息ProducerBatch
爲批量消息的抽象。
在編寫客戶端發送消息時,客戶端面向的類則是 ProducerRecord
,kafka
客戶端,在發送消息時,會將 ProducerRecord
放入 ProducerBatch
,使消息更加緊湊。
若是爲每一個消息都獨自建立內存空間,那麼內存空間的開闢和釋放,則將會比較耗時。所以 ProducerBatch
內部有一個 ByteBufferOutputStream bufferStream
(實則爲 ByteBuffer
), 使用 ByteBuffer
重複利用內存空間。網絡
bufferStream
值的大小爲:數據結構
public final class RecordAccumulator { // 該值大小,可經過 buffer.memory 配置 private final BufferPool free; public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException { int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); } }
其中,batchSize
默認 16384B
,即 16kb
,可經過 batch.size
配置。第2個入參的值則爲消息的大小。app
須要注意的是,bufferStream
的內存空間是從 free
內存空間中劃出的。 異步
上面有說到,ProducerBatch
會使用 ByteBuffer
追加消息。可是,若是你看代碼,你會發現 ProducerBatch
在作消息的追加時,會將消息放入 DataOutputStream appendStream
。好像跟咱們說的 不同! 可是實際上,就是利用 ByteBuffer
,這裏還須要看 appendStream
是如何初始化的!ui
注:MemoryRecordsBuilder 爲 ProducerBatch 中的一個屬性
public class MemoryRecordsBuilder { private final ByteBufferOutputStream bufferStream; private DataOutputStream appendStream; private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) throws IOException { ensureOpenForRecordAppend(); int offsetDelta = (int) (offset - baseOffset); long timestampDelta = timestamp - firstTimestamp; // 往 appendStream 中追加消息 int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers); recordWritten(offset, timestamp, sizeInBytes); } }
MemoryRecordsBuilder
初始化
public class MemoryRecordsBuilder { private final ByteBufferOutputStream bufferStream; private DataOutputStream appendStream; public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, byte magic, CompressionType compressionType, TimestampType timestampType, long baseOffset, long logAppendTime, long producerId, short producerEpoch, int baseSequence, boolean isTransactional, boolean isControlBatch, int partitionLeaderEpoch, int writeLimit) { // ..省略部分代碼 bufferStream.position(initialPosition + batchHeaderSizeInBytes); this.bufferStream = bufferStream; // 使用 bufferStream 包裝 this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic)); } }
能夠看到實際上使用的仍是 ByteBufferOutputStream bufferStream
Sender
線程Sender
線程在發送消息時,會從 RecordAccumulator
中取出消息,並將放在 RecordAccumulator
中的 Deque<ProducerBatch>
轉換成 Map<nodeId, List<ProducerBatch>>
,這裏的 nodeId
是 kafka
節點的 id
。再發送給 kafka
以前,又會將消息封裝成 Map<nodeId, ClientRequest>
。
請求在從 Sender
發往 kafka
時,還會被存入 InFlightRequests
public class NetworkClient implements KafkaClient { /* the set of requests currently being sent or awaiting a response */ private final InFlightRequests inFlightRequests; private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) { String destination = clientRequest.destination(); RequestHeader header = clientRequest.makeHeader(request.version()); if (log.isDebugEnabled()) { int latestClientVersion = clientRequest.apiKey().latestVersion(); if (header.apiVersion() == latestClientVersion) { log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request, clientRequest.correlationId(), destination); } else { log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}", header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), destination); } } Send send = request.toSend(destination, header); InFlightRequest inFlightRequest = new InFlightRequest( clientRequest, header, isInternalRequest, request, send, now); // 將請求放入 this.inFlightRequests.add(inFlightRequest); selector.send(send); } }
InFlightRequests
/** * The set of requests which have been sent or are being sent but haven't yet received a response */ final class InFlightRequests { private final int maxInFlightRequestsPerConnection; private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>(); }
InFlightRequests
的做用是存儲已經發送的,或者發送了,可是未收到響應的請求。InFlightRequests
類中有一個屬性 maxInFlightRequestsPerConnection
, 標識一個節點最多能夠緩存多少個請求。該默認值爲 5
, 可經過 max.in.flight.requests.per.connection
進行配置, 須要注意的是 InFlightRequests
對象是在建立 KafkaProducer
時就會被建立。
requests
參數的 key
爲 nodeId
,value
則爲緩存的請求。
sender
線程 在發送消息時,會先判斷 InFlightRequests
對應的請求緩存中是否超過了 maxInFlightRequestsPerConnection
的大小
代碼入口:Sender.sendProducerData
public class Sender implements Runnable { private long sendProducerData(long now) { // ... 省略部分代碼 while (iter.hasNext()) { Node node = iter.next(); // todo 這裏爲代碼入口 if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now)); } } // ... 省略部分代碼 } } public class NetworkClient implements KafkaClient { private boolean canSendRequest(String node, long now) { return connectionStates.isReady(node, now) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node); } } final class InFlightRequests { public boolean canSendMore(String node) { Deque<NetworkClient.InFlightRequest> queue = requests.get(node); return queue == null || queue.isEmpty() || (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection); } }
從 InFlightRequests
的設計中,能夠看到,咱們能夠很輕鬆的就知道,哪一個 kafka
節點的負載是最低。由於只須要判斷 requests
中對應 node
集合的大小便可。
acks
用於指定分區中須要有多少個副本收到消息,生產者纔會認爲消息是被寫入的acks
= 1。默認爲1, 只要 leader
副本寫入,則被認爲已經寫入。若是消息已經被寫入 leader
副本,且已經返回給生產者 ok
,可是在 follower
拉取 leader
消息以前, leader
副本忽然掛掉,那麼此時消息也會丟失acks
= 0。發送消息後,不須要等待服務端的響應,此配置,吞吐量最高。acks
= -1 或者 all。須要等待全部 ISR
中的全部副本都成功寫入消息以後,纔會收到服務端的成功響應。
須要注意的一點是 acks
入參是 String
,而不是 int
max.request.size
客戶端容許發送的消息最大長度,默認爲 1MB
.
retries
、retry.backoff.ms
retries
配置生產者的重試次數,默認爲 0
. retry.backoff.ms
配置兩次重試的間隔時間
compression.type
指定消息的壓縮方式,默認爲 none
。可選配置gzip
,snappy
,lz4
connection.max.idle.ms
指定在多久以後關閉閒置的鏈接,默認 540000(ms)
,即 9分鐘
linger.ms
指定發送 ProducerBatch
以前等待更多的消息(ProducerRecord
) 加入 ProducerBatch
的時間,默認爲 0
。生產者在 ProducerBatch
填充滿時,或者等待時間超過 linger.ms
發送消息出去。
receive.buffer.bytes
設置 Socket
接收消息緩存區的大小,默認 32678B
, 32KB
。若是設置爲 -1
, 則表示使用 操做系統的默認值。若是 Procuer
和 kafka
處於不一樣的機房,能夠調大此參數。
send.buffer.bytes
設置 Socket
發送消息緩衝區大小。默認 131072B
, 即128KB
。若是設置爲 -1
,則使用操做系統的默認值
request.timeout.ms
Producer
等待響應的最長時間,默認 30000ms
。須要注意的是,該參數須要比 replica.lag.time.max.ms
值更大。能夠減小因客戶端重試,而形成的消息重複
buffer.memory
配置消息追加器,內存大小。默認最大爲 33554432B
,即 32MB
batch.size
ProducerBatch
ByteBuffer
。默認 16384B
,即 16kb
max.block.ms
生產者生成消息過快時,客戶端最多阻塞多少時間。
kafka
將生產者生產消息,消息發送給服務端,拆成了 2 個過程。生產消息交由 主線程, 消息發送給服務端的任務交由 sender
線程。RecordAccumulator
的設計,將生產消息,與發送消息解耦。RecordAccumulator
內部存儲數據的數據結構是 ArrayDeque
. 隊尾追加消息,隊頭取出消息ProducerRecord
,在消息發送以前會被轉爲 ProducetBatch
。爲的是批量發送消息,提升網絡 IO 效率kafka
設計了 InFlightRequests
, 將爲響應的消息放入其中buffer.memory
最好是 buffer.memory
整數倍大小。由於 ProducerBatch
的 ByteBuffer
是從 RecordAccumulator
的 ByteBuffer
中劃出的RocketMQ
區別RocketMQ
沒有將生產消息與發送消息解耦。RocketMQ
的消息發送,分爲 同步,異步、單向。其中單向發送與 kafka
的 acks
= 0 的配置效果同樣。可是實際上,還得看 RocketMQ broker
的刷盤配置!kafka
發送失敗,默認不重試,RocketMQ
默認重試 2 次。不過 RocketMQ
沒法配置 2 次重試的間隔時間. kafka
能夠配置重試的間隔時間。RocketMQ
默認消息最大爲 4MB
, kafka
默認 1MB
RocketMQ
在消息的發送上,是直接使用 Netty
。kafka
則是使用 NIO
本身實現通訊。(雖然說,Netty
也是基於 NIO
)ByteBuffer
ByteBuffer
通常用於網絡傳輸的緩衝區。
先來看下 ByteBuffer
的類繼承體系
ByteBuffer
主要的 2 個父類。 DirectByteBuffer
、HeapByteBuffer
。通常而言,咱們主要的是使用 HeapByteBuffer
。
ByteBuffer
重要屬性position
當前讀取的位置
mark
爲某一讀過的位置作標記,便於某些時候回退到該位置
limit
讀取的結束位置
capacity
buffer
大小
ByteBuffer
基本方法put()
往 buffer
中寫數據,並將 position
往前移動
flip()
將 position
設置爲0,limit
設置爲當前位置
rewind()
將 position
設置爲0, limit
不變
mark()
將 mark
設置爲當前 position
值,調用 reset()
, 會將 mark
賦值給 position
clear()
將 position
設置爲0,limit
設置爲 capacity
ByteBuffer
食用DEMOFileInputStream fis = new FileInputStream("/Users/chenshaoping/text.txt"); FileChannel channel = fis.getChannel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int read = channel.read(buffer); while (read != -1) { System.out.println(new String(buffer.array(), Charset.defaultCharset())); buffer.clear(); read = channel.read(buffer); }
ArrayDeque
ArrayDeque
,是一個雙端隊列。便可以從隊頭插入元素,也能夠從隊尾插入元素
對於雙端隊列,既可使用 鏈表的方式實現,也可使用數組的方式實現。JDK
中 LinkedList
使用鏈表實現,ArrayDeque
則使用數組的方式實現
來看 ArrayDeque
的實現。
ArrayDeque 中,有 head
, tail
分別指向 頭指針,和尾指針。能夠把 ArrayDeque
想象成循環數組
head
會往前走
tail
會日後走
能夠看到,這裏經過移動 head
, tail
指針就能夠刪除元素了。
當 tail
、head
都指向都一個位置時,則須要擴容
擴容會將數組的大小擴充爲原來的 2 倍,而後從新將 head
指向數組 0
下標, tail
指向數組的最後一個元素位置。
上面的數組,在從新擴容後,會變成下面這個樣子
public class ArrayDeque<E> extends AbstractCollection<E> implements Deque<E>, Cloneable, Serializable { private void doubleCapacity() { assert head == tail; int p = head; int n = elements.length; int r = n - p; // number of elements to the right of p int newCapacity = n << 1; if (newCapacity < 0) throw new IllegalStateException("Sorry, deque too big"); Object[] a = new Object[newCapacity]; System.arraycopy(elements, p, a, 0, r); System.arraycopy(elements, 0, a, r, p); elements = a; head = 0; tail = n; } }