最近看kafka源碼,着實被它的客戶端緩衝池技術優雅到了。忍不住要寫篇文章讚美一下(哈哈)。java
注:本文用到的源碼來自kafka2.2.2版本。數據庫
當咱們應用程序調用kafka客戶端 producer發送消息的時候,在kafka客戶端內部,會把屬於同一個topic分區的消息先彙總起來,造成一個batch。真正發往kafka服務器的消息都是以batch爲單位的。以下圖所示:api
這麼作的好處顯而易見。客戶端和服務端經過網絡通訊,這樣批量發送能夠減小網絡帶來的性能開銷,提升吞吐量。緩存
這個Batch的管理就很是值得探討了。可能有人會說,這不簡單嗎?用的時候分配一個塊內存,發送完了釋放不就好了嗎。服務器
kafka是用java語言編寫的(新版本大部分都是用java實現的了),用上面的方案就是使用的時候new一個空間而後賦值給一個引用,釋放的時候把引用置爲null等JVM GC處理就能夠了。網絡
看起來彷佛也沒啥問題。可是在併發量比較高的時候就會頻繁的進行GC。咱們都知道GC的時候有個stop the world
,儘管最新的GC技術這個時間已經很是短,依然有可能成爲生產環境的性能瓶頸。併發
kafka的設計者固然能考慮到這一層。下面咱們就來學習下kafka是如何對batch進行管理的。app
kafka客戶端使用了緩衝池的概念,預先分配好真實的內存塊,放在池子裏。ide
每一個batch其實都對應了緩衝池中的一個內存空間,發送完消息以後,batch再也不使用了,就把內存塊歸還給緩衝池。oop
聽起來是否是很耳熟啊?不錯,數據庫鏈接池,線程池等池化技術其實差很少都是這樣的原理。經過池化技術下降建立和銷燬帶來的開銷,提高執行效率。
代碼是最好的文檔,,下面咱們就來擼下源碼。
咱們擼代碼的步驟採用的是從上往下的原則,先帶你看看緩衝池在哪裏使用,而後再深刻到緩存池內部深刻分析。
下面的代碼作了一些刪減,值保留了跟本文相關的部分便於分析。
public class KafkaProducer<K, V> implements Producer<K, V> { private final Logger log; private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); private static final String JMX_PREFIX = "kafka.producer"; public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread"; public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics"; @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); } private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); ... }
當咱們調用客戶端的發送消息的時候,底層會調用doSend
,而後裏面使用一個記錄累計器RecordAccumulator
把消息append
進來。咱們繼續往下看看,
public final class RecordAccumulator { private final Logger log; private volatile boolean closed; private final AtomicInteger flushesInProgress; private final AtomicInteger appendsInProgress; private final int batchSize; private final CompressionType compression; private final int lingerMs; private final long retryBackoffMs; private final int deliveryTimeoutMs; private final BufferPool free; private final Time time; private final ApiVersions apiVersions; private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches; private final IncompleteBatches incomplete; // The following variables are only accessed by the sender thread, so we don't need to protect them. private final Map<TopicPartition, Long> muted; private int drainIndex; private final TransactionManager transactionManager; private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time (absolute) a batch will expire. public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); ByteBuffer buffer = null; buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new KafkaException("Producer closed while send in progress"); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... return appendResult; } MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds())); dq.addLast(batch); ...
RecordAccumulator
其實就是管理一個batch隊列,咱們看到append方法實現實際上是調用BufferPool
的free方法申請(allocate
)了一塊內存空間(ByteBuffer
), 而後把這個內存空空間包裝成batch添加到隊列後面。
當消息發送完成不在使用batch的時候,RecordAccumulator
會調用deallocate
方法歸還內存,內部實際上是調用BufferPool
的deallocate
方法。
public void deallocate(ProducerBatch batch) { incomplete.remove(batch); // Only deallocate the batch if it is not a split batch because split batch are allocated outside the // buffer pool. if (!batch.isSplitBatch()) free.deallocate(batch.buffer(), batch.initialCapacity()); }
很明顯,BufferPool
就是緩衝池管理的類,也是咱們今天要討論的重點。咱們先來看看分配內存塊的方法。
public class BufferPool { static final String WAIT_TIME_SENSOR_NAME = "bufferpool-wait-time"; private final long totalMemory; private final int poolableSize; private final ReentrantLock lock; private final Deque<ByteBuffer> free; private final Deque<Condition> waiters; /** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize. */ private long nonPooledAvailableMemory; private final Metrics metrics; private final Time time; private final Sensor waitTime; public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); ByteBuffer buffer = null; this.lock.lock(); try { // check if we have a free buffer of the right size pooled if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // now check if the request is immediately satisfiable with the // memory on hand or if we need to block int freeListSize = freeSize() * this.poolableSize; if (this.nonPooledAvailableMemory + freeListSize >= size) { // we have enough unallocated or pooled memory to immediately // satisfy the request, but need to allocate the buffer freeUp(size); this.nonPooledAvailableMemory -= size; } else { // we are out of memory and will have to block int accumulated = 0; Condition moreMemory = this.lock.newCondition(); try { long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); // loop over and over until we have a buffer or have reserved // enough memory to allocate one while (accumulated < size) { long startWaitNs = time.nanoseconds(); long timeNs; boolean waitingTimeElapsed; try { waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } finally { long endWaitNs = time.nanoseconds(); timeNs = Math.max(0L, endWaitNs - startWaitNs); recordWaitTime(timeNs); } if (waitingTimeElapsed) { throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } remainingTimeToBlockNs -= timeNs; // check if we can satisfy this request from the free list, // otherwise allocate memory if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { // just grab a buffer from the free list buffer = this.free.pollFirst(); accumulated = size; } else { // we'll need to allocate memory, but we may only get // part of what we need on this iteration freeUp(size - accumulated); int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory); this.nonPooledAvailableMemory -= got; accumulated += got; } ...
首先整個方法是加鎖操做的,因此支持併發分配內存。
邏輯是這樣的,當申請的內存大小等於poolableSize
,則從緩存池中獲取。這個poolableSize
能夠理解成是緩衝池的頁大小,做爲緩衝池分配的基本單位。從緩存池獲取其實就是從ByteBuffer隊列取出一個元素返回。
若是申請的內存不等於特定的數值,則向非緩存池申請。同時會從緩衝池中取一些內存併入到非緩衝池中。這個nonPooledAvailableMemory
指的就是非緩衝池的可用內存大小。非緩衝池分配內存,其實就是調用ByteBuffer.allocat
分配真實的JVM內存。
緩存池的內存通常都不多回收。而非緩存池的內存是使用後丟棄,而後等待GC
回收。
繼續來看看batch釋放的代碼,
public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { if (size == this.poolableSize && size == buffer.capacity()) { buffer.clear(); this.free.add(buffer); } else { this.nonPooledAvailableMemory += size; } Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) moreMem.signal(); } finally { lock.unlock(); } }
很簡單,也是分爲兩種狀況。要麼直接歸還緩衝池,要麼就是更新非緩衝池部分的能夠內存。而後通知等待隊列裏的第一個元素。