netty中級篇(2) netty入門篇(1) netty初探(2)

 

上一篇 netty入門篇(1)

1、編碼解碼技術

如何評價一個編解碼技術:html

  • 是否支持跨語言,或者說支持的語言是否豐富
  • 編碼碼流大小,影響傳輸速度
  • 編碼和解碼的性能,即時間
  • 類庫是否精緻,API是否方便
  • 使用難度

1. Java序列化缺點

Java也提供了序列化技術,在工業化工程中有如下缺點:java

  • 沒法跨語言
  • 序列化後的碼流太大
  • 序列化的性能太差

下面咱們來測試如下jdk序列化的問題bootstrap

建立一個測試類UserInfo:api

 1 import java.io.Serializable;
 2 import java.nio.ByteBuffer;
 3 
 4 /**
 5  * @author Administrator
 6  * @version 1.0
 7  * @date 2014年2月23日
 8  */
 9 public class UserInfo implements Serializable {
10 
11     /**
12      * 默認的序列號
13      */
14     private static final long serialVersionUID = 1L;
15 
16     private String userName;
17 
18     private int userID;
19 
20     public UserInfo buildUserName(String userName) {
21         this.userName = userName;
22         return this;
23     }
24 
25     public UserInfo buildUserID(int userID) {
26         this.userID = userID;
27         return this;
28     }
29 
30     /**
31      * @return the userName
32      */
33     public final String getUserName() {
34         return userName;
35     }
36 
37     /**
38      * @param userName the userName to set
39      */
40     public final void setUserName(String userName) {
41         this.userName = userName;
42     }
43 
44     /**
45      * @return the userID
46      */
47     public final int getUserID() {
48         return userID;
49     }
50 
51     /**
52      * @param userID the userID to set
53      */
54     public final void setUserID(int userID) {
55         this.userID = userID;
56     }
57 
58     /**
59      * 將當前對象轉換一個byte[]數組
60      * @return
61      */
62     public byte[] codeC() {
63         ByteBuffer buffer = ByteBuffer.allocate(1024);
64         //寫入userName長度和內容
65         byte[] value = this.userName.getBytes();
66         buffer.putInt(value.length);
67         buffer.put(value);
68         //直接寫入Id
69         buffer.putInt(this.userID);
70         buffer.flip();
71         value = null;
72         byte[] result = new byte[buffer.remaining()];
73         buffer.get(result);
74         return result;
75     }
76 
77     public byte[] codeC(ByteBuffer buffer) {
78         buffer.clear();
79         byte[] value = this.userName.getBytes();
80         buffer.putInt(value.length);
81         buffer.put(value);
82         buffer.putInt(this.userID);
83         buffer.flip();
84         value = null;
85         byte[] result = new byte[buffer.remaining()];
86         buffer.get(result);
87         return result;
88     }
89 }

其中的codeC是最樸素的編碼方法,咱們來和它比較如下數組

比較大小:

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;

/**
 * @author Administrator
 * @version 1.0
 * @date 2014年2月23日
 */
public class TestUserInfo {

    /**
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        UserInfo info = new UserInfo();
        info.buildUserID(100).buildUserName("Welcome to Netty");
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream os = new ObjectOutputStream(bos);
        os.writeObject(info);
        os.flush();
        os.close();
        byte[] b = bos.toByteArray();
        System.out.println("The jdk serializable length is : " + b.length);
        bos.close();
        System.out.println("-------------------------------------");
        System.out.println("The byte array serializable length is : "
                + info.codeC().length);

    }

}

結果有點不能接受,這麼一點就大了6倍緩存

"C:\Program Files (x86)\Java\jdk1.8.0_102\bin\java" -Didea.launcher.port=7537 "-Didea.launcher.bin.path=C:\dev\JetBrains\IntelliJ IDEA 2016.2.1\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\charsets.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\deploy.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\access-bridge-32.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\cldrdata.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\dnsns.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\jaccess.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\jfxrt.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\localedata.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\nashorn.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\sunec.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\sunjce_provider.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\sunmscapi.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\sunpkcs11.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\ext\zipfs.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\javaws.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\jce.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\jfr.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\jfxswt.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\jsse.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\management-agent.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\plugin.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\resources.jar;C:\Program Files (x86)\Java\jdk1.8.0_102\jre\lib\rt.jar;G:\projects-helloworld\netty\target\classes;G:\repo\maven\io\netty\netty-all\4.1.5.Final\netty-all-4.1.5.Final.jar;C:\dev\JetBrains\IntelliJ IDEA 2016.2.1\lib\idea_rt.jar" com.intellij.rt.execution.application.AppMain demo.codec.serializable.TestUserInfo
The jdk serializable length is : 117
-------------------------------------
The byte array serializable length is : 24

比較下時間

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;

/**
 * @author Administrator
 * @version 1.0
 * @date 2014年2月23日
 */
public class PerformTestUserInfo {

