BeetleX之TCP服務應用詳解

BeetleX.net core平臺下的一個開源TCP 通信組件,它不只使用簡便還提供了出色性能的支持,能夠輕易讓你實現上百萬級別RPS吞吐的服務應用。組件所提供的基礎功能也很是完善,可讓你輕易擴展本身的服務應用,如下提組件集成的功能:linux

  • 完善的會話管理機制,能夠根據鏈接狀態和相關日誌git

  • 專門針對內存池實現的異步流讀寫,支持標準Stream的同並提供高效的性能github

  • 消息IO合併,廣播序列化合並等性能強化功能web

  • 提供簡潔的協議擴展規範,輕易實現http,websocket,mqtt等應用通信協議json

  • 支持TLS,讓你構建的通信服務更安全可靠安全

擴展的組件

如下是Beetlex擴展的一些功能組件websocket

性能

一開始說組可讓你現上百萬級別RPS吞吐的服務應用其實一點不假,BeetleX的基礎性能有這樣的支撐能力;雖然組件不能說是.net core上性能最好的,但在功能和綜合性能上絕對很是出色(詳細能夠https://tfb-status.techempower.com/ 查看測試結果,惋惜這個網站提交的.net core組件比較少,大部都是基於aspcore的通信模塊擴展).如下是JSON serialization基礎輸出的一個測試結果(Plaintext在官方的測試環境一直沒辦法跑起來....) 網絡

在測試中組件只落後於aspcore-rhtx 這是紅帽專門針對 .net core編寫的linux網絡驅動.session

Single query多線程

構建基礎TCP應用

組件在構建TCP服務的時候很是簡單,主要歸功於它提供了完善的Stream讀寫功能,而這些功能讓你徹底不用關心bytes的讀寫。基於Stream的好處就是能夠輕鬆和第三方序列化的組件進行整合。如下是簡單地構建一個Hello服務。

    class Program : ServerHandlerBase
    {
        private static IServer server;
        public static void Main(string[] args)
        {
            server = SocketFactory.CreateTcpServer<Program>();
            //server.Options.DefaultListen.Port =9090;
            //server.Options.DefaultListen.Host = "127.0.0.1";
            server.Open();
            Console.Read();
        }
        public override void SessionReceive(IServer server, SessionReceiveEventArgs e)
        {
            var pipeStream = e.Stream.ToPipeStream();
            if (pipeStream.TryReadLine(out string name))
            {
                Console.WriteLine(name);
                pipeStream.WriteLine("hello " + name);
                e.Session.Stream.Flush();
            }
            base.SessionReceive(server, e);
        }
    }

以上就是一個簡單的TCP服務,讓以代碼正常運行須要引用Beetlex最新版的組件能夠在Nuget上找到。以上服務的功能很簡單當接收數據後嘗試從流中讀取一行字符,若是讀取成功則把內容寫入到流中提交返回。經過以上代碼是否是感受寫個服務比較簡單(可是PipeStream並非線程安全的,因此不能涉及到多線程讀寫它)

協議處理規則

其實PipeStream處理數據已經很是方便了,那爲何還須要制定一個協議處理規範呢?前面已經說了PipeStream並非線程安全的,很容易帶來使用上的風險,因此引入協議處理規則來進行一個安全約束的同時能夠實現多線程消息處理。組件提供了這樣一個接口來規範消息的處理,接口以下:

    public interface IPacket : IDisposable
    {

        EventHandler<EventArgs.PacketDecodeCompletedEventArgs> Completed
        {
            get; set;
        }

        IPacket Clone();

        void Decode(ISession session, System.IO.Stream stream);

        void Encode(object data, ISession session, System.IO.Stream stream);

        byte[] Encode(object data, IServer server);

        ArraySegment<byte> Encode(object data, IServer server, byte[] buffer);
    }

若是你要處理消息對象,則須要實現以上接口(固然這個接口的實現不是必須的,只要把握好PipeStream安全上的控制就好);但實現這接口來處理消息能夠帶不少好處,能夠多消息合併IO,廣播消息合併序列化等高效的功能。不過在不瞭解組件的狀況實現這個接口的確也是有些難度的,因此組件提供了一個基礎的類FixedHeaderPacket,它是一個抽像類用於描述有個消息頭長的信息流處理。

字符消息分包

接下來經過FixedHeaderPacket來實現一個簡單的字符分包協議消息;主要在發送消息的時候添加一個大小頭用來描述消息的長度(這是在TCP中解決粘包的主要手段)。

    public class StringPacket : BeetleX.Packets.FixedHeaderPacket
    {
        public override IPacket Clone()
        {
            return new StringPacket();
        }

        protected override object OnReader(ISession session, PipeStream stream)
        {
            return stream.ReadString(CurrentSize);
        }
        protected override void OnWrite(ISession session, object data, PipeStream stream)
        {
            stream.Write((string)data);
        }
    }

經過FixedHeaderPacket制定一個分包規則是很是簡單的,主要實現讀寫兩個方法。下面便可在服務中引用這個包做爲TCP數據流的分析規則:

    class Program : ServerHandlerBase
    {
        private static IServer server;
        public static void Main(string[] args)
        {
            server = SocketFactory.CreateTcpServer<Program,StringPacket>();
            //server.Options.DefaultListen.Port =9090;
            //server.Options.DefaultListen.Host = "127.0.0.1";
            server.Open();
            Console.Read();
        }
        protected override void OnReceiveMessage(IServer server, ISession session, object message)
        {
            Console.WriteLine(message);
            server.Send($"hello {message}", session);
        }
    }

通過分析器包裝後,就不再用流來處理數據了,能夠直接進行對像的發送處理。

集成Protobuf

處理String並非友好的事情,畢竟沒有對象來得直觀和操做方便;如下是經過FixedHeaderPacket擴展Protobuf對象傳輸,如下是針對Protobuf的規則擴展:

    public class ProtobufPacket : BeetleX.Packets.FixedHeaderPacket
    {
        static ProtobufPacket()
        {
            TypeHeader.Register(typeof(ProtobufClientPacket).Assembly);
        }
        public static BeetleX.Packets.CustomTypeHeader TypeHeader { get; set; } = new BeetleX.Packets.CustomTypeHeader(BeetleX.Packets.MessageIDType.INT);

        public override IPacket Clone()
        {
            return new ProtobufPacket();
        }

        protected override object OnReader(ISession session, PipeStream stream)
        {
            Type type = TypeHeader.ReadType(stream);
            var size = CurrentSize - 4;
            return ProtoBuf.Meta.RuntimeTypeModel.Default.Deserialize(stream, null, type, size);
        }

        protected override void OnWrite(ISession session, object data, PipeStream stream)
        {
            TypeHeader.WriteType(data, stream);
            ProtoBuf.Meta.RuntimeTypeModel.Default.Serialize(stream, data);
        }
    }

使用規則分析器

    class Program : ServerHandlerBase
    {
        private static IServer server;
        public static void Main(string[] args)
        {
            server = SocketFactory.CreateTcpServer<Program, Messages.ProtobufPacket>();
            //server.Options.DefaultListen.Port =9090;
            //server.Options.DefaultListen.Host = "127.0.0.1";
            server.Open();
            Console.Read();
        }
        protected override void OnReceiveMessage(IServer server, ISession session, object message)
        {
            ((Messages.Register)message).DateTime = DateTime.Now;
            server.Send(message, session);
        }
    }

不一樣序列化的擴展

既然有了一個Protobuf做爲樣本,那針對其餘序列化的實現就比較簡單了

  • json
    public class JsonPacket : BeetleX.Packets.FixedHeaderPacket
    {
        static JsonPacket()
        {
            TypeHeader.Register(typeof(JsonClientPacket).Assembly);
        }
        public static BeetleX.Packets.CustomTypeHeader TypeHeader { get; set; } = new BeetleX.Packets.CustomTypeHeader(BeetleX.Packets.MessageIDType.INT);

        public override IPacket Clone()
        {
            return new JsonPacket();
        }

        protected override object OnReader(ISession session, PipeStream stream)
        {
            Type type = TypeHeader.ReadType(stream);
            var size = CurrentSize - 4;
            var buffer = System.Buffers.ArrayPool<byte>.Shared.Rent(size);
            stream.Read(buffer, 0, size);
            try
            {
                return SpanJson.JsonSerializer.NonGeneric.Utf8.Deserialize(new ReadOnlySpan<byte>(buffer, 0, size), type);
            }
            finally
            {
                System.Buffers.ArrayPool<byte>.Shared.Return(buffer);
            }
        }

        protected override void OnWrite(ISession session, object data, PipeStream stream)
        {
            TypeHeader.WriteType(data, stream);
            var buffer = SpanJson.JsonSerializer.NonGeneric.Utf8.SerializeToArrayPool(data);
            try
            {
                stream.Write(buffer.Array, buffer.Offset, buffer.Count);
            }
            finally
            {
                System.Buffers.ArrayPool<byte>.Shared.Return(buffer.Array);
            }
        }
    }
  • messagepack
    public class MsgpackPacket : BeetleX.Packets.FixedHeaderPacket
    {
        static MsgpackPacket()
        {
            TypeHeader.Register(typeof(MsgpackClientPacket).Assembly);
        }
        public static BeetleX.Packets.CustomTypeHeader TypeHeader { get; set; } = new BeetleX.Packets.CustomTypeHeader(BeetleX.Packets.MessageIDType.INT);

        public override IPacket Clone()
        {
            return new MsgpackPacket();
        }

        protected override object OnReader(ISession session, PipeStream stream)
        {
            Type type = TypeHeader.ReadType(stream);
            var size = CurrentSize - 4;
            return MessagePackSerializer.NonGeneric.Deserialize(type, stream, true);
        }

        protected override void OnWrite(ISession session, object data, PipeStream stream)
        {
            TypeHeader.WriteType(data, stream);
            MessagePackSerializer.NonGeneric.Serialize(data.GetType(), stream, data);
        }
    }

更多示例

https://github.com/IKende/BeetleX-Samples

相關文章
相關標籤/搜索