Netty就是這麼回事(六)

趁着今天有時間,多發一點。這一章主要介紹Protobuf編解碼器,首先先說protobuf是個什麼東西。它實際上是Google開發的一個數據交換的格式,它獨立於語言,獨立於平臺。用過它的都會以爲它太好用了,記得當初在iesLab開發電力系統高級應用軟件的服務端用到的通訊協議就是它,除了代碼風格不是很好以外,其餘的真是太方便了,膜拜Google的技術啊。java

在詳細介紹以前,我先來問你們一個問題,若是讓你開發一款通訊協議,你但願如何設計協議?我相信,你們既然都已經習慣了面向對象編程,都是選擇將對象轉換爲協議發送,固然接受的時候是將協議轉換爲對象。先拋開半包問題不說,先談數據幀的傳輸,咱們是否是但願:1. 通訊的時候最好能用最少的字節來傳遞更多的消息,這樣就能夠加快消息的傳輸,產生很小的延遲。2. 咱們的消息能夠跨平臺傳輸,無論對方用的什麼語言?C仍是C++仍是java,對一樣的通訊協議能夠通用,不會由於不一樣的平臺由於字節大小而痛苦。那麼java給我帶來的序列化能夠用麼?固然能夠可是有個問題,首先java的序列化的對象生成的字節數目太過龐大,幾乎是正常二進制編碼的5倍左右,固然是protobuf的更度倍。第二,你用java平臺序列化的對象,在C++的平臺是無法解析的。因此,基於這個protobuf給咱們解決了,他的編碼字節足夠小,不一樣平臺直接能夠通用。編程

在Netty中天生給咱們集成了protobuf的編解碼器,不須要咱們本身去實現,主要有解碼器:ProtobufVarint32FrameDecoder(負責半包解碼),ProtobufDecoder(負責protobuf的解碼)bootstrap

編碼器:ProtobufVarint32LengthFieldPrepender(負責半包編碼),ProtobufEncoder(負責協議編碼)服務器

這個其實很好理解,一個是爲了解決半包問題,另外一個解決協議解析問題。socket

咱們看一下服務端的代碼:ide

SubscribeProto.Subscribe.getDefaultInstance()這個是protobuf協議工具給咱們生成的,具體的百度一下就知道了,它是什麼意思呢?你想啊,Netty的解碼器又不是萬能的,他怎麼知道你的幀格式是什麼樣子的?因此這個就是告訴他你給我照着這個樣式解碼,是否是很方便啊!函數

package com.dlb.note.server;

import com.dlb.note.doj.SubscribeProto;
import com.google.protobuf.ProtocolStringList;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

/**
 * 功能:protobuf時間服務器
 * 版本:1.0
 * 日期:2016/12/9 18:33
 * 做者:馟蘇
 */
public class ProtobufTimeServer {
    /**
     * main函數
     * @param args
     */
    public static void main(String []args) {
        // 構造nio線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChannelInitializer() {
                        protected void initChannel(Channel channel) throws Exception {
                            /**
                             * 如下是支持protobuf的解碼器
                             */
                            channel.pipeline()
                                    .addLast(new ProtobufVarint32FrameDecoder()) // 半包解碼
                                    .addLast(new ProtobufDecoder(SubscribeProto.Subscribe.getDefaultInstance())) // protobuf解碼
                                    .addLast(new ProtobufVarint32LengthFieldPrepender()) // 半包編碼
                                    .addLast(new ProtobufEncoder()) // protobuf編碼
                                    .addLast(new MyProtoBufHandler());
                        }
                    });
            // 綁定端口,同步等待成功
            ChannelFuture future = bootstrap.bind(8888).sync();
            System.out.println("----服務端在8888端口監聽----");

            // 等待服務端監聽端口關閉
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 優雅的退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

class MyProtoBufHandler extends ChannelHandlerAdapter {
    // 客戶端連接異常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("client exception,ip=" + ctx.channel().remoteAddress());
        ctx.close();
        super.exceptionCaught(ctx, cause);
    }

    // 客戶端連接到來
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client come,ip=" + ctx.channel().remoteAddress());
        super.channelActive(ctx);
    }

    // 客戶端連接關閉
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client close,ip=" + ctx.channel().remoteAddress());
        ctx.close();
        super.channelInactive(ctx);
    }

    // 可讀
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 讀數據
        SubscribeProto.Subscribe subscribe = (SubscribeProto.Subscribe) msg;
        ProtocolStringList addressList = subscribe.getAddressList();

        System.out.println(subscribe.getName());
        for (String str : addressList) {
            System.out.println(str);
        }
        System.out.println("---------------------------------------------------");

        super.channelRead(ctx, msg);
    }
}
相關文章
相關標籤/搜索