學習 CLR 源碼:連續內存塊數據操做的性能優化

本文主要介紹 C# 命名空間 System.Buffers.Binary 中的一些二進制處理類和 Span 的簡單使用方法,這些二進制處理類型是上層應用處理二進制數據的基礎,掌握這些類型後,咱們能夠很容易地處理類型和二進制數據之間的轉換以及提升程序性能。html

C# 原語類型

按照內存分配來區分,C# 有值類型、引用類型;程序員

按照基礎類型類型來分,C# 有 內置類型、通用類型、自定義類型、匿名類型、元組類型、CTS類型(通用類型系統);數組

C# 的基礎類型包括:安全

  1. 整型: sbyte, byte, short, ushort, int, uint, long, ulong
  2. 實數類型: float, double, decimal
  3. 字符類型: char
  4. 布爾類型: bool
  5. 字符串類型: string

C# 中的原語類型,是基礎類型中的值類型,不包括 string。原語類型可使用 sizeof() 來獲取字節大小,除 bool 外,都有 MaxValueMinValue 兩個字段。架構

sizeof(uint);
uint.MaxValue
uint.MinValue

咱們也能夠在泛型上進行區分,上面的教程類型,除了 string,其餘類型都是 struct。socket

<T>() where T : struct
{
}

更多說明,能夠戳這裏瞭解:https://www.programiz.com/csharp-programming/variables-primitive-data-typeside

1,利用 Buffer 優化數組性能

Buffer 能夠操做基元類型(int、byte等)的數組,利用.NET 中的 Buffer 類,經過更快地訪問內存中的數據來提升應用程序的性能。
Buffer 能夠直接從基元類型的數組中,直接取出指定數量的字節,或者給其某個字節設置值。函數

Buffer 主要在直接操做內存數據、操做非託管內存時,使用 Buffer 能夠帶來安全且高性能的體驗。工具

方法 說明
BlockCopy(Array, Int32, Array, Int32, Int32) 將指定數目的字節從起始於特定偏移量的源數組複製到起始於特定偏移量的目標數組。
ByteLength(Array) 返回指定數組中的字節數。
GetByte(Array, Int32) 檢索指定數組中指定位置的字節。
MemoryCopy(Void, Void, Int64, Int64) 將指定爲長整型值的一些字節從內存中的一個地址複製到另外一個地址。此 API 不符合 CLS。
MemoryCopy(Void, Void, UInt64, UInt64) 將指定爲無符號長整型值的一些字節從內存中的一個地址複製到另外一個地址。此 API 不符合 CLS。
SetByte(Array, Int32, Byte) 將指定的值分配給指定數組中特定位置處的字節。

CLS 指公共語言標準,請參考 https://www.cnblogs.com/whuanle/p/14141213.html#5,clscompliantattribute性能

下面來介紹一下 Buffer 的一些使用方法。

BlockCopy 能夠複製數組的一部分到另外一個數組,其使用方法以下:

int[] arr1 = new int[] { 1, 2, 3, 4, 5 };
        int[] arr2 = new int[10] { 0, 0, 0, 0, 0, 6, 7, 8, 9, 10 };

        // int = 4 byte
        // index:       0  1  2  3  4  5  6  7  8  9  10 11 12 13 14 15 16 17 18 19 ... ...
        // arr1:        01 00 00 00 02 00 00 00 03 00 00 00 04 00 00 00 05 00 00 00
        // arr2:        00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 06 00 00 00 07 00 00 00 08 00 00 00 09 00 00 00 0A 00 00 00

        // Buffer.ByteLength(arr1) == 20 ,
        // Buffer.ByteLength(arr2) == 40


        Buffer.BlockCopy(arr1, 0, arr2, 0, 19);

        for (int i = 0; i < arr2.Length; i++)
        {
            Console.Write(arr2[i] + ",");
        }

.SetByte() 則可細粒度地設置數組的值,便可以直接設置數組中任意一位的值,其使用方法以下:

