Netty整理(二)

Netty整理 java

如今咱們來驗證一下channel的生命週期。web

咱們將EchoServerHandler修改以下,增長所有的監聽事件,並打印事件方法名稱。算法

/**
 * 事件處理器
 */
@Slf4j
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 監聽讀取事件
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info(data.toString(CharsetUtil.UTF_8));
        ctx.writeAndFlush(data);
    }

    /**
     * 監聽讀取完畢事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        log.info("channelReadComplete");
    }

    /**
     * 監聽異常事件
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * 將channel註冊到EventLoop的Selector多路複用器中
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelRegistered");
    }

    /**
     * channel未註冊到EventLoop中
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelUnregistered");
    }

    /**
     * 有鏈接,變爲活躍狀態
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelActive");
    }

    /**
     * 沒有鏈接,非活躍狀態
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelInactive");
    }
}

啓動EchoServer,打開telnet鏈接到端口,咱們能夠看到bootstrap

admindeMacBook-Pro:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
sdfs
sdfs
^]
telnet> quit
Connection closed.數組

整個過程爲鏈接,發送字符串sdfs,退出鏈接promise

服務端日誌爲緩存

2019-10-01 05:33:36.960  INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelRegistered
2019-10-01 05:33:36.960  INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelActive
2019-10-01 05:33:54.439  INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : sdfs服務器

2019-10-01 05:33:54.442  INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelReadComplete
2019-10-01 05:34:22.527  INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelReadComplete
2019-10-01 05:34:22.529  INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelInactive
2019-10-01 05:34:22.529  INFO 543 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelUnregisteredwebsocket

整個生命週期正如前面寫到同樣網絡

Channel的生命週期爲:(1)channelRegistered->(3)channelActive->(4)channelInactive->(2)channelUnregistered

ChannelPipeline:
        比如廠裏的流水線同樣,能夠在上面添加多個ChannelHanler,也可當作是一串 ChannelHandler 實例,攔截穿過 Channel 的輸入輸出 event,  ChannelPipeline 實現了攔截器的一種高級形式,使得用戶能夠對事件的處理以及ChannelHanler之間交互得到徹底的控制權。

咱們來看一下ChannelPipeline的源碼

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {

    /**
     * 在管道的首位置添加一個channelhandler
     */
    ChannelPipeline addFirst(String name, ChannelHandler handler);

    /**
     * 同上,多了一個線程池參數
     */
    ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);

    /**
     * 在管道的最末端添加一個channelhandler
     */
    ChannelPipeline addLast(String name, ChannelHandler handler);

    /**
     * 同上,多了一個線程池參數
     */
    ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);

    /**
     * 在一個管道中已存在的channelhandler以前插入另一個channelhandler
     */
    ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);

    /**
     * 同上,多了一個線程池參數
     */
    ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);

    /**
     * 在管道已有多一個channelhandler以後插入另一個channelhandler
     */
    ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);

    /**
     * 同上,多了一個線程池參數
     */
    ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);

    /**
     * 在該管道的首位置放入一組channelhandler
     *
     */
    ChannelPipeline addFirst(ChannelHandler... handlers);

    /**
     * 同上,多了一個線程池參數
     *
     */
    ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);

    /**
     * 在管道的最末端放入一組channelhandler
     *
     */
    ChannelPipeline addLast(ChannelHandler... handlers);

    /**
     * 同上,多了一個線程池參數
     *
     */
    ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);

    /**
     * 從管道中移除一個channelhandler
     */
    ChannelPipeline remove(ChannelHandler handler);

    /**
     * 根據名字在管道中移除一個channelhandler
     */
    ChannelHandler remove(String name);

    /**
     * 根據類名在管道中移除一個channelhandler
     */
    <T extends ChannelHandler> T remove(Class<T> handlerType);

    /**
     * 移除管道中首個channelhandler
     */
    ChannelHandler removeFirst();

    /**
     * 移除管道中末個channelhandler
     */
    ChannelHandler removeLast();

    /**
     * 在管道中用新的channelhandler替換舊的channelhandler,中間參數都是替換者的名字
     */
    ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);

    /**
     * 在管道中用新的channelhandler替換舊的channelhandler,中間參數都是替換者的名字
     */
    ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);

    /**
     * 在管道中用新的channelhandler替換舊的channelhandler,中間參數都是替換者的名字
     */
    <T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName,
                                         ChannelHandler newHandler);

    /**
     * 返回管道中首個channelhandler
     */
    ChannelHandler first();

    /**
     * 獲取第一個管道處理器上下文
     */
    ChannelHandlerContext firstContext();

    /**
     * 獲取管道中最後一個channelhandler
     */
    ChannelHandler last();

    /**
     * 獲取管道中最後一個管道處理器上下文
     */
    ChannelHandlerContext lastContext();

    /**
     * 根據名字獲取管道中的一個channelhandler
     */
    ChannelHandler get(String name);

    /**
     * 根據類獲取一個channelhandler
     */
    <T extends ChannelHandler> T get(Class<T> handlerType);

    /**
     * 根據channelhandler獲取一個管道處理器上下文
     */
    ChannelHandlerContext context(ChannelHandler handler);

    /**
     * 根據名字獲取一個管道處理器上下文
     */
    ChannelHandlerContext context(String name);

    /**
     * 根據一個channelhandler的類名獲取一個管道處理器上下文
     */
    ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType);

    /**
     * 返回一個管道
     */
    Channel channel();

    /**
     * 返回管道中的channelhandler的名稱列表
     */
    List<String> names();

    /**
     * Converts this pipeline into an ordered {@link Map} whose keys are
     * handler names and whose values are handlers.
     */
    Map<String, ChannelHandler> toMap();

    @Override
    ChannelPipeline fireChannelRegistered();

     @Override
    ChannelPipeline fireChannelUnregistered();

    @Override
    ChannelPipeline fireChannelActive();

    @Override
    ChannelPipeline fireChannelInactive();

    @Override
    ChannelPipeline fireExceptionCaught(Throwable cause);

    @Override
    ChannelPipeline fireUserEventTriggered(Object event);

    @Override
    ChannelPipeline fireChannelRead(Object msg);

    @Override
    ChannelPipeline fireChannelReadComplete();

    @Override
    ChannelPipeline fireChannelWritabilityChanged();

    @Override
    ChannelPipeline flush();
}

ChannelHandlerContext模塊的做用和分析

咱們在ChannelHandler的方法中會看到有一個參數。如

/**
 * 監聽讀取事件
 * @param ctx
 * @param msg
 * @throws Exception
 */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf data = (ByteBuf) msg;
    log.info(data.toString(CharsetUtil.UTF_8));
    ctx.writeAndFlush(data);
}

這個ChannelHandlerContext就是一個處理器的上下文。

一、ChannelHandlerContext是鏈接ChannelHandler和ChannelPipeline的橋樑


            ChannelHandlerContext部分方法和Channel及ChannelPipeline重合,比如調用write方法,
            Channel、ChannelPipeline、ChannelHandlerContext 均可以調用此方法,前二者都會在整個管道流裏傳播,而ChannelHandlerContext就只會在後續的Handler裏面傳播

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //第一種,會在整個管道中傳播
        Channel channel = ctx.channel();
        channel.writeAndFlush(Unpooled.copiedBuffer("帥呆了",CharsetUtil.UTF_8));
        //第二種,會在整個管道中傳播
        ChannelPipeline pipeline = ctx.pipeline();
        pipeline.writeAndFlush(Unpooled.copiedBuffer("帥呆了",CharsetUtil.UTF_8));
        //第三種,只會在後續的channelhandler中傳播
        ctx.writeAndFlush(Unpooled.copiedBuffer("帥呆了",CharsetUtil.UTF_8));
//        ByteBuf data = (ByteBuf) msg;
//        log.info(data.toString(CharsetUtil.UTF_8));
//        ctx.writeAndFlush(data);
    }

二、AbstractChannelHandlerContext類
                    雙向鏈表結構,next/prev分別是後繼節點,和前驅節點        

volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;

            三、DefaultChannelHandlerContext 是實現類,可是大部分都是父類那邊完成,這個只是簡單的實現一些方法
                主要就是判斷Handler的類型

private static boolean isInbound(ChannelHandler handler) {
    return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}

                ChannelInboundHandler之間的傳遞,主要經過調用ctx裏面的FireXXX()方法來實現下個handler的調用

咱們修改一下EchoServerHandler,再增長一個EchoServerHandler2

/**
 * 事件處理器
 */
@Slf4j
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 監聽讀取事件
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//        //第一種,會在整個管道中傳播
//        Channel channel = ctx.channel();
//        channel.writeAndFlush(Unpooled.copiedBuffer("帥呆了",CharsetUtil.UTF_8));
//        //第二種,會在整個管道中傳播
//        ChannelPipeline pipeline = ctx.pipeline();
//        pipeline.writeAndFlush(Unpooled.copiedBuffer("帥呆了",CharsetUtil.UTF_8));
//        //第三種,只會在後續的channelhandler中傳播
//        ctx.writeAndFlush(Unpooled.copiedBuffer("帥呆了",CharsetUtil.UTF_8));
        ByteBuf data = (ByteBuf) msg;
        log.info(data.toString(CharsetUtil.UTF_8));
        ctx.writeAndFlush(Unpooled.copiedBuffer("我是第一個handler", CharsetUtil.UTF_8));
        ctx.fireChannelRead(msg);
    }

    /**
     * 監聽讀取完畢事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        log.info("channelReadComplete");
    }

    /**
     * 監聽異常事件
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * 將channel註冊到EventLoop的Selector多路複用器中
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelRegistered");
    }

    /**
     * channel未註冊到EventLoop中
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelUnregistered");
    }

    /**
     * 有鏈接,變爲活躍狀態
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelActive");
    }

    /**
     * 沒有鏈接,非活躍狀態
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelInactive");
    }
}
@Slf4j
public class EchoServerHandler2 extends SimpleChannelInboundHandler {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info(data.toString(CharsetUtil.UTF_8));
        ctx.writeAndFlush(Unpooled.copiedBuffer("我是第二個handler", CharsetUtil.UTF_8));
    }

    /**
     * 監聽讀取完畢事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        log.info("channelReadComplete");
    }

    /**
     * 監聽異常事件
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * 將channel註冊到EventLoop的Selector多路複用器中
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelRegistered");
    }

    /**
     * channel未註冊到EventLoop中
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelUnregistered");
    }

    /**
     * 有鏈接,變爲活躍狀態
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelActive");
    }

    /**
     * 沒有鏈接,非活躍狀態
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelInactive");
    }
}

將EchoServerHandler2添加到管道中

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //歡迎線程組(其實就是一個線程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工做線程組(其實就是一個線程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty啓動對象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //將兩個線程組添加到啓動對象中
            serverBootstrap.group(bossGroup,workGroup)
                    //給啓動對象添加Socket管道(相似於NIO中的Channel或者BIO中的Socket,意思差很少)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的請求的等待隊列的最大長度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //不要延遲(NODELAY),要求高實時性,有數據時立刻發送,就將該選項設置爲true關閉Nagle算法;
                    //若是要減小發送次數,就設置爲false,會累積必定大小後再發送;
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個
                    //必需要實現的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //將咱們本身編寫的事件處理器添加到客戶端的鏈接管道中
                            //這裏爲socketChannel(客戶端鏈接管道),有別於NioServerSocketChannel
                            //這裏能夠添加不少的事件處理器,其實Netty有不少內置的事件處理器可使用
                            //pipeline()能夠理解成一個工廠生產流水線,而把事件處理器理解成在流水線上
                            //做業的工人,咱們能夠往一條流水線上投放不少的工人
                            socketChannel.pipeline().addLast(new EchoServerHandler());
                            socketChannel.pipeline().addLast(new EchoServerHandler2());
                        }
                    });
            log.info("服務器啓動中");
            //綁定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服務端監聽端口關閉
            channelFuture.channel().closeFuture().sync();
        } finally {
            //優雅關閉線程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

咱們能夠看到EchoServerHandler2是添加到EchoServerHandler後面的。

啓動EchoServer,打開telnet

admindeMacBook-Pro:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
kdfaf
我是第一個handler我是第二個handler^]
telnet> quit
Connection closed.

服務端的日誌爲

2019-09-29 22:42:42.626  INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelRegistered
2019-09-29 22:42:42.627  INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelActive
2019-09-29 22:42:48.108  INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : kdfaf

2019-09-29 22:42:48.114  INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler2   : kdfaf

2019-09-29 22:42:48.114  INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelReadComplete
2019-09-29 22:44:09.799  INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelReadComplete
2019-09-29 22:44:09.802  INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelInactive
2019-09-29 22:44:09.802  INFO 589 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelUnregistered

多個入站出站ChannelHandler的執行順序

一、通常的項目中,inboundHandler和outboundHandler有多個,在Pipeline中的執行順序?
                InboundHandler順序執行,OutboundHandler逆序執行
                問題:ch.pipeline().addLast(new InboundHandler1());
                     ch.pipeline().addLast(new OutboundHandler1());
                     ch.pipeline().addLast(new OutboundHandler2());
                     ch.pipeline().addLast(new InboundHandler2());
                或者:
                     ch.pipeline().addLast(new OutboundHandler1());
                     ch.pipeline().addLast(new OutboundHandler2());
                     ch.pipeline().addLast(new InboundHandler1());
                     ch.pipeline().addLast(new InboundHandler2());

如今咱們以實際代碼來驗證一下

@Slf4j
public class InboundHandler1 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info("InboundHandler1 channelRead 服務端收到數據:" + data.toString(CharsetUtil.UTF_8));
        //會傳遞到inbounhandler2中
        ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
@Slf4j
public class InboundHandler2 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info("InboundHandler2 channelRead 服務端收到數據:" + data.toString(CharsetUtil.UTF_8));
        //在此結束調用,不會在管道中繼續傳遞
        ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler2 " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
@Slf4j
public class OutboundHandler1 extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info("OutBoundHandler1 write : " + data.toString(CharsetUtil.UTF_8));
        ctx.write(Unpooled.copiedBuffer("OutBoundHandler1 " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
        ctx.flush();
    }
}
@Slf4j
public class OutboundHandler2 extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info("OutboundHaneler2 write : " + data.toString(CharsetUtil.UTF_8));
        ctx.write(Unpooled.copiedBuffer("OutboundHandler2 " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
        ctx.flush();
    }
}
@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //歡迎線程組(其實就是一個線程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工做線程組(其實就是一個線程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty啓動對象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //將兩個線程組添加到啓動對象中
            serverBootstrap.group(bossGroup,workGroup)
                    //給啓動對象添加Socket管道(相似於NIO中的Channel或者BIO中的Socket,意思差很少)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的請求的等待隊列的最大長度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //不要延遲(NODELAY),要求高實時性,有數據時立刻發送,就將該選項設置爲true關閉Nagle算法;
                    //若是要減小發送次數,就設置爲false,會累積必定大小後再發送;
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個
                    //必需要實現的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //將咱們本身編寫的事件處理器添加到客戶端的鏈接管道中
                            //這裏爲socketChannel(客戶端鏈接管道),有別於NioServerSocketChannel
                            //這裏能夠添加不少的事件處理器,其實Netty有不少內置的事件處理器可使用
                            //pipeline()能夠理解成一個工廠生產流水線,而把事件處理器理解成在流水線上
                            //做業的工人,咱們能夠往一條流水線上投放不少的工人
//                            socketChannel.pipeline().addLast(new EchoServerHandler());
//                            socketChannel.pipeline().addLast(new EchoServerHandler2());
                            socketChannel.pipeline().addLast(new InboundHandler1());
                            socketChannel.pipeline().addLast(new InboundHandler2());
                            socketChannel.pipeline().addLast(new OutboundHandler1());
                            socketChannel.pipeline().addLast(new OutboundHandler2());
                        }
                    });
            log.info("服務器啓動中");
            //綁定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服務端監聽端口關閉
            channelFuture.channel().closeFuture().sync();
        } finally {
            //優雅關閉線程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

啓動EchoServer,打開telnet

admindeMBP:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
dsfs
InboundHandler2 InboundHandler1 dsfs


服務端日誌

2019-10-02 09:51:44.865  INFO 716 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler1      : InboundHandler1 channelRead 服務端收到數據:dsfs

2019-10-02 09:51:44.869  INFO 716 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler2      : InboundHandler2 channelRead 服務端收到數據:InboundHandler1 dsfs

咱們能夠看到此時,事件在InboundHandler中就結束了,並無傳遞到OutboundHandler中。這是由於OutboundHandler在InboundHandler以後纔開始監聽,InboundHandler在處理中,並無監聽寫出站事件,因此不會執行到OutboundHandler之中的代碼。

可是若是把監聽事件的順序調整一下,在入站以前就開始監聽出站事件。

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //歡迎線程組(其實就是一個線程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工做線程組(其實就是一個線程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty啓動對象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //將兩個線程組添加到啓動對象中
            serverBootstrap.group(bossGroup,workGroup)
                    //給啓動對象添加Socket管道(相似於NIO中的Channel或者BIO中的Socket,意思差很少)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的請求的等待隊列的最大長度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //不要延遲(NODELAY),要求高實時性,有數據時立刻發送,就將該選項設置爲true關閉Nagle算法;
                    //若是要減小發送次數,就設置爲false,會累積必定大小後再發送;
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個
                    //必需要實現的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //將咱們本身編寫的事件處理器添加到客戶端的鏈接管道中
                            //這裏爲socketChannel(客戶端鏈接管道),有別於NioServerSocketChannel
                            //這裏能夠添加不少的事件處理器,其實Netty有不少內置的事件處理器可使用
                            //pipeline()能夠理解成一個工廠生產流水線,而把事件處理器理解成在流水線上
                            //做業的工人,咱們能夠往一條流水線上投放不少的工人
//                            socketChannel.pipeline().addLast(new EchoServerHandler());
//                            socketChannel.pipeline().addLast(new EchoServerHandler2());
                            socketChannel.pipeline().addLast(new OutboundHandler1());
                            socketChannel.pipeline().addLast(new OutboundHandler2());
                            socketChannel.pipeline().addLast(new InboundHandler1());
                            socketChannel.pipeline().addLast(new InboundHandler2());
                            
                        }
                    });
            log.info("服務器啓動中");
            //綁定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服務端監聽端口關閉
            channelFuture.channel().closeFuture().sync();
        } finally {
            //優雅關閉線程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

運行EchoServer,打開telnet

admindeMBP:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
sfdfg
OutBoundHandler1 OutboundHandler2 InboundHandler2 InboundHandler1 sfdfg
^]
telnet> quit
Connection closed.

從右到左爲依次執行順序,先是InboundHandler1->InboundHandler2->OutboundHandler2->OutboundHandler1

服務端日誌爲

2019-10-02 10:27:03.683  INFO 770 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler1      : InboundHandler1 channelRead 服務端收到數據:sfdfg

2019-10-02 10:27:03.686  INFO 770 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler2      : InboundHandler2 channelRead 服務端收到數據:InboundHandler1 sfdfg

2019-10-02 10:27:03.686  INFO 770 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2     : OutboundHaneler2 write : InboundHandler2 InboundHandler1 sfdfg

2019-10-02 10:27:03.686  INFO 770 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1     : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 InboundHandler1 sfdfg

由於沒有監聽活躍,斷開事件,因此不會打印相關日誌。這裏之因此OutboundHandler會執行是由於在入站以前就開始監聽寫出事件,而InboundHandler會先執行是由於只有接收到客戶端的消息的時候纔會進行寫出操做,這個時候纔會被OutboundHandler監聽到,進行相關操做,可是因爲OutboundHandler是從後往前執行,因此會先執行OutboundHandler2,再執行OutboundHandler1。而整個數據傳輸是貫穿管道始終的。

執行順序是:
                    InboundHandler1 channelRead
                    InboundHandler2 channelRead
                    OutboundHandler2 write
                    OutboundHandler1 write

 

如今把InboundHandler1提到最前又是什麼狀況呢?

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //歡迎線程組(其實就是一個線程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工做線程組(其實就是一個線程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty啓動對象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //將兩個線程組添加到啓動對象中
            serverBootstrap.group(bossGroup,workGroup)
                    //給啓動對象添加Socket管道(相似於NIO中的Channel或者BIO中的Socket,意思差很少)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的請求的等待隊列的最大長度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //不要延遲(NODELAY),要求高實時性,有數據時立刻發送,就將該選項設置爲true關閉Nagle算法;
                    //若是要減小發送次數,就設置爲false,會累積必定大小後再發送;
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個
                    //必需要實現的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //將咱們本身編寫的事件處理器添加到客戶端的鏈接管道中
                            //這裏爲socketChannel(客戶端鏈接管道),有別於NioServerSocketChannel
                            //這裏能夠添加不少的事件處理器,其實Netty有不少內置的事件處理器可使用
                            //pipeline()能夠理解成一個工廠生產流水線,而把事件處理器理解成在流水線上
                            //做業的工人,咱們能夠往一條流水線上投放不少的工人
//                            socketChannel.pipeline().addLast(new EchoServerHandler());
//                            socketChannel.pipeline().addLast(new EchoServerHandler2());
                            socketChannel.pipeline().addLast(new InboundHandler1());
                            socketChannel.pipeline().addLast(new OutboundHandler1());
                            socketChannel.pipeline().addLast(new OutboundHandler2());
                            socketChannel.pipeline().addLast(new InboundHandler2());

                        }
                    });
            log.info("服務器啓動中");
            //綁定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服務端監聽端口關閉
            channelFuture.channel().closeFuture().sync();
        } finally {
            //優雅關閉線程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

由分析可知,OutboundHandler出站是不會監聽InboudHandler1的寫出事件的,但能夠監聽到InboundHandler2的寫出事件。

咱們修改一下兩個InboundHandler來加以驗證。

@Slf4j
public class InboundHandler1 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info("InboundHandler1 channelRead 服務端收到數據:" + data.toString(CharsetUtil.UTF_8));
        
        ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
        //會傳遞到inbounhandler2中
        ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
@Slf4j
public class InboundHandler2 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info("InboundHandler2 channelRead 服務端收到數據:" + data.toString(CharsetUtil.UTF_8));
        //在此結束調用,不會在管道中繼續傳遞
        ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler2 alone \n".getBytes()));
        ctx.writeAndFlush(Unpooled.copiedBuffer("InboundHandler2 " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

重啓EchoServer,打開telnet

admindeMBP:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
sdfljioldf
InboundHandler1 sdfljioldf    //此處InboundHandler1只是簡單的輸出了,沒有被OutboundHandler監聽。
OutBoundHandler1 OutboundHandler2 InboundHandler2 alone  //此處InboundHandler2被OutboundHandler監聽,因此會追加輸出OutBoundHandler1 OutboundHandler2
OutBoundHandler1 OutboundHandler2 InboundHandler2 InboundHandler1 sdfljioldf   //此處也是InboundHandler2被OutboundHandler監聽輸出的。

服務端日誌

2019-10-02 11:04:09.345  INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler1      : InboundHandler1 channelRead 服務端收到數據:sdfljioldf

2019-10-02 11:04:09.351  INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler2      : InboundHandler2 channelRead 服務端收到數據:InboundHandler1 sdfljioldf

2019-10-02 11:04:09.351  INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2     : OutboundHaneler2 write : InboundHandler2 alone

2019-10-02 11:04:09.351  INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1     : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 alone

2019-10-02 11:04:09.351  INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2     : OutboundHaneler2 write : InboundHandler2 InboundHandler1 sdfljioldf

2019-10-02 11:04:09.351  INFO 827 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1     : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 InboundHandler1 sdfljioldf

跟咱們分析的同樣。

但若是咱們修改一下InboundHandler1的代碼以下

@Slf4j
public class InboundHandler1 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info("InboundHandler1 channelRead 服務端收到數據:" + data.toString(CharsetUtil.UTF_8));

        ctx.channel().writeAndFlush(Unpooled.copiedBuffer("InboundHandler1 alone " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
        //會傳遞到inbounhandler2中
        ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

或者

@Slf4j
public class InboundHandler1 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info("InboundHandler1 channelRead 服務端收到數據:" + data.toString(CharsetUtil.UTF_8));

        ctx.pipeline().writeAndFlush(Unpooled.copiedBuffer("InboundHandler1 alone " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
        //會傳遞到inbounhandler2中
        ctx.fireChannelRead(Unpooled.copiedBuffer("InboundHandler1 " + data.toString(CharsetUtil.UTF_8),
                CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

如今不管InboundHandler1的位置放前、放後,是否被OutboundHandler監聽,它都會流通整個管道。

admindeMBP:~ admin$ telnet 127.0.0.1 10101
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
ljlfrg
OutBoundHandler1 OutboundHandler2 InboundHandler1 alone ljlfrg
OutBoundHandler1 OutboundHandler2 InboundHandler2 alone
OutBoundHandler1 OutboundHandler2 InboundHandler2 InboundHandler1 ljlfrg

服務端日誌爲

2019-10-02 11:18:10.761  INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler1      : InboundHandler1 channelRead 服務端收到數據:ljlfrg

2019-10-02 11:18:10.764  INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2     : OutboundHaneler2 write : InboundHandler1 alone ljlfrg

2019-10-02 11:18:10.764  INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1     : OutBoundHandler1 write : OutboundHandler2 InboundHandler1 alone ljlfrg

2019-10-02 11:18:10.767  INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.InboundHandler2      : InboundHandler2 channelRead 服務端收到數據:InboundHandler1 ljlfrg

2019-10-02 11:18:10.768  INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2     : OutboundHaneler2 write : InboundHandler2 alone

2019-10-02 11:18:10.768  INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1     : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 alone

2019-10-02 11:18:10.768  INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler2     : OutboundHaneler2 write : InboundHandler2 InboundHandler1 ljlfrg

2019-10-02 11:18:10.768  INFO 837 --- [ntLoopGroup-3-1] c.g.websocket.netty.OutboundHandler1     : OutBoundHandler1 write : OutboundHandler2 InboundHandler2 InboundHandler1 ljlfrg

 結論:
                1)InboundHandler順序執行,OutboundHandler逆序執行
                
                2)InboundHandler之間傳遞數據,經過ctx.fireChannelRead(msg)
                
                3)InboundHandler經過ctx.write(msg),則會傳遞到outboundHandler
                
                4) 使用ctx.write(msg)傳遞消息,Inbound須要放在結尾,在Outbound以後,否則outboundhandler會不執行;
                    可是使用channel.write(msg)、pipline.write(msg)狀況會不一致,都會執行

                5) outBound和Inbound誰先執行,針對客戶端和服務端而言,客戶端是發起請求再接受數據,先outbound再inbound,服務端則相反

七、Netty異步操做模塊ChannelFuture

Netty中的全部I/O操做都是異步的,這意味着任何I/O調用都會當即返回,而ChannelFuture會提供有關的信息I/O操做的結果或狀態。

ChannelFuture的使用主要是在EchoServer中

//綁定端口,同步等待成功
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//阻塞,等待服務端監聽端口關閉
channelFuture.channel().closeFuture().sync();

  
        1)ChannelFuture狀態:
            未完成:當I/O操做開始時,將建立一個新的對象,新的最初是未完成的 - 它既沒有成功,也沒有被取消,由於I/O操做還沒有完成。 

            已完成:當I/O操做完成,無論是成功、失敗仍是取消,Future都是標記爲已完成的, 失敗的時候也有具體的信息,例如緣由失敗,但請注意,即便失敗和取消屬於完成狀態。

        ChannelFuture的四種狀態

*                                      +---------------------------+
*                                      | Completed successfully    |
*                                      +---------------------------+
*                                 +---->      isDone() = true      |
* +--------------------------+    |    |   isSuccess() = true      |
* |        Uncompleted       |    |    +===========================+
* +--------------------------+    |    | Completed with failure    |
* |      isDone() = false    |    |    +---------------------------+
* |   isSuccess() = false    |----+---->      isDone() = true      |
* | isCancelled() = false    |    |    |       cause() = non-null  |
* |       cause() = null     |    |    +===========================+
* +--------------------------+    |    | Completed by cancellation |
*                                 |    +---------------------------+
*                                 +---->      isDone() = true      |
*                                      | isCancelled() = true      |
*                                      +---------------------------+

            注意:不要在IO線程內調用future對象的sync或者await方法
                不能在channelHandler中調用sync或者await方法

        2)ChannelPromise:繼承於ChannelFuture,進一步拓展用於設置IO操做的結果

其中的繼承關係圖如上所示,第一個Future是JDK自帶的,第二個Future是netty中增長的。關於future通常的使用方法能夠參考Fork/Join框架原理和使用探祕

Netty編寫的網絡數據傳輸中的編碼和解碼

前面說的:高性能RPC框架的3個要素:IO模型、數據協議、線程模型

            最開始接觸的編碼碼:java序列化/反序列化(就是編解碼)、url編碼、base64編解碼

            爲啥jdk有編解碼,還要netty本身開發編解碼?
                java自帶序列化的缺點
                    1)沒法跨語言
                    2) 序列化後的碼流太大,也就是數據包太大
                    3) 序列化和反序列化性能比較差
            
            業界裏面也有其餘編碼框架: google的 protobuf(PB)、Facebook的Trift、Jboss的Marshalling、Kyro等,關於Kyro的使用方法能夠參考淺析克隆

 Netty裏面的編解碼:
                解碼器:負責處理「入站 InboundHandler」數據
                編碼器:負責「出站 OutboundHandler」 數據

                Netty裏面提供默認的編解碼器,也支持自定義編解碼器
                Encoder:編碼器
                Decoder:解碼器
                Codec:編解碼器

Netty的解碼器Decoder和使用場景

        Decoder對應的就是ChannelInboundHandler,主要就是字節數組轉換爲消息對象,在咱們以前的樣例中,都是處理一些簡單的字符串來進行消息傳遞,但若是咱們須要轉換的是Java的對象的話,則須要使用到Decoder。

        主要是兩個方法
            decode (經常使用)
            decodeLast (用於最後的幾個字節處理,也就是channel關閉的時候,產生的最後一個消息)
        抽象解碼器
            1)ByteToMessageDecoder
                用於將字節轉爲消息,須要檢查緩衝區是否有足夠的字節,經過源碼可知,ByteToMessageDecoder實際上就是一個ChannelInboundHandler。

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter

            2)ReplayingDecoder
                繼承ByteToMessageDecoder,不須要檢查緩衝區是否有足夠的字節,可是ReplayingDecoder速度略慢於ByteToMessageDecoder,不是全部的ByteBuf都支持


            選擇:項目複雜性高則使用ReplayingDecoder,不然使用 ByteToMessageDecoder

解碼器具體的實現,用的比較多的是(更可能是爲了解決TCP底層的粘包和拆包問題)

            DelimiterBasedFrameDecoder: 指定消息分隔符的解碼器
            LineBasedFrameDecoder: 以換行符爲結束標誌的解碼器
            FixedLengthFrameDecoder:固定長度解碼器
            LengthFieldBasedFrameDecoder:message = header+body, 基於長度解碼的通用解碼器
            StringDecoder:文本解碼器,將接收到的對象轉化爲字符串,通常會與上面的進行配合,而後在後面添加業務handle

Netty編碼器Encoder

        Encoder對應的就是ChannelOutboundHandler,消息對象轉換爲字節數組

        Netty自己未提供和解碼同樣的編碼器,是由於場景不一樣,二者非對等的


        1)MessageToByteEncoder
            消息轉爲字節數組,調用write方法,會先判斷當前編碼器是否支持須要發送的消息類型,若是不支持,則透傳;其自己其實就是一個ChannelOutboundHandler。

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter

        2)MessageToMessageEncoder
            用於從一種消息編碼爲另一種消息(例如POJO到POJO)

Netty編解碼器類Codec

組合解碼器和編碼器,以此提供對於字節和消息都相同的操做(通常不經常使用)
        
        優勢:成對出現,編解碼都是在一個類裏面完成

        缺點:耦合在一塊兒,拓展性不佳


        Codec:組合編解碼
            1)ByteToMessageCodec

            2)MessageToMessageCodec

        decoder:解碼
             1)ByteToMessageDecoder

             2)MessageToMessageDecoder
        
        encoder:編碼
             1)ByteToMessageEncoder

            2)MessageToMessageEncoder

什麼是TCP粘包拆包

        1)TCP拆包: 一個完整的包可能會被TCP拆分爲多個包進行發送
        2)TCP粘包: 把多個小的包封裝成一個大的數據包發送, client發送的若干數據包 Server接收時粘成一包

       發送方和接收方均可能出現這個緣由
        
        發送方的緣由:TCP默認會使用Nagle算法
        
        接收方的緣由: TCP接收到數據放置緩存中,應用程序從緩存中讀取
       
       UDP: 是沒有粘包和拆包的問題,有邊界協議

二、TCP半包讀寫常看法決方案
        簡介:講解TCP半包讀寫常見的解決辦法

             發送方:能夠關閉Nagle算法
             接受方: TCP是無界的數據流,並無處理粘包現象的機制, 且協議自己沒法避免粘包,半包讀寫的發生須要在應用層進行處理
                    應用層解決半包讀寫的辦法
                        1)設置定長消息 (10字符)
                            xdclass000xdclass000xdclass000xdclass000
                        2)設置消息的邊界 ($$ 切割)
                            sdfafwefqwefwe$$dsafadfadsfwqehidwuehfiw$$879329832r89qweew$$

                        3)使用帶消息頭的協議,消息頭存儲消息開始標識及消息的長度信息
                                Header+Body

 三、Netty自帶解決TCP半包讀寫方案

            DelimiterBasedFrameDecoder: 指定消息分隔符的解碼器
            LineBasedFrameDecoder: 以換行符爲結束標誌的解碼器
            FixedLengthFrameDecoder:固定長度解碼器
            LengthFieldBasedFrameDecoder:message = header+body, 基於長度解碼的通用解碼器

如今咱們來作一個實驗,若是不使用解碼器,會產生什麼樣的效果。

服務端的入站事件處理器

@Slf4j
public class ServerHandler extends ChannelInboundHandlerAdapter {
    private int counter;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        byte[] bytes = new byte[data.readableBytes()];
        //將收到到字節流放入內存字節數組
        data.readBytes(bytes);
        //將字節數組轉成字符串,並截取該字符串從0到換行符(不包含換行符)到子串
        //System.getProperty("line.separator")爲獲取操做系統的換行符,每種操做系統可能各不相同
        String body = new String(bytes, CharsetUtil.UTF_8)
                .substring(0,bytes.length - System.getProperty("line.separator").length());
        log.info("服務端收到消息內容爲:" + body + "收到消息次數:" + ++counter);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

客戶端入站事件處理器

public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf msg = null;
        //定義一段包含換行符的字符串,並轉化爲字節數組
        byte[] req = ("guanjian.net" + System.getProperty("line.separator")).getBytes();
        //連續發送10條該字符串到服務端
        for (int i = 0;i < 10;i++) {
            //申請一段內存緩衝區
            msg = Unpooled.buffer(req.length);
            //將字節數組寫入緩衝區
            msg.writeBytes(req);
            //發送該字符串到服務端
            ctx.writeAndFlush(msg);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

分別啓動服務端和客戶端

咱們能夠看到服務端日誌爲

2019-10-05 08:41:11.733  INFO 614 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net
guanjian.net收到消息次數:1

雖然客戶端是分了10次發送到數據,可是服務端倒是隻接收了一次,收到消息次數1,這明顯是一次粘包。

如今咱們給客戶端加上.option(ChannelOption.TCP_NODELAY,true)來看一下是什麼狀況

@AllArgsConstructor
public class EchoClient {
    private String host;
    private int port;

    public void run() throws InterruptedException {
        //客戶端處理線程組(其實就是一個線程池)
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            //客戶端netty啓動對象
            Bootstrap bootstrap = new Bootstrap();
            //將客戶端線程組添加到啓動對象中
            bootstrap.group(group)
                    //給啓動對象添加Socket管道
                    .channel(NioSocketChannel.class)
                    //主動鏈接到遠程服務器IP端口
                    .remoteAddress(new InetSocketAddress(host,port))
                    .option(ChannelOption.TCP_NODELAY,true)
                    //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個
                    //必需要實現的抽象方法
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new ClientHandler());
                        }
                    });
            //鏈接到服務端,connect是異步鏈接,在調用同步等待sync,等待鏈接成功
            ChannelFuture channelFuture = bootstrap.connect().sync();
            //阻塞直到客戶端通道關閉
            channelFuture.channel().closeFuture().sync();
        } finally {
            //優雅關閉線程池
            group.shutdownGracefully();
        }
    }
}

服務端日誌爲

2019-10-05 08:45:52.654  INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.net收到消息次數:1
2019-10-05 08:45:52.655  INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.net
guanjian.net收到消息次數:2
2019-10-05 08:45:52.655  INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.net
guanjian.net收到消息次數:3
2019-10-05 08:45:52.655  INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.net
guanjian.net收到消息次數:4
2019-10-05 08:45:52.655  INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.net收到消息次數:5
2019-10-05 08:45:52.656  INFO 614 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.net
guanjian.net收到消息次數:6

咱們能夠看到,雖然加了.option(ChannelOption.TCP_NODELAY,true),但並不能保證它不產生粘包,有些包包含了兩條字符串,收到的消息也就不是10次了。

如今咱們把換行解碼器LineBasedFrameDecoder放入

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //歡迎線程組(其實就是一個線程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工做線程組(其實就是一個線程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty啓動對象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //將兩個線程組添加到啓動對象中
            serverBootstrap.group(bossGroup,workGroup)
                    //給啓動對象添加Socket管道(相似於NIO中的Channel或者BIO中的Socket,意思差很少)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的請求的等待隊列的最大長度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //不要延遲(NODELAY),要求高實時性,有數據時立刻發送,就將該選項設置爲true關閉Nagle算法;
                    //若是要減小發送次數,就設置爲false,會累積必定大小後再發送;
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個
                    //必需要實現的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //將咱們本身編寫的事件處理器添加到客戶端的鏈接管道中
                            //這裏爲socketChannel(客戶端鏈接管道),有別於NioServerSocketChannel
                            //這裏能夠添加不少的事件處理器,其實Netty有不少內置的事件處理器可使用
                            //pipeline()能夠理解成一個工廠生產流水線,而把事件處理器理解成在流水線上
                            //做業的工人,咱們能夠往一條流水線上投放不少的工人
                            //這個1024是在1024個字節內去尋找換行符,若是在1024個字節內沒有找到換行符,就會報錯
                            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            socketChannel.pipeline().addLast(new ServerHandler());

                        }
                    });
            log.info("服務器啓動中");
            //綁定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服務端監聽端口關閉
            channelFuture.channel().closeFuture().sync();
        } finally {
            //優雅關閉線程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

再次運行,服務端日誌

2019-10-05 09:04:15.693  INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.ne收到消息次數:1
2019-10-05 09:04:15.693  INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.ne收到消息次數:2
2019-10-05 09:04:15.693  INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.ne收到消息次數:3
2019-10-05 09:04:15.693  INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.ne收到消息次數:4
2019-10-05 09:04:15.693  INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.ne收到消息次數:5
2019-10-05 09:04:15.693  INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.ne收到消息次數:6
2019-10-05 09:04:15.693  INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.ne收到消息次數:7
2019-10-05 09:04:15.693  INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.ne收到消息次數:8
2019-10-05 09:04:15.693  INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.ne收到消息次數:9
2019-10-05 09:04:15.694  INFO 696 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.ne收到消息次數:10

LineBasedFrameDecoder是繼承於ByteToMessageDecoder,是一個ChannelInboundHandler

public class LineBasedFrameDecoder extends ByteToMessageDecoder

由上面可見,粘包通過LineBasedFrameDecoder處理,再逐條發往下一個ChannelInboundHandler,而不是從客戶端逐條發送過來的。

若是咱們以爲ServerHandler的channelRead方法太麻煩了,還要由字節數組轉成字符串,那咱們直接將收到的信息強制轉成字符串會怎麼樣呢?

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    String body = (String) msg;
    log.info("服務端收到消息內容爲:" + body + "收到消息次數:" + ++counter);
}

啓動後服務端報錯以下(部分)

java.lang.ClassCastException: io.netty.buffer.PooledSlicedByteBuf cannot be cast to java.lang.String
    at com.guanjian.websocket.netty.packet.ServerHandler.channelRead(ServerHandler.java:29)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:297)

說明此處沒法將收到的信息直接強制轉換成字符串,這個時候咱們能夠加入字符串解碼器StringDecoder.

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //歡迎線程組(其實就是一個線程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工做線程組(其實就是一個線程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty啓動對象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //將兩個線程組添加到啓動對象中
            serverBootstrap.group(bossGroup,workGroup)
                    //給啓動對象添加Socket管道(相似於NIO中的Channel或者BIO中的Socket,意思差很少)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的請求的等待隊列的最大長度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //不要延遲(NODELAY),要求高實時性,有數據時立刻發送,就將該選項設置爲true關閉Nagle算法;
                    //若是要減小發送次數,就設置爲false,會累積必定大小後再發送;
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個
                    //必需要實現的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //將咱們本身編寫的事件處理器添加到客戶端的鏈接管道中
                            //這裏爲socketChannel(客戶端鏈接管道),有別於NioServerSocketChannel
                            //這裏能夠添加不少的事件處理器,其實Netty有不少內置的事件處理器可使用
                            //pipeline()能夠理解成一個工廠生產流水線,而把事件處理器理解成在流水線上
                            //做業的工人,咱們能夠往一條流水線上投放不少的工人
                            //這個1024是在1024個字節內去尋找換行符,若是在1024個字節內沒有找到換行符,就會報錯
                            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            //字符串解碼器
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new ServerHandler());

                        }
                    });
            log.info("服務器啓動中");
            //綁定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服務端監聽端口關閉
            channelFuture.channel().closeFuture().sync();
        } finally {
            //優雅關閉線程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

啓動運行,服務端日誌爲

2019-10-05 10:32:13.774  INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.net收到消息次數:1
2019-10-05 10:32:13.774  INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.net收到消息次數:2
2019-10-05 10:32:13.774  INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.net收到消息次數:3
2019-10-05 10:32:13.774  INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.net收到消息次數:4
2019-10-05 10:32:13.774  INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.net收到消息次數:5
2019-10-05 10:32:13.775  INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.net收到消息次數:6
2019-10-05 10:32:13.775  INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.net收到消息次數:7
2019-10-05 10:32:13.775  INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.net收到消息次數:8
2019-10-05 10:32:13.775  INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.net收到消息次數:9
2019-10-05 10:32:13.775  INFO 739 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:guanjian.net收到消息次數:10

使用解碼器LineBasedFrameDecoder解決半包讀寫問題
            
            1)LineBaseFrameDecoder 以換行符爲結束標誌的解碼器 ,構造函數裏面的數字表示最長遍歷的幀數

            2)StringDecoder解碼器將對象轉成字符串

如今咱們修改一下客戶端處理器ClientHandler。

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    String message = "Starting EchoclientApplication &_" +
            "on admindeMBP.lan with PID 741 &_" +
            "(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_" +
            "/Users/admin/Downloads/nettyecho)";
    ByteBuf buf = null;
    //申請一段內存緩衝區
    buf = Unpooled.buffer(message.getBytes().length);
    //將字節數組寫入緩衝區
    buf.writeBytes(message.getBytes());
    //發送字節數組到服務端
    ctx.writeAndFlush(buf);
}

咱們能夠看到message字符串中都帶有&_字符,咱們將在服務端以該字符爲分隔符進行解碼。

修改EchoServer到代碼以下

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //歡迎線程組(其實就是一個線程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工做線程組(其實就是一個線程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty啓動對象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //將兩個線程組添加到啓動對象中
            serverBootstrap.group(bossGroup,workGroup)
                    //給啓動對象添加Socket管道(相似於NIO中的Channel或者BIO中的Socket,意思差很少)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的請求的等待隊列的最大長度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //不要延遲(NODELAY),要求高實時性,有數據時立刻發送,就將該選項設置爲true關閉Nagle算法;
                    //若是要減小發送次數,就設置爲false,會累積必定大小後再發送;
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個
                    //必需要實現的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //將咱們本身編寫的事件處理器添加到客戶端的鏈接管道中
                            //這裏爲socketChannel(客戶端鏈接管道),有別於NioServerSocketChannel
                            //這裏能夠添加不少的事件處理器,其實Netty有不少內置的事件處理器可使用
                            //pipeline()能夠理解成一個工廠生產流水線,而把事件處理器理解成在流水線上
                            //做業的工人,咱們能夠往一條流水線上投放不少的工人
                            //使用指定消息分隔符解碼器進行解碼
                            //1024的意思是在1024個字節內查找&_,若是找不到就會拋出異常
                            ByteBuf delimiter = Unpooled.copiedBuffer("&_".getBytes());
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
                            //字符串解碼器
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new ServerHandler());

                        }
                    });
            log.info("服務器啓動中");
            //綁定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服務端監聽端口關閉
            channelFuture.channel().closeFuture().sync();
        } finally {
            //優雅關閉線程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

分別啓動服務端,客戶端。服務端到日誌以下

2019-10-06 12:55:36.078  INFO 894 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:Starting EchoclientApplication 收到消息次數:1
2019-10-06 12:55:36.078  INFO 894 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:on admindeMBP.lan with PID 741 收到消息次數:2
2019-10-06 12:55:36.079  INFO 894 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:(/Users/admin/Downloads/nettyecho/target/classes started by admin in 收到消息次數:3

對比message

"Starting EchoclientApplication &_" +
        "on admindeMBP.lan with PID 741 &_" +
        "(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_" +
        "/Users/admin/Downloads/nettyecho)"

咱們會發現最後一句/Users/admin/Downloads/nettyecho)被丟棄了,因此要所有拿到咱們須要的消息,咱們須要在最後一段字符串中加入&_

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    String message = "Starting EchoclientApplication &_" +
            "on admindeMBP.lan with PID 741 &_" +
            "(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_" +
            "/Users/admin/Downloads/nettyecho)&_";
    ByteBuf buf = null;
    //申請一段內存緩衝區
    buf = Unpooled.buffer(message.getBytes().length);
    //將字節數組寫入緩衝區
    buf.writeBytes(message.getBytes());
    //發送字節數組到服務端
    ctx.writeAndFlush(buf);
}

再次啓動客戶端,服務端日誌爲

2019-10-06 13:18:51.705  INFO 894 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:Starting EchoclientApplication 收到消息次數:1
2019-10-06 13:18:51.705  INFO 894 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:on admindeMBP.lan with PID 741 收到消息次數:2
2019-10-06 13:18:51.705  INFO 894 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:(/Users/admin/Downloads/nettyecho/target/classes started by admin in 收到消息次數:3
2019-10-06 13:18:51.705  INFO 894 --- [ntLoopGroup-3-2] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:/Users/admin/Downloads/nettyecho)收到消息次數:4

其實DelimiterBasedFrameDecoder有不少個構造器。咱們這裏使用的兩參構造器其實調用到是一個三參構造器。

public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf delimiter) {
    this(maxFrameLength, true, delimiter);
}
public DelimiterBasedFrameDecoder(
        int maxFrameLength, boolean stripDelimiter, ByteBuf delimiter) {
    this(maxFrameLength, stripDelimiter, true, delimiter);
}

而三參構造器實際上是調用了一個四參構造器

public DelimiterBasedFrameDecoder(
        int maxFrameLength, boolean stripDelimiter, boolean failFast,
        ByteBuf delimiter) {
    this(maxFrameLength, stripDelimiter, failFast, new ByteBuf[] {
            delimiter.slice(delimiter.readerIndex(), delimiter.readableBytes())});
}

咱們來講明一下這個四參構造器每一個參數到含義。

             maxFrameLength:
                表示一行最大的長度,若是超過這個長度依然沒有檢測自定義分隔符,將會拋出TooLongFrameException

            stripDelimiter:
                解碼後的消息是否去除掉分隔符(true去掉分隔符,flase保留分隔符)

            failFast:
                若是爲true,則超出maxLength後當即拋出TooLongFrameException,不進行繼續解碼
                若是爲false,則等到完整的消息被解碼後,再拋出TooLongFrameException異常

            delimiters:
                分隔符,ByteBuf類型

如今咱們來看一下第二個參數到含義。

修改EchoServer以下

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //歡迎線程組(其實就是一個線程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工做線程組(其實就是一個線程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty啓動對象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //將兩個線程組添加到啓動對象中
            serverBootstrap.group(bossGroup,workGroup)
                    //給啓動對象添加Socket管道(相似於NIO中的Channel或者BIO中的Socket,意思差很少)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的請求的等待隊列的最大長度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //不要延遲(NODELAY),要求高實時性,有數據時立刻發送,就將該選項設置爲true關閉Nagle算法;
                    //若是要減小發送次數,就設置爲false,會累積必定大小後再發送;
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個
                    //必需要實現的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //將咱們本身編寫的事件處理器添加到客戶端的鏈接管道中
                            //這裏爲socketChannel(客戶端鏈接管道),有別於NioServerSocketChannel
                            //這裏能夠添加不少的事件處理器,其實Netty有不少內置的事件處理器可使用
                            //pipeline()能夠理解成一個工廠生產流水線,而把事件處理器理解成在流水線上
                            //做業的工人,咱們能夠往一條流水線上投放不少的工人
                            //使用指定消息分隔符解碼器進行解碼,1024爲在1024個字節內查找分隔符(能夠本身任意定義),若是
                            //找不到會拋出異常
                            ByteBuf delimiter = Unpooled.copiedBuffer("&_".getBytes());
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,false,true,delimiter));
                            //字符串解碼器
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new ServerHandler());

                        }
                    });
            log.info("服務器啓動中");
            //綁定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服務端監聽端口關閉
            channelFuture.channel().closeFuture().sync();
        } finally {
            //優雅關閉線程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

從新啓動服務端,客戶端。服務端到日誌以下

2019-10-06 13:33:24.806  INFO 911 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:Starting EchoclientApplication &_收到消息次數:1
2019-10-06 13:33:24.807  INFO 911 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:on admindeMBP.lan with PID 741 &_收到消息次數:2
2019-10-06 13:33:24.807  INFO 911 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_收到消息次數:3
2019-10-06 13:33:24.808  INFO 911 --- [ntLoopGroup-3-1] c.g.w.netty.packet.ServerHandler         : 服務端收到消息內容爲:/Users/admin/Downloads/nettyecho)&_收到消息次數:4

自定義長度半包讀寫器LengthFieldBasedFrameDecoder

LengthFieldBasedFrameDecoder也有不少構造器,通常咱們使用的是一個五參構造器

public LengthFieldBasedFrameDecoder(
        int maxFrameLength,
        int lengthFieldOffset, int lengthFieldLength,
        int lengthAdjustment, int initialBytesToStrip) {
    this(
            maxFrameLength,
            lengthFieldOffset, lengthFieldLength, lengthAdjustment,
            initialBytesToStrip, true);
}

固然它調用的是一個六參構造器。

public LengthFieldBasedFrameDecoder(
        int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
        int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
    this(
            ByteOrder.BIG_ENDIAN, maxFrameLength, lengthFieldOffset, lengthFieldLength,
            lengthAdjustment, initialBytesToStrip, failFast);
}

這六個參數的含義以下

        maxFrameLength 數據包的最大長度
        lengthFieldOffset 長度字段的偏移位,長度字段開始的地方,意思是跳過指定長度個字節以後的纔是消息體字段

        lengthFieldLength 長度字段佔的字節數, 幀數據長度的字段自己的長度

        lengthAdjustment
        通常 Header + Body,添加到長度字段的補償值,若是爲負數,開發人員認爲這個 Header的長度字段是整個消息包的長度,則Netty應該減去對應的數字
        initialBytesToStrip 從解碼幀中第一次去除的字節數, 獲取完一個完整的數據包以後,忽略前面的指定位數的長度字節,應用解碼器拿到的就是不帶長度域的數據包
        failFast 是否快速失敗

六參構造器調用到是一個七參構造器(不討論)。

如今咱們來設置一個存儲消息內容到消息類。(服務端和客戶端相同)

public class CCMessageHeader {
    @Getter
    @Setter
    private byte[] messageFlag = new byte[2]; //消息長度偏移量
    @Getter
    @Setter
    private int length; //消息長度
    @Getter
    @Setter
    private int type; //消息類型
    @Getter
    private String data; //消息內容

    public CCMessageHeader() {
        //170,這兩個數沒有實際意義,只用來描述消息長度偏移量而填充進去的
        messageFlag[0] = (byte) 0xaa;
        //187
        messageFlag[1] = (byte) 0xbb;
    }

    public void setData(String data) {
        this.data = data;
        //消息體的長度爲字符串data的長度加4,4爲type整形的長度
        this.length = data.length() + 4;
    }
}

服務端入站處理器

@Slf4j
public class ServerHandler extends ChannelInboundHandlerAdapter {
    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf body = (ByteBuf) msg;
        //若是字節流到可讀字節數小於等於0,轉給下一個InboundHandler處理
        if (body.readableBytes() <= 0) {
            ctx.fireChannelRead(msg);
        }
        //初始化一個接收消息對象
        CCMessageHeader recHd = new CCMessageHeader();
        //字節流讀取到的第一個整形賦給nLength
        int nLength = body.readInt();
        //字節流讀取到到第二個整形賦給nType
        int nType = body.readInt();
        //獲取字節流的可讀字節數,並分配一個該大小的字節數組
        int nDataSize = body.readableBytes();
        byte[] aa = new byte[nDataSize];
        //將字節流讀入字節數組
        body.readBytes(aa);
        //將該字節數組轉成字符串
        String myMsg = new String(aa, CharsetUtil.UTF_8);
        log.info("收到 " + ++counter + "次消息:[" + myMsg + "],類型爲[" + nType + "]");

        //初始化一個發送消息對象
        CCMessageHeader hd = new CCMessageHeader();
        hd.setType(2);
        hd.setData("server data...");
        //申請一段直接緩衝空間
        ByteBuf echo = Unpooled.directBuffer();
        //將該發送消息對象的各屬性寫入到緩衝空間中
        echo.writeBytes(hd.getMessageFlag());
        echo.writeInt(hd.getLength());
        echo.writeInt(hd.getType());
        echo.writeCharSequence(hd.getData(),CharsetUtil.UTF_8);
        //將該發送消息對象以字節流到形式發送到客戶端
        ctx.writeAndFlush(echo);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

EchoServer以下

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //歡迎線程組(其實就是一個線程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工做線程組(其實就是一個線程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty啓動對象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //將兩個線程組添加到啓動對象中
            serverBootstrap.group(bossGroup,workGroup)
                    //給啓動對象添加Socket管道(相似於NIO中的Channel或者BIO中的Socket,意思差很少)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的請求的等待隊列的最大長度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //不要延遲(NODELAY),要求高實時性,有數據時立刻發送,就將該選項設置爲true關閉Nagle算法;
                    //若是要減小發送次數,就設置爲false,會累積必定大小後再發送;
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個
                    //必需要實現的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //將咱們本身編寫的事件處理器添加到客戶端的鏈接管道中
                            //這裏爲socketChannel(客戶端鏈接管道),有別於NioServerSocketChannel
                            //這裏能夠添加不少的事件處理器,其實Netty有不少內置的事件處理器可使用
                            //pipeline()能夠理解成一個工廠生產流水線,而把事件處理器理解成在流水線上
                            //做業的工人,咱們能夠往一條流水線上投放不少的工人
                            //65535表示對自定義長度解碼器對最大處理長度,
                            // 第二個2表示長度信息從第二個字節後開始獲取,
                            //其實這個2是messageFlag的長度,即獲取長度字段的偏移量。
                            //4表示長度字段佔4個字節,即private int length的字節數,一個整數佔4個字節
                            //0表示添加到長度字段的補償值,這裏不須要補償
                            //最後一個2表示獲取咱們的消息體,要去掉2個長度字段的字節數,由於除了length字段還有一個
                            //type的整形字段,因此是2才能拿到data字段,即顯示消息體字段,但其實消息體包含了type字段的
                            socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,2,4,0,2));
                            socketChannel.pipeline().addLast(new ServerHandler());

                        }
                    });
            log.info("服務器啓動中");
            //綁定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服務端監聽端口關閉
            channelFuture.channel().closeFuture().sync();
        } finally {
            //優雅關閉線程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

客戶端入站處理器

@Slf4j
public class ClientHandler2 extends ChannelInboundHandlerAdapter {
    private int counter;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //初始化一個發送消息對象
        CCMessageHeader hd = new CCMessageHeader();
        //設置類型爲1
        hd.setType(1);
        for (int i = 0;i < 10;i++) {
            String strData = String.format("client data %d...",i + 1);
            //設置消息體
            hd.setData(strData);
            //申請一段直接緩衝區
            ByteBuf echo = Unpooled.directBuffer();
            //將消息對象的各個屬性寫入該緩衝區
            echo.writeBytes(hd.getMessageFlag());
            echo.writeInt(hd.getLength());
            echo.writeInt(hd.getType());
            echo.writeCharSequence(hd.getData(), CharsetUtil.UTF_8);
            //將該緩衝區的字節流發送到服務端
            ctx.writeAndFlush(echo);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf body = (ByteBuf) msg;
        //初始化接收消息對象
        CCMessageHeader recHd = new CCMessageHeader();
        //獲取第一個整形賦給nLength,即消息長度
        int nLength = body.readInt();
        //獲取第二個整形賦給nType
        int nType = body.readInt();
        //獲取整個字節流的可讀字節數,並以此創建一個字節數組
        int nDataSize = body.readableBytes();
        byte[] aa = new byte[nDataSize];
        //將字節流讀入該字節數組
        body.readBytes(aa);
        //將該字節數組轉換爲字符串
        String myMsg = new String(aa,CharsetUtil.UTF_8);
        log.info("收到" + ++counter + "次消息[" + myMsg + "],類型爲:[" + nType + "]");
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

EchoClient代碼以下

@AllArgsConstructor
public class EchoClient {
    private String host;
    private int port;

    public void run() throws InterruptedException {
        //客戶端處理線程組(其實就是一個線程池)
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            //客戶端netty啓動對象
            Bootstrap bootstrap = new Bootstrap();
            //將客戶端線程組添加到啓動對象中
            bootstrap.group(group)
                    //給啓動對象添加Socket管道
                    .channel(NioSocketChannel.class)
                    //主動鏈接到遠程服務器IP端口
                    .remoteAddress(new InetSocketAddress(host,port))
                    .option(ChannelOption.TCP_NODELAY,true)
                    //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個
                    //必需要實現的抽象方法
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,2,4,0,2));
                            socketChannel.pipeline().addLast(new ClientHandler2());
                        }
                    });
            //鏈接到服務端,connect是異步鏈接,在調用同步等待sync,等待鏈接成功
            ChannelFuture channelFuture = bootstrap.connect().sync();
            //阻塞直到客戶端通道關閉
            channelFuture.channel().closeFuture().sync();
        } finally {
            //優雅關閉線程池
            group.shutdownGracefully();
        }
    }
}

運行服務端,客戶端。服務端日誌以下

2019-10-06 15:27:54.601  INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler    : 收到 1次消息:[client data 1...],類型爲[1]
2019-10-06 15:27:54.604  INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler    : 收到 2次消息:[client data 2...],類型爲[1]
2019-10-06 15:27:54.605  INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler    : 收到 3次消息:[client data 3...],類型爲[1]
2019-10-06 15:27:54.605  INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler    : 收到 4次消息:[client data 4...],類型爲[1]
2019-10-06 15:27:54.605  INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler    : 收到 5次消息:[client data 5...],類型爲[1]
2019-10-06 15:27:54.605  INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler    : 收到 6次消息:[client data 6...],類型爲[1]
2019-10-06 15:27:54.605  INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler    : 收到 7次消息:[client data 7...],類型爲[1]
2019-10-06 15:27:54.605  INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler    : 收到 8次消息:[client data 8...],類型爲[1]
2019-10-06 15:27:54.605  INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler    : 收到 9次消息:[client data 9...],類型爲[1]
2019-10-06 15:27:54.605  INFO 1160 --- [ntLoopGroup-3-1] c.g.w.n.lengfieldpacket.ServerHandler    : 收到 10次消息:[client data 10...],類型爲[1]

客戶端日誌以下

2019-10-06 15:27:54.611  INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2      : 收到1次消息[server data...],類型爲:[2]
2019-10-06 15:27:54.611  INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2      : 收到2次消息[server data...],類型爲:[2]
2019-10-06 15:27:54.611  INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2      : 收到3次消息[server data...],類型爲:[2]
2019-10-06 15:27:54.612  INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2      : 收到4次消息[server data...],類型爲:[2]
2019-10-06 15:27:54.612  INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2      : 收到5次消息[server data...],類型爲:[2]
2019-10-06 15:27:54.612  INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2      : 收到6次消息[server data...],類型爲:[2]
2019-10-06 15:27:54.612  INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2      : 收到7次消息[server data...],類型爲:[2]
2019-10-06 15:27:54.612  INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2      : 收到8次消息[server data...],類型爲:[2]
2019-10-06 15:27:54.612  INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2      : 收到9次消息[server data...],類型爲:[2]
2019-10-06 15:27:54.612  INFO 1162 --- [ntLoopGroup-2-1] c.g.echoclient.netty.ClientHandler2      : 收到10次消息[server data...],類型爲:[2]

這樣就分別完成了自定義長度的半包解碼

相關文章
相關標籤/搜索