接Netty整理(二)web
ByteBuf:是數據容器(字節容器)算法
JDK ByteBuffer
共用讀寫索引,每次讀寫操做都須要Flip()
擴容麻煩,並且擴容後容易形成浪費 關於ByteBuffer的使用方法能夠參考序列化和反序列化的三種方法 ,裏面有Netty 3的ChannelBuffer,由於如今Netty 3用的比較少,看成參考就好。緩存
Netty ByteBuf
讀寫使用不一樣的索引,因此操做便捷
自動擴容,使用便捷服務器
咱們如今來看一下ByteBuf的源碼。websocket
public abstract class ByteBuf implements ReferenceCounted, Comparable<ByteBuf>
咱們能夠看到ByteBuf是一個抽象類,它裏面幾乎大部分都是抽象方法。socket
繼承ByteBuf的是一個AbstractByteBuf的抽象類,而讀寫索引的屬性就在該類中。ide
public abstract class AbstractByteBuf extends ByteBuf
int readerIndex; //讀索引 int writerIndex; //寫索引 private int markedReaderIndex; //標記當前讀索引的位置 private int markedWriterIndex; //標記當前寫索引的位置 private int maxCapacity; //最大容量
而支持自動擴容的部分能夠由ensureWritable追蹤看到oop
@Override public ByteBuf ensureWritable(int minWritableBytes) { if (minWritableBytes < 0) { throw new IllegalArgumentException(String.format( "minWritableBytes: %d (expected: >= 0)", minWritableBytes)); } ensureWritable0(minWritableBytes); return this; } final void ensureWritable0(int minWritableBytes) { ensureAccessible(); if (minWritableBytes <= writableBytes()) { return; } if (minWritableBytes > maxCapacity - writerIndex) { throw new IndexOutOfBoundsException(String.format( "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", writerIndex, minWritableBytes, maxCapacity, this)); } // Normalize the current capacity to the power of 2. int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity); // Adjust to the new capacity. capacity(newCapacity); }
在AbstractByteBufAllocator中性能
static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page
/** * 計算新容量 */ @Override public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { if (minNewCapacity < 0) { throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)"); } if (minNewCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "minNewCapacity: %d (expected: not greater than maxCapacity(%d)", minNewCapacity, maxCapacity)); } final int threshold = CALCULATE_THRESHOLD; // 4 MiB page if (minNewCapacity == threshold) { return threshold; } // If over threshold, do not double but just increase by threshold. if (minNewCapacity > threshold) { int newCapacity = minNewCapacity / threshold * threshold; if (newCapacity > maxCapacity - threshold) { newCapacity = maxCapacity; } else { newCapacity += threshold; } return newCapacity; } // Not over threshold. Double up to 4 MiB, starting from 64. int newCapacity = 64; while (newCapacity < minNewCapacity) { newCapacity <<= 1; } return Math.min(newCapacity, maxCapacity); }
這個newCapacity就是擴容的新容量,因此咱們通常不須要擔憂ByteBuf容量不夠的問題。this
ByteBuf的建立方法
1)ByteBufAllocator
池化(Netty4.x版本後默認使用 PooledByteBufAllocator
提升性能而且最大程度減小內存碎片
可使用Channel處理器上下文來建立
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf headBuffer = ctx.alloc().buffer(); ByteBuf directBuffer = ctx.alloc().directBuffer(); CompositeByteBuf byteBufs = ctx.alloc().compositeBuffer(); byteBufs.addComponents(headBuffer,directBuffer); }
非池化UnpooledByteBufAllocator: 每次返回新的實例
2)Unpooled: 提供靜態方法建立未池化的ByteBuf,能夠建立堆內存和直接內存緩衝區
ByteBuf使用模式
堆緩存區HEAP BUFFER: ByteBuf headBuffer = Unpooled.buffer(6);
優勢:存儲在JVM的堆空間中,能夠快速的分配和釋放
缺點:每次使用前會拷貝到直接緩存區(也叫堆外內存)
直接緩存區DIRECR BUFFER: ByteBuf directBuffer = Unpooled.directBuffer();
優勢:存儲在堆外內存上,堆外分配的直接內存,不會佔用堆空間
缺點:內存的分配和釋放,比在堆緩衝區更復雜
複合緩衝區COMPOSITE BUFFER: CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
compositeByteBuf.addComponents(headBuffer,directBuffer);
能夠建立多個不一樣的ByteBuf,而後放在一塊兒,可是隻是一個視圖
選擇:大量IO數據讀寫,用「直接緩存區」; 業務消息編解碼用「堆緩存區」
心跳檢測
以前有說過Netty Channel處理器的生命週期,如今咱們在此基礎上增長心跳檢測的部分
咱們須要修改一下EchoServer
@Slf4j @AllArgsConstructor public class EchoServer { private int port; public void run() throws InterruptedException { //歡迎線程組(其實就是一個線程池) EventLoopGroup bossGroup = new NioEventLoopGroup(); //工做線程組(其實就是一個線程池) EventLoopGroup workGroup = new NioEventLoopGroup(); try { //netty啓動對象 ServerBootstrap serverBootstrap = new ServerBootstrap(); //將兩個線程組添加到啓動對象中 serverBootstrap.group(bossGroup,workGroup) //給啓動對象添加Socket管道(相似於NIO中的Channel或者BIO中的Socket,意思差很少) .channel(NioServerSocketChannel.class) //存放已完成三次握手的請求的等待隊列的最大長度; .option(ChannelOption.SO_BACKLOG,1024) //不要延遲(NODELAY),要求高實時性,有數據時立刻發送,就將該選項設置爲true關閉Nagle算法; //若是要減小發送次數,就設置爲false,會累積必定大小後再發送; .childOption(ChannelOption.TCP_NODELAY,true) //添加事件處理器,這裏ChannelInitializer爲一個抽象類,initChannel是一個 //必需要實現的抽象方法 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //將咱們本身編寫的事件處理器添加到客戶端的鏈接管道中 //這裏爲socketChannel(客戶端鏈接管道),有別於NioServerSocketChannel //這裏能夠添加不少的事件處理器,其實Netty有不少內置的事件處理器可使用 //pipeline()能夠理解成一個工廠生產流水線,而把事件處理器理解成在流水線上 //做業的工人,咱們能夠往一條流水線上投放不少的工人 //IdleStateHandler是一個作空閒檢測的ChannelInboundHandler //針對客戶端,若是10秒鐘時沒有向服務端發送讀寫心跳(All),則主動斷開 //若是是讀空閒或者是寫空閒,不處理 socketChannel.pipeline().addLast(new IdleStateHandler(2,4,10)); socketChannel.pipeline().addLast(new EchoServerHandler()); } }); log.info("服務器啓動中"); //綁定端口,同步等待成功 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //阻塞,等待服務端監聽端口關閉 channelFuture.channel().closeFuture().sync(); } finally { //優雅關閉線程池 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
再修改EchoServerHandler以下
/** * 事件處理器 */ @Slf4j public class EchoServerHandler extends ChannelInboundHandlerAdapter { //用於記錄和管理全部客戶端的channel private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 監聽讀取事件 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf data = (ByteBuf) msg; log.info(data.toString(CharsetUtil.UTF_8)); } /** * 監聽讀取完畢事件 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { log.info("channelReadComplete"); } /** * 監聽異常事件 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); clients.remove(ctx.channel()); } /** * 將channel註冊到EventLoop的Selector多路複用器中 * @param ctx * @throws Exception */ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { log.info("channelRegistered"); } /** * channel未註冊到EventLoop中 * @param ctx * @throws Exception */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { log.info("channelUnregistered"); } /** * 有鏈接,變爲活躍狀態 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("channelActive"); } /** * 沒有鏈接,非活躍狀態 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("channelInactive"); } /** * 用戶事件追蹤器 * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { //IdleStateEvent是一個用戶事件,包含讀空閒/寫空閒/讀寫空閒 if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.info("進入讀空閒"); }else if (event.state() == IdleState.WRITER_IDLE) { log.info("進入寫空閒"); }else if (event.state() == IdleState.ALL_IDLE) { log.info("channel關閉前,鏈接數爲" + clients.size()); ctx.channel().close(); log.info("channel關閉後,鏈接數爲" + clients.size()); } } } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.info("handlerAdded"); clients.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { log.info("客戶端斷開,channel對應的長id爲:" + ctx.channel().id().asLongText()); log.info("客戶端斷開,channel對應的短id爲:" + ctx.channel().id().asShortText()); } }
在這裏,咱們只是簡單打印一下客戶端鏈接時發送過來的一段字符串
分別啓動服務端和客戶端,服務端的日誌以下
2019-10-22 08:13:31.615 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : handlerAdded
2019-10-22 08:13:31.616 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelRegistered
2019-10-22 08:13:31.616 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelActive
2019-10-22 08:13:31.626 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : Starting EchoclientApplication &_on admindeMBP.lan with PID 741 &_(/Users/admin/Downloads/nettyecho/target/classes started by admin in &_/Users/admin/Downloads/nettyecho)&_
2019-10-22 08:13:31.631 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelReadComplete
2019-10-22 08:13:33.637 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : 進入讀空閒
2019-10-22 08:13:35.632 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : 進入寫空閒
2019-10-22 08:13:35.635 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : 進入讀空閒
2019-10-22 08:13:37.639 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : 進入讀空閒
2019-10-22 08:13:39.634 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : 進入寫空閒
2019-10-22 08:13:39.641 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : 進入讀空閒
2019-10-22 08:13:41.635 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channel關閉前,鏈接數爲1
2019-10-22 08:13:41.638 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channel關閉後,鏈接數爲0
2019-10-22 08:13:41.638 INFO 541 --- [ntLoopGroup-3-1] cd.g.websocket.netty.EchoServerHandler : channelInactive
2019-10-22 08:13:41.638 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : channelUnregistered
2019-10-22 08:13:41.648 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : 客戶端斷開,channel對應的長id爲:acde48fffe001122-0000021d-00000001-a2651c27e64fc656-a01ff1fe
2019-10-22 08:13:41.648 INFO 541 --- [ntLoopGroup-3-1] c.g.websocket.netty.EchoServerHandler : 客戶端斷開,channel對應的短id爲:a01ff1fe
從日誌能夠看到,處理的流程爲先將handler添加到管道pipeline中(handlerAdded),而後是註冊到EventLoop的Seletor多路複用器裏(channelRegistered),而後是有鏈接進來,激活狀態(channelActive),讀取客戶端發送過來的消息,而後是讀取完畢,次數客戶端再也不發送消息,服務端每2秒進入一次讀空閒狀態,每4秒進入一次寫空閒狀態,等到第10秒的時候,讀寫都沒有,服務端主動斷開客戶端的鏈接,進入無鏈接,非活躍狀態。channel未註冊到EventLoop線程中。最終removed完成後,打印斷開的channel的id。此處能夠看到,客戶端要想不被斷開鏈接,就必須主動發送心跳鏈接的檢測信號。