//source data:
        // 0000,0001,0002,00003,0004
        // 00 00 00 00 01 00 00 00 02 00 00 00 03 00 00 00 04 00 00 00
        int[] a = new int[] { 0, 1, 2, 3, 4 };
        foreach (var item in a)
        {
            Console.Write(item + ",");
        }

        Console.WriteLine("\n------\n");

        // see : https://stackoverflow.com/questions/26455843/how-are-array-values-stored-in-little-endian-vs-big-endian-architecture
        // memory save that data:
        // 0000    1000    2000    3000    4000
        for (int i = 0; i < Buffer.ByteLength(a); i++)
        {
            Console.Write(Buffer.GetByte(a, i));
            if (i != 0 && (i + 1) % 4 == 0)
                Console.Write("    ");
        }

        // 16 進制
        // 0000    1000    2000    3000    4000

        Console.WriteLine("\n------\n");

        Buffer.SetByte(a, 0, 4);
        Buffer.SetByte(a, 4, 3);
        Buffer.SetByte(a, 8, 2);
        Buffer.SetByte(a, 12, 1);
        Buffer.SetByte(a, 16, 0);

        foreach (var item in a)
        {
            Console.Write(item + ",");
        }

        Console.WriteLine("\n------\n");

建議複製代碼自行測試,斷點調試,觀察過程。

2,BinaryPrimitives 細粒度操做字節數組

System.Buffers.Binary.BinaryPrimitives 用來以精確的方式讀取或者字節數組,只能對 bytebyte 數組使用,其使用場景很是普遍。

BinaryPrimitives 的實現原理是 BitConverter,BinaryPrimitives 對 BitConverter 作了一些封裝。BinaryPrimitives 的主要使用方式是以某種形式從 byte 或 byte 數組中讀取出信息。

例如,BinaryPrimitives 在 byte 數組中,一次性讀取四個字節,其示例代碼以下:

// source data:  00 01 02 03 04
        // binary data:  00000000 00000001 00000010 00000011 000001000
        byte[] arr = new byte[] { 0, 1, 2, 3, 4, };

        // read one int,4 byte
        int head = BinaryPrimitives.ReadInt32BigEndian(arr);


        // 5 byte:             00000000 00000001 00000010 00000011 000001000
        // read 4 byte(int) :  00000000 00000001 00000010 00000011
        //                     = 66051

        Console.WriteLine(head);

在 BinaryPrimitives 中有大端小端之分。在 C# 中,應該都是小端在前大端在後的,具體可能會因處理器架構而不一樣。
你可使用 BitConverter.IsLittleEndian 來判斷在當前處理器上,C# 程序是大端仍是小端在前。


.Read...() 開頭的方法,能夠以字節爲定位訪問 byte 數組上的數據。

.Write...() 開頭的方法,能夠向某個位置寫入數據。

下面舉個例子:

// source data:  00 01 02 03 04
        // binary data:  00000000 00000001 00000010 00000011 000001000
        byte[] arr = new byte[] { 0, 1, 2, 3, 4, };

        // read one int,4 byte
        // 5 byte:             00000000 00000001 00000010 00000011 000001000
        // read 4 byte(int) :  00000000 00000001 00000010 00000011
        //                     = 66051

        int head = BinaryPrimitives.ReadInt32BigEndian(arr);
        Console.WriteLine(head);

        // BinaryPrimitives.WriteInt32LittleEndian(arr, 1);
        BinaryPrimitives.WriteInt32BigEndian(arr.AsSpan().Slice(0, 4), 0b00000000_00000000_00000000_00000001);
        // to : 00000000 00000000 00000000 00000001 |  000001000
        // read 4 byte

        head = BinaryPrimitives.ReadInt32BigEndian(arr);
        Console.WriteLine(head);

建議複製代碼自行測試,斷點調試,觀察過程。


提升代碼安全性

