BeetleX
是.net core
平臺下的一個開源TCP
通信組件,它不只使用簡便還提供了出色性能的支持,能夠輕易讓你實現上百萬級別RPS吞吐的服務應用。組件所提供的基礎功能也很是完善,可讓你輕易擴展本身的服務應用,如下提組件集成的功能:linux
完善的會話管理機制,能夠根據鏈接狀態和相關日誌git
專門針對內存池實現的異步流讀寫,支持標準Stream的同並提供高效的性能github
消息IO合併,廣播序列化合並等性能強化功能web
提供簡潔的協議擴展規範,輕易實現http,websocket,mqtt等應用通信協議json
支持TLS,讓你構建的通信服務更安全可靠安全
如下是Beetlex
擴展的一些功能組件websocket
一開始說組可讓你現上百萬級別RPS吞吐的服務應用其實一點不假,BeetleX
的基礎性能有這樣的支撐能力;雖然組件不能說是.net core
上性能最好的,但在功能和綜合性能上絕對很是出色(詳細能夠https://tfb-status.techempower.com/ 查看測試結果,惋惜這個網站提交的.net core
組件比較少,大部都是基於aspcore的通信模塊擴展).如下是JSON serialization
基礎輸出的一個測試結果(Plaintext
在官方的測試環境一直沒辦法跑起來....) 網絡
在測試中組件只落後於aspcore-rhtx
這是紅帽專門針對 .net core編寫的linux網絡驅動.session
Single query多線程
組件在構建TCP服務的時候很是簡單,主要歸功於它提供了完善的Stream
讀寫功能,而這些功能讓你徹底不用關心bytes的讀寫。基於Stream
的好處就是能夠輕鬆和第三方序列化的組件進行整合。如下是簡單地構建一個Hello
服務。
class Program : ServerHandlerBase { private static IServer server; public static void Main(string[] args) { server = SocketFactory.CreateTcpServer<Program>(); //server.Options.DefaultListen.Port =9090; //server.Options.DefaultListen.Host = "127.0.0.1"; server.Open(); Console.Read(); } public override void SessionReceive(IServer server, SessionReceiveEventArgs e) { var pipeStream = e.Stream.ToPipeStream(); if (pipeStream.TryReadLine(out string name)) { Console.WriteLine(name); pipeStream.WriteLine("hello " + name); e.Session.Stream.Flush(); } base.SessionReceive(server, e); } }
以上就是一個簡單的TCP
服務,讓以代碼正常運行須要引用Beetlex
最新版的組件能夠在Nuget上找到。以上服務的功能很簡單當接收數據後嘗試從流中讀取一行字符,若是讀取成功則把內容寫入到流中提交返回。經過以上代碼是否是感受寫個服務比較簡單(可是PipeStream
並非線程安全的,因此不能涉及到多線程讀寫它)
其實PipeStream
處理數據已經很是方便了,那爲何還須要制定一個協議處理規範呢?前面已經說了PipeStream
並非線程安全的,很容易帶來使用上的風險,因此引入協議處理規則來進行一個安全約束的同時能夠實現多線程消息處理。組件提供了這樣一個接口來規範消息的處理,接口以下:
public interface IPacket : IDisposable { EventHandler<EventArgs.PacketDecodeCompletedEventArgs> Completed { get; set; } IPacket Clone(); void Decode(ISession session, System.IO.Stream stream); void Encode(object data, ISession session, System.IO.Stream stream); byte[] Encode(object data, IServer server); ArraySegment<byte> Encode(object data, IServer server, byte[] buffer); }
若是你要處理消息對象,則須要實現以上接口(固然這個接口的實現不是必須的,只要把握好PipeStream
安全上的控制就好);但實現這接口來處理消息能夠帶不少好處,能夠多消息合併IO,廣播消息合併序列化等高效的功能。不過在不瞭解組件的狀況實現這個接口的確也是有些難度的,因此組件提供了一個基礎的類FixedHeaderPacket
,它是一個抽像類用於描述有個消息頭長的信息流處理。
接下來經過FixedHeaderPacket
來實現一個簡單的字符分包協議消息;主要在發送消息的時候添加一個大小頭用來描述消息的長度(這是在TCP中解決粘包的主要手段)。
public class StringPacket : BeetleX.Packets.FixedHeaderPacket { public override IPacket Clone() { return new StringPacket(); } protected override object OnReader(ISession session, PipeStream stream) { return stream.ReadString(CurrentSize); } protected override void OnWrite(ISession session, object data, PipeStream stream) { stream.Write((string)data); } }
經過FixedHeaderPacket
制定一個分包規則是很是簡單的,主要實現讀寫兩個方法。下面便可在服務中引用這個包做爲TCP
數據流的分析規則:
class Program : ServerHandlerBase { private static IServer server; public static void Main(string[] args) { server = SocketFactory.CreateTcpServer<Program,StringPacket>(); //server.Options.DefaultListen.Port =9090; //server.Options.DefaultListen.Host = "127.0.0.1"; server.Open(); Console.Read(); } protected override void OnReceiveMessage(IServer server, ISession session, object message) { Console.WriteLine(message); server.Send($"hello {message}", session); } }
通過分析器包裝後,就不再用流來處理數據了,能夠直接進行對像的發送處理。
處理String
並非友好的事情,畢竟沒有對象來得直觀和操做方便;如下是經過FixedHeaderPacket
擴展Protobuf
對象傳輸,如下是針對Protobuf
的規則擴展:
public class ProtobufPacket : BeetleX.Packets.FixedHeaderPacket { static ProtobufPacket() { TypeHeader.Register(typeof(ProtobufClientPacket).Assembly); } public static BeetleX.Packets.CustomTypeHeader TypeHeader { get; set; } = new BeetleX.Packets.CustomTypeHeader(BeetleX.Packets.MessageIDType.INT); public override IPacket Clone() { return new ProtobufPacket(); } protected override object OnReader(ISession session, PipeStream stream) { Type type = TypeHeader.ReadType(stream); var size = CurrentSize - 4; return ProtoBuf.Meta.RuntimeTypeModel.Default.Deserialize(stream, null, type, size); } protected override void OnWrite(ISession session, object data, PipeStream stream) { TypeHeader.WriteType(data, stream); ProtoBuf.Meta.RuntimeTypeModel.Default.Serialize(stream, data); } }
使用規則分析器
class Program : ServerHandlerBase { private static IServer server; public static void Main(string[] args) { server = SocketFactory.CreateTcpServer<Program, Messages.ProtobufPacket>(); //server.Options.DefaultListen.Port =9090; //server.Options.DefaultListen.Host = "127.0.0.1"; server.Open(); Console.Read(); } protected override void OnReceiveMessage(IServer server, ISession session, object message) { ((Messages.Register)message).DateTime = DateTime.Now; server.Send(message, session); } }
既然有了一個Protobuf
做爲樣本,那針對其餘序列化的實現就比較簡單了
public class JsonPacket : BeetleX.Packets.FixedHeaderPacket { static JsonPacket() { TypeHeader.Register(typeof(JsonClientPacket).Assembly); } public static BeetleX.Packets.CustomTypeHeader TypeHeader { get; set; } = new BeetleX.Packets.CustomTypeHeader(BeetleX.Packets.MessageIDType.INT); public override IPacket Clone() { return new JsonPacket(); } protected override object OnReader(ISession session, PipeStream stream) { Type type = TypeHeader.ReadType(stream); var size = CurrentSize - 4; var buffer = System.Buffers.ArrayPool<byte>.Shared.Rent(size); stream.Read(buffer, 0, size); try { return SpanJson.JsonSerializer.NonGeneric.Utf8.Deserialize(new ReadOnlySpan<byte>(buffer, 0, size), type); } finally { System.Buffers.ArrayPool<byte>.Shared.Return(buffer); } } protected override void OnWrite(ISession session, object data, PipeStream stream) { TypeHeader.WriteType(data, stream); var buffer = SpanJson.JsonSerializer.NonGeneric.Utf8.SerializeToArrayPool(data); try { stream.Write(buffer.Array, buffer.Offset, buffer.Count); } finally { System.Buffers.ArrayPool<byte>.Shared.Return(buffer.Array); } } }
public class MsgpackPacket : BeetleX.Packets.FixedHeaderPacket { static MsgpackPacket() { TypeHeader.Register(typeof(MsgpackClientPacket).Assembly); } public static BeetleX.Packets.CustomTypeHeader TypeHeader { get; set; } = new BeetleX.Packets.CustomTypeHeader(BeetleX.Packets.MessageIDType.INT); public override IPacket Clone() { return new MsgpackPacket(); } protected override object OnReader(ISession session, PipeStream stream) { Type type = TypeHeader.ReadType(stream); var size = CurrentSize - 4; return MessagePackSerializer.NonGeneric.Deserialize(type, stream, true); } protected override void OnWrite(ISession session, object data, PipeStream stream) { TypeHeader.WriteType(data, stream); MessagePackSerializer.NonGeneric.Serialize(data.GetType(), stream, data); } }