System.IO.Pipelines
是一個新的庫,旨在簡化在.NET中執行高性能IO的過程。它是一個依賴.NET Standard的庫,適用於全部.NET實現。html
Pipelines誕生於.NET Core團隊,爲使Kestrel成爲業界最快的Web服務器之一。最初從做爲Kestrel內部的實現細節發展成爲可重用的API,它在.Net Core 2.1中做爲可用於全部.NET開發人員的最高級BCL API(System.IO.Pipelines)提供。git
爲了正確解析Stream或Socket中的數據,代碼有固定的樣板,而且有許多極端狀況,爲了處理他們,不得不編寫難以維護的複雜代碼。
實現高性能和正確性,同時也難以處理這種複雜性。Pipelines旨在解決這種複雜性。github
讓咱們從一個簡單的問題開始吧。咱們想編寫一個TCP服務器,它接收來自客戶端的用行分隔的消息(由\n
分隔)。(譯者注:即一行爲一條消息)數組
聲明:與全部對性能敏感的工做同樣,應在應用程序中測量每一個方案的實際狀況。根據您的網絡應用程序須要處理的規模,可能不須要在意的各類技術的開銷。緩存
在Pipelines以前用.NET編寫的典型代碼以下所示:安全
async Task ProcessLinesAsync(NetworkStream stream) { var buffer = new byte[1024]; await stream.ReadAsync(buffer, 0, buffer.Length); // 在buffer中處理一行消息 ProcessLine(buffer); }
此代碼可能在本地測試時正確工做,但它有幾個潛在錯誤:服務器
ReadAsync
調用可能沒有收到整個消息(行尾)。stream.ReadAsync()
返回值中實際填充到buffer
中的數據量。(譯者注:即不必定將buffer
填充滿)ReadAsync
調用不能處理多條消息。這些是讀取流數據時常見的一些缺陷。爲了解決這個問題,咱們須要作一些改變:網絡
async Task ProcessLinesAsync(NetworkStream stream) { var buffer = new byte[1024]; var bytesBuffered = 0; var bytesConsumed = 0; while (true) { var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, buffer.Length - bytesBuffered); if (bytesRead == 0) { // EOF 已經到末尾 break; } // 跟蹤已緩衝的字節數 bytesBuffered += bytesRead; var linePosition = -1; do { // 在緩衝數據中查找找一個行末尾 linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed); if (linePosition >= 0) { // 根據偏移量計算一行的長度 var lineLength = linePosition - bytesConsumed; // 處理這一行 ProcessLine(buffer, bytesConsumed, lineLength); // 移動bytesConsumed爲了跳過咱們已經處理掉的行 (包括\n) bytesConsumed += lineLength + 1; } } while (linePosition >= 0); } }
這一次,這可能適用於本地開發,但一行可能大於1KiB(1024字節)。咱們須要調整輸入緩衝區的大小,直到找到新行。數據結構
所以,咱們能夠在堆上分配緩衝區去處理更長的一行。咱們從客戶端解析較長的一行時,能夠經過使用ArrayPool<byte>
避免重複分配緩衝區來改進這一點。併發
async Task ProcessLinesAsync(NetworkStream stream) { byte[] buffer = ArrayPool<byte>.Shared.Rent(1024); var bytesBuffered = 0; var bytesConsumed = 0; while (true) { // 在buffer中計算中剩餘的字節數 var bytesRemaining = buffer.Length - bytesBuffered; if (bytesRemaining == 0) { // 將buffer size翻倍 而且將以前緩衝的數據複製到新的緩衝區 var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2); Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length); // 將舊的buffer丟回池中 ArrayPool<byte>.Shared.Return(buffer); buffer = newBuffer; bytesRemaining = buffer.Length - bytesBuffered; } var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining); if (bytesRead == 0) { // EOF 末尾 break; } // 跟蹤已緩衝的字節數 bytesBuffered += bytesRead; do { // 在緩衝數據中查找找一個行末尾 linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed); if (linePosition >= 0) { // 根據偏移量計算一行的長度 var lineLength = linePosition - bytesConsumed; // 處理這一行 ProcessLine(buffer, bytesConsumed, lineLength); // 移動bytesConsumed爲了跳過咱們已經處理掉的行 (包括\n) bytesConsumed += lineLength + 1; } } while (linePosition >= 0); } }
這段代碼有效,但如今咱們正在從新調整緩衝區大小,從而產生更多緩衝區副本。它將使用更多內存,由於根據代碼在處理一行行後不會縮緩衝區的大小。爲避免這種狀況,咱們能夠存儲緩衝區序列,而不是每次超過1KiB大小時調整大小。
此外,咱們不會增加1KiB的 緩衝區,直到它徹底爲空。這意味着咱們最終傳遞給ReadAsync
愈來愈小的緩衝區,這將致使對操做系統的更多調用。
爲了緩解這種狀況,咱們將在現有緩衝區中剩餘少於512個字節時分配一個新緩衝區:
譯者注:這段代碼太複雜了,懶得翻譯註釋了,你們將就看吧
public class BufferSegment { public byte[] Buffer { get; set; } public int Count { get; set; } public int Remaining => Buffer.Length - Count; } async Task ProcessLinesAsync(NetworkStream stream) { const int minimumBufferSize = 512; var segments = new List<BufferSegment>(); var bytesConsumed = 0; var bytesConsumedBufferIndex = 0; var segment = new BufferSegment { Buffer = ArrayPool<byte>.Shared.Rent(1024) }; segments.Add(segment); while (true) { // Calculate the amount of bytes remaining in the buffer if (segment.Remaining < minimumBufferSize) { // Allocate a new segment segment = new BufferSegment { Buffer = ArrayPool<byte>.Shared.Rent(1024) }; segments.Add(segment); } var bytesRead = await stream.ReadAsync(segment.Buffer, segment.Count, segment.Remaining); if (bytesRead == 0) { break; } // Keep track of the amount of buffered bytes segment.Count += bytesRead; while (true) { // Look for a EOL in the list of segments var (segmentIndex, segmentOffset) = IndexOf(segments, (byte)'\n', bytesConsumedBufferIndex, bytesConsumed); if (segmentIndex >= 0) { // Process the line ProcessLine(segments, segmentIndex, segmentOffset); bytesConsumedBufferIndex = segmentOffset; bytesConsumed = segmentOffset + 1; } else { break; } } // Drop fully consumed segments from the list so we don't look at them again for (var i = bytesConsumedBufferIndex; i >= 0; --i) { var consumedSegment = segments[i]; // Return all segments unless this is the current segment if (consumedSegment != segment) { ArrayPool<byte>.Shared.Return(consumedSegment.Buffer); segments.RemoveAt(i); } } } } (int segmentIndex, int segmentOffest) IndexOf(List<BufferSegment> segments, byte value, int startBufferIndex, int startSegmentOffset) { var first = true; for (var i = startBufferIndex; i < segments.Count; ++i) { var segment = segments[i]; // Start from the correct offset var offset = first ? startSegmentOffset : 0; var index = Array.IndexOf(segment.Buffer, value, offset, segment.Count - offset); if (index >= 0) { // Return the buffer index and the index within that segment where EOL was found return (i, index); } first = false; } return (-1, -1); }
此代碼只是獲得不少更加複雜。當咱們正在尋找分隔符時,咱們同時跟蹤已填充的緩衝區序列。爲此,咱們此處使用List<BufferSegment>
查找新行分隔符時表示緩衝數據。其結果是,ProcessLine
和IndexOf
如今接受List<BufferSegment>
做爲參數,而不是一個byte[],offset和count
。咱們的解析邏輯如今須要處理一個或多個緩衝區序列。
咱們的服務器如今處理部分消息,它使用池化內存來減小整體內存消耗,但咱們還須要進行更多更改:
byte[]
和ArrayPool<byte>
的只是普通的託管數組。這意味着不管什麼時候咱們執行ReadAsync
或WriteAsync
,這些緩衝區都會在異步操做的生命週期內被固定(以便與操做系統上的本機IO API互操做)。這對GC有性能影響,由於沒法移動固定內存,這可能致使堆碎片。根據異步操做掛起的時間長短,池的實現可能須要更改。複雜性已經到了極端(咱們甚至沒有涵蓋全部案例)。高性能網絡應用一般意味着編寫很是複雜的代碼,以便從系統中得到更高的性能。
System.IO.Pipelines的目標是使這種類型的代碼更容易編寫。
讓咱們來看看這個例子的樣子System.IO.Pipelines:
async Task ProcessLinesAsync(Socket socket) { var pipe = new Pipe(); Task writing = FillPipeAsync(socket, pipe.Writer); Task reading = ReadPipeAsync(pipe.Reader); return Task.WhenAll(reading, writing); } async Task FillPipeAsync(Socket socket, PipeWriter writer) { const int minimumBufferSize = 512; while (true) { // 從PipeWriter至少分配512字節 Memory<byte> memory = writer.GetMemory(minimumBufferSize); try { int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None); if (bytesRead == 0) { break; } // 告訴PipeWriter從套接字讀取了多少 writer.Advance(bytesRead); } catch (Exception ex) { LogError(ex); break; } // 標記數據可用,讓PipeReader讀取 FlushResult result = await writer.FlushAsync(); if (result.IsCompleted) { break; } } // 告訴PipeReader沒有更多的數據 writer.Complete(); } async Task ReadPipeAsync(PipeReader reader) { while (true) { ReadResult result = await reader.ReadAsync(); ReadOnlySequence<byte> buffer = result.Buffer; SequencePosition? position = null; do { // 在緩衝數據中查找找一個行末尾 position = buffer.PositionOf((byte)'\n'); if (position != null) { // 處理這一行 ProcessLine(buffer.Slice(0, position.Value)); // 跳過 這一行+\n (basically position 主要位置?) buffer = buffer.Slice(buffer.GetPosition(1, position.Value)); } } while (position != null); // 告訴PipeReader咱們以及處理多少緩衝 reader.AdvanceTo(buffer.Start, buffer.End); // 若是沒有更多的數據,中止都去 if (result.IsCompleted) { break; } } // 將PipeReader標記爲完成 reader.Complete(); }
咱們的行讀取器的pipelines版本有2個循環:
FillPipeAsync
從Socket讀取並寫入PipeWriter。ReadPipeAsync
從PipeReader中讀取並解析傳入的行。與原始示例不一樣,在任何地方都沒有分配顯式緩衝區。這是管道的核心功能之一。全部緩衝區管理都委託給PipeReader/PipeWriter實現。
這使得使用代碼更容易專一於業務邏輯而不是複雜的緩衝區管理。
在第一個循環中,咱們首先調用PipeWriter.GetMemory(int)
從底層編寫器獲取一些內存; 而後咱們調用PipeWriter.Advance(int)
告訴PipeWriter咱們實際寫入緩衝區的數據量。而後咱們調用PipeWriter.FlushAsync()
來提供數據給PipeReader。
在第二個循環中,咱們正在使用PipeWriter最終來自的緩衝區Socket。當調用PipeReader.ReadAsync()
返回時,咱們獲得一個ReadResult包含2條重要信息,包括以ReadOnlySequence<byte>
形式讀取的數據和bool IsCompleted
,讓reader知道writer是否寫完(EOF)。在找到行尾(EOL)分隔符並解析該行以後,咱們將緩衝區切片以跳過咱們已經處理過的內容,而後咱們調用PipeReader.AdvanceTo
告訴PipeReader咱們消耗了多少數據。
在每一個循環結束時,咱們完成了reader和writer。這容許底層Pipe釋放它分配的全部內存。
除了處理內存管理以外,其餘核心管道功能還包括可以在Pipe不實際消耗數據的狀況下查看數據。
PipeReader有兩個核心API ReadAsync
和AdvanceTo
。ReadAsync
獲取Pipe數據,AdvanceTo
告訴PipeReader再也不須要這些緩衝區,以即可以丟棄它們(例如返回到底層緩衝池)。
這是一個http解析器的示例,它在接收Pipe到有效起始行以前讀取部分數據緩衝區數據。
該Pipe實現存儲了在PipeWriter和PipeReader之間傳遞的緩衝區的連接列表。PipeReader.ReadAsync暴露一個ReadOnlySequence<T>新的BCL類型,它表示一個或多個ReadOnlyMemory<T>段的視圖,相似於Span<T>和Memory<T>提供數組和字符串的視圖。
該Pipe內部維護指向reader和writer能夠分配或更新它們的數據集合,。SequencePosition表示緩衝區鏈表中的單個點,可用於有效地對ReadOnlySequence<T>進行切片。
這段實在翻譯困難,給出原文
The Pipe internally maintains pointers to where the reader and writer are in the overall set of allocated data and updates them as data is written or read. The SequencePosition represents a single point in the linked list of buffers and can be used to efficiently slice the ReadOnlySequence.
因爲ReadOnlySequence<T>能夠支持一個或多個段,所以高性能處理邏輯一般基於單個或多個段來分割快速和慢速路徑(fast and slow paths?)。
例如,這是一個將ASCII ReadOnlySequence<byte>轉換爲string如下內容的例程:
string GetAsciiString(ReadOnlySequence<byte> buffer) { if (buffer.IsSingleSegment) { return Encoding.ASCII.GetString(buffer.First.Span); } return string.Create((int)buffer.Length, buffer, (span, sequence) => { foreach (var segment in sequence) { Encoding.ASCII.GetChars(segment.Span, span); span = span.Slice(segment.Length); } }); }
在一個完美的世界中,讀取和解析工做是一個團隊:讀取線程消耗來自網絡的數據並將其放入緩衝區,而解析線程負責構建適當的數據結構。一般,解析將比僅從網絡複製數據塊花費更多時間。結果,讀取線程能夠輕易地壓倒解析線程。結果是讀取線程必須減慢或分配更多內存來存儲解析線程的數據。爲得到最佳性能,在頻繁暫停和分配更多內存之間存在平衡。
爲了解決這個問題,管道有兩個設置來控制數據的流量,PauseWriterThreshold和ResumeWriterThreshold。PauseWriterThreshold決定有多少數據應該在調用PipeWriter.FlushAsync
以前進行緩衝停頓。ResumeWriterThreshold控制reader消耗多少後寫入能夠恢復。
當Pipe的數據量超過PauseWriterThreshold,PipeWriter.FlushAsync
會異步阻塞。數據量變得低於ResumeWriterThreshold,它會解鎖時。兩個值用於防止在極限附近發生反覆阻塞和解鎖。
一般在使用async / await時,會在線程池線程或當前線程上調用continuation SynchronizationContext。
在執行IO時,對執行IO的位置進行細粒度控制很是重要,這樣能夠更有效地利用CPU緩存,這對於Web服務器等高性能應用程序相當重要。Pipelines公開了一個PipeScheduler肯定異步回調運行位置的方法。這使得調用者能夠精確控制用於IO的線程。
實踐中的一個示例是在Kestrel Libuv傳輸中,其中IO回調在專用事件循環線程上運行。
做爲製做System.IO.Pipelines的一部分,咱們還添加了許多新的原始BCL類型:
MemoryPool<T>
,IMemoryOwner<T>
,MemoryManager<T>
- .NET Core 1.0添加了ArrayPool<T>
,在.NET Core 2.1中,咱們如今有一個更通用的抽象,適用於任何工做的池Memory<T>
。這提供了一個可擴展點,容許您插入更高級的分配策略以及控制緩衝區的管理方式(例如,提供預先固定的緩衝區而不是純託管的陣列)。IBufferWriter<T>
- 表示用於寫入同步緩衝數據的接收器。(PipeWriter實現這個)ValueTask<T>
自.NET Core 1.1以來就已存在,但在.NET Core 2.1中得到了一些超級權限,容許無分配的等待異步操做。有關詳細信息,請參閱https://github.com/dotnet/corefx/issues/27445。API存在於System.IO.Pipelines
nuget包中。
如下是使用管道處理基於行的消息的.NET Core 2.1服務器應用程序的示例(上面的示例)https://github.com/davidfowl/TcpEcho。它應該運行`dotnet run`(或經過在Visual Studio中運行)。它偵聽端口8087上的套接字並將收到的消息寫入控制檯。您可使用netcat或putty等客戶端創建與8087的鏈接,併發送基於行的消息以使其正常工做。
今天Pipelines爲Kestrel和SignalR提供支持,咱們但願看見它做爲.NET社區中許多網絡庫和組件的核心。
PS: 首次翻譯英文文章,不足錯漏請指出,多謝支持