Netty整理(三)

Netty整理(二)web

ByteBuf:是數據容器(字節容器)算法

        JDK ByteBuffer
            共用讀寫索引,每次讀寫操做都須要Flip()
            擴容麻煩,並且擴容後容易形成浪費 關於ByteBuffer的使用方法能夠參考序列化和反序列化的三種方法 ,裏面有Netty 3的ChannelBuffer,由於如今Netty 3用的比較少,看成參考就好。緩存

        Netty ByteBuf
            讀寫使用不一樣的索引,因此操做便捷
            自動擴容,使用便捷服務器

咱們如今來看一下ByteBuf的源碼。websocket

public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf>

咱們能夠看到ByteBuf是一個抽象類,它裏面幾乎大部分都是抽象方法。socket

繼承ByteBuf的是一個AbstractByteBuf的抽象類,而讀寫索引的屬性就在該類中。ide

public abstract class AbstractByteBuf extends ByteBuf
int readerIndex;  //讀索引
int writerIndex;  //寫索引
private int markedReaderIndex;  //標記當前讀索引的位置
private int markedWriterIndex;  //標記當前寫索引的位置
private int maxCapacity;  //最大容量

而支持自動擴容的部分能夠由ensureWritable追蹤看到oop

@Override
public ByteBuf ensureWritable(int minWritableBytes) {
    if (minWritableBytes < 0) {
        throw new IllegalArgumentException(String.format(
                "minWritableBytes: %d (expected: >= 0)", minWritableBytes));
    }
    ensureWritable0(minWritableBytes);
    return this;
}

final void ensureWritable0(int minWritableBytes) {
    ensureAccessible();
    if (minWritableBytes <= writableBytes()) {
        return;
    }

    if (minWritableBytes > maxCapacity - writerIndex) {
        throw new IndexOutOfBoundsException(String.format(
                "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                writerIndex, minWritableBytes, maxCapacity, this));
    }

    // Normalize the current capacity to the power of 2.
    int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);

    // Adjust to the new capacity.
    capacity(newCapacity);
}

在AbstractByteBufAllocator中性能

static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page
/**
 * 計算新容量
 */
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
    if (minNewCapacity < 0) {
        throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)");
    }
    if (minNewCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format(
                "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                minNewCapacity, maxCapacity));
    }
    final int threshold = CALCULATE_THRESHOLD; // 4 MiB page

    if (minNewCapacity == threshold) {
        return threshold;
    }

    // If over threshold, do not double but just increase by threshold.
    if (minNewCapacity > threshold) {
        int newCapacity = minNewCapacity / threshold * threshold;
        if (newCapacity > maxCapacity - threshold) {
            newCapacity = maxCapacity;
        } else {
            newCapacity += threshold;
        }
        return newCapacity;
    }

    // Not over threshold. Double up to 4 MiB, starting from 64.
    int newCapacity = 64;
    while (newCapacity < minNewCapacity) {
        newCapacity <<= 1;
    }

    return Math.min(newCapacity, maxCapacity);
}

這個newCapacity就是擴容的新容量,因此咱們通常不須要擔憂ByteBuf容量不夠的問題。this

ByteBuf的建立方法
                1)ByteBufAllocator
                     池化(Netty4.x版本後默認使用 PooledByteBufAllocator
                         提升性能而且最大程度減小內存碎片

                    可使用Channel處理器上下文來建立

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf headBuffer = ctx.alloc().buffer();
    ByteBuf directBuffer = ctx.alloc().directBuffer();
    CompositeByteBuf byteBufs = ctx.alloc().compositeBuffer();
    byteBufs.addComponents(headBuffer,directBuffer);
}

                     非池化UnpooledByteBufAllocator: 每次返回新的實例

                2)Unpooled: 提供靜態方法建立未池化的ByteBuf,能夠建立堆內存和直接內存緩衝區

    
            ByteBuf使用模式
                堆緩存區HEAP BUFFER:  ByteBuf headBuffer = Unpooled.buffer(6);
                    優勢:存儲在JVM的堆空間中,能夠快速的分配和釋放
                    缺點:每次使用前會拷貝到直接緩存區(也叫堆外內存)   

                直接緩存區DIRECR BUFFER:  ByteBuf directBuffer = Unpooled.directBuffer();
                    優勢:存儲在堆外內存上,堆外分配的直接內存,不會佔用堆空間
                    缺點:內存的分配和釋放,比在堆緩衝區更復雜   

                複合緩衝區COMPOSITE BUFFER:   CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();

                                                                 compositeByteBuf.addComponents(headBuffer,directBuffer);
                    能夠建立多個不一樣的ByteBuf,而後放在一塊兒,可是隻是一個視圖
                選擇:大量IO數據讀寫,用「直接緩存區」; 業務消息編解碼用「堆緩存區」

心跳檢測

以前有說過Netty Channel處理器的生命週期,如今咱們在此基礎上增長心跳檢測的部分

咱們須要修改一下EchoServer

@Slf4j
@AllArgsConstructor
public class EchoServer {
    private int port;

    public void run() throws InterruptedException {
        //歡迎線程組(其實就是一個線程池)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //工做線程組(其實就是一個線程池)
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            //netty啓動對象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //將兩個線程組添加到啓動對象中
            serverBootstrap.group(bossGroup,workGroup)
                    //給啓動對象添加Socket管道(相似於NIO中的Channel或者BIO中的Socket,意思差很少)
                    .channel(NioServerSocketChannel.class)
                    //存放已完成三次握手的請求的等待隊列的最大長度;
                    .option(ChannelOption.SO_BACKLOG,1024)
                    //不要延遲(NODELAY),要求高實時性,有數據時立刻發送,就將該選項設置爲true關閉Nagle算法;
                    //若是要減小發送次數,就設置爲false,會累積必定大小後再發送;
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個
                    //必需要實現的抽象方法
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //將咱們本身編寫的事件處理器添加到客戶端的鏈接管道中
                            //這裏爲socketChannel(客戶端鏈接管道),有別於NioServerSocketChannel
                            //這裏能夠添加不少的事件處理器,其實Netty有不少內置的事件處理器可使用
                            //pipeline()能夠理解成一個工廠生產流水線,而把事件處理器理解成在流水線上
                            //做業的工人,咱們能夠往一條流水線上投放不少的工人
                            //IdleStateHandler是一個作空閒檢測的ChannelInboundHandler
                            //針對客戶端,若是10秒鐘時沒有向服務端發送讀寫心跳(All),則主動斷開
                            //若是是讀空閒或者是寫空閒,不處理
                            socketChannel.pipeline().addLast(new IdleStateHandler(2,4,10));
                            socketChannel.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            log.info("服務器啓動中");
            //綁定端口,同步等待成功
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            //阻塞,等待服務端監聽端口關閉
            channelFuture.channel().closeFuture().sync();
        } finally {
            //優雅關閉線程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

再修改EchoServerHandler以下

/**
 * 事件處理器
 */
@Slf4j
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    //用於記錄和管理全部客戶端的channel
    private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 監聽讀取事件
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf data = (ByteBuf) msg;
        log.info(data.toString(CharsetUtil.UTF_8));
    }

    /**
     * 監聽讀取完畢事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        log.info("channelReadComplete");
    }

    /**
     * 監聽異常事件
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
        clients.remove(ctx.channel());
    }

    /**
     * 將channel註冊到EventLoop的Selector多路複用器中
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelRegistered");
    }

    /**
     * channel未註冊到EventLoop中
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelUnregistered");
    }

    /**
     * 有鏈接,變爲活躍狀態
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelActive");
    }

    /**
     * 沒有鏈接,非活躍狀態
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("channelInactive");
    }

    /**
     * 用戶事件追蹤器
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        //IdleStateEvent是一個用戶事件,包含讀空閒/寫空閒/讀寫空閒
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                log.info("進入讀空閒");
            }else if (event.state() == IdleState.WRITER_IDLE) {
                log.info("進入寫空閒");
            }else if (event.state() == IdleState.ALL_IDLE) {
                log.info("channel關閉前,鏈接數爲" + clients.size());
                ctx.channel().close();
                log.info("channel關閉後,鏈接數爲" + clients.size());
            }
        }
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("handlerAdded");
        clients.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("客戶端斷開,channel對應的長id爲:" + ctx.channel().id().asLongText());
        log.info("客戶端斷開,channel對應的短id爲:" + ctx.channel().id().asShortText());
    }
}

在這裏,咱們只是簡單打印一下客戶端鏈接時發送過來的一段字符串

分別啓動服務端和客戶端,服務端的日誌以下

2019-10-22 08:13:31.615  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : handlerAdded
2019-10-22 08:13:31.616  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelRegistered
2019-10-22 08:13:31.616  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelActive
2019-10-22 08:13:31.626  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : Starting EchoclientApplication &_on admindeMBP.lan with PID 741 &_(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_/Users/admin/Downloads/nettyecho)&_
2019-10-22 08:13:31.631  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelReadComplete
2019-10-22 08:13:33.637  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : 進入讀空閒
2019-10-22 08:13:35.632  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : 進入寫空閒
2019-10-22 08:13:35.635  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : 進入讀空閒
2019-10-22 08:13:37.639  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : 進入讀空閒
2019-10-22 08:13:39.634  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : 進入寫空閒
2019-10-22 08:13:39.641  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : 進入讀空閒
2019-10-22 08:13:41.635  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channel關閉前,鏈接數爲1
2019-10-22 08:13:41.638  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channel關閉後,鏈接數爲0
2019-10-22 08:13:41.638  INFO 541 --- [ntLoopGroup-3-1] cd.g.websocket.netty.EchoServerHandler    : channelInactive
2019-10-22 08:13:41.638  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : channelUnregistered
2019-10-22 08:13:41.648  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : 客戶端斷開,channel對應的長id爲:acde48fffe001122-0000021d-00000001-a2651c27e64fc656-a01ff1fe
2019-10-22 08:13:41.648  INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler    : 客戶端斷開,channel對應的短id爲:a01ff1fe

從日誌能夠看到,處理的流程爲先將handler添加到管道pipeline中(handlerAdded),而後是註冊到EventLoop的Seletor多路複用器裏(channelRegistered),而後是有鏈接進來,激活狀態(channelActive),讀取客戶端發送過來的消息,而後是讀取完畢,次數客戶端再也不發送消息,服務端每2秒進入一次讀空閒狀態,每4秒進入一次寫空閒狀態,等到第10秒的時候,讀寫都沒有,服務端主動斷開客戶端的鏈接,進入無鏈接,非活躍狀態。channel未註冊到EventLoop線程中。最終removed完成後,打印斷開的channel的id。此處能夠看到,客戶端要想不被斷開鏈接,就必須主動發送心跳鏈接的檢測信號。

相關文章
相關標籤/搜索