Netty就是這麼回事(八)

這一章主要介紹如何使用Netty開發自定義通訊協議。咱們知道有的時候可能咱們不想用java的序列化,由於編碼效率低,而且咱們也不想使用protobuf,由於若是說咱們的通訊程序都是來自一個平臺,那麼用protobuf每次都須要從新利用工具生成文件也不是很好,那麼就須要開發本身的通訊協議。java

在開發自定義通訊協議以前,咱們先來明確這麼幾個開發目標:首先,咱們但願仍然通訊在對象和字節之間作轉換,對上層來講無感知;其次,咱們的自定義協議通訊程序也要有半包解碼的能力。明確了目的以後,咱們如何處理呢?咱們須要依次開發本身的編碼器和解碼器,你可能會以爲之前都是用Netty自帶的編解碼器,本身開發是否是很困難啊。編程

固然不是!Netty給咱們提供了MessageToByteEncoder和ByteToMessageDecoder這兩個編解碼器,他們具備半包處理的能力,咱們只須要實現協議就能夠了。bootstrap

咱們來看一下客戶端和服務端的代碼。緩存

客戶端代碼:服務器

package com.dlb.note.client;

import com.dlb.note.constant.ConstantValue;
import com.dlb.note.doj.Request;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.MessageToByteEncoder;

/**
 * 功能:自定義編碼器客戶端
 * 版本:1.0
 * 日期:2016/12/19 19:55
 * 做者:馟蘇
 */
public class MySelfEncoderClient {
    /**
     * 主函數
     */
    public static void main(String []args) {
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer() {
                        protected void initChannel(Channel channel) throws Exception {
                            channel.pipeline().addLast(new MySelfEncoder());
                            channel.pipeline().addLast(new MySelfEncoderHandler());
                        }
                    });

            // 等待客戶端連接成功
            ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
            System.out.println("客戶端連接成功!");

            // 等待客戶端連接關閉
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }
}

/**
 * 請求編碼器
 * <pre>
 * 數據包格式
 * +——----——+——-----——+——----——+——----——+——-----——+
 * | 包頭    | 模塊號   | 命令號  |  長度   |   數據   |
 * +——----——+——-----——+——----——+——----——+——-----——+
 * </pre>
 * 包頭4字節
 * 模塊號2字節short
 * 命令號2字節short
 * 長度4字節(描述數據部分字節長度)
 * @author 馟蘇
 */
class MySelfEncoder extends MessageToByteEncoder {
    /**
     * 編碼
     * @param channelHandlerContext
     * @param rs
     * @param byteBuf
     * @throws Exception
     */
    protected void encode(ChannelHandlerContext channelHandlerContext, Object rs, ByteBuf byteBuf) throws Exception {
        Request request = (Request)(rs);

        //包頭
        byteBuf.writeInt(ConstantValue.FLAG);
        //module
        byteBuf.writeShort(request.getModule());
        //cmd
        byteBuf.writeShort(request.getCmd());
        //長度
        byteBuf.writeInt(request.getDataLength());
        //data
        if(request.getData() != null){
            byteBuf.writeBytes(request.getData());
        }
    }
}

/**
 * 自定義處理器
 */
class MySelfEncoderHandler extends ChannelHandlerAdapter {
    // 可讀
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

    }

    // 鏈接
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 10; i++) {
            Request request = new Request();

            request.setCmd((short) 100);
            request.setData("nihao".getBytes());
            request.setModule((short) 9000);

            ctx.writeAndFlush(request);
        }
    }

    // 關閉
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client close,ip:" + ctx.channel().remoteAddress());
        ctx.close();
    }

    // 異常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println(cause.toString());
        ctx.close();
    }
}

服務端代碼:框架

package com.dlb.note.server;

import com.dlb.note.doj.Request;
import com.dlb.note.server.decode.MySelfDecoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * 功能:自定義解碼器服務端
 * 版本:1.0
 * 日期:2016/12/15 12:47
 * 做者:馟蘇
 */
public class MySelfDecoderServer {
    /**
     * 主函數
     */
    public static void main(String []args) {
        // 配置服務端的NIO線程池
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    // 當服務器請求處理線程全滿時,用於臨時存放已完成三次握手的請求的隊列的最大長度
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    // 爲管道添加處理器
                    .childHandler(new ChannelInitializer() {
                        // 初始化管道
                        protected void initChannel(Channel channel) throws Exception {
                            channel.pipeline()
                                    .addLast(new MySelfDecoder())
                                    .addLast(new MySelfDecoderHandler());
                        }
                    });

            // 綁定端口,同步等待成功
            ChannelFuture future = serverBootstrap.bind(8888).sync();
            System.out.println("服務器在8888端口監聽hello");

            // 等待服務端監聽端口關閉
            future.channel().closeFuture().sync();
            System.out.println("服務器關閉bye");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 優雅退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

/**
 * 自定義處理器
 */
class MySelfDecoderHandler extends ChannelHandlerAdapter {
    // 可讀
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Request request = (Request) msg;
        System.out.println(request.toString());
    }

    // 鏈接
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client come,ip:" + ctx.channel().remoteAddress());
    }

    // 關閉
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client close,ip:" + ctx.channel().remoteAddress());
        ctx.close();
    }

    // 異常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println(cause.toString());
        ctx.close();
    }
}

解碼器:socket

package com.dlb.note.server.decode;

import com.dlb.note.constant.ConstantValue;
import com.dlb.note.doj.Request;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**
 * 請求解碼器
 * <pre>
 * 數據包格式
 * +——----——+——-----——+——----——+——----——+——-----——+
 * | 包頭    | 模塊號   | 命令號  |  長度   |   數據   |
 * +——----——+——-----——+——----——+——----——+——-----——+
 * </pre>
 * 包頭4字節
 * 模塊號2字節short
 * 命令號2字節short
 * 長度4字節(描述數據部分字節長度)
 */
/**
 * 功能:自定義消息解碼器
 * 版本:1.0
 * 日期:2016/12/19 19:42
 * 做者:馟蘇
 */
public class MySelfDecoder extends ByteToMessageDecoder {
    /**
     * 數據包基本長度
     */
    public static int BASE_LENTH = 4 + 2 + 2 + 4;

    /**
     * 解碼器
     * @param channelHandlerContext
     * @param byteBuf
     * @param list
     * @throws Exception
     */
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list)
            throws Exception {
        // 可讀長度必須大於基本長度
        if(byteBuf.readableBytes() >= BASE_LENTH){
            // 防止socket字節流攻擊
            if(byteBuf.readableBytes() > 2048){
                byteBuf.skipBytes(byteBuf.readableBytes());
            }

            // 記錄包頭開始的index
            int beginReader;

            while(true){
                beginReader = byteBuf.readerIndex();
                byteBuf.markReaderIndex();
                if(byteBuf.readInt() == ConstantValue.FLAG){
                    break;
                }

                // 未讀到包頭,略過一個字節
                byteBuf.resetReaderIndex();
                byteBuf.readByte();

                // 長度又變得不知足
                if(byteBuf.readableBytes() < BASE_LENTH){
                    return;
                }
            }

            // 模塊號
            short module = byteBuf.readShort();
            // 命令號
            short cmd = byteBuf.readShort();
            // 長度
            int length = byteBuf.readInt();

            // 判斷請求數據包數據是否到齊
            if(byteBuf.readableBytes() < length){
                // 還原讀指針
                byteBuf.readerIndex(beginReader);
                return;
            }

            // 讀取data數據
            byte[] data = new byte[length];
            byteBuf.readBytes(data);

            Request request = new Request();
            request.setModule(module);
            request.setCmd(cmd);
            request.setData(data);

            // 繼續往下傳遞
            list.add(request);
        }
    }
}

可能你們看完之後會問這麼一個問題,Netty到底是怎麼給咱們解決了半包讀寫問題,還有關於編碼器的編寫時什麼意思?這其實就須要咱們看一下Netty的源碼,拿解碼器作例子,下面是byteToMessageDecoder的關鍵代碼。ide

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if(msg instanceof ByteBuf) {
        RecyclableArrayList out = RecyclableArrayList.newInstance();
        boolean var12 = false;

        try {
            var12 = true;
            ByteBuf t = (ByteBuf)msg;
            this.first = this.cumulation == null;
            if(this.first) {
                this.cumulation = t;
            } else {
                if(this.cumulation.writerIndex() > this.cumulation.maxCapacity() - t.readableBytes()) {
                    this.expandCumulation(ctx, t.readableBytes());
                }

                this.cumulation.writeBytes(t);
                t.release();
            }

            this.callDecode(ctx, this.cumulation, out);
            var12 = false;
        } catch (DecoderException var13) {
            throw var13;
        } catch (Throwable var14) {
            throw new DecoderException(var14);
        } finally {
            if(var12) {
                if(this.cumulation != null && !this.cumulation.isReadable()) {
                    this.cumulation.release();
                    this.cumulation = null;
                }

                int size = out.size();
                this.decodeWasNull = size == 0;

                for(int i1 = 0; i1 < size; ++i1) {
                    ctx.fireChannelRead(out.get(i1));
                }

                out.recycle();
            }
        }

        if(this.cumulation != null && !this.cumulation.isReadable()) {
            this.cumulation.release();
            this.cumulation = null;
        }

        int var16 = out.size();
        this.decodeWasNull = var16 == 0;

        for(int i = 0; i < var16; ++i) {
            ctx.fireChannelRead(out.get(i));
        }

        out.recycle();
    } else {
        ctx.fireChannelRead(msg);
    }

}
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    this.replayable.setCumulation(in);

    try {
        while(in.isReadable()) {
            int cause = this.checkpoint = in.readerIndex();
            int outSize = out.size();
            Object oldState = this.state;
            int oldInputLength = in.readableBytes();

            try {
                this.decode(ctx, this.replayable, out);
                if(ctx.isRemoved()) {
                    break;
                }

                if(outSize == out.size()) {
                    if(oldInputLength == in.readableBytes() && oldState == this.state) {
                        throw new DecoderException(StringUtil.simpleClassName(this.getClass()) + ".decode() must consume the inbound " + "data or change its state if it did not decode anything.");
                    }
                    continue;
                }
            } catch (Signal var10) {
                var10.expect(REPLAY);
                if(!ctx.isRemoved()) {
                    int checkpoint = this.checkpoint;
                    if(checkpoint >= 0) {
                        in.readerIndex(checkpoint);
                    }
                }
                break;
            }

            if(cause == in.readerIndex() && oldState == this.state) {
                throw new DecoderException(StringUtil.simpleClassName(this.getClass()) + ".decode() method must consume the inbound data " + "or change its state if it decoded something.");
            }

            if(this.isSingleDecode()) {
                break;
            }
        }

    } catch (DecoderException var11) {
        throw var11;
    } catch (Throwable var12) {
        throw new DecoderException(var12);
    }
}

咱們能夠看到實際上decode方法是在callDecoder中調用的,調用完了之後若是list不爲空,那麼就會往上傳,這樣看來要是想讓咱們的上層handler接受對象,就必需要把咱們從字節解析的對象放入list集合中。函數

比較關鍵的是工具

首先呢,查看是否是第一次接受,若是是那麼把接受到的字節賦值給cumulation,若是不是那麼把接收到的字節拼接到cumulation的後面。這是什麼意思,咱們能夠這麼考慮,把cumulation想象成一個緩存,假如你是第一次接受字節,那麼你的緩存就是本次接受到的字節;若是說你上次的沒有處理完,那麼我就把上次沒處理完的字節加上本次接受的一塊兒放入緩存,而後再交給本次解碼器處理。

還有一個問題是怎麼保證在處理的時候準備返回還未處理完的字節呢,好比說我發現這一幀不夠,我但願等幀接受足夠大了再進行相應處理?其實關鍵在這個地方,一切都是以讀指針來標識的,咱們來看一個bytebuf其實有兩個指針:一個讀指針readIndex和一個寫指針writeIndex,這就解決了NIO編程中常常要flip的問題,那麼讀指針和寫指針直接的字節就是還未處理的。而咱們每讀一個字節,讀指針都會自增,只要咱們保證在咱們的處理程序中返回讀指針的正確位置就能保證Netty框架的緩存幫咱們緩存字節信息。所以,Netty就幫咱們解決了半包的解碼問題,是否是很方便!

相關文章
相關標籤/搜索