其中消息的追加包含如下幾個組件。咱們在KafkaProducer中調用send方法發送一個消息,在消息追加步驟,最終是將消息添加到了ByteBuffer中。java
##1、KafkaProducer #####1.一、攔截器的實現 咱們發如今send的時候,若是存在攔截器,則調用onSend方法。apache
@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
onSend方法的實現很是簡單,實際上這就是將註冊到該Producer的攔截器進行輪詢,並進行調用,從源碼中咱們也能夠知道,這個攔截器是有順序要求的,解析配置文件時是依半角逗號來隔開的。安全
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) { ProducerRecord<K, V> interceptRecord = record; for (ProducerInterceptor<K, V> interceptor : this.interceptors) { try { interceptRecord = interceptor.onSend(interceptRecord); } catch (Exception e) { // do not propagate interceptor exception, log and continue calling other interceptors // be careful not to throw exception from here if (record != null) log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e); else log.warn("Error executing interceptor onSend callback", e); } } return interceptRecord; } // 解析配置時: case LIST: if (value instanceof List) return (List<?>) value; else if (value instanceof String) if (trimmed.isEmpty()) return Collections.emptyList(); else return Arrays.asList(trimmed.split("\\s*,\\s*", -1)); else throw new ConfigException(name, value, "Expected a comma separated list.");
因此咱們的配置文件能夠這麼寫,來註冊攔截器,注意,攔截器必須繼承ProducerInterceptor。咱們在InterceptorPlus 中,把咱們即將發送的value強轉爲了int,而後爲其++;app
Properties props = new Properties(); String classNames = InterceptorPlus.class.getName() + "," + InterceptorMultiply.class.getName(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.CLIENT_ID_CONFIG, "ProducerTest"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, classNames); // 攔截器: /** * Created by Anur IjuoKaruKas on 2018/9/5 */ public class InterceptorPlus implements ProducerInterceptor { @Override public ProducerRecord onSend(ProducerRecord record) { Integer val = Integer.valueOf(record.value() .toString()); String result = String.valueOf(val + 1); return new ProducerRecord(record.topic(), record.key(), result); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
#####1.二、消息分區的實現 首先,先從元數據拿到集羣的信息,集羣信息中的partitions是以 Map<\String, List<\PartitionInfo>>來存儲的。這裏根據咱們指定的topic名字來獲取partitions。這裏經過兩種方式來肯定消息發往哪一個分區:ide
/** * Compute the partition for the given record. * 爲消息計算分區 * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);// 先從集羣中取出全部分區 int numPartitions = partitions.size(); if (keyBytes == null) { // 沒有key的狀況走這個分支 int nextValue = counter.getAndIncrement(); // 計數服務+1 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);// 取出可用的分區 if (availablePartitions.size() > 0) { int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part) .partition(); } else { // no partitions are available, give a non-available partition // 沒有可用的分區,只能返回一個不可用的分區 return DefaultPartitioner.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
##2、RecordAccumulator #####2.一、雙重檢查鎖與分段鎖 ######2.1.一、分段鎖 這是存在RecordAccumulator中的一段邏輯,咱們在追加消息時須要根據Topic來獲取到Deque<\RecordBatch>,這後面的一系列操做,都是非線程安全的,因此在操做dq對象時,這裏採用了分段鎖的概念。咱們每個Topic都維護了一個Deque,它們的append操做並不互相影響,因此沒必要爲整個<\String /* topic */ , Deque<\RecordBatch>>加鎖,只需對某個topic下的dq加鎖便可。oop
######2.1.二、雙重檢查鎖 雙重檢查鎖,適用於先檢查,後執行。咱們發現上下兩段臨界區的代碼有一部分很像,實際上就是使用了雙重檢查鎖,好比像下面這個簡單的單例模式的建立。this
public static Singleton getInstanceDC() { if (_instance == null) { // Single Checked synchronized (Singleton.class) { if (_instance == null) { // Double checked _instance = new Singleton(); } } } return _instance; }
// check if we have an in-progress batch Deque<RecordBatch> dq = getOrCreateDeque(tp); synchronized (dq) { if (closed) { throw new IllegalStateException("Cannot send after the producer is closed."); } RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); if (appendResult != null) { return appendResult; } } int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); ByteBuffer buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) { throw new IllegalStateException("Cannot send after the producer is closed."); } RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... free.deallocate(buffer); return appendResult; } MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); }
##3、RecordBatch RecordBatch(一個批發送對象,能夠存放不少條消息,它有必定的存儲空間上限,當新的消息放不下時,Kafka就會在RecordAccumulator中申請空間 ByteBuffer buffer = free.allocate(size, maxTimeToBlock) )spa
咱們在調用KafkaProducer的send方法時,能夠指定一個回調。這裏不過多講述RecordBatch。在消息追加時這個回調就會與RecordBatch進行綁定。.net
這個回調就是在RecordBatch層面被調用,會在正常響應、超時、或關閉生產者時調用這個。好比咱們能夠在非正常響應時將消息保存在本地或者將異常日誌打印出來,以便恢復之類的。線程
producer.send(new ProducerRecord<>(topic, messageNo, messageStr), new DemoCallBack(startTime, messageNo, messageStr));
每一條消息追加進來,都會生成一個新的Thunk對象,Kafka中應用了不少這種設計,例如 【仿照Kafka,實現一個簡單的監聽器 + 適配器吧!】。原理都是在對象中維護一個相似List<方法>之類的列表,而後在適當的時候(異常、正常、或者執行到某個步驟時)循環取出列表,並調用裏面的方法。
// FutureRecordMetadata主要包含了 RecordBatch裏的 ProduceRequestResult FutureRecordMetadata future = new FutureRecordMetadata( this.produceFuture, this.recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length); if (callback != null) { thunks.add(new Thunk(callback, future)); }
##4、MemoryRecord、Compressor 這兩個對象在消息追加中算是最底層的角色了。咱們在前面申請到的空間ByteBuffer對象,就是由這二者進行維護。Compressor負責消息的寫入,它生成了一個bufferStream用於向ByteBuffer寫入數據。(還有一個appendStream負責壓縮數據,這塊還沒看太明白)
建立 ByteBufferOutputStream bufferStream 十分簡單,實際上就是將ByteBuffer的引用交給 bufferStream。
public class ByteBufferOutputStream extends OutputStream { private ByteBuffer buffer; public ByteBufferOutputStream(ByteBuffer buffer) { this.buffer = buffer; }
ByteBufferOutputStream 中巧妙的即是它的ByteBuffer自動擴容方法。咱們來看看最基礎的一個寫入
public void write(byte[] bytes, int off, int len) { if (buffer.remaining() < len) expandBuffer(buffer.capacity() + len); buffer.put(bytes, off, len); }
若是要寫入的內容過大,會進行一次ByteBuffer的擴容,好比說個人RecordBatch默認都爲10M,如今已經9.9M了,最後一條消息進來,若是正好大了一點,那麼就依靠這個擴容方法,臨時擴充一下ByteBuffer的大小。
由於前面在判斷可否繼續追加消息的時候,只是對消息大小進行了預估,尤爲是指定了壓縮方式後,這個預估可能會沒那麼準確,這個時候,就須要ByteBuffer的擴容機制來進行兜底。
private void expandBuffer(int size) { int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size); ByteBuffer temp = ByteBuffer.allocate(expandSize); temp.put(buffer.array(), buffer.arrayOffset(), buffer.position()); buffer = temp; }
##5、Kafka的ByteBuffer內存管理 第二步中說道的RecordAccumulator中,有一個內存申請的操做,實際上就是Kafka BufferPool來完成的。
先看看BufferPool的構造方法,咱們在建立RecordAccumulator時(KafkaProducer),會傳入一個參數,叫作totalMemorySize。這個totalMemorySize,是在KafkaProducer的構造方法裏面獲取的。
咱們能夠在配置中:
指定ProducerConfig.BUFFER_MEMORY_CONFIG來配置BufferPool的大小, 指定ProducerConfig.BATCH_SIZE_CONFIG 來配置每個ByteBuffer的大小。
從構造方法和上面的分析咱們能夠知道,內存的消耗隨着Producer的增多而增多,ByteBuffer默認大小若是不合理,將可能致使RecordAccumulator常常性的阻塞。
-------------------- KafkaProducer 構造方法: this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); // accumulator相關配置的建立與更新 this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.totalMemorySize, this.compressionType, config.getLong(ProducerConfig.LINGER_MS_CONFIG), retryBackoffMs, metrics, time); -------------------- RecordAccumulator 構造方法: this.free = new BufferPool(totalSize, batchSize, metrics, time, metricGrpName); -------------------- BufferPool 構造方法: public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) { this.poolableSize = poolableSize; this.lock = new ReentrantLock(); this.free = new ArrayDeque<ByteBuffer>(); this.waiters = new ArrayDeque<Condition>(); this.totalMemory = memory; this.availableMemory = memory; this.metrics = metrics; this.time = time; this.waitTime = this.metrics.sensor("bufferpool-wait-time"); MetricName metricName = metrics.metricName("bufferpool-wait-ratio", metricGrpName, "The fraction of time an appender waits for space allocation."); this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); }
咱們來看看ByteBuffer和核心方法,allocate。進來的第一個判斷,size > this.totalMemory,若是要申請的大小直接大於ByteBuffer可持全部大小,直接拋出異常。
/** 代碼1.1 */ /** * Allocate a buffer of the given size. This method blocks if there is not enough memory and the buffer pool * is configured with blocking mode. * * @param size The buffer size to allocate in bytes * @param maxTimeToBlockMs The maximum time in milliseconds to block for buffer memory to be available * * @return The buffer * @throws InterruptedException If the thread is interrupted while blocked * @throws IllegalArgumentException if size is larger than the total memory controlled by the pool (and hence we would block * forever) */ public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { if (size > this.totalMemory) { throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); } //...... }
這上面的分支,實際上都是【有足夠內存可分配的狀況】
一、若是申請的內存爲 poolableSize(最小分配大小) 且 private final Deque<ByteBuffer> free 列表(重複利用ByteBuffer)不爲空
則直接返回重複利用的ByteBuffer
二、若是申請的大小不是poolableSize,但如今能夠當即知足,則freeUp一下。
/** 代碼1.2 緊接着1.1 */ this.lock.lock(); try { // check if we have a free buffer of the right size pooled // 校驗是否有合適的小的空閒的buffer if (size == poolableSize && !this.free.isEmpty()) { return this.free.pollFirst(); } // now check if the request is immediately satisfiable with the // memory on hand or if we need to block // 校驗如今的內存是否能夠當即知足請求,或者是否須要阻塞 int freeListSize = this.free.size() * this.poolableSize; // 若是可用內存+freeList大小大於申請大小 if (this.availableMemory + freeListSize >= size) { // we have enough unallocated or pooled memory to immediately // satisfy the request // 有足夠的被釋放或存放在池中的內存來當即知足請求 freeUp(size); this.availableMemory -= size; lock.unlock(); return ByteBuffer.allocate(size); } else {// 如今可用的內存大小沒法知足 //.....
freeUp的邏輯十分簡單,實際上就是當申請的內存大於可用內存availableMemory時,從free這個ByteBuffer列表循環 將free:List<ByteBuffer>中的元素poll出來,而且將其容量賦值給availableMemory。
/** * Attempt to ensure we have at least the requested number of bytes of memory for allocation by deallocating pooled * buffers (if needed) */ private void freeUp(int size) { // 當free列表不爲空,而且可用內存小於申請內存時 // :: 仍是從free中取,這個取法,能夠取多個 while (!this.free.isEmpty() && this.availableMemory < size) { this.availableMemory += this.free.pollLast() .capacity(); } }
除了以上兩種狀況(一、夠用,並且有現成的ByteBuffer可供調用。二、夠用,但沒有現成的ByteBuffer可供調用,須要申請),只剩下第三種狀況了,即不夠用。這種狀況可想而知,只能等待更多的,已經使用的ByteBuffer釋放內存。
它能夠複用freeUp嘛?固然能夠。但freeUp是不夠的,由於咱們知道freeUp是釋放掉free:List<ByteBuffer>中的內存。而這第三種狀況,即便把空餘的ByteBuffer都釋放掉,加上如今的可用內存availableMemory,也不夠。
怎麼解決?最簡單的固然是寫個循環,等待其餘的線程把正在使用的ByteBuffer釋放掉,內存就夠用了。Kafka就是這麼作的。
咱們來看看kafka是如何實現的:
聲明一個int accumulated,這個是如今已聲明的內存大小。new一個Condition,進行condition內的阻塞(await),並釋放鎖,且每當已聲明大小還不足以達到要求,就在condition上阻塞。
假如這是咱們waiter裏面的最後一個(也是第一個線程),它難道將永遠阻塞?
並不,實際上咱們在釋放內存時,會peek隊列中第一個等待的線程進行signal,並且會將poolable的ByteBuffer塞進List<ByteBuffer>中,來實現ByteBuffer的重複利用,要知道,這裏是沒有釋放掉ByteBuffer的,只是將ByteBuffer clear掉了。
/** 代碼1.3 緊接着1.2 */ } else {// 如今可用的內存大小沒法知足 // we are out of memory and will have to block int accumulated = 0; ByteBuffer buffer = null; Condition moreMemory = this.lock.newCondition(); long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); // loop over and over until we have a buffer or have reserved // enough memory to allocate one while (accumulated < size) { long startWaitNs = time.nanoseconds(); long timeNs; boolean waitingTimeElapsed; try { // 當前condition進行等待 remainingTimeToBlockNs 毫秒 waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { this.waiters.remove(moreMemory); throw e; } finally { long endWaitNs = time.nanoseconds(); timeNs = Math.max(0L, endWaitNs - startWaitNs); // 統計阻塞時間 this.waitTime.record(timeNs, time.milliseconds()); } //.....
// 通知waiter隊列的隊頭 public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { if (size == this.poolableSize && size == buffer.capacity()) { buffer.clear(); this.free.add(buffer); } else { this.availableMemory += size; } Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) { moreMem.signal(); } } finally { lock.unlock(); } }
在被通知以後,線程又會作什麼呢?
一、首先若是超時了,直接報出異常。
二、再次判斷申請的空間是否是 poolableSize,若是是,則直接從free中取一個(通常也只有一個,由於deallocate是加鎖了的,釋放完內存就會通知waiter),固然也可能free中爲空(由於釋放的ByteBuffer不是Poolable大小,它會被釋放到availableMemory中,進入下一步)
三、走到了這個else分支裏,首先會嘗試從free中釋放所需內存,而後看看剩餘的availableMemory是否比剩餘所要申請的大。若是是,申請的空間就算已經所有知足了。 int got = (int) Math.min(size - accumulated, this.availableMemory)。但若是沒申請完,就會繼續進入while循環,回到await狀態,繼續等有人通知。
四、若是已經申請完了,出隊。此時若是還有空間,會通知waiter的第一個去拿內存,並返回申請的內存。
/** 代碼1.4 緊接着1.3 */ if (waitingTimeElapsed) {// 第一步 this.waiters.remove(moreMemory); throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } // 沒超時,剩餘時間等於本身減去耗時 remainingTimeToBlockNs -= timeNs; // check if we can satisfy this request from the free list, // otherwise allocate memory // 檢查free list是否能夠知足請求,不知足則申請內存 if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {// 第二步 // just grab a buffer from the free list // 和前面是同樣的,從free list中取一個出來 buffer = this.free.pollFirst(); accumulated = size; } else {// 第三步 //todo: 走到這裏說明申請的大小要大於poolableSize,或者free爲空 // we'll need to allocate memory, but we may only get // part of what we need on this iteration // 須要申請內存,可是在這個循環可能只能從中獲取須要內存的一部分,也就是說太大了,會再獲取一次 // size:要申請的大小。 freeUp(size - accumulated); int got = (int) Math.min(size - accumulated, this.availableMemory); this.availableMemory -= got; accumulated += got; } } // remove the condition for this thread to let the next thread // in line start getting memory Condition removed = this.waiters.removeFirst(); if (removed != moreMemory) { throw new IllegalStateException("Wrong condition: this shouldn't happen."); } // signal any additional waiters if there is more memory left // over for them // 第四步、通知其餘waiters去拿內存 if (this.availableMemory > 0 || !this.free.isEmpty()) { if (!this.waiters.isEmpty()) { this.waiters.peekFirst() .signal(); } } // unlock and return the buffer lock.unlock(); if (buffer == null) {// buffer = null 表明內存時直接從free中輪詢釋放的 return ByteBuffer.allocate(size); } else {// buffer不爲空,是直接複用free中的內存 return buffer; } }
經過上面的分析咱們知道,一旦有線程想要申請一塊很大的內存的話,並且這個線程到了隊頭,它就成了大哥。全部釋放的內存,都被分配給它,因此咱們要儘可能地去避免這種狀況!
好比說咱們忽然發送一條須要佔用很大內存的消息,那麼對於kafka的效率來講,將是毀滅性的!
另,若是發送的消息比較平均,且ByteBuffer的poolableSize分配合理,則能夠極大地提高kafka的效率!
《Kafka技術內幕》 鄭奇煌著 《Apache Kafka源碼剖析》 徐郡明著