借鑑以前的Pipeline的操做方式,如今目標是給串口讀取操做也使用上Pipeline。稍微改造一下,如下代碼能夠直接運行。html
協議爲使用連續的4個0XFF做爲結尾,沒有頭標誌。數據總長爲68位定長。c#
我須要判斷從開始到選定位置是否長度足夠,這裏面用來判斷segment
長度我用了這個方式。async
buffer.Slice(0, start.Value).Length >= 64
其實最先不是使用這個東西的,而是使用的SequencePosition的GetInteger()方法,獲取到了位置的index,天然就知道了長度等信息,並且很是方便進行截取操做。但是在使用的時候,發現一個很是詭異的問題:經過這個方法獲取到的index值要大於Buffer的總長度。Slice直接彈出ArgumentOutOfRangeException
,可是不彈出錯誤,調試的時候很是麻煩。spa
查看這個方法定義的時候,發現簽名是這樣的:調試
[EditorBrowsable(EditorBrowsableState.Never)] public int GetInteger();
這個東西第一次見到,VS並不會提示,可是你強行寫的話,可以正常編譯。看來微軟並非很像讓咱們看到這個玩意。仔細挖掘一下,發現經過PositionOf
方法得到的SequencePosition
內部引用了一段長度爲4096的內存。這個GetInteger()有時候返回的是在這段Memory上面的Index值。
看來這個東西是內部使用的,不太推薦咱們使用。code
咱們只能使用GetPosition
方法來得到相對的位置。不過我用的這個設備,協議是尾部標誌,若是使用PostionOf
的話,偏移量得是負數。在嘗試了不少次不通以後,發現微軟文檔中有說到這個潛在問題:blog
buffer.Slice(0, pos)
得到最長的片斷,並將內容傳輸給處理程序進行。總之,不要使用操做ReadOnlySpan
爲了解決操做的複雜性,.NET Core 3.0引入了一個
SequenceReader<T>
簡化了操做,之後有機會使用的時候在寫吧。ip
最後程序以下:
private async void Sp_DataReceived(object sender, SerialDataReceivedEventArgs e) { SerialPort p = sender as SerialPort; byte[] bytes = new byte[1024 * 4]; var dataCount = p.Read(bytes, 0, p.BytesToRead); var span = bytes.AsMemory().Slice(0, dataCount); await FillPipeAsync(span); } private void InitPipe() { Pipe pipe = new Pipe(); writer = pipe.Writer; //Task writing = FillPipeAsync(port, pipe.Writer); Task.Run(() => ReadPipeAsync(pipe.Reader)); //await Task.WhenAll(reading, writing); } private PipeWriter writer; private async Task FillPipeAsync(ReadOnlyMemory<byte> memory) { await writer.WriteAsync(memory); //writer.Advance(memory.Length); // Make the data available to the PipeReader FlushResult result = await writer.FlushAsync(); if (result.IsCompleted) writer.Complete(); } private async Task ReadPipeAsync(PipeReader reader) { while (true) { try { ReadResult result = await reader.ReadAsync(); ReadOnlySequence<byte> buffer = result.Buffer; SequencePosition? start = null; var headBytes = new byte[] { 0xff, 0xff, 0xff, 0xff }; do { // Find the EOL start = buffer.PositionOf(headBytes[0]); if (start != null) { if (buffer.Slice(start.Value).Length >= 4) { var headtoCheck = buffer.Slice(start.Value, 4).ToArray(); if (headtoCheck.SequenceEqual(headBytes)) { if(buffer.Slice(0, start.Value).Length >= 64) { var pos = buffer.GetPosition(4, start.Value); var mes = buffer.Slice(0, pos); DataProcess.Process(mes.ToArray()); var next = buffer.GetPosition(4, start.Value); buffer = buffer.Slice(next); } else { var next = buffer.GetPosition(4, start.Value); buffer = buffer.Slice(next); } } else { var next = buffer.GetPosition(1, start.Value); buffer = buffer.Slice(next); } } else { var next = buffer.GetPosition(1, start.Value); buffer = buffer.Slice(next); } } } while (start != null); // We sliced the buffer until no more data could be processed // Tell the PipeReader how much we consumed and how much we left to process if (result.IsCompleted) { continue; } reader.AdvanceTo(buffer.Start, buffer.End); } catch (ArgumentOutOfRangeException e) { //throw e; } } reader.Complete(); }