Jusfr 原創,轉載請註明來自博客園php
Request 與 Response 都是以 長度+內容 形式描述, 見於 A Guide To The Kafka Protocolgit
Request 除了 Size+ApiKey+ApiVersion+CorrelationId+ClientId 這些固定字段, 額外的 RequestMessage 包含了具體請求數據;github
Request => Size ApiKey ApiVersion CorrelationId ClientId RequestMessage Size => int32 ApiKey => int16 ApiVersion => int16 CorrelationId => int32 ClientId => string RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
Response 除了 Size+CorrelationId, 額外的 ResponseMessage 包含了具體響應數據;apache
Response => Size CorrelationId ResponseMessage Size => int32 CorrelationId => int32 ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse
序列化 Request 須要分配內存, 從緩衝區讀取 Response 同理.設計模式
MemoryStream 是一個可靠方案, 它實現了自動擴容, 但擴容過程離不開字節拷貝, 而頻繁分配不小的內存將影響性能, 近似的擴容示例代碼以下:數組
// init Byte[] buffer = new Byte[4096]; Int32 offset = 0; //write bytes Byte[] bytePrepareCopy = // from outside if (bytePrepareCopy > buffer.Length - offset) { Byte[] newBuffer = new Byte[buffer.Length * 2]; Array.Copy(buffer, 0, newBuffer, 0, offset); buffer = newBuffer; } Array.Copy(bytePrepareCopy, 0, buffer, offset, bytePrepareCopy.Length);
數組擴容能夠參見 List 的實現, 這裏只是示意, 沒有處理長度爲 (buffer.Length*2 - offset) < bytePrepareCopy.Length 的狀況安全
在數組長度超4k 時,擴容成本很是高。若是約定「請求和響應不得超過4k「, 那麼使用可回收(見下文相關內容)的固定長度的數組模擬 MemoryStream 的讀取和寫入行爲, 可以達到極大的性能收益。markdown
KafkaStreamBinary (見於 github) 內部使用 MemoryStream, KafkaFixedBinary (見於 github) 則是基於數組的實現;ide
使用過 Memcached 的人很容易理解 BufferManager 的思路: 爲了下降頻繁開闢內存帶來的開銷,首先「將內存塊化」, 申請者獲取到「成塊的內存」, 被分配出去的內存塊標記爲「已分配」; 與 Memcached 不一樣的是 BufferManager 指望申請者歸還使用完後的內存塊,以從新分配給其餘申請操做。oop
System.ServiceModel.Channels.BufferManager 提供了一個可靠實現, 大體使用方式以下:
const Int32 size = 4096; BufferManager bm = BufferManager.CreateBufferManager(maxBufferPoolSize: size * 32, maxBufferSize: size); Byte[] buffer = bm.TakeBuffer(1024); bm.ReturnBuffer(buffer);
與手動分配內容的性能對比
const Int32 size = 4096; BufferManager bm = BufferManager.CreateBufferManager(maxBufferPoolSize: size * 10, maxBufferSize: size); var timer = new FunctionTimer(); timer.Push("BufferManager", () => { Byte[] buffer = bm.TakeBuffer(size); bm.ReturnBuffer(buffer); }); timer.Push("new Byte[]", () => { Byte[] buffer = new Byte[size]; }); timer.Initialize(); timer.Execute(100000).Print();
測試結果:
BufferManager
Time Elapsed : 7ms CPU Cycles : 17,055,523 Memory cost : 3,388 Gen 0 : 2 Gen 1 : 2 Gen 2 : 2 new Byte[] Time Elapsed : 42ms CPU Cycles : 113,437,539 Memory cost : 24 Gen 0 : 263 Gen 1 : 2 Gen 2 : 2
強制要求業務使用的請求不超過4k 貌似作獲得,但需求更大內存的場景老是存在,好比合並消息、批量消費等,Chuye.Kafka 做爲類庫須要提供支持。
KafkaScalableBinary 並無發明新東西, 在其內部維護了一個 Dictionary<int32, byte[]=""> 保存一系列 Byte數組;
初始化時並未真正分配內存, 除非開始寫入;
public KafkaScalableBinary() : this(4096) { } public KafkaScalableBinary(Int32 size) { if (size <= 0) { throw new ArgumentOutOfRangeException("size"); } _lengthPerArray = size; _buffers = new Dictionary<Int32, Byte[]>(16); }
寫入時先根據當前位置對數組長度取模 _position / _lengthPerArray 找到待寫入數組,不存在則分配新數組;
private Byte[] GetBufferForWrite() { var index = (Int32)(_position / _lengthPerArray); Byte[] buffer; if (!_buffers.TryGetValue(index, out buffer)) { if (_lengthPerArray >= 128) { buffer = ServiceProvider.BufferManager.TakeBuffer(_lengthPerArray); } else { buffer = new Byte[_lengthPerArray]; } _buffers.Add(index, buffer); } return buffer; }
而後根據當前位置對數組長度取整 _position % _lengthPerArray 找到目標位置;因爲待寫入長度可能超過可以使用長度,這裏使用了 while 循環,一邊獲取和分配待寫入數組, 一邊將剩餘字節寫入其中,直至完成;
public override void WriteByte(Byte[] buffer, int offset, int count) { if (buffer == null) { throw new ArgumentNullException("buffer"); } if (buffer.Length == 0) { return; } if (buffer.Length < count) { throw new ArgumentOutOfRangeException(); } checked { var left = count; //標記剩餘量 while (left > 0) { var targetBuffer = GetBufferForWrite(); //查找目標數組 var targetOffset = (Int32)(_position % _lengthPerArray); //查找目標位置 if (targetOffset == _lengthPerArray - 1) { //若是位置已經位於數組末尾, 說明位於起始位置; targetOffset = 0; } var prepareCopy = left; //準備寫入剩餘量 if (prepareCopy > _lengthPerArray - targetOffset) { //但數組的剩餘長度可能不夠,寫入較小長度 prepareCopy = _lengthPerArray - targetOffset; } Array.Copy(buffer, count - left, targetBuffer, targetOffset, prepareCopy); //拷貝字節 _position += prepareCopy; //推動位置 left -= prepareCopy; //減少剩餘量 if (_position > _length) { //增大總長度 _length = _position; } } } }
讀取過程相似,循環查找待讀取數組和拷貝字節直到完成,不一樣的是分配內存的邏輯以一條異常替代;
public override Int32 ReadBytes(Byte[] buffer, int offset, int count) { if (buffer == null) { throw new ArgumentNullException("buffer"); } if (buffer.Length == 0) { return 0; } if (buffer.Length < count) { throw new ArgumentOutOfRangeException(); } checked { var prepareRead = (Int32)(Math.Min(count, _length - _position)); //計算待讀取長度 var left = prepareRead; //標記剩餘量 while (left > 0) { var targetBuffer = GetBufferForRead(); //查找目標數組 var targetOffset = (Int32)(_position % _lengthPerArray); //查找目標位置 var prepareCopy = left; //準備讀取剩餘量 if (prepareCopy > _lengthPerArray - targetOffset) { prepareCopy = _lengthPerArray - targetOffset; } Array.Copy(targetBuffer, targetOffset, buffer, prepareRead - left, prepareCopy); //但數組的剩餘長度可能不夠,讀取較小長度 _position += prepareCopy; //推動位置 left -= prepareCopy; //減少剩餘量 } return prepareRead; } } private Byte[] GetBufferForRead() { var index = (Int32)(_position / _lengthPerArray); Byte[] buffer; if (!_buffers.TryGetValue(index, out buffer)) { throw new IndexOutOfRangeException(); } return buffer; }
釋放時釋放內部維護的的所有字節;
public override void Dispose() { foreach (var item in _buffers) { if (_lengthPerArray >= 128) { ServiceProvider.BufferManager.ReturnBuffer(item.Value); } } _buffers.Clear(); }
寫入緩衝區是對內部維護數組列表的直接操做,高度優化
public override void CopyTo(Stream destination) { foreach (var item in GetBufferAndSize()) { destination.Write(item.Key, 0, item.Value); } }
讀取緩衝區時和寫入行爲相似
public override void ReadFrom(Stream source, int count) { var left = count; var loop = 0; do { var targetBuffer = GetBufferForWrite(); var targetOffset = (Int32)(_position % _lengthPerArray); var prepareCopy = left; if (prepareCopy > _lengthPerArray - targetOffset) { prepareCopy = _lengthPerArray - targetOffset; } var readed = source.Read(targetBuffer, targetOffset, prepareCopy); _position += readed; left -= readed; if (_position > _length) { _length = _position; } loop++; } while (left > 0); }
實際上能夠從 MemoryStream 定義出 ScalableMemoryStream 再改寫其行爲,KafkaScalableBinary 依賴於 MemoryStream 而不是具體實現,總體就更加"設計模式"了 , 基本邏輯前文已陳述。
測試過程當中發現,一來 **mono 的 BufferManager 實現存在線程安全問題*,故 Chuye.Kafka 提供了一個 ObjectPool 模式的 BufferManager 做爲替代方案; 二是 KafkaScalableBinary 與 ScalableStreamBinary 的性能對比測試結果很是不穩定,但前者頻繁的取橫取整及字典開銷必然是拖累,我會繼續追蹤和優化。
KafkaScalableBinary (見於 github), 序列化部分設計示意:
Jusfr 原創,轉載請註明來自博客園