SuperSocket與Netty之實現protobuf協議,包括服務端和客戶端

今天準備給你們介紹一個c#服務器框架(SuperSocket)和一個c#客戶端框架(SuperSocket.ClientEngine)。這兩個框架的做者是園區裏面的江大漁。 首先感謝他的無私開源貢獻。之因此要寫這個文章是由於羣裏常常有人問這個客戶端框架要如何使用。緣由在於服務端框架的文檔比較多,客戶端的文檔比較少,因此不少c#基礎比較差的人就不懂怎麼玩起來。今天就這裏寫一個例子但願能給部分人拋磚引玉吧。html

參考資料:java

SuperSocket文檔 http://docs.supersocket.net/git

我之前在開源中國的一部分文章:http://my.oschina.net/caipeiyu/bloggithub

這篇文章選擇 protobuf 來實現,選擇protobuf是由於服務器有可能用的是java的netty,客戶端想用SuperSocket.ClientEngine,而netty我看不少人常常用protobuf。算法


1、SuperSocket服務器

新建一個項目 ProtobufServer 而後添加 SuperSocket 和 protobuf 的依賴包。bootstrap

添加protobuf依賴包 輸入的搜索詞是 Google.ProtocolBuffers
添加SuperSocket依賴包 輸入搜索詞是 SuperSocket,要添加兩個SuperSocket.Engine 和 SuperSocketc#

上面的工做完成後,咱們就應該來實現咱們的傳輸協議了。傳輸協議打算參考netty的ProtobufVarint32FrameDecoder.java數組

* BEFORE DECODE (302 bytes)       AFTER DECODE (300 bytes)
* +--------+---------------+      +---------------+
* | Length | Protobuf Data |----->| Protobuf Data |
* | 0xAC02 |  (300 bytes)  |      |  (300 bytes)  |
* +--------+---------------+      +---------------+

Protobuf Data是protobuf的序列化結果。Length(Base 128 Varints)是表示Protobuf Data的長度。protobuf自己的序列號協議能夠參考:https://developers.google.com/protocol-buffers/docs/encoding緩存

咱們先看一下SuperSocket的內置的經常使用協議實現模版看看有沒有合適咱們能夠直接拿來用的。由於Length使用的是Base 128 Varints一種處理整數的變長二進制編碼算法,因此呢內置的協議實現模板並不能直接拿來使用,因此咱們只能本身來實現接口IRequestInfo和IReceiveFilter了,參考:使用 IRequestInfo 和 IReceiveFilter 等等其餘對象來實現自定義協議服務器

這裏說明一下:爲何protobuf明明序列化成Protobuf Data 了爲何還要再加一個Length來打包,由於tcp這個流發送會參數粘包、分包,若是不加個協議來解析會讀取錯誤的數據而致使沒法反序列化 Protobuf Data (自行谷歌 tcp 粘包、分包)

ProtobufRequestInfo的實現

在實現ProtobufRequestInfo以前要先來考慮一個問題,那就是咱們的傳輸協議是長度+protobuf數據,那麼咱們根本就沒法知道獲取到的protobuf數據該如何反序列化。在官方網站提供了一種解決思路:https://developers.google.com/protocol-buffers/docs/techniques#union
就是咱們能夠弄惟一個數據包,而後這個數據包裏面必須包含一個枚舉值,而後還包含了其餘類型的數據包,每個枚舉值對應一個數據包,而後傳送過來後,能夠用分支判斷來獲取值。

那咱們先設計一個 DefeatMessage.proto包含內容:

import "BackMessage.proto";
import "CallMessage.proto";

message DefeatMessage {
    enum Type { CallMessage = 1; BackMessage = 2; }

    required Type type = 1;

    optional CallMessage callMessage = 2;
    optional BackMessage backMessage = 3;
}

而後再把CallMessage和BackMessage補全

message BackMessage {

    optional string content = 1;

}

message CallMessage {

    optional string content = 1;

}

而後在咱們的路徑packages\Google.ProtocolBuffers.2.4.1.555\tools裏面有兩個工具protoc.exe 和 protogen.exe,咱們能夠執行下面的命令來生成咱們的c#代碼

protoc --descriptor_set_out=DefeatMessage.protobin --proto_path=pack --include_imports pack\DefeatMessage.proto

protogen DefeatMessage.protobin

注意路徑要本身修改

若是有報Expected top-level statement (e.g. "message").這麼一個錯誤,那就是你cmd的編碼和proto的編碼不一致,要改爲一致。

相關文件:https://github.com/kotcmm/SuperSocket.ClientEngine.QuickStart/tree/master/ProtobufServer/Pack

生成完c#代碼後,咱們就要設計ProtobufRequestInfo了。這個比較簡單,只要實現IRequestInfo接口。咱們這裏在實現接口帶的屬性外另加一個 DefeatMessage 和 DefeatMessage.Types.Type,其中DefeatMessage是爲了存儲咱們解包完數據後反序列化出來的對象,Type是爲了方便區分咱們應該取出DefeatMessage裏面的哪一個值。

using System;
using SuperSocket.SocketBase.Protocol;

namespace ProtobufServer
{
    public class ProtobufRequestInfo : IRequestInfo
    {
            public string Key {
                get;
                private set;
            }

            public DefeatMessage.Types.Type Type
            {
                get;
                private set;
            }

            public DefeatMessage Body { get; private set; }

            public ProtobufRequestInfo (DefeatMessage.Types.Type type, DefeatMessage body)
            {
                Type = type;
                Key = type.ToString();
                Body = body;
            }
    }
}

ProtobufReceiveFilter的實現

代碼比較長,直接看在github上的代碼ProtobufReceiveFilter的實現
實現的注意點參考:使用 IRequestInfo 和 IReceiveFilter 等等其餘對象來實現自定義協議。主要是對ss裏面給咱們緩存的數據流進行協議解析。

  1. readBuffer: 接收緩衝區, 接收到的數據存放在此數組裏
  2. offset: 接收到的數據在接收緩衝區的起始位置
  3. length: 本輪接收到的數據的長度
  4. toBeCopied: 表示當你想緩存接收到的數據時,是否須要爲接收到的數據從新建立一個備份而不是直接使用接收緩衝區
  5. rest: 這是一個輸出參數, 它應該被設置爲當解析到一個爲正的請求後,接收緩衝區還剩餘多少數據未被解析

    public  ProtobufRequestInfo Filter (byte[] readBuffer, int offset, int length, bool toBeCopied, out int rest)
         {
             rest = 0;
             var readOffset = offset - m_OffsetDelta;//咱們從新計算緩存區的起始位置,這裏要說明的是若是前一次解析還有剩下沒有解析到的數據,那麼就須要把起始位置移到以前最後要解析的那個位置
    
         CodedInputStream stream = CodedInputStream.CreateInstance (readBuffer, readOffset, length);//這個類是Google.ProtocolBuffers提供的
         var varint32 = (int)stream.ReadRawVarint32 ();//這裏是計算咱們這個數據包是有多長(不包含length自己)
         if(varint32 <= 0) return null;
    
         var headLen = (int) stream.Position - readOffset;//計算協議裏面length佔用幾位
         rest = length - varint32 - headLen + m_ParsedLength;//本次解析完後緩存區還剩下多少數據沒有解析
    
         if (rest >= 0)//緩存區裏面的數據足夠本次解析
         {   
             byte[] body = stream.ReadRawBytes(varint32);
             DefeatMessage message = DefeatMessage.ParseFrom(body);
             var requestInfo = new ProtobufRequestInfo(message.Type,message);
             InternalReset();
             return requestInfo;
         }
         else//緩存區裏面的數據不夠本次解析,(名詞爲分包)
         {
             m_ParsedLength += length;
             m_OffsetDelta = m_ParsedLength;
             rest = 0;
    
             var expectedOffset = offset + length;
             var newOffset = m_OrigOffset + m_OffsetDelta;
    
             if (newOffset < expectedOffset)
             {
                 Buffer.BlockCopy(readBuffer, offset - m_ParsedLength + length, readBuffer, m_OrigOffset, m_ParsedLength);
             }
    
             return null;
         }
     }

ProtobufAppSession 的實現

using System;
using SuperSocket.SocketBase;

namespace ProtobufServer
{
    public class ProtobufAppSession : AppSession<ProtobufAppSession,ProtobufRequestInfo>
    {
        public ProtobufAppSession ()
        {
        }
    }
}

ProtobufAppServer 的實現

using System;
using SuperSocket.SocketBase;
using SuperSocket.SocketBase.Protocol;

namespace ProtobufServer
{
    public class ProtobufAppServer : AppServer<ProtobufAppSession,ProtobufRequestInfo>
    {
        public ProtobufAppServer ()
            :base(new DefaultReceiveFilterFactory< ProtobufReceiveFilter, ProtobufRequestInfo >())
        {
        }
    }
}

服務器的實例啓動實現

參考:http://docs.supersocket.net/v1-6/zh-CN/A-Telnet-Example

代碼:https://github.com/kotcmm/SuperSocket.ClientEngine.QuickStart/blob/master/ProtobufServer/Program.cs

主要是接收到數據的一個方法實現,固然ss裏面還帶了命令模式的實現,不過這個不在本文章裏面說。這裏的實現了接收到不一樣的數據給打印出來,而後接收到CallMessage數據的話就給客戶端回發一條信息

private static void AppServerOnNewRequestReceived(ProtobufAppSession session, ProtobufRequestInfo requestInfo)
    {
        switch (requestInfo.Type)
        {
            case DefeatMessage.Types.Type.BackMessage:
                Console.WriteLine("BackMessage:{0}", requestInfo.Body.BackMessage.Content);
                break;
            case DefeatMessage.Types.Type.CallMessage:
                Console.WriteLine("CallMessage:{0}", requestInfo.Body.CallMessage.Content);

                var backMessage = BackMessage.CreateBuilder()
                .SetContent("Hello I am form C# server by SuperSocket").Build();
                var message = DefeatMessage.CreateBuilder()
                    .SetType(DefeatMessage.Types.Type.BackMessage)
                    .SetBackMessage(backMessage).Build();

                using (var stream = new MemoryStream())
                {

                    CodedOutputStream os = CodedOutputStream.CreateInstance(stream);

                    os.WriteMessageNoTag(message);

                    os.Flush();

                    byte[] data = stream.ToArray();
                    session.Send(new ArraySegment<byte>(data));

                }


                break;

        }
    }

服務器的代碼就到這裏,能夠編譯運行起來看看有無錯誤。

2、SuperSocket.ClientEngine客戶端

與服務器實現相同,先經過NuGet添加 SuperSocket.ClientEngine 和 protobuf 的依賴包。
有三個實現:

ProtobufPackageInfo的實現

把前面實現服務器時候生成的通信數據包拷貝過來,而後和實現服務器的ProtobufRequestInfo同樣,只不過這裏只是實現接口IPackageInfo而已

using SuperSocket.SocketBase.Protocol;using SuperSocket.ProtoBase;

namespace ProtobufClient
{
    public class ProtobufPackageInfo : IPackageInfo
    {
        public ProtobufPackageInfo(DefeatMessage.Types.Type type, DefeatMessage body)
        {
            Type = type;
            Key = type.ToString();
            Body = body;
        }

        public string Key { get; private set; }

        public DefeatMessage Body { get; private set; }
        public DefeatMessage.Types.Type Type { get; private set; }
    }
}

ProtobufReceiveFilter的實現

代碼:https://github.com/kotcmm/SuperSocket.ClientEngine.QuickStart/blob/master/ProtobufClient/ProtobufReceiveFilter.cs
這裏的數據解析的實現與服務器的實現有點不一樣,不過下一個版本可能會統一,若是統一塊兒來的話,那麼之後數據解析就能夠作成和插件同樣,同時能夠給服務器和客戶端使用。

  1. data:也是數據緩存區
  2. rest:緩存區還剩下多少

這個實現與服務器的不一樣就在BufferList自己就已經有處理分包,就不須要咱們本身再作處理。

public ProtobufPackageInfo Filter(BufferList data, out int rest)
    {
        rest = 0;
        var buffStream = new BufferStream();
        buffStream.Initialize(data);

        var stream = CodedInputStream.CreateInstance(buffStream);
        var varint32 = (int) stream.ReadRawVarint32();
        if (varint32 <= 0) return default(ProtobufPackageInfo);

        var total = data.Total;
        var packageLen = varint32 + (int) stream.Position;

        if (total >= packageLen)
        {
            rest = total - packageLen;
            var body = stream.ReadRawBytes(varint32);
            var message = DefeatMessage.ParseFrom(body);
            var requestInfo = new ProtobufPackageInfo(message.Type, message);
            return requestInfo;
        }

        return default(ProtobufPackageInfo);
    }

運行主程序實現

具體實現看:https://github.com/kotcmm/SuperSocket.ClientEngine.QuickStart/blob/master/ProtobufClient/Program.cs

這個真的沒有什麼好說了。運行效果以下:

這裏的打印信息是相對比較簡單,你們能夠本身下載源碼來加些打印數據,讓看起來更好看點。

3、java的Netty實現

既然前面提到了Netty,那就順便實現一個簡單的服務器來通信看看。
使用的是Netty 4.x 參考資料:http://netty.io/wiki/user-guide-for-4.x.html

Netty的實如今網絡上有超級多的例子,這裏就簡單的介紹一下就能夠,首先先生成java的通信包代碼

protoc --proto_path=pack --java_out=./ pack/DefeatMessage.proto

protoc --proto_path=pack --java_out=./ pack/BackMessage.proto

protoc --proto_path=pack --java_out=./ pack/CallMessage.proto

這幾個命令要本身靈活改變們不要死死的硬搬。

生成的代碼:https://github.com/kotcmm/SuperSocket.ClientEngine.QuickStart/tree/master/java/NettyProtobufServer/src/main/java

而後建立一個maven項目,添加Netty 和 protobuf 依賴:https://github.com/kotcmm/SuperSocket.ClientEngine.QuickStart/blob/master/java/NettyProtobufServer/pom.xml

<dependencies>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>2.6.1</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-microbench</artifactId>
            <version>4.1.0.Final</version>
        </dependency>
    </dependencies>

ProtobufServerHandler的實現

由於Netty裏面已經有幫咱們實現了protobuf的解析,因此咱們不須要本身實現。咱們只要繼承ChannelInboundHandlerAdapter而後經過channelRead就能夠拿到解析好的對象,而後轉換成咱們本身的類型,就能夠直接使用。這裏一樣是實現不一樣類型的消息打印和CallMessage消息就回覆信息給客戶端。

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

/**
* Created by caipeiyu on 16/6/4.
*/
public class ProtobufServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)

        try {
            DefeatMessageOuterClass.DefeatMessage in = (DefeatMessageOuterClass.DefeatMessage) msg;
        if(in.getType() == DefeatMessageOuterClass.DefeatMessage.Type.BackMessage){
            System.out.print("BackMessage:");
            System.out.print(in.getBackMessage());
            System.out.flush();
        }else if(in.getType() == DefeatMessageOuterClass.DefeatMessage.Type.CallMessage){
            System.out.print("CallMessage:");
            System.out.print(in.getCallMessage());
            System.out.flush();

            DefeatMessageOuterClass.DefeatMessage out =
                    DefeatMessageOuterClass.DefeatMessage.newBuilder()
                            .setType(DefeatMessageOuterClass.DefeatMessage.Type.BackMessage)
                            .setBackMessage(BackMessageOuterClass.BackMessage
                                    .newBuilder().setContent("Hello I from server by Java Netty").build())
                            .build();

            ctx.write(out);
            ctx.flush();
        }

        } finally {
            ReferenceCountUtil.release(msg); // (2)
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

ProtobufServer的實現

主要是添加已經有的編碼解碼和消息接收的類就能夠了。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

/**
* Created by caipeiyu on 16/6/4.
*/
public class ProtobufServer {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // (3)
                    .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            //解碼
                            p.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
                            //構造函數傳遞要解碼成的類型
                            p.addLast("protobufDecoder", new ProtobufDecoder(DefeatMessageOuterClass.DefeatMessage.getDefaultInstance()));
                            //編碼
                            p.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
                            p.addLast("protobufEncoder", new ProtobufEncoder());
                            //業務邏輯處理
                            p.addLast("handler", new ProtobufServerHandler());

                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)          // (5)
                    .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(2012).sync(); // (7)

            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }

    }
}

整個代碼編寫完成後,直接運行並打開咱們前面的客戶端進通信發數據。運行結果以下:

固然這三個例子直接簡單的說明如何使用框架來解決咱們的問題,實際開發過程當中確定不僅是咱們例子的這麼點東西,須要考慮的東西還不少,這裏只是寫一些能夠運行起來的例子做爲拋磚引玉。但願能給不懂的同窗有點啓發做用。謝謝您百忙中抽出時間來觀看個人分享。


因爲本人水平有限,知識有限,文章不免會有錯誤,歡迎你們指正。若是有什麼問題也歡迎你們回覆交流。要是你以爲本文還能夠,那麼點擊一下推薦。

相關文章
相關標籤/搜索