前面介紹了基本的網絡模型以及IO與NIO,那麼有了NIO來開發非阻塞服務器,你們就知足了嗎?有了技術支持,就回去追求效率,所以就產生了不少NIO的框架對NIO進行封裝——這就是大名鼎鼎的Netty。html
前幾篇的內容,能夠參考:java
這個問題幾乎能夠當作廢話,框架確定要比一些原生的API封裝了更多地功能,重複造輪子在追求效率的狀況並非明智之舉。那麼先來講說NIO有什麼缺點吧:git
那麼有了這些問題,就急需一些大牛們開發通用框架來方便勞苦大衆了。最致命的NIO框架就是MINA和Netty了,這裏不得不說個小插曲:github
先來看看MINA的主要貢獻者:web
再來看看NETYY的主要貢獻者:編程
總結起來,有這麼幾點:bootstrap
所以,若是讓你選擇你應該知道選擇誰了吧。另外,MINA對底層系統要求功底更深,且國內Netty的氛圍更好,有李林峯等人在大力宣傳(《Netty權威指南》的做者)。promise
講了一大堆的廢話以後,總結來講就是——Netty有前途,學它準沒錯。安全
按照定義來講,Netty是一個異步、事件驅動的用來作高性能、高可靠性的網絡應用框架。主要的優勢有:服務器
主要支持的功能或者特性有:
總之提供了不少現成的功能能夠直接供開發者使用。
基於Netty的服務器編程能夠看作是Reactor模型:
即包含一個接收鏈接的線程池(也有多是單個線程,boss線程池)以及一個處理鏈接的線程池(worker線程池)。boss負責接收鏈接,並進行IO監聽;worker負責後續的處理。爲了便於理解Netty,直接看看代碼:
package cn.xingoo.book.netty.chap04; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; import java.nio.charset.Charset; public class NettyNioServer { public void serve(int port) throws InterruptedException { final ByteBuf buffer = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi\r\n", Charset.forName("UTF-8"))); // 第一步,建立線程池 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ // 第二步,建立啓動類 ServerBootstrap b = new ServerBootstrap(); // 第三步,配置各組件 b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(buffer.duplicate()).addListener(ChannelFutureListener.CLOSE); } }); } }); // 第四步,開啓監聽 ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully().sync(); workerGroup.shutdownGracefully().sync(); } } public static void main(String[] args) throws InterruptedException { NettyNioServer server = new NettyNioServer(); server.serve(5555); } }
代碼很是少,並且想要換成阻塞IO,只須要替換Channel裏面的工廠類便可:
public class NettyOioServer { public void serve(int port) throws InterruptedException { final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi\r\b", Charset.forName("UTF-8"))); EventLoopGroup bossGroup = new OioEventLoopGroup(1); EventLoopGroup workerGroup = new OioEventLoopGroup(); try{ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup)//配置boss和worker .channel(OioServerSocketChannel.class) // 使用阻塞的SocketChannel ....
歸納來講,在Netty中包含下面幾個主要的組件:
瞭解上面的基本組件後,就看一下幾個重要的內容。
在Unix操做系統中,系統底層能夠基於mmap實現內核空間和用戶空間的內存映射。可是在Netty中並非這個意思,它主要來自於下面幾個功能:
另外,Netty本身封裝實現了ByteBuf,相比於Nio原生的ByteBuffer,API上更易用了;同時支持容量的動態擴容;另外還支持Buffer的池化,高效複用Buffer。
public class ByteBufTest { public static void main(String[] args) { //建立bytebuf ByteBuf buf = Unpooled.copiedBuffer("hello".getBytes()); System.out.println(buf); // 讀取一個字節 buf.readByte(); System.out.println(buf); // 讀取一個字節 buf.readByte(); System.out.println(buf); // 丟棄無用數據 buf.discardReadBytes(); System.out.println(buf); // 清空 buf.clear(); System.out.println(buf); // 寫入 buf.writeBytes("123".getBytes()); System.out.println(buf); buf.markReaderIndex(); System.out.println("mark:"+buf); buf.readByte(); buf.readByte(); System.out.println("read:"+buf); buf.resetReaderIndex(); System.out.println("reset:"+buf); } }
輸出爲:
UnpooledHeapByteBuf(ridx: 0, widx: 5, cap: 5/5) UnpooledHeapByteBuf(ridx: 1, widx: 5, cap: 5/5) UnpooledHeapByteBuf(ridx: 2, widx: 5, cap: 5/5) UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5) UnpooledHeapByteBuf(ridx: 0, widx: 0, cap: 5/5) UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5) mark:UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5) read:UnpooledHeapByteBuf(ridx: 2, widx: 3, cap: 5/5) reset:UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5)
有興趣的能夠看一下上一篇分享的ByteBuffer,對比一下,就能發如今Netty中經過獨立的讀寫索引維護,避免讀寫模式的切換,更加方便了。
前面介紹了Handler包含了Inbound和Outbound兩種,他們統一放在一個雙向鏈表中:
當接收消息的時候,會從鏈表的表頭開始遍歷,若是是inbound就調用對應的方法;若是發送消息則從鏈表的尾巴開始遍歷。那麼上面途中的例子,接收消息就會輸出:
InboundA --> InboundB --> InboundC
輸出消息,則會輸出:
OutboundC --> OutboundB --> OutboundA
這裏有段代碼,能夠直接複製下來,試試看:
package cn.xingoo.book.netty.pipeline; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.charset.Charset; /** * 注意: * * 1 ChannelOutboundHandler要在最後一個Inbound以前 * */ public class NettyNioServerHandlerTest { final static ByteBuf buffer = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi\r\n", Charset.forName("UTF-8"))); public void serve(int port) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast("1",new InboundA()); pipeline.addLast("2",new OutboundA()); pipeline.addLast("3",new InboundB()); pipeline.addLast("4",new OutboundB()); pipeline.addLast("5",new OutboundC()); pipeline.addLast("6",new InboundC()); } }); ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully().sync(); workerGroup.shutdownGracefully().sync(); } } public static void main(String[] args) throws InterruptedException { NettyNioServerHandlerTest server = new NettyNioServerHandlerTest(); server.serve(5555); } private static class InboundA extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf)msg; System.out.println("InboundA read"+buf.toString(Charset.forName("UTF-8"))); super.channelRead(ctx, msg); } } private static class InboundB extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf)msg; System.out.println("InboundB read"+buf.toString(Charset.forName("UTF-8"))); super.channelRead(ctx, msg); // 從pipeline的尾巴開始找outbound ctx.channel().writeAndFlush(buffer); } } private static class InboundC extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf)msg; System.out.println("InboundC read"+buf.toString(Charset.forName("UTF-8"))); super.channelRead(ctx, msg); // 這樣會從當前的handler向前找outbound //ctx.writeAndFlush(buffer); } } private static class OutboundA extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("OutboundA write"); super.write(ctx, msg, promise); } } private static class OutboundB extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("OutboundB write"); super.write(ctx, msg, promise); } } private static class OutboundC extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("OutboundC write"); super.write(ctx, msg, promise); } } }
最後有一個TCP粘包的例子,有興趣的也能夠本身試一下,代碼就不貼上來了,能夠參考最後面的Github鏈接。