C#和.NET Core 有的許多面向性能的 API,C# 和 .NET 的一大優勢是能夠在不犧牲內存安全性的狀況下編寫快速出高性能的庫。咱們在避免使用 unsafe 代碼的狀況下,經過二進制處理類,咱們能夠編寫出高性能的代碼和具備安全性的代碼。

在 C# 中,咱們有如下類型能夠高效操做字節/內存:

  • Span 和C#類型能夠快速安全地訪問內存。表示任意內存的連續區域。使用 span 使咱們能夠序列化爲託管.NET數組,堆棧分配的數組或非託管內存,而無需使用指針。.NET能夠防止緩衝區溢出。
  • ref structSpan
  • stackalloc 用於建立基於堆棧的數組。stackalloc 是在須要較小緩衝區時避免分配的有用工具。
  • 低級方法,並在原始類型和字節之間直接轉換。MemoryMarshal.GetReference()Unsafe.ReadUnaligned()Unsafe.WriteUnaligned()
  • BinaryPrimitives具備用於在.NET基本類型和字節之間進行有效轉換的輔助方法。例如,讀取小尾數字節並返回無符號的64位數字。所提供的方法通過了最優化,並使用了向量化。BinaryPrimitives.ReadUInt64LittleEndianBinaryPrimitive

.Reverse...() 開頭的方法,能夠置換基元類型的大小端。

short value = 0b00000000_00000001;
        // to endianness: 0b00000001_00000000 == 256
        BinaryPrimitives.ReverseEndianness(0b00000000_00000000_00000000_00000001);

        Console.WriteLine(BinaryPrimitives.ReverseEndianness(value));

        value = 0b00000001_00000000;
        Console.WriteLine(BinaryPrimitives.ReverseEndianness(value));
        // 1

3,BitConverter、MemoryMarshal

BitConverter 能夠基元類型和 byte 相互轉換,例如 int 和 byte 互轉,或者任意取出、寫入基元類型的任意一個字節。
其示例以下:

// 0b...1_00000100
        int value = 260;
		
        // byte max value:255
        // a = 0b00000100; 丟失 int ... 00000100 以前的位數。
        byte a = (byte)value;

        // a = 4
        Console.WriteLine(a);

        // LittleEndian
        // 0b 00000100 00000001 00000000 00000000
        byte[] b = BitConverter.GetBytes(260);
        Console.WriteLine(Buffer.GetByte(b, 1)); // 4

        if (BitConverter.IsLittleEndian)
            Console.WriteLine(BinaryPrimitives.ReadInt32LittleEndian(b));
        else
            Console.WriteLine(BinaryPrimitives.ReadInt32BigEndian(b));

MemoryMarshal 提供與 Memory<T>ReadOnlyMemory<T>Span<T>ReadOnlySpan<T> 進行交互操做的方法。

MemoryMarshalSystem.Runtime.InteropServices 命名空間中。

咱們先介紹 MemoryMarshal.Cast(),它能夠將一種基元類型的範圍強制轉換爲另外一種基元類型的範圍。

// 1 int  = 4 byte
        // int [] {1,2}
        // 0001     0002
        var byteArray = new byte[] { 1, 0, 0, 0, 2, 0, 0, 0 };
        Span<byte> byteSpan = byteArray.AsSpan();
        // byte to int 
        Span<int> intSpan = MemoryMarshal.Cast<byte, int>(byteSpan);
        foreach (var item in intSpan)
        {
            Console.Write(item + ",");
        }

最簡單的說法是,MemoryMarshal 能夠將一種結構轉換爲另外一種結構

咱們能夠將一個結構轉換爲字節:

public struct Test
{
    public int A;
    public int B;
    public int C;
}

... ...

        Test test = new Test()
        {
            A = 1,
            B = 2,
            C = 3
        };
        var testArray = new Test[] { test };
        ReadOnlySpan<byte> tmp = MemoryMarshal.AsBytes(testArray.AsSpan());

        // socket.Send(tmp); ...

還能夠逆向還原字節爲結構體:

