使用JDK NIO類庫時開發NIO的異步服務端時,須要用到:多路複用器Selector,ServerSocketChannel,SocketChanel,ByteBuffer,SelectionKey等。若是用源生的JAVA NIO搭建服務端,無疑是十分複雜的,這不只拖慢了項目的進度,並且開發出來的項目可能還不穩定。那麼Netty是如何下降其複雜度的呢?Netty開發簡單異步服務端不超過9步,下面分步驟來說解用Netty來開發異步服務端:java
第一步:建立EventLoopGroupbootstrap
//第一步 建立EventLoopGroup EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup();
講解:EventLoopGroup 是EventLoop的數組,EventLoop的職責是處理全部註冊到本線程多路複用器Selector上的Channel,Selector的輪詢操做由EventLoop線程的run方法驅動。數組
第二步:建立ServerBootstrap安全
ServerBootstrap b = new ServerBootstrap();
講解:ServerBootstrap是Netty服務端啓動的輔助類,從代碼能夠看出,咱們在建立ServerBootstrap時,用的是一個無參構造器建立的,你也會驚訝的發現,ServerBootstrap只有一個無參構造器,由於這個輔助類須要的參數太多了,並且有些參數是用戶能夠選擇性添加的,因此ServerBootstrap引入了Builder模式 讓用戶動態添加參數:服務器
ServerBootstrap b = new ServerBootstrap(); //動態添加參數,支持鏈式添加 b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler()) .childHandler(new HttpServerHandler());
第三步:設置並綁定服務端Channel(開始綁定ServerSocketChannel)網絡
b.channel(NioServerSocketChannel.class)
講解:這裏只須要傳入對應的.class類便可,ServerBootstrap會經過工廠類,利用反射幫咱們建立對應的Class對象。異步
第四步:建立並初始化處理網絡時間的職責鏈 ChannelPipelinesocket
ch.pipeline().addLast(new LoggingHandler()); //請求 http解碼 ch.pipeline().addLast("http-decoder",new HttpRequestDecoder()); //將多個消息轉換爲單一的FullHttpRequest ch.pipeline().addLast("http-aggregator",new HttpObjectAggregator(65536)); //應答http編碼 ch.pipeline().addLast("http-encoder",new HttpResponseEncoder()); //鏈接異步發送請求,防止內存溢出 ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler()); ch.pipeline().addLast("httpServerHandlerAdapter",new HttpServerHandlerAdapter());
講解:ChannelPipeline並非NIO服務端必須的,它本質就是一個處理網絡事件的職責鏈,負責管理和執行ChannelHandler,咱們能夠在其中添加服務端接收請求的解碼和發送請求的編碼,以及粘包\拆包的處理等。ide
第五步:添加並設置ChannelHandleroop
ChannelHandler是Netty提供給咱們定製和擴展的關鍵接口,利用ChannelHandler咱們能夠完成大多數功能的定製,例如消息編解碼,心跳安全認證等,下面是我定義的Handler:
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler()) //加入自定義的handler .childHandler(new HttpServerHandler()); //我定義的handler public class HttpServerHandler extends ChannelInitializer<SocketChannel>{ @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); //請求 http解碼 ch.pipeline().addLast("http-decoder",new HttpRequestDecoder()); //將多個消息轉換爲單一的FullHttpRequest ch.pipeline().addLast("http-aggregator",new HttpObjectAggregator(65536)); //應答http編碼 ch.pipeline().addLast("http-encoder",new HttpResponseEncoder()); //鏈接異步發送請求,防止內存溢出 ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler()); ch.pipeline().addLast("httpServerHandlerAdapter",new HttpServerHandlerAdapter()); } //第二層對消息處理 public class HttpServerHandlerAdapter extends SimpleChannelInboundHandler<FullHttpRequest>{ @Override public boolean acceptInboundMessage(Object msg) throws Exception { System.out.println(msg); return super.acceptInboundMessage(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { System.out.println(msg); } }
第六步:綁定並啓動監聽端口:
ChannelFuture f = b.bind(8080).sync(); System.out.println("Http服務器啓動完成,監聽端口爲:"+port); f.channel().closeFuture().sync();
講解:在監聽端口以前,Netty會作一系列的初始化和檢測工做,而後啓動監聽端口,同時,會將ServerSocketChannel註冊到多路複用器Selector上,監聽客戶端的鏈接。
第七步:Selector輪詢:
講解:Selector的輪詢是由Reactor線程NioEventLoop負責調度的,而後將準備就緒的Channel放到一個集合中;
第八步:Reactor線程NioEventLoop將準備就緒的Channel去執行咱們定義好的ChannelPipeline方法,並執行系統定義的ChannelHandler;
第九步:最終,執行咱們根據具體業務邏輯定義好的的ChannelHandler。
完整源碼以下:
HttpServer:
package kaoqin.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LoggingHandler; public class HttpServer { private void run(int port) throws InterruptedException{ //第一步 建立EventLoopGroup EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler()) //加入自定義的handler .childHandler(new HttpServerHandler()); //.option(ChannelOption.AUTO_READ,true); ChannelFuture f = b.bind(port).sync(); System.out.println("Http服務器啓動完成,監聽端口爲:"+port); f.channel().closeFuture().sync(); }finally{ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new HttpServer().run(9999); } }
HttpServerHandler:
package kaoqin.server; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.stream.ChunkedWriteHandler; public class HttpServerHandler extends ChannelInitializer<SocketChannel>{ @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); //請求 http解碼 ch.pipeline().addLast("http-decoder",new HttpRequestDecoder()); //將多個消息轉換爲單一的FullHttpRequest ch.pipeline().addLast("http-aggregator",new HttpObjectAggregator(65536)); //應答http編碼 ch.pipeline().addLast("http-encoder",new HttpResponseEncoder()); //鏈接異步發送請求,防止內存溢出 ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler()); ch.pipeline().addLast("httpServerHandlerAdapter",new HttpServerHandlerAdapter()); } }
HttpServerHandlerAdapter:
package kaoqin.server; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.FullHttpRequest; public class HttpServerHandlerAdapter extends SimpleChannelInboundHandler<FullHttpRequest>{ @Override public boolean acceptInboundMessage(Object msg) throws Exception { System.out.println(msg); return super.acceptInboundMessage(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { System.out.println(msg); } }