把書讀薄(Netty in Action第二章) + Netty啓動源碼java
public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public static void main(String[]args)throws Exception{ new EchoServer(8888).start(); } public void start() throws Exception{ final EchoServerHandler handler = new EchoServerHandler(); EventLoopGroup group = new NioEventLoopGroup(); try{ ServerBootstrap b = new ServerBootstrap(); b.group(group).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(handler); } }); ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); }finally { group.shutdownGracefully().sync(); } } }
1: 初始化EventLoopGroupsegmentfault
所謂的EventLoopGroup,組(group)的概念表如今它自身維護了一個數組children,默認維護邏輯處理核數2倍的NioEventLoop線程,並經過chooser來方便的獲取下一個要執行的線程。實際處理的是NioEventLoop,它的部分類結構以下:數組
實質上的線程執行,就是啓動一個java Thread,而後從taskQuene中獲取要執行的任務,在run方法中執行。ide
2:配置引導類ServerBootstrap做爲工具來引導channel創建工具
Hello word版代碼中用的是同一個NioEventLoop,實際中通常各自分配
3 :建立並初始化channel oop
在管道的最後添加ChannelInitializer的方式則會在管道註冊完成以後,往管道中 添加一個ServerBootstrapAcceptor(它是InboundHandler),它持有對childGroup(client)和childHandler的引用,而ChannelInitializer這個InboundHandler在完成它的使命以後,就會從管道中被移除, 至此完成channel的初始化。ui
ServerBootstrapAcceptor 最開始在客戶端創建鏈接的時候執行調用(後續讀消息調用),入口是 doReadMessages,讀到消息以後,從Head沿着InBoundHandler到ServerBootstrapAcceptor,觸發讀事件,此時執行註冊childGroup到這個channel,也就是每次都用childGroup來處理讀到的消息
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { //管道註冊完成以後觸發 ChannelPipeline pipeline = ctx.pipeline(); boolean success = false; try { initChannel((C) ctx.channel()); //執行註冊過程當中的方法,在這裏就是往管道中添加ServerBootstrapAcceptor pipeline.remove(this); //刪除ChannelInitializer自己 ctx.fireChannelRegistered(); //繼續沿着管道傳遞channel註冊完成事件 success = true; } catch (Throwable t) { logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t); } finally { if (pipeline.context(this) != null) { pipeline.remove(this); } if (!success) { ctx.close(); } } }
新建的NioServerSocketChannel的部分類結構以下:this
對於Netty來說channel有"兩個"spa
凡是經過 channel()方法獲取的則是Netty自身的channel
public DefaultChannelPipeline(AbstractChannel channel) { if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; //Netty自身的channel tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
凡是經過javachannel()調用的獲取到的值便是jdk的channel
,而unsafe自己真正意義上執行的register、bind、connect、write、read操做均經過ServerSocketChannel實現4: 執行channel的註冊線程
能夠看到註冊過程當中實際的註冊操做經理了從channel->unsafe->ch的一個過程,實際的註冊操做就是使用jdk完成的。
5:執行channel的綁定
可以看到的是,綁定操做也是經過jdk來實現綁定的。另外同步阻塞住server,使之不關閉,實際上也就是隻要CloseFuture不完成,那麼server主線程永遠阻塞住,由剛開始分配的NioEventLoop一直在運行各自的task
netty nio底層的註冊channel、綁定監聽端口都是經過jdk自身的nio完成的。java nio中的select和channel是怎麼使用的?
public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; System.out.printf("Server get:"+in.toString(CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //將目前暫存於ChannelOutboundBuffer中的消息在下一次flush或者writeAndFlush的時候沖刷到遠程並關閉這個channel ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
public class EchoClient { private final String host; private final int port; public EchoClient(String host,int port){ this.host=host; this.port=port; } public void start() throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); try{ Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class)//指定NIO的傳輸方式 .remoteAddress(new InetSocketAddress(host,port))//指定遠程地址 .handler(new ChannelInitializer<SocketChannel>() {//向channel的pipeline添加handler @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler());//channelHander交給pipeline } }); ChannelFuture f = b.connect().sync();//鏈接到遠程節點,阻塞直到鏈接完成 System.out.println("wait"); f.channel().closeFuture().sync();//阻塞直到鏈接關閉 System.out.println("over"); }finally { System.out.println("shutdown"); group.shutdownGracefully().sync();//關閉線程池而且釋放資源 } } public static void main(String[]args) throws Exception{ new EchoClient("localhost",8888).start(); } }
從代碼自己能夠看到與 server的差別化在於如下兩個部分:
注意這裏的其實是沒有指定本地的地址的
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("Hello world",CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("Client get:"+msg.toString(CharsetUtil.UTF_8)); } }