netty 的 Google protobuf 開發

根據上一篇博文 Google Protobuf 使用 Java 版 netty 集成 protobuf 的方法很是簡單.代碼以下:html

server編程

package protobuf.server.impl;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import object.server.impl.SubReqServer;
import object.server.impl.SubScriptReqProto;

public class SubReqProtobufServer {
    public void start(int port) {
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workGroup);
        bootstrap.channel(NioServerSocketChannel.class);
        // 配置 NioServerSocketChannel 的 tcp 參數, BACKLOG 的大小
        bootstrap.option(ChannelOption.SO_BACKLOG, 100);
        bootstrap.handler(new LoggingHandler(LogLevel.INFO));
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            protected void initChannel(SocketChannel ch) throws Exception {

                /*
                 * 首先添加 ProtobufVarint32FrameDecoder 處理器 , 它主要用於半包處理
                 */
                ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                /*
                 * 而後添加 ProtobufDecoder 解碼器 ,
                 * 他的參數com.google.protobuf.MessageLite 實際上就是要告訴 ProtobufDecoder
                 * 須要解碼的目標類是什麼,不然僅僅從字節數組中是沒法判斷出須要解碼的目標類型信息
                 */
                ProtobufDecoder protobufDecoder = new ProtobufDecoder(
                        SubScriptReqProto.SubScriptReq.getDefaultInstance());
                ch.pipeline().addLast(protobufDecoder);
                ch.pipeline().addLast(
                        new ProtobufVarint32LengthFieldPrepender());
                ch.pipeline().addLast(new ProtobufEncoder());
                ch.pipeline().addLast(new SubReqProtobufHandler());
            }
        });
        // 綁定端口,隨後調用它的同步阻塞方法 sync 等等綁定操做成功,完成以後 Netty 會返回一個 ChannelFuture
        // 它的功能相似於的 Future,主要用於異步操做的通知回調.
        ChannelFuture channelFuture;
        try {
            channelFuture = bootstrap.bind(port).sync();
            // 等待服務端監聽端口關閉,調用 sync 方法進行阻塞,等待服務端鏈路關閉以後 main 函數才退出.
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        SubReqProtobufServer server = new SubReqProtobufServer();
        server.start(9091);
    }
}

serverHandlerbootstrap

package protobuf.server.impl;

import object.server.impl.SubScriptReqProto;
import object.server.impl.SubScriptRespProto;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class SubReqProtobufHandler extends ChannelHandlerAdapter {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        try {
            
            SubScriptReqProto.SubScriptReq req = (SubScriptReqProto.SubScriptReq) msg;
            System.out.println("SubReqProtobufHandler : " + req);
            ctx.writeAndFlush(resp(req.getSubReqID()));
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
    }

    private Object resp(int subReqID) {
        SubScriptRespProto.SubScriptResp.Builder builder = SubScriptRespProto.SubScriptResp
                .newBuilder();
        builder.setSubReqID(subReqID);
        builder.setDesc("desc");
        builder.setRespCode(2);
        return builder.build();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

}

 

client 數組

package protobuf.client.impl;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.codec.serialization.ObjectEncoder;
import object.client.impl.SubReqClient;
import object.server.impl.SubScriptReqProto;
import object.server.impl.SubScriptRespProto;

public class SubReqProtobufClient {
    public void connect(String host, int port) {
        NioEventLoopGroup workGroup = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(workGroup);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.option(ChannelOption.TCP_NODELAY, true);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                /*
                 * 禁止堆類加載器進行緩存,他在基於 OSGI 的動態模塊化編程中常常使用,因爲 OSGI 能夠進行熱部署和熱升級,當某個
                 * bundle
                 * 升級後,它對應的類加載器也將一塊兒升級,所以在動態模塊化的編程過程當中,不多對類加載器進行緩存,由於他隨時可能會發生變化.
                 */
                ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                ch.pipeline().addLast(
                        new ProtobufDecoder(SubScriptRespProto.SubScriptResp
                                .getDefaultInstance()));
                ch.pipeline().addLast(
                        new ProtobufVarint32LengthFieldPrepender());
                ch.pipeline().addLast(new ProtobufEncoder());
                ch.pipeline().addLast(new SubReqClientHandler());
            }
        });

        // 發起異步連接操做
        ChannelFuture future;
        try {
            future = bootstrap.connect(host, port).sync();
            // 等待客戶端鏈路關閉
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new SubReqProtobufClient().connect("localhost", 9091);
    }
}

clientHandler緩存

package protobuf.client.impl;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import object.server.impl.SubScriptReqProto;

public class SubReqClientHandler extends ChannelHandlerAdapter {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        try {

            SubScriptReqProto.SubScriptReq.Builder builder = SubScriptReqProto.SubScriptReq
                    .newBuilder();
            for (int i = 0; i < 100; i++) {

                builder.setSubReqID(999 + i);
                builder.setAddress("address" + i);
                builder.setProductName("productvalue" + i);
                builder.setUserName("userName" + i);

                ctx.writeAndFlush(builder.build());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        System.out.println("SubReqClientHandler : " + msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

}

 

使用 ProtoBuf 的注意事項:

  ProtobufDecode 只負責解碼,它不支持半包讀寫,所以,在 protoBuf 前必定要一個能處理半包讀寫的處理器.他有三種方法能夠選擇:異步

  1. 使用 Netty 提供的 ProtobufVarint32FrameDecoder;
  2. 繼承 Netty 提供的通用半包解碼器 LengthFieldBasedFrameDecoder;
  3. 繼承 ByteToMessageDecoder 類,本身處理半包消息

   

以上內容出自 : <Netty 權威指南socket

>tcp

相關文章
相關標籤/搜索