面試官:Kafka 如何優化內存緩衝機制形成的頻繁 GC 問題?

Jusfr 原創,轉載請註明來自博客園php

Request 與 Response 的響應格式

Request 與 Response 都是以 長度+內容 形式描述, 見於 A Guide To The Kafka Protocolgit

Request 除了 Size+ApiKey+ApiVersion+CorrelationId+ClientId 這些固定字段, 額外的 RequestMessage 包含了具體請求數據;github

Request => Size ApiKey ApiVersion CorrelationId ClientId RequestMessage
  Size => int32
  ApiKey => int16
  ApiVersion => int16
  CorrelationId => int32
  ClientId => string
  RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest

Response 除了 Size+CorrelationId, 額外的 ResponseMessage 包含了具體響應數據;apache

Response => Size CorrelationId ResponseMessage Size => int32 CorrelationId => int32 ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse

處理序列化與反序列化需求

使用 MemoryStream

序列化 Request 須要分配內存, 從緩衝區讀取 Response 同理.設計模式

MemoryStream 是一個可靠方案, 它實現了自動擴容, 但擴容過程離不開字節拷貝, 而頻繁分配不小的內存將影響性能, 近似的擴容示例代碼以下:數組

// init Byte[] buffer = new Byte[4096]; Int32 offset = 0; //write bytes Byte[] bytePrepareCopy = // from outside if (bytePrepareCopy > buffer.Length - offset) { Byte[] newBuffer = new Byte[buffer.Length * 2]; Array.Copy(buffer, 0, newBuffer, 0, offset); buffer = newBuffer; } Array.Copy(bytePrepareCopy, 0, buffer, offset, bytePrepareCopy.Length);

數組擴容能夠參見 List 的實現, 這裏只是示意, 沒有處理長度爲 (buffer.Length*2 - offset) < bytePrepareCopy.Length 的狀況安全

在數組長度超4k 時,擴容成本很是高。若是約定「請求和響應不得超過4k「, 那麼使用可回收(見下文相關內容)的固定長度的數組模擬 MemoryStream 的讀取和寫入行爲, 可以達到極大的性能收益。markdown

KafkaStreamBinary (見於 github) 內部使用 MemoryStream, KafkaFixedBinary (見於 github) 則是基於數組的實現;ide

使用 BufferManager

使用過 Memcached 的人很容易理解 BufferManager 的思路: 爲了下降頻繁開闢內存帶來的開銷,首先「將內存塊化」, 申請者獲取到「成塊的內存」, 被分配出去的內存塊標記爲「已分配」; 與 Memcached 不一樣的是 BufferManager 指望申請者歸還使用完後的內存塊,以從新分配給其餘申請操做。oop

System.ServiceModel.Channels.BufferManager 提供了一個可靠實現, 大體使用方式以下:

const Int32 size = 4096; BufferManager bm = BufferManager.CreateBufferManager(maxBufferPoolSize: size * 32, maxBufferSize: size); Byte[] buffer = bm.TakeBuffer(1024); bm.ReturnBuffer(buffer);

與手動分配內容的性能對比

const Int32 size = 4096; BufferManager bm = BufferManager.CreateBufferManager(maxBufferPoolSize: size * 10, maxBufferSize: size); var timer = new FunctionTimer(); timer.Push("BufferManager", () => { Byte[] buffer = bm.TakeBuffer(size); bm.ReturnBuffer(buffer); }); timer.Push("new Byte[]", () => { Byte[] buffer = new Byte[size]; }); timer.Initialize(); timer.Execute(100000).Print();

測試結果:

BufferManager
    Time Elapsed : 7ms CPU Cycles : 17,055,523 Memory cost : 3,388 Gen 0 : 2 Gen 1 : 2 Gen 2 : 2 new Byte[] Time Elapsed : 42ms CPU Cycles : 113,437,539 Memory cost : 24 Gen 0 : 263 Gen 1 : 2 Gen 2 : 2 
  • 太小的內容使用沒有使用 BufferManager 的必要,但BufferManager分配超過 4k 內存時性能降低明顯;
  • 最優狀況是申請人獲取的內存塊大小一致,若是設置maxBufferSize = 4k,但 TakeBuffer(Int32 bufferSize) 方法使用的參數大於 4k,測試代表性能還不如手動建立 Byte 數組;
  • mono 的實現存在線程安全的問題;

強制要求業務使用的請求不超過4k 貌似作獲得,但需求更大內存的場景老是存在,好比合並消息、批量消費等,Chuye.Kafka 做爲類庫須要提供支持。

KafkaScalableBinary = BufferManager + Byte[][]

KafkaScalableBinary 並無發明新東西, 在其內部維護了一個 Dictionary<int32, byte[]=""> 保存一系列 Byte數組;

初始化時並未真正分配內存, 除非開始寫入;

public KafkaScalableBinary() : this(4096) { } public KafkaScalableBinary(Int32 size) { if (size <= 0) { throw new ArgumentOutOfRangeException("size"); } _lengthPerArray = size; _buffers = new Dictionary<Int32, Byte[]>(16); }

寫入時先根據當前位置對數組長度取模 _position / _lengthPerArray 找到待寫入數組,不存在則分配新數組;

private Byte[] GetBufferForWrite() { var index = (Int32)(_position / _lengthPerArray); Byte[] buffer; if (!_buffers.TryGetValue(index, out buffer)) { if (_lengthPerArray >= 128) { buffer = ServiceProvider.BufferManager.TakeBuffer(_lengthPerArray); } else { buffer = new Byte[_lengthPerArray]; } _buffers.Add(index, buffer); } return buffer; }

而後根據當前位置對數組長度取整 _position % _lengthPerArray 找到目標位置;因爲待寫入長度可能超過可以使用長度,這裏使用了 while 循環,一邊獲取和分配待寫入數組, 一邊將剩餘字節寫入其中,直至完成;

public override void WriteByte(Byte[] buffer, int offset, int count) { if (buffer == null) { throw new ArgumentNullException("buffer"); } if (buffer.Length == 0) { return; } if (buffer.Length < count) { throw new ArgumentOutOfRangeException(); } checked { var left = count; //標記剩餘量 while (left > 0) { var targetBuffer = GetBufferForWrite(); //查找目標數組 var targetOffset = (Int32)(_position % _lengthPerArray); //查找目標位置 if (targetOffset == _lengthPerArray - 1) { //若是位置已經位於數組末尾, 說明位於起始位置; targetOffset = 0; } var prepareCopy = left; //準備寫入剩餘量 if (prepareCopy > _lengthPerArray - targetOffset) { //但數組的剩餘長度可能不夠,寫入較小長度 prepareCopy = _lengthPerArray - targetOffset; } Array.Copy(buffer, count - left, targetBuffer, targetOffset, prepareCopy); //拷貝字節 _position += prepareCopy; //推動位置 left -= prepareCopy; //減少剩餘量 if (_position > _length) { //增大總長度 _length = _position; } } } } 

讀取過程相似,循環查找待讀取數組和拷貝字節直到完成,不一樣的是分配內存的邏輯以一條異常替代;

public override Int32 ReadBytes(Byte[] buffer, int offset, int count) { if (buffer == null) { throw new ArgumentNullException("buffer"); } if (buffer.Length == 0) { return 0; } if (buffer.Length < count) { throw new ArgumentOutOfRangeException(); } checked { var prepareRead = (Int32)(Math.Min(count, _length - _position)); //計算待讀取長度 var left = prepareRead; //標記剩餘量 while (left > 0) { var targetBuffer = GetBufferForRead(); //查找目標數組 var targetOffset = (Int32)(_position % _lengthPerArray); //查找目標位置 var prepareCopy = left; //準備讀取剩餘量 if (prepareCopy > _lengthPerArray - targetOffset) { prepareCopy = _lengthPerArray - targetOffset; } Array.Copy(targetBuffer, targetOffset, buffer, prepareRead - left, prepareCopy); //但數組的剩餘長度可能不夠,讀取較小長度 _position += prepareCopy; //推動位置 left -= prepareCopy; //減少剩餘量 } return prepareRead; } } private Byte[] GetBufferForRead() { var index = (Int32)(_position / _lengthPerArray); Byte[] buffer; if (!_buffers.TryGetValue(index, out buffer)) { throw new IndexOutOfRangeException(); } return buffer; }

釋放時釋放內部維護的的所有字節;

public override void Dispose() { foreach (var item in _buffers) { if (_lengthPerArray >= 128) { ServiceProvider.BufferManager.ReturnBuffer(item.Value); } } _buffers.Clear(); }

寫入緩衝區是對內部維護數組列表的直接操做,高度優化

public override void CopyTo(Stream destination) { foreach (var item in GetBufferAndSize()) { destination.Write(item.Key, 0, item.Value); } }

讀取緩衝區時和寫入行爲相似

public override void ReadFrom(Stream source, int count) { var left = count; var loop = 0; do { var targetBuffer = GetBufferForWrite(); var targetOffset = (Int32)(_position % _lengthPerArray); var prepareCopy = left; if (prepareCopy > _lengthPerArray - targetOffset) { prepareCopy = _lengthPerArray - targetOffset; } var readed = source.Read(targetBuffer, targetOffset, prepareCopy); _position += readed; left -= readed; if (_position > _length) { _length = _position; } loop++; } while (left > 0); }

實際上能夠從 MemoryStream 定義出 ScalableMemoryStream 再改寫其行爲,KafkaScalableBinary 依賴於 MemoryStream 而不是具體實現,總體就更加"設計模式"了 , 基本邏輯前文已陳述。

測試過程當中發現,一來 **mono 的 BufferManager 實現存在線程安全問題*,故 Chuye.Kafka 提供了一個 ObjectPool 模式的 BufferManager 做爲替代方案; 二是 KafkaScalableBinary 與 ScalableStreamBinary 的性能對比測試結果很是不穩定,但前者頻繁的取橫取整及字典開銷必然是拖累,我會繼續追蹤和優化。

KafkaScalableBinary (見於 github), 序列化部分設計示意:

 


Jusfr 原創,轉載請註明來自博客園

相關文章
相關標籤/搜索