SuperSocket與Netty之實現protobuf協議,包括服務端和客戶端

  • 參考資料說明

SuperSocket文檔 http://docs.supersocket.net/html

Protobuf語言參考 https://developers.google.com/protocol-buffers/docs/protosession

單消息多類型解決方案 https://developers.google.com/protocol-buffers/docs/techniques#app

主要資料(很是感謝) http://www.cnblogs.com/caipeiyu/p/5559112.htmlsocket

使用的ProtocolBuffers http://code.google.com/p/protobuf-csharp-porttcp

關於MsgPack的協議 https://my.oschina.net/caipeiyu/blog/512437ide

 

Protoui

message CallMessage
{
    optional string content = 1;
}

message BackMessage
{
    optional string content = 1;
}

message PersonMessage
{
    required int32 id = 1;
    required string name = 2;
    enum Sex
    {
        Male = 1;
        Female = 2;
    }
    required Sex sex = 3 [default = Male];
    required uint32 age = 4;
    required string phone = 5;
}

import "BackMessage.proto";
import "CallMessage.proto";
import "PersonMessage.proto";

message DefeatMessage
{
    enum Type
    {
        CallMessage = 1;
        BackMessage = 2;
        PersonMessage = 3;
    }
    required Type type = 1;
    optional CallMessage callMessage = 2;
    optional BackMessage backMessage = 3;
    optional PersonMessage personMessage = 4;
}
View Code

 

生成C#代碼google

protoc --descriptor_set_out=DefeatMessage.protobin --proto_path=./ --include_imports DefeatMessage.proto
 protogen DefeatMessage.protobin
View Code

 

Serverspa

namespace SuperSocketProtoServer.Protocol
{
    public class ProtobufRequestInfo: IRequestInfo
    {
        public string Key { get; }
        public DefeatMessage.Types.Type Type { get; }

        public DefeatMessage Body { get; }

        public ProtobufRequestInfo(DefeatMessage.Types.Type type, DefeatMessage body)
        {
            Type = type;
            Key = type.ToString();
            Body = body;
        }
    }
}

namespace SuperSocketProtoServer.Protocol
{
    public class ProtobufReceiveFilter: IReceiveFilter<ProtobufRequestInfo>, IOffsetAdapter, IReceiveFilterInitializer
    {
        private int _origOffset;
        private int _offsetDelta;
        private int _leftBufferSize;

        public void Initialize(IAppServer appServer, IAppSession session)
        {
            _origOffset = session.SocketSession.OrigReceiveOffset;
        }

        public int OffsetDelta
        {
            get { return _offsetDelta; }
        }

        /// <summary>
        /// 數據包解析
        /// </summary>
        /// <param name="readBuffer">接收緩衝區</param>
        /// <param name="offset">接收到的數據在緩衝區的起始位置</param>
        /// <param name="length">本輪接收到的數據長度</param>
        /// <param name="toBeCopied">爲接收到的數據從新建立一個備份而不是直接使用接收緩衝區</param>
        /// <param name="rest">接收緩衝區未被解析的數據</param>
        /// <returns></returns>
        public ProtobufRequestInfo Filter(byte[] readBuffer, int offset, int length, bool toBeCopied, out int rest)
        {
            rest = 0;
            // 從新計算緩衝區的起始位置,前一次解析還有剩下沒有解析的數據就須要把起始位置移到以前最後要解析的那個位置
            var readOffset = offset - _offsetDelta;
            // 由google.protocolbuffers提供
            CodedInputStream cis = CodedInputStream.CreateInstance(readBuffer, readOffset, length);
            // 計算數據包的長度,不包含Length自己
            int varint32 = (int) cis.ReadRawVarint32();
            if (varint32 <= 0) return null;

            // 計算協議裏面Length佔用字節
            int headLen = (int) (cis.Position - readOffset);
            // 本輪解析完緩衝後剩餘沒有解析的數據大小
            rest = length - varint32 - headLen + _leftBufferSize;

            // 緩衝裏面的數據足夠本輪解析
            if (rest >= 0)
            {
                byte[] body = cis.ReadRawBytes(varint32);
                DefeatMessage message = DefeatMessage.ParseFrom(body);
                ProtobufRequestInfo requestInfo = new ProtobufRequestInfo(message.Type, message);
                _offsetDelta = 0;
                _leftBufferSize = 0;

                return requestInfo;
            }
            // 緩衝裏面的數據不夠本次解析[tcp分包傳送]
            else
            {
                _leftBufferSize += length;
                _offsetDelta = _leftBufferSize;
                rest = 0;

                var expectedOffset = offset + length;
                var newOffset = _origOffset + _offsetDelta;
                if (newOffset < expectedOffset) Buffer.BlockCopy(readBuffer, offset - _leftBufferSize + length, readBuffer, _origOffset, _leftBufferSize);
            }

            return null;
        }

        public void Reset()
        {
            _offsetDelta = 0;
            _leftBufferSize = 0;
        }

        public int LeftBufferSize
        {
            get { return _leftBufferSize; }
        }

        public IReceiveFilter<ProtobufRequestInfo> NextReceiveFilter { get; }

        public FilterState State { get; }
    }
}

namespace SuperSocketProtoServer.Protocol
{
    public class ProtobufAppSession:AppSession<ProtobufAppSession, ProtobufRequestInfo>
    {
        public ProtobufAppSession() { }
    }
}

namespace SuperSocketProtoServer.Protocol
{
    public class ProtobufAppServer: AppServer<ProtobufAppSession, ProtobufRequestInfo>
    {
        public ProtobufAppServer()
            : base(new DefaultReceiveFilterFactory<ProtobufReceiveFilter, ProtobufRequestInfo>())
        {
            
        }
    }
}
View Code

 

Server Command.net

 

namespace SuperSocketProtoServer.Protocol.Command
{
    public class BackMessage : CommandBase<ProtobufAppSession, ProtobufRequestInfo>
    {
        public override void ExecuteCommand(ProtobufAppSession session, ProtobufRequestInfo requestInfo)
        {
            Console.WriteLine("BackMessage:{0}", requestInfo.Body.BackMessage.Content);
        }
    }
}

namespace SuperSocketProtoServer.Protocol.Command
{
    public class CallMessage : CommandBase<ProtobufAppSession, ProtobufRequestInfo>
    {
        public override void ExecuteCommand(ProtobufAppSession session, ProtobufRequestInfo requestInfo)
        {            
            Console.WriteLine("CallMessage:{0}", requestInfo.Body.CallMessage.Content);
            var backMessage = global::BackMessage.CreateBuilder().SetContent("Hello I am from C# server by SuperSocket")
                .Build();
            var message = DefeatMessage.CreateBuilder().SetType(DefeatMessage.Types.Type.BackMessage)
                .SetBackMessage(backMessage).Build();
            using (var stream = new MemoryStream())
            {
                CodedOutputStream cos = CodedOutputStream.CreateInstance(stream);
                cos.WriteMessageNoTag(message);
                cos.Flush();
                byte[] data = stream.ToArray();
                session.Send(new ArraySegment<byte>(data));
            }
        }
    }
}

namespace SuperSocketProtoServer.Protocol.Command
{
    public class PersonMessage:CommandBase<ProtobufAppSession, ProtobufRequestInfo>
    {
        public override void ExecuteCommand(ProtobufAppSession session, ProtobufRequestInfo requestInfo)
        {
            Console.WriteLine("Recv Person Message From Client.");
            Console.WriteLine("person's id = {0}, person's name = {1}, person's sex = {2}, person's phone = {3}", 
                requestInfo.Body.PersonMessage.Id, 
                requestInfo.Body.PersonMessage.Name, 
                requestInfo.Body.PersonMessage.Sex, 
                requestInfo.Body.PersonMessage.Phone);
        }
    }
}
View Code

 

Client

namespace SuperSocketProtoClient.Protocol
{
    public class ProtobufPackageInfo: IPackageInfo
    {
        public string Key { get; }
        public DefeatMessage.Types.Type Type { get; }
        public DefeatMessage Body { get; }

        public ProtobufPackageInfo(DefeatMessage.Types.Type type, DefeatMessage body)
        {
            Type = type;
            Key = type.ToString();
            Body = body;
        }
    }
}

namespace SuperSocketProtoClient.Protocol
{
    public class ProtobufReceiveFilter: IReceiveFilter<ProtobufPackageInfo>
    {
        /// <summary>
        /// 數據解析
        /// BufferList已經實現了分包處理
        /// </summary>
        /// <param name="data">數據緩衝區</param>
        /// <param name="rest">緩衝區剩餘數據</param>
        public ProtobufPackageInfo Filter(BufferList data, out int rest)
        {
            rest = 0;
            var buffStream = new BufferStream();
            buffStream.Initialize(data);

            var stream = CodedInputStream.CreateInstance(buffStream);
            var varint32 = (int)stream.ReadRawVarint32();
            if (varint32 <= 0) return default(ProtobufPackageInfo);

            var total = data.Total;
            var packageLen = varint32 + (int)stream.Position;

            if (total >= packageLen)
            {
                rest = total - packageLen;
                var body = stream.ReadRawBytes(varint32);
                var message = DefeatMessage.ParseFrom(body);
                var requestInfo = new ProtobufPackageInfo(message.Type, message);
                return requestInfo;
            }

            return default(ProtobufPackageInfo);
        }

        public void Reset()
        {
            NextReceiveFilter = null;
            State = FilterState.Normal;
        }

        public IReceiveFilter<ProtobufPackageInfo> NextReceiveFilter { get; protected set; }
        public FilterState State { get; protected set; }
    }
}
View Code

 

Server Entrance

namespace SuperSocketProtoServer
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Press any key to start the server!");

            Console.ReadKey();
            Console.WriteLine();

            var appServer = new ProtobufAppServer();
            //appServer.NewRequestReceived += AppServerOnNewRequestReceived;

            try
            {
                //Setup the appServer
                if (!appServer.Setup(2017)) //Setup with listening port
                {
                    Console.WriteLine("Failed to setup!");
                    Console.ReadKey();
                    return;
                }
            }catch(Exception e) { Console.WriteLine(e);}

            Console.WriteLine();

            //Try to start the appServer
            if (!appServer.Start())
            {
                Console.WriteLine("Failed to start!");
                Console.ReadKey();
                return;
            }

            Console.WriteLine("The server started successfully, press key 'q' to stop it!");

            while (Console.ReadKey().KeyChar != 'q')
            {
                Console.WriteLine();
                continue;
            }

            //Stop the appServer
            appServer.Stop();

            Console.WriteLine("The server was stopped!");
            Console.ReadKey();
        }

        private static void AppServerOnNewRequestReceived(ProtobufAppSession session, ProtobufRequestInfo requestinfo)
        {
            switch (requestinfo.Type)
            {
                case DefeatMessage.Types.Type.BackMessage:
                    Console.WriteLine("BackMessage:{0}", requestinfo.Body.BackMessage.Content);
                    break;
                case DefeatMessage.Types.Type.CallMessage:
                    Console.WriteLine("CallMessage:{0}", requestinfo.Body.CallMessage.Content);
                    var backMessage = BackMessage.CreateBuilder().SetContent("Hello I am from C# server by SuperSocket")
                        .Build();
                    var message = DefeatMessage.CreateBuilder().SetType(DefeatMessage.Types.Type.BackMessage)
                        .SetBackMessage(backMessage).Build();
                    using (var stream = new MemoryStream())
                    {
                        CodedOutputStream cos = CodedOutputStream.CreateInstance(stream);
                        cos.WriteMessageNoTag(message);
                        cos.Flush();
                        byte[] data = stream.ToArray();
                        session.Send(new ArraySegment<byte>(data));
                    }
                    break;
            }
        }
    }
}
View Code

 

Client Entrance

namespace SuperSocketProtoClient
{
    class Program
    {
        static void Main(string[] args)
        {
            EasyClient client = new EasyClient();
            client.Initialize(new ProtobufReceiveFilter(), packageInfo =>
            {
                switch (packageInfo.Type)
                {
                    case DefeatMessage.Types.Type.BackMessage:
                        Console.WriteLine("BackMessage:{0}", packageInfo.Body.BackMessage.Content);
                        break;
                    case DefeatMessage.Types.Type.CallMessage:
                        Console.WriteLine("CallMessage:{0}", packageInfo.Body.CallMessage.Content);
                        break;

                }
            });
            var flag = client.ConnectAsync(new DnsEndPoint("127.0.0.1", 2017));
            if (flag.Result)
            {
                var callMessage = CallMessage.CreateBuilder()
                    .SetContent("Hello I am form C# client by SuperSocket ClientEngine").Build();
                var message = DefeatMessage.CreateBuilder()
                    .SetType(DefeatMessage.Types.Type.CallMessage)
                    .SetCallMessage(callMessage).Build();

                using (var stream = new MemoryStream())
                {

                    CodedOutputStream os = CodedOutputStream.CreateInstance(stream);

                    os.WriteMessageNoTag(message);

                    os.Flush();

                    byte[] data = stream.ToArray();
                    client.Send(new ArraySegment<byte>(data));

                }

                Thread.Sleep(2000);

                // 發送PersonMessage
                var personMessage = PersonMessage.CreateBuilder()
                    .SetId(123).SetAge(33).SetSex(PersonMessage.Types.Sex.Male).SetName("zstudio").SetPhone("110").Build();
                message = DefeatMessage.CreateBuilder().SetType(DefeatMessage.Types.Type.PersonMessage)
                    .SetPersonMessage(personMessage).Build();
                using (var stream = new MemoryStream())
                {
                    CodedOutputStream cos = CodedOutputStream.CreateInstance(stream);
                    cos.WriteMessageNoTag(message);
                    cos.Flush();
                    byte[] data = stream.ToArray();
                    client.Send(new ArraySegment<byte>(data));
                }

            }
            Console.ReadKey();
        }
    }
}
View Code
  • 執行結果

  • https://www.cnblogs.com/linxmouse/p/7905575.html
  • 工程、資源、資料打包:http://pan.baidu.com/s/1qXB9aEg
  • 更多項目相關細節和詳情參考博客:http://www.cnblogs.com/caipeiyu/p/5559112.html, 在此也表示對博主由衷的感謝!!!
相關文章
相關標籤/搜索