    /**
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        UserInfo info = new UserInfo();
        info.buildUserID(100).buildUserName("Welcome to Netty");
        int loop = 1000000;
        ByteArrayOutputStream bos = null;
        ObjectOutputStream os = null;
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < loop; i++) {
            bos = new ByteArrayOutputStream();
            os = new ObjectOutputStream(bos);
            os.writeObject(info);
            os.flush();
            os.close();
            byte[] b = bos.toByteArray();
            bos.close();
        }
        long endTime = System.currentTimeMillis();
        System.out.println("The jdk serializable cost time is  : "
                + (endTime - startTime) + " ms");

        System.out.println("-------------------------------------");

        ByteBuffer buffer = ByteBuffer.allocate(1024);
        startTime = System.currentTimeMillis();
        for (int i = 0; i < loop; i++) {
            byte[] b = info.codeC(buffer);
        }
        endTime = System.currentTimeMillis();
        System.out.println("The byte array serializable cost time is : "
                + (endTime - startTime) + " ms");

    }

}

運行結果,jdk的慢了10倍都不止服務器

The jdk serializable cost time is  : 1928 ms
-------------------------------------
The byte array serializable cost time is : 164 ms

2. 主流的編解碼框架簡介

  • Google的Protobuf
  • Facebook的Thrift
  • JBoss Marshalling

這裏主要介紹這3種,還有其餘著名好比Hryo等等...數據結構

Google ProtoBuf

google內部久經考驗。它將數據結構以.proto文件進行描述,經過代碼生成工具能夠生成對應數據結構的POJO對象和Protobuf相關方法和屬性。app

特色:框架

  •   結構化數據存儲格式
  •   性能高效
  •   語言無關、平臺無關、擴展性
  •   官方支持Java、C++和Python三種語言

(1) ProtoBuf使用二進制編碼,而不是XML,儘管XML的可讀性和擴展性都不錯,可是XML犧牲的空間和時間開銷太大,不適合高性能框架

(2) ProtoBuf另外一個吸引人的地方是數據描述文件和代碼生成機制

下面的圖頗有說服力,爲何這麼多人選擇Google的Protobuf

性能對比:

 

 碼流對比:

 

 Facebook的Thrift

對當時的Facebook而言,thrift用於解決各系統間大數量的傳輸通訊問題,所以能夠多種語言,C++ C# Cocoa Erlang Haskell Java Perl PHP Python Ruby和Smalltalk

  • Thrift能夠做爲高性能的通訊中間件,支持數據序列化的多種類型的RPC服務。
  • 適用於靜態數據交換,即事先肯定好它的數據結構,當數據結構變化時,必須從新編輯IDL文件,生成代碼和編譯。
  • 相對於XML和Json在性能和傳輸大小上有明顯優點。

Thrift主要由5部分組成:

(1) 語言系統和IDL編譯器:負責由用戶給定的IDL文件生成相應語言接口代碼;

(2) TProtocol: RPC協議層,能夠選擇多種不一樣的序列化方式,例如Binary和Json;

(3) TTransport:RPC傳輸層,一樣能夠選擇不一樣的傳輸層實現,例如socket NIO和MemoryBuffer等;

(4) Tprocessor: 做爲協議層和用戶提供的服務實現的紐帶,負責調用服務實現的接口;

(5) TServer:聚合TProtocol、TTransport和TProcessor等對象。

關注協議的話就是關於於Tprotocol層,其支持3中典型的編解碼方式:

  • 通用二進制
  • 壓縮二進制
  • 優化可選字段的壓縮編解碼

下圖展現同等測試條件下的編解碼耗時信息:

 

JBoss Marshalling

JBoss內部使用,不能跨語言,能夠看作是jdk的進化版... 擁有優勢以下:

  • 可插拔的類解析器、更加便捷的類加載定製策略,經過一個接口實現定製;
  • 可插拔的對象替換方式,不須要繼續的方式;
  • 可插拔的預約義類緩存表,能夠減少序列化的字節數組長度,提高經常使用類型的序列化對象性能;
  • 無須實現java.io.Serializable接口,實現序列化;
  • 利用了緩存技術提高性能

2、MessagePack編解碼技術

2.1 介紹

高效、性能、跨語言、碼流小、支持的語言由Java Python Ruby Hashkell C# OCaml Lua Go C C++等。

pom文件,guava是額外能夠不用.

 <!-- https://mvnrepository.com/artifact/org.msgpack/msgpack -->
        <dependency>
            <groupId>org.msgpack</groupId>
            <artifactId>msgpack</artifactId>
            <version>0.6.11</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>20.0</version>
        </dependency>

Java API

import com.google.common.collect.Lists;
import org.msgpack.MessagePack;
import org.msgpack.template.Templates;

import java.util.List;

/**
 * Created by carl.yu on 2016/12/15.
 */
public class ApiDemo {
    public static void main(String[] args) throws Exception {
        //使用了guava
        List<String> src = Lists.newArrayList("msgpack", "kumofs", "viver");
        MessagePack msgpack = new MessagePack();
        //序列化
        byte[] raw = msgpack.write(src);
        //反序列化
        List<String> dst1 = msgpack.read(raw, Templates.tList(Templates.TString));
        System.out.println(dst1);
    }
}

2.2 編寫Encoder和Decoder

注意,要使用Messagepack,須要在實體類前加上註解@Message.

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.msgpack.MessagePack;

import java.util.List;

/**
 * Created by carl.yu on 2016/12/15.
 */
public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        //將msg中的字節寫到array中
        System.out.println("開始進行解碼...");
        final byte[] array;
        final int length = msg.readableBytes();
        array = new byte[length];
        msg.getBytes(msg.readerIndex(), array, 0, length);
        MessagePack msgpack = new MessagePack();
        Object result = msgpack.read(array);
        out.add(result);
    }
}
import com.google.common.base.Throwables;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.msgpack.MessagePack;

