kafka producer調用RecordAccumulator#append來將消息存到本地內存。消息以TopicPartition爲key分組存放,每一個TopicPartition對應一個Deque
producer調用send方法的時候,調用RecordAccumulator#append將消息存放到內存中。這裏須要注意的是,append獲取了兩次鎖,這樣作是爲了減小鎖的範圍。緩存
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Callback callback, long maxTimeToBlock) throws InterruptedException { appendsInProgress.incrementAndGet(); try { Deque<RecordBatch> dq = getOrCreateDeque(tp); // 獲取tp對應的Deque<RecordBatch> synchronized (dq) { // 關鍵, 獲取Deque<RecordBatch>的鎖才操做 if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); Deque<RecordBatch> dq = getOrCreateDeque(tp); // 獲取tp對應的Deque<RecordBatch> RecordBatch last = dq.peekLast(); // 往最後一個RecordBatch添加 if (last != null) { // 嘗試添加,後面會詳細講 FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds()); if (future != null) //添加成功就返回了 return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } // 沒有添加成功,說明最後一個RecordBatch空間不足或者last == null // 關鍵, 若是消息體大於batchsize,那麼會建立消息體大小的RecordBatch,即RecordBatch不必定和batchsize相等 int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); // 從BufferPool中分配內存,後面會詳細講 ByteBuffer buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { // 從新獲取鎖,由於allocate的時候不須要鎖dq,這裏也是儘可能減小鎖粒度的一種思想 if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); RecordBatch last = dq.peekLast(); if (last != null) { // 可能在從新獲取鎖以前其餘線程釋放了內存,因此這裏從新獲取下 FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds()); if (future != null) { free.deallocate(buffer); return new RecordAppendResult(future, dq.size() > 1 || last.records.isFull(), false); } } // 尚未獲取到RecordBatch則申請內存建立新的RecordBatch MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); } } finally { appendsInProgress.decrementAndGet(); } }
上述代碼調用RecordBatch#tryAppend嘗試將消息放到RecordBatch,而RecordBatch#tryAppend又調用MemoryRecords#append。app
public long append(long offset, long timestamp, byte[] key, byte[] value) { if (!writable) throw new IllegalStateException("Memory records is not writable"); int size = Record.recordSize(key, value); compressor.putLong(offset); compressor.putInt(size); long crc = compressor.putRecord(timestamp, key, value); compressor.recordWritten(size + Records.LOG_OVERHEAD); return crc; }
這裏的關鍵是compressor,來分析下Compressor,以putInt爲例,其實是調用了DataOutputStream#writeInt方法this
public void putInt(final int value) { try { appendStream.writeInt(value); // appendStream是DataOutputStream類型 } catch (IOException e) { throw new KafkaException("I/O exception when writing to the append stream, closing", e); } }
看下Compressor是如何初始化的:線程
public Compressor(ByteBuffer buffer, CompressionType type) { // ... // create the stream bufferStream = new ByteBufferOutputStream(buffer); appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE); } static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) { try { switch (type) { case NONE: return new DataOutputStream(buffer); // 封裝了ByteBufferOutputStream case GZIP: return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize)); case SNAPPY: try { OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize); return new DataOutputStream(stream); } catch (Exception e) { throw new KafkaException(e); } case LZ4: try { OutputStream stream = (OutputStream) lz4OutputStreamSupplier.get().newInstance(buffer); return new DataOutputStream(stream); } catch (Exception e) { throw new KafkaException(e); } default: throw new IllegalArgumentException("Unknown compression type: " + type); } } catch (IOException e) { throw new KafkaException(e); } }
從上面代碼能夠看到,和ByteBuffer直接關聯的是ByteBufferOutputStream;而DataOutputStream封裝了ByteBufferOutputStream,負責處理壓縮數據,直觀上來看以下圖:code
BufferPool用於管理producer緩存池,使用配置項buffer.memory來指定緩存池的大小,默認是32M。blog
BufferPool#allocate用於從緩存池中申請內存。BufferPool維護了一個ByteBuffer的雙端隊列free,表示空閒的ByteBuffer,只有大小爲batch.size的內存申請纔會從free中去拿去,也就是說free中維護的ByteBuffer都是batch.size大小。隊列
BufferPool幾個關鍵屬性內存
private final long totalMemory; private final int poolableSize; // 一塊連續內存的大小,等於batch.size private final ReentrantLock lock; private final Deque<ByteBuffer> free; // 空閒的ByteBuffer列表,每一個ByteBuffer都是batch.size大小,只有申請的內存等於batch.size大小纔會從free中獲取 private final Deque<Condition> waiters; private long availableMemory; // 還有多少內存能夠用,即buffer.memory-已用內存 // ... }
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); this.lock.lock(); try { if (size == poolableSize && !this.free.isEmpty()) // 關鍵,只有大小等於batch.size的時候纔會從free中獲取 return this.free.pollFirst(); int freeListSize = this.free.size() * this.poolableSize; // 剩餘總內存夠用,可是不能從free中獲取,則將free釋放一些,而後申請對應大小的內存 if (this.availableMemory + freeListSize >= size) { freeUp(size); // 釋放 this.availableMemory -= size; lock.unlock(); return ByteBuffer.allocate(size); } else { // 關鍵,剩餘總內存不夠了,則會阻塞,直到有足夠的內存 int accumulated = 0; ByteBuffer buffer = null; Condition moreMemory = this.lock.newCondition(); long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); // 添加到等待隊列尾 while (accumulated < size) { long startWaitNs = time.nanoseconds(); long timeNs; boolean waitingTimeElapsed; try { waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); // 阻塞 } catch (InterruptedException e) { this.waiters.remove(moreMemory); throw e; } finally { long endWaitNs = time.nanoseconds(); timeNs = Math.max(0L, endWaitNs - startWaitNs); this.waitTime.record(timeNs, time.milliseconds()); } if (waitingTimeElapsed) { this.waiters.remove(moreMemory); throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } remainingTimeToBlockNs -= timeNs; // check if we can satisfy this request from the free list, // otherwise allocate memory if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { // just grab a buffer from the free list buffer = this.free.pollFirst(); accumulated = size; } else { freeUp(size - accumulated); int got = (int) Math.min(size - accumulated, this.availableMemory); this.availableMemory -= got; accumulated += got; } } Condition removed = this.waiters.removeFirst(); // 從頭部獲取,後面詳細講 if (removed != moreMemory) throw new IllegalStateException("Wrong condition: this shouldn't happen."); // 若是分配後還有剩餘空間,即喚醒後續的等待線程 if (this.availableMemory > 0 || !this.free.isEmpty()) { if (!this.waiters.isEmpty()) this.waiters.peekFirst().signal(); // 喚醒頭部 } // unlock and return the buffer lock.unlock(); if (buffer == null) return ByteBuffer.allocate(size); else return buffer; } } finally { if (lock.isHeldByCurrentThread()) lock.unlock(); } }
對於allocate有幾點須要注意ci
Condition removed = this.waiters.removeFirst(); // 從頭部獲取,後面詳細講 if (removed != moreMemory) throw new IllegalStateException("Wrong condition: this shouldn't happen.");
BufferPool#deallocate用於將內存釋放並放回到緩存池。同allocate同樣,只有大小等於batch.size的內存塊纔會放到free中。
public void deallocate(ByteBuffer buffer) { deallocate(buffer, buffer.capacity()); } public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { if (size == this.poolableSize && size == buffer.capacity()) { buffer.clear(); this.free.add(buffer);// 只有大小等於batch.size的內存塊纔會放到free中 } else { // 不然的話只是availableMemory改變,無用的ByteBuffer會被GC清理掉 this.availableMemory += size; } Condition moreMem = this.waiters.peekFirst(); // 喚醒waiters的頭結點 if (moreMem != null) moreMem.signal(); } finally { lock.unlock(); } }