// bytes = socket.Accept(); .. 
        ReadOnlySpan<Test> testSpan = MemoryMarshal.Cast<byte,Test>(tmp);

        // or
        Test testSpan = MemoryMarshal.Read<Test>(tmp);

例如,咱們要對比兩個結構體數組中,每一個結構體是否相等,能夠採用如下代碼:

static void Main(string[] args)
        {
            int[] a = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
            int[] b = new int[] { 1, 2, 3, 4, 5, 6, 7, 0, 9 };
            _ = Compare64(a,b);
        }

        private static bool Compare64<T>(T[] t1, T[] t2)
            where T : struct
        {
            var l1 = MemoryMarshal.Cast<T, long>(t1);
            var l2 = MemoryMarshal.Cast<T, long>(t2);

            for (int i = 0; i < l1.Length; i++)
            {
                if (l1[i] != l2[i]) return false;
            }
            return true;
        }

後面有個更好的性能提高方案。

程序員基本都學習過 C 語言,應該瞭解 C 語言中的結構體字節對齊,在 C# 中也是同樣,兩種類型相互轉換,除了 C# 結構體轉 C# 結構體,也能夠 C 語言結構體轉 C# 結構體,可是要考慮好字節對齊,若是兩個結構體所佔用的內存大小不同,則可能在轉換時出現數據丟失或出現錯誤。

4,Marshal

Marshal 提供了用於分配非託管內存,複製非託管內存塊以及將託管類型轉換爲非託管類型的方法的集合,以及與非託管代碼進行交互時使用的其餘方法,或者用來肯定對象的大小。

例如,來肯定 C# 中的一些類型大小:

Console.WriteLine("SystemDefaultCharSize={0}, SystemMaxDBCSCharSize={1}",
         Marshal.SystemDefaultCharSize, Marshal.SystemMaxDBCSCharSize);

輸出 char 佔用的字節數。

例如,在調用非託管代碼時,須要傳遞函數指針,C# 通常使用委託傳遞,不少時候爲了不各類內存問題異常問題,須要轉換爲指針傳遞。

IntPtr p = Marshal.GetFunctionPointerForDelegate(_overrideCompileMethod)

Marshal 也能夠很方便地得到一個結構體的字節大小:

public struct Point
{
    public Int32 x, y;
}

Marshal.SizeOf(typeof(Point));

從非託管內存中分配一塊內存和釋放內存,咱們能夠避免 usafe 代碼的使用,代碼示例:

IntPtr hglobal = Marshal.AllocHGlobal(100);
        Marshal.FreeHGlobal(hglobal);

實踐

合理利用前面提到的二進制處理類,能夠在不少方面提高代碼性能,在前面的學習中,咱們大概瞭解這些對象,可是有什麼應用場景?真的可以提高性能?有沒有練習代碼?

這裏筆者舉個例子,如何比較兩個 byte[] 數組是否相等?
最簡單的代碼示例以下:

public bool ForBytes(byte[] a,byte[] b)
        {
            if (a.Length != b.Length)
                return false;
				
            for (int i = 0; i < a.Length; i++)
            {
                if (a[i] != b[i]) return false;
            }
            return true;
        }

這個代碼很簡單,循環遍歷字節數組,一個個判斷是否相等。

若是用上前面的二進制處理對象類,則能夠這樣寫代碼:

private static bool EqualsBytes(byte[] b1, byte[] b2)
        {
            var a = b1.AsSpan();
            var b = b2.AsSpan();
            Span<byte> copy1 = default;
            Span<byte> copy2 = default;

            if (a.Length != b.Length)
                return false;

            for (int i = 0; i < a.Length;)
            {
                if (a.Length - 8 > i)
                {
                    copy1 = a.Slice(i, 8);
                    copy2 = b.Slice(i, 8);
                    if (BinaryPrimitives.ReadUInt64BigEndian(copy1) != BinaryPrimitives.ReadUInt64BigEndian(copy2))
                        return false;
                    i += 8;
                    continue;
                }

                if (a[i] != b[i])
                    return false;
                i++;
            }
            return true;
        }

你可能會在想,第二種方法,這麼多代碼,這麼多判斷,還有各類函數調用,還多建立了一些對象,這特麼可以提高速度?這樣會不會消耗更多內存??? 別急,你可使用如下完整代碼測試:
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Jobs;
using BenchmarkDotNet.Running;
using System;
using System.Buffers.Binary;
using System.Runtime.InteropServices;
using System.Text;

namespace BenTest
{
    [SimpleJob(RuntimeMoniker.NetCoreApp31)]
    [SimpleJob(RuntimeMoniker.CoreRt31)]
    [RPlotExporter]
    public class Test
    {
        private byte[] _a = Encoding.UTF8.GetBytes("5456456456444444444444156456454564444444444444444444444444444444444444444777777777777777777777711111111111116666666666666");
        private byte[] _b = Encoding.UTF8.GetBytes("5456456456444444444444156456454564444444444444444444444444444444444444444777777777777777777777711111111111116666666666666");

        private int[] A1 = new int[] { 41544444, 4487, 841, 8787, 4415, 7, 458, 4897, 87897, 815, 485, 4848, 787, 41, 5489, 74878, 84, 89787, 8456, 4857489, 784, 85489, 47 };
        private int[] B2 = new int[] { 41544444, 4487, 841, 8787, 4415, 7, 458, 4897, 87897, 815, 485, 4848, 787, 41, 5489, 74878, 84, 89787, 8456, 4857489, 784, 85489, 47 };

        [Benchmark]
        public bool ForBytes()
        {
            for (int i = 0; i < _a.Length; i++)
            {
                if (_a[i] != _b[i]) return false;
            }
            return true;
        }

        [Benchmark]
        public bool ForArray()
        {
            return ForArray(A1, B2);
        }

        private bool ForArray<T>(T[] b1, T[] b2) where T : struct
        {
            for (int i = 0; i < b1.Length; i++)
            {
                if (!b1[i].Equals(b2[i])) return false;
            }
            return true;
        }

        [Benchmark]
        public bool EqualsArray()
        {
            return EqualArray(A1, B2);
        }

        [Benchmark]
        public bool EqualsBytes()
        {
            var a = _a.AsSpan();
            var b = _b.AsSpan();
            Span<byte> copy1 = default;
            Span<byte> copy2 = default;

            if (a.Length != b.Length)
                return false;

            for (int i = 0; i < a.Length;)
            {
                if (a.Length - 8 > i)
                {
                    copy1 = a.Slice(i, 8);
                    copy2 = b.Slice(i, 8);
                    if (BinaryPrimitives.ReadUInt64BigEndian(copy1) != BinaryPrimitives.ReadUInt64BigEndian(copy2))
                        return false;
                    i += 8;
                    continue;
                }

                if (a[i] != b[i])
                    return false;
                i++;
            }
            return true;
        }

        private bool EqualArray<T>(T[] t1, T[] t2) where T : struct
        {
            Span<byte> b1 = MemoryMarshal.AsBytes<T>(t1.AsSpan());
            Span<byte> b2 = MemoryMarshal.AsBytes<T>(t2.AsSpan());

            Span<byte> copy1 = default;
            Span<byte> copy2 = default;

            if (b1.Length != b2.Length)
                return false;

            for (int i = 0; i < b1.Length;)
            {
                if (b1.Length - 8 > i)
                {
                    copy1 = b1.Slice(i, 8);
                    copy2 = b2.Slice(i, 8);
                    if (BinaryPrimitives.ReadUInt64BigEndian(copy1) != BinaryPrimitives.ReadUInt64BigEndian(copy2))
                        return false;
                    i += 8;
                    continue;
                }

                if (b1[i] != b2[i])
                    return false;
                i++;
            }
            return true;
        }
    }

    class Program
    {
        static void Main(string[] args)
        {
            var summary = BenchmarkRunner.Run<Test>();
            Console.ReadKey();
        }
    }
}

使用 BenchmarkDotNet 的測試結果以下:

BenchmarkDotNet=v0.13.0, OS=Windows 10.0.19043.1052 (21H1/May2021Update)
Intel Core i7-10700 CPU 2.90GHz, 1 CPU, 16 logical and 8 physical cores
.NET SDK=5.0.301
  [Host]        : .NET Core 3.1.16 (CoreCLR 4.700.21.26205, CoreFX 4.700.21.26205), X64 RyuJIT
  .NET Core 3.1 : .NET Core 3.1.16 (CoreCLR 4.700.21.26205, CoreFX 4.700.21.26205), X64 RyuJIT


|      Method |           Job |       Runtime |     Mean |    Error |   StdDev |
|------------ |-------------- |-------------- |---------:|---------:|---------:|
|    ForBytes | .NET Core 3.1 | .NET Core 3.1 | 76.95 ns | 0.064 ns | 0.053 ns |
|    ForArray | .NET Core 3.1 | .NET Core 3.1 | 66.37 ns | 1.258 ns | 1.177 ns |
| EqualsArray | .NET Core 3.1 | .NET Core 3.1 | 17.91 ns | 0.027 ns | 0.024 ns |
| EqualsBytes | .NET Core 3.1 | .NET Core 3.1 | 26.26 ns | 0.432 ns | 0.383 ns |

能夠看到,byte[] 比較中,使用了二進制對象的方式,耗時降低了近 60ns,而在 struct 的比較中,耗時也降低了 40ns。

在第二種代碼中,咱們使用了 Span、切片、 MemoryMarshal、BinaryPrimitives,這些用法均可以給咱們的程序性能帶來很大的提高。

這裏示例雖然使用了 Span 等,其最主要是利用了 64位 CPU ,64位 CPU 可以一次性讀取 8個字節(64位),所以咱們使用 ReadUInt64BigEndian 一次讀取從字節數組中讀取 8 個字節去進行比較。若是字節數組長度爲 1024 ,那麼第二種方法只須要 比較 128次。

固然,這裏並非這種代碼性能是最強的,由於 CLR 有不少底層方法具備更猛的性能。不過,咱們也看到了,合理使用這些類型,可以很大程度上提升代碼性能。上面的數組對比只是一個簡單的例子,在實際項目中,咱們也能夠挖掘更多使用場景。

更高性能

雖然第二種方法,快了幾倍,可是性能還不夠強勁,咱們能夠利用 Span 中的 API,來實現更快的比較。

[Benchmark]
        public bool SpanEqual()
        {
            return SpanEqual(_a,_b);
        }
        private bool SpanEqual(byte[] a, byte[] b)
        {
            return a.AsSpan().SequenceEqual(b);
        }

能夠試試

StructuralComparisons.StructuralEqualityComparer.Equals(a, b);

性能測試結果:

|      Method |           Job |       Runtime |      Mean |     Error |    StdDev |
|------------ |-------------- |-------------- |----------:|----------:|----------:|
|    ForBytes | .NET Core 3.1 | .NET Core 3.1 | 77.025 ns | 0.0502 ns | 0.0419 ns |
|    ForArray | .NET Core 3.1 | .NET Core 3.1 | 66.192 ns | 0.6127 ns | 0.5117 ns |
| EqualsArray | .NET Core 3.1 | .NET Core 3.1 | 17.897 ns | 0.0122 ns | 0.0108 ns |
| EqualsBytes | .NET Core 3.1 | .NET Core 3.1 | 25.722 ns | 0.4584 ns | 0.4287 ns |
|   SpanEqual | .NET Core 3.1 | .NET Core 3.1 |  4.736 ns | 0.0099 ns | 0.0093 ns |

能夠看到,Span.SequenceEqual() 的速度簡直是碾壓。 對於 C# 中的二進制處理技巧就介紹到這裏,閱讀 CLR 源碼 時,咱們能夠學習到不少騷操做,讀者能夠多閱讀 CLR 源碼,對技術提高有很大的幫助。

相關文章
相關標籤/搜索