/**
 * Created by carl.yu on 2016/12/15.
 */
public class MsgpackEncoder extends MessageToByteEncoder<Object> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        //負責將POJO對象編碼爲byte數組
        MessagePack msgpack = new MessagePack();
        byte[] raw = null;
        try {
            raw = msgpack.write(msg);
        } catch (Exception e) {
            e.printStackTrace();
            Throwables.propagateIfPossible(e);
        }
        out.writeBytes(raw);
    }
}

分別用MessagePack進行編解碼

2.3 編寫Server和ServerHandler

 1 import io.netty.bootstrap.ServerBootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.ChannelOption;
 5 import io.netty.channel.EventLoopGroup;
 6 import io.netty.channel.nio.NioEventLoopGroup;
 7 import io.netty.channel.socket.SocketChannel;
 8 import io.netty.channel.socket.nio.NioServerSocketChannel;
 9 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
10 import io.netty.handler.codec.LengthFieldPrepender;
11 import io.netty.handler.logging.LogLevel;
12 import io.netty.handler.logging.LoggingHandler;
13 
14 /**
15  * Created by carl.yu on 2016/12/15.
16  */
17 public class EchoServer {
18     public void bind(int port) throws Exception {
19         // 配置服務端的NIO線程組
20         EventLoopGroup bossGroup = new NioEventLoopGroup();
21         EventLoopGroup workerGroup = new NioEventLoopGroup();
22         try {
23             ServerBootstrap b = new ServerBootstrap();
24             b.group(bossGroup, workerGroup)
25                     .channel(NioServerSocketChannel.class)
26                     .option(ChannelOption.SO_BACKLOG, 100)
27                     .handler(new LoggingHandler(LogLevel.INFO))
28                     .childHandler(new ChannelInitializer<SocketChannel>() {
29                         @Override
30                         public void initChannel(SocketChannel ch)
31                                 throws Exception {
32                             //讀數據的時候用decoder解碼
33                             ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
34                             ch.pipeline().addLast("msgpack decoder", new MsgpackDecoder());
35                             //寫數據的時候用encoder編碼
36                             ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
37                             ch.pipeline().addLast("msgpack encoder", new MsgpackEncoder());
38                             //
39                             ch.pipeline().addLast(new EchoServerHandler());
40                         }
41                     });
42 
43             // 綁定端口,同步等待成功
44             ChannelFuture f = b.bind(port).sync();
45 
46             // 等待服務端監聽端口關閉
47             f.channel().closeFuture().sync();
48         } finally {
49             // 優雅退出,釋放線程池資源
50             bossGroup.shutdownGracefully();
51             workerGroup.shutdownGracefully();
52         }
53     }
54 
55     public static void main(String[] args) throws Exception {
56         int port = 8080;
57         if (args != null && args.length > 0) {
58             try {
59                 port = Integer.valueOf(args[0]);
60             } catch (NumberFormatException e) {
61                 // 採用默認值
62             }
63         }
64         new EchoServer().bind(port);
65     }
66 }

主要在於2個編解碼器。

在MessagePack編碼器以前增長了LengthFieldPrepender,它將在ByteBuf以前增長字節的消息長度。

  而後使用LengthFieldBasedFrameDecoder根據消息長度進行解碼,工做原理如圖:

這樣獲取到的永遠是整包消息,很是簡單的解決了煩人的半包問題

2.4 編寫Client和ClientHandler

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;

/**
 * Created by carl.yu on 2016/12/15.
 */
public class EchoClient {

    public void connect(int port, String host) throws Exception {
        // 配置客戶端NIO線程組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            //讀數據的時候用decoder解碼
                            ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
                            ch.pipeline().addLast("msgpack decoder", new MsgpackDecoder());
                            //寫數據的時候用encoder編碼
                            ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
                            ch.pipeline().addLast("msgpack encoder", new MsgpackEncoder());

                            ch.pipeline().addLast(new EchoClientHandler(100));
                        }
                    });

            // 發起異步鏈接操做
            ChannelFuture f = b.connect(host, port).sync();

            // 當代客戶端鏈路關閉
            f.channel().closeFuture().sync();
        } finally {
            // 優雅退出,釋放NIO線程組
            group.shutdownGracefully();
        }
    }

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 採用默認值
            }
        }
        new EchoClient().connect(port, "127.0.0.1");
    }
}
import demo.codec.serializable.UserInfo;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Created by carl.yu on 2016/12/15.
 */
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
    private final int sendNumber;

    public EchoClientHandler(int sendNumber) {
        this.sendNumber = sendNumber;
    }

    private UserInfo[] userInfo() {
        UserInfo[] userInfos = new UserInfo[sendNumber];
        UserInfo userInfo = null;
        for (int i = 0; i < sendNumber; i++) {
            userInfo = new UserInfo();
            userInfos[i] = userInfo;
            userInfo.setUserID(i);
            userInfo.setUserName("ABDCEFG-->" + i);
        }
        return userInfos;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
       /* UserInfo userInfo = new UserInfo();
        userInfo.setUserID(0);
        userInfo.setUserName("ABDCEFG-->" + 0);*/
        UserInfo[] userInfos = userInfo();
        for (int i = 0; i < userInfos.length; i++) {
            ctx.writeAndFlush(userInfos[i]);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("客戶端收到信息:" + msg);
//        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//        ctx.flush();
//        ctx.close();
    }
}

後面咱們會更加詳細的講解LengthFieldPrepender和LengthFieldBasedFrameDecoder,這裏只須要明白用來解決半包問題便可。

3、Google Protobuf

3.1 測試Google Protobuf

準備環境:

SubscribeReq.proto:

package netty;
option java_package="demo.codec.protobuf";
option java_outer_classname="SubscribeReqProto";

message SubscribeReq{
    required int32 subReqID = 1;
    required string userName = 2;
    required string productName = 3;
    repeated string address = 4;
}

SubscribeResp.proto

package netty;
option java_package="demo.codec.protobuf";
option java_outer_classname="SubscribeRespProto";

message SubscribeResp{
    required int32 subReqID = 1;
    required int32 respCode = 2;
    required string desc = 3;
}

這裏不詳細介紹google protobuf的語法:https://developers.google.com/protocol-buffers/docs/proto?hl=zh-CN

build.bat

protoc ./proto/*.proto --java_out=../main/java

pause

google protobuf依賴maven:

<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>2.5.0</version>
        </dependency>

運行build.bat,生成:

下面咱們運行如下代碼來了解Protobuf的用法:

import com.google.protobuf.InvalidProtocolBufferException;

import java.util.ArrayList;
import java.util.List;

/**
 * @author Administrator
 * @version 1.0
 * @date 2014年2月23日
 */
public class TestSubscribeReqProto {

    // 編碼方法: Object->byte[]
    private static byte[] encode(SubscribeReqProto.SubscribeReq req) {
        return req.toByteArray();
    }

    // 解碼方法: bayte[] -> Object
    private static SubscribeReqProto.SubscribeReq decode(byte[] body)
            throws InvalidProtocolBufferException {
        return SubscribeReqProto.SubscribeReq.parseFrom(body);
    }

    /**
     * 建立實例
     *
     * @return
     */
    private static SubscribeReqProto.SubscribeReq createSubscribeReq() {
        //(1) Builder模式
        SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq
                .newBuilder();
        builder.setSubReqID(1);
        builder.setUserName("Lilinfeng");
        builder.setProductName("Netty Book");
        List<String> address = new ArrayList<>();
        address.add("NanJing YuHuaTai");
        address.add("BeiJing LiuLiChang");
        address.add("ShenZhen HongShuLin");
        builder.addAllAddress(address);
        return builder.build();
    }

    /**
     * @param args
     * @throws InvalidProtocolBufferException
     */
    public static void main(String[] args)
            throws InvalidProtocolBufferException {
        SubscribeReqProto.SubscribeReq req = createSubscribeReq();
        System.out.println("Before encode : " + req.toString());
        SubscribeReqProto.SubscribeReq req2 = decode(encode(req));
        System.out.println("After decode : " + req.toString());
        System.out.println("Assert equal : --> " + req2.equals(req));

    }

}

3.2 開發圖書訂購服務端

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * @author lilinfeng
 * @version 1.0
 * @date 2014年2月14日
 */
public class SubReqServer {
    public void bind(int port) throws Exception {
        // 配置服務端的NIO線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(
                                    new ProtobufVarint32FrameDecoder());
                            ch.pipeline().addLast(
                                    new ProtobufDecoder(
                                            SubscribeReqProto.SubscribeReq
                                                    .getDefaultInstance()));
                            ch.pipeline().addLast(
                                    new ProtobufVarint32LengthFieldPrepender());
                            ch.pipeline().addLast(new ProtobufEncoder());
                            ch.pipeline().addLast(new SubReqServerHandler());
                        }
                    });

            // 綁定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();

            // 等待服務端監聽端口關閉
            f.channel().closeFuture().sync();
        } finally {
            // 優雅退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 採用默認值
            }
        }
        new SubReqServer().bind(port);
    }
}

咱們來注意如下編解碼器的順序:

(1) ProtobufVarint32FrameDecoder : 半包問題

(2) ProtobufDecoder:解碼

(3) ProtobufVarint32LenghtFiedldPrepender:半包問題

(4) ProtobufEncoder:編碼

因而邏輯處理部分能夠直接使用類:

 1 import io.netty.channel.ChannelHandler.Sharable;
 2 import io.netty.channel.ChannelHandlerAdapter;
 3 import io.netty.channel.ChannelHandlerContext;
 4 import io.netty.channel.ChannelInboundHandlerAdapter;
 5 
 6 /**
 7  * @author lilinfeng
 8  * @version 1.0
 9  * @date 2014年2月14日
10  */
11 @Sharable
12 public class SubReqServerHandler extends ChannelInboundHandlerAdapter {
13 
14     @Override
15     public void channelRead(ChannelHandlerContext ctx, Object msg)
16             throws Exception {
17         SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq) msg;
18         if ("Lilinfeng".equalsIgnoreCase(req.getUserName())) {
19             System.out.println("Service accept client subscribe req : ["
20                     + req.toString() + "]");
21             ctx.writeAndFlush(resp(req.getSubReqID()));
22         }
23     }
24 
25     private SubscribeRespProto.SubscribeResp resp(int subReqID) {
26         SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp
27                 .newBuilder();
28         builder.setSubReqID(subReqID);
29         builder.setRespCode(0);
30         builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
31         return builder.build();
32     }
33 
34     @Override
35     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
36         cause.printStackTrace();
37         ctx.close();// 發生異常,關閉鏈路
38     }
39 }

3.3 圖書訂購客戶端開發

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

/**
 * @author lilinfeng
 * @version 1.0
 * @date 2014年2月14日
 */
public class SubReqClient {

    public void connect(int port, String host) throws Exception {
        // 配置客戶端NIO線程組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            ch.pipeline().addLast(
                                    new ProtobufVarint32FrameDecoder());
                            ch.pipeline().addLast(
                                    new ProtobufDecoder(
                                            SubscribeRespProto.SubscribeResp
                                                    .getDefaultInstance()));
                            ch.pipeline().addLast(
                                    new ProtobufVarint32LengthFieldPrepender());
                            ch.pipeline().addLast(new ProtobufEncoder());
                            ch.pipeline().addLast(new SubReqClientHandler());
                        }
                    });

            // 發起異步鏈接操做
            ChannelFuture f = b.connect(host, port).sync();

            // 當代客戶端鏈路關閉
            f.channel().closeFuture().sync();
        } finally {
            // 優雅退出,釋放NIO線程組
            group.shutdownGracefully();
        }
    }

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 採用默認值
            }
        }
        new SubReqClient().connect(port, "127.0.0.1");
    }
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.ArrayList;
import java.util.List;

/**
 * @author lilinfeng
 * @version 1.0
 * @date 2014年2月14日
 */
public class SubReqClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * Creates a client-side handler.
     */
    public SubReqClientHandler() {
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        for (int i = 0; i < 10; i++) {
            ctx.write(subReq(i));
        }
        ctx.flush();
    }

    private SubscribeReqProto.SubscribeReq subReq(int i) {
        SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq
                .newBuilder();
        builder.setSubReqID(i);
        builder.setUserName("Lilinfeng");
        builder.setProductName("Netty Book For Protobuf");
        List<String> address = new ArrayList<>();
        address.add("NanJing YuHuaTai");
        address.add("BeiJing LiuLiChang");
        address.add("ShenZhen HongShuLin");
        builder.addAllAddress(address);
        return builder.build();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        System.out.println("Receive server response : [" + msg + "]");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

在不怎麼了解Protobuf實現和使用細節的狀況 下,咱們就能夠輕鬆支持Google Protobuf編碼

3.4 注意事項

ProtobufDecoder僅僅負責解碼,所以在ProtobufDecoder前面,必定要可以處理半包的解碼器,有如下3種方式:

(1) 使用Netty提供的ProtobufVarint32FrameDecoder,它能夠處理半包消息;

(2) 繼承Netty提供的通用半包解碼器LengthFieldBasedFrameDecoder;

(3) 繼承ByteToMessageDecoder,本身處理..

半包問題必須解決,不然服務器沒法正常工做。

4、JBoss Marshalling編解碼

暫時略。能夠參考netty初探(2)

相關文章
相關標籤/搜索