Netty是Java領域有名的開源網絡庫,特色是高性能和高擴展性,所以不少流行的框架都是基於它來構建的,好比咱們熟知的Dubbo、Rocketmq、Hadoop等,針對高性能RPC,通常都是基於Netty來構建,好比sock-bolt。總之一句話,Java小夥伴們須要且有必要學會使用Netty並理解其實現原理。java
netty旨在爲可維護的高性能、高可擴展性協議服務器和客戶端的快速開發提供異步事件驅動的網絡應用程序框架和工具。換句話說,Netty是一個NIO客戶端服務器框架,能夠快速輕鬆地開發協議服務器和客戶端等網絡應用程序。它極大地簡化並簡化了TCP和UDP套接字服務器開發等網絡編程。git
學習netty原理細節,看netty源碼是必不可少的,那首先來看下如何編譯源碼:程序員
- 從github下載netty 4.x源碼
- 若是缺乏XxxObjectHashMap類,這些類是在編譯時自動生成的,能夠執行mvn clean install或者cd common && mvn clean install命令便可。
- 打開idea,開啓源碼閱讀之旅 :)
除了看源碼,能夠結合一些書籍來看,學習效果更好。關於Netty的書籍,筆者這裏推薦一本 李林鋒 寫的《Netty權威指南》,這本書對於Netty的基礎概念和NIO部分講解的仍是不錯的,不過有點地方感受有點貼代碼湊字數嫌疑,總體來講還算不錯。github
Netty是一個事件驅動的高性能Java網絡庫,是一個隱藏了背後複雜性而提供一個易於使用的API的客戶端/服務端框架。Netty以其高性能和可擴展性,使開發者專一於真正感興趣的地方。它的一個主要目標就是促進「關注點分離」:使業務邏輯從網絡基礎設施應用程序中分離。面試
不只僅是Netty框架,其餘框架的設計目的也大都是爲了使業務程序和底層技術解耦,使程序員更加專一於業務邏輯實現,提升開發質量和效率。Netty爲何性能如此之高,主要是其內部的Reactor模型機制。編程
Netty 是一個非阻塞、事件驅動的網絡框架。Netty 其實是使用 Threads( 多線程) 處理 I/O事件的,對於熟悉多線程編程的讀者可能會須要關注同步代碼。這樣的方式很差,由於同步會影響程序的性能,Netty 的設計保證程序處理事件不會有同步。由於某個Channel事件是被添加到一個EventLoop中的,之後該Channel事件都是由該EventLoop來處理的,而EventLoop是一個線程來處理的,也就是說Netty不須要同步IO操做,EventLoop與EventLoopGroup的關係能夠理解爲線程與線程池的關係同樣。bootstrap
ByteBuf是字節數據的容器,全部的網絡通訊都是基於底層的字節流傳輸,ByteBuf 是一個很好的通過優化的數據容器,咱們能夠將字節數據有效的添加到 ByteBuf 中或從 ByteBuf 中獲取數據。爲了便於操做,ByteBuf 提供了兩個索引:一個用於讀,一個用於寫。咱們能夠按順序讀取數據,也能夠經過調整讀取數據的索引或者直接將讀取位置索引做爲參數傳遞給get方法來重複讀取數據。數組
ByteBuf使用模式promise
堆緩衝區ByteBuf將數據存儲在 JVM 的堆空間,這是經過將數據存儲在數組的實現。堆緩衝區能夠快速分配,當不使用時也能夠快速釋放。它還提供了直接訪問數組的方法,經過 ByteBuf.array() 來獲取 byte[]數據。服務器
堆緩衝區ByteBuf使用示例:
ByteBuf heapBuf = ...; if (heapBuf.hasArray()) { byte[] array = heapBuf.array(); int offset = heapBuf.arrayOffset() + heapBuf.readerIndex(); int length = heapBuf.readableBytes(); handleArray(array, offset, length); }
直接緩衝區ByteBuf,在 JDK1.4 中被引入 NIO 的ByteBuffer 類容許 JVM 經過本地方法調用分配內存,其目的是經過免去中間交換的內存拷貝, 提高IO處理速度; 直接緩衝區的內容能夠駐留在垃圾回收掃描的堆區之外。DirectBuffer 在-XX:MaxDirectMemorySize=xx
M大小限制下, 使用 Heap 以外的內存, GC對此」無能爲力」,也就意味着規避了在高負載下頻繁的GC過程對應用線程的中斷影響。
瞭解了Netty基礎概念以後,一塊兒看下Netty的使用示例,下面以TCP server、TCP client、http server爲例,因爲示例代碼不難,因此再也不贅述,直接上代碼。
public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap boot = new ServerBootstrap(); boot.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(8080) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoHandler()); } }); // start ChannelFuture future = boot.bind().sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // shutdown bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public class EchoHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; System.out.println(in.toString(CharsetUtil.UTF_8)); ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
public static void main(String[] args) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } }); // Start the client. ChannelFuture f = b.connect("localhost", 8081).sync(); f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } public class EchoClientHandler extends ChannelInboundHandlerAdapter { private final ByteBuf message; public EchoClientHandler() { message = Unpooled.buffer(256); message.writeBytes("hello netty".getBytes(CharsetUtil.UTF_8)); } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(message); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println(((ByteBuf) msg).toString(CharsetUtil.UTF_8)); ctx.write(msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
netty client端在何時將channel註冊到selector上的呢?是在建立channel以後,就註冊到selector的,相關代碼在initAndRegister方法中:
final ChannelFuture initAndRegister() { Channel channel = null; try { // 建立(netty自定義)Channel實例,並初始化 // channel爲 NioServerSocketChannel 實例,NioServerSocketChannel的父類AbstractNioChannel保存有nio的ServerSocketChannel channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } // 向Selector註冊channel ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
initAndRegister以後會執行connect動做,注意,真正的channel.connect動做是由NioEventLoop線程來完成的,當鏈接三次握手完成以後,會觸發該channel的ACCEPT事件,也就是NIOEventLoop中處理事件的流程。
public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap boot = new ServerBootstrap(); boot.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(8080) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast("decoder", new HttpRequestDecoder()) .addLast("encoder", new HttpResponseEncoder()) .addLast("aggregator", new HttpObjectAggregator(512 * 1024)) .addLast("handler", new HttpHandler()); } }); // start ChannelFuture future = boot.bind().sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // shutdown bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public class HttpHandler extends SimpleChannelInboundHandler<FullHttpRequest> { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer("hello netty".getBytes())); HttpHeaders heads = response.headers(); heads.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN + "; charset=UTF-8"); heads.add(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes()); // 3 heads.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); ctx.writeAndFlush(response); } }
歡迎小夥伴關注【TopCoder】閱讀更多精彩好文。