使用高性能Pipelines構建.NET通信程序

原文: 使用高性能Pipelines構建.NET通信程序

.NET Standard支持一組新的API,System.Span , System.Memory ,還有System.IO.Pipelines。這幾個新的API極大了提高了.NET程序的效能,未來.NET不少基礎API都會使用它們進行重寫。 html

Pipelines旨在解決.NET編寫Socket通訊程序時的不少困難,相信讀者也對此不勝其煩,使用stream模型進行編程,就算可以解決,也是實在麻煩。shell

System.IO.Pipelines使用簡單的內存片斷來管理數據,能夠極大的簡化編寫程序的過程。關於Pipelines的詳細介紹,能夠看看這裏。如今ASP.NET Core中使用的Kestrel已經在使用這個API。(話說這個東西貌似就是Kestrel團隊搞出來的。)編程

多是直接須要用Socket場景有限(物聯網用的還挺多的),Pipelines相關的資料感受不是不少。官方給出的示例是基於ASCII協議的,有固定結尾的協議,這裏我以物聯網設備經常使用的BINARY二進制自定義協議爲例,講解基於Pipelines的程序套路。c#

System.IO.Pipelines

與基於Stream的方式不一樣,pipelines提供一個pipe,用於存儲數據,pipe中間存儲的數據有點鏈表的感受,能夠基於SequencePosition進行slice操做,這樣就能獲得一個ReadOnlySequence<T>對象。reader能夠進行自定義操做,並在操做完成以後告訴pipe已經處理了多少數據,整個過程是不須要進行內存複製操做的,所以性能獲得了提高,還少了不少麻煩。能夠簡單理解做爲服務器端,流程:數組

接受數據循環:接到數據->放pipe裏面->告訴pipe放了多少數據
處理數據循環:在pipe裏面找一條完整數據->交給處理流程->告訴pipe處理了多少數據服務器

協議

有一款設備,binary協議,數據包開頭0x75, 0xbd, 0x7e, 0x97一共4個字節,隨後跟數據包長度2個字節(固定2400字節,不固定長度也能夠參照),隨後是數據區。在設備鏈接成功以後,數據主動從設備發送到PC。socket

關鍵代碼

雖然是.NET Core平臺的,可是.NET FRAMEWORK 4.6.1上面也能夠nuget安裝,直接async

install-package system.io.pipelines

進行安裝就能夠了。Socket相關處理的代碼再也不寫了,只列關鍵的。函數

代碼第一步是聲明pipe。性能

private async void InitPipe(Socket socket)
{
    Pipe pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(socket, pipe.Reader);

    await Task.WhenAll(reading, writing);
}

pipe有reader還有一個writer,reader負責讀取pipe數據,主要用在數據處理循環,writer負責將數據寫入pipe,主要用在數據接受循環。

//寫入循環
private async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    //數據流量比較大,用1M字節做爲buffer
    const int minimumBufferSize = 1024 * 1024;

    while (running)
    {
        try
        {
            //從writer中,得到一段很多於指定大小的內存空間
            Memory<byte> memory = writer.GetMemory(minimumBufferSize);

            //將內存空間變成ArraySegment,提供給socket使用
            if (!MemoryMarshal.TryGetArray((ReadOnlyMemory<byte>)memory, out ArraySegment<byte> arraySegment))
            {
                throw new InvalidOperationException("Buffer backed by array was expected");
            }
            //接受數據
            int bytesRead = await SocketTaskExtensions.ReceiveAsync(socket, arraySegment, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }

            //一次接受完畢,數據已經在pipe中,告訴pipe已經給它寫了多少數據。
            writer.Advance(bytesRead);
        }
        catch
        {
            break;
        }

        // 提示reader能夠進行讀取數據,reader能夠繼續執行readAsync()方法
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

    // 告訴pipe完事了
    writer.Complete();
}

//讀取循環
private async Task ReadPipeAsync(Socket socket, PipeReader reader)
{
    while (running)
    {
        //等待writer寫數據
        ReadResult result = await reader.ReadAsync();
        //得到內存區域
        ReadOnlySequence<byte> buffer = result.Buffer;
        SequencePosition? position = null;

        do
        {
            //尋找head的第一個字節所在的位置
            position = buffer.PositionOf((byte)0x75);
            if (position != null)
            {
                //因爲是連續四個字節做爲head,須要進行比對,我這裏直接使用了ToArray方法,仍是有了內存拷貝動做,不是很理想,可是寫起來很方便。
                //對性能有更高要求的場景,能夠進行slice操做後的單獨比對,這樣不須要內存拷貝動做
                var headtoCheck = buffer.Slice(position.Value, 4).ToArray();
                //SequenceEqual須要引用System.Linq
                if (headtoCheck.SequenceEqual(new byte[] { 0x75, 0xbd, 0x7e, 0x97 }))
                {
                    //到這裏,認爲找到包開頭了(從position.value開始),接下來須要從開頭處截取整包的長度,須要先判斷長度是否足夠
                    if (buffer.Slice(position.Value).Length >= 2400)
                    {
                        //長度足夠,那麼取出ReadOnlySequence,進行操做
                        var mes = buffer.Slice(position.Value, 2400);
                        //這裏是數據處理的函數,能夠參考官方文檔對ReadOnlySequence進行操做,文檔裏面使用了span,那樣性能會好一些。我這裏簡單實用ToArray()操做,這樣也有了內存拷貝的問題,可是處理的直接是byte數組了。
                        await ProcessMessage(mes.ToArray());
                        //這一段就算是完成了,從開頭位置,一整個包的長度就算完成了
                        var next = buffer.GetPosition(2400, position.Value);
                        //將buffer處理過的捨棄,替換爲剩餘的buffer引用
                        buffer = buffer.Slice(next);
                    }
                    else
                    {
                        //長度不夠,說明數據包不完整,等下一波數據進來再拼接,跳出循環。
                        break;
                    }
                }
                else
                {
                    //第一個是0x75可是後面不匹配,可能有數據傳輸問題,那麼須要捨棄第一個,0x75後面的字節開始再從新找0x75
                    var next = buffer.GetPosition(1, position.Value);
                    buffer = buffer.Slice(next);
                }
            }
        }
        while (position != null);

        //數據處理完畢,告訴pipe還剩下多少數據沒有處理(數據包不完整的數據,找不到head)
        reader.AdvanceTo(buffer.Start, buffer.End);

        if (result.IsCompleted)
        {
            break;
        }
    }

    reader.Complete();
}

以上代碼基本解決了如下問題:

  • 數據接收不完整,找不到開頭結尾,致使數據大量丟棄,或者本身維護一個queue的代碼複雜性
  • 數據接收與處理的同步問題
  • 一次性收到多條數據的狀況

後記

本文只是解釋了pipeline處理的模式,對於茫茫多的ToArray方法,能夠使用基於Span的操做進行優化(有時間就來填坑)。另外,若是在await ProcessMessage(mes.ToArray());這裏,直接使用Task.Run(()=>ProcessMessage(mes);代替的話,實測會出現莫名其妙的問題,頗有多是pipe運行快,在系統調度Task以前,已經將內存釋放致使的,若是須要優化這一塊的話,須要格外注意。

相關文章
相關標籤/搜索