結構設計html
Channel的NIO實現位於io.netty.channel.nio包和io.netty.channel.socket.nio包中,其中io.netty.channel.nio是抽象實現,io.netty.channel.socket.nio最終實現。下面是Channel NIO相關類的派生圖:java
NIO實現最終派生出3個類型NioServerSocketChannel實現了tcp server, NioSocketChannel實現了tcp client, NioDatagramChannel實現了udp socket。api
整個NIO實現分爲三個層次:服務器
AbstractNioChannel抽象層框架
對channel進行基本的初始化工做,把channel設置成非阻塞模式。socket
實現Channel.Unsafe的connect方法框架,提供給了doConnection, doFinishConnect兩個抽象方法,把真正的鏈接操做交給子類實現。tcp
覆蓋了AbstractChannel的doRegister,doDeregister方法,正兩個方法實現了Channel的selectionKey的註冊和註銷。ide
實現AbstractChannel的doClose, 這個方法並無真正關閉channel動做。oop
形如doXXX的方法是,AbstractChannel提供的擴展點,在<<netty源碼解解析(4.0)-3 Channel的抽象實現>>的末尾,給出了這些擴展點的詳細列表。post
AbstractNioByteChannel, AbstractNioMessageChannel抽象層
這兩個類主要實現read和write的框架,它們的實現大體相同AbstractNioByteChannel讀寫的是byte array,而AbstractNioMessageChannel讀的時候會把byte array轉換成結構化的對象,寫的時候把結構化對象序列化成byte array。
AbstractNioByteChannel定義了3個抽象方法用來實現真正的讀寫操做: doReadBytes, doWriteBytes, doWriteFileRegion。
AbstractNioMessageChannel第了兩個2個抽象方法用來實現真正的結構化數據類型的讀寫: doReadMessages, doWriteMessage。
NioServerSocketChannel, NioSocketChannel, NioDatagramChannel最終實現
封裝NIO API調用,真正的I/O操操做和socket相關的api調用都在這一層實現。
使用方式
使用過netty的人都知道,netty提供了ServerBootstrap和Bootstrap類幫助用戶方便地建立服務器端和客戶端應用,但這不是必須的。僅僅使用NioServerSocketChannel, NioSocketChannel, NioDatagramChannel和NioEventLoopGroup就能夠用開發tcp的server和client, 及udp應用。
爲了能讓讀者可以更清晰地理解NioEventLoopGroup和Channel直接的關係,下面給出了最原始的使用使用netty框架的代碼。
tcp server實現
1 import io.netty.buffer.ByteBuf; 2 import io.netty.channel.*; 3 import io.netty.channel.nio.NioEventLoopGroup; 4 import io.netty.channel.socket.nio.NioServerSocketChannel; 5 6 import java.net.InetSocketAddress; 7 import java.nio.charset.Charset; 8 9 public class TcpServer { 10 private NioEventLoopGroup group = new NioEventLoopGroup(); 11 12 public static void main(String[] argc){ 13 TcpServer server = new TcpServer(); 14 server.start(); 15 16 while(true){ 17 try{ 18 Thread.sleep(1000); 19 }catch (Exception e){ 20 break; 21 } 22 } 23 24 server.stop(); 25 } 26 27 public void start(){ 28 NioServerSocketChannel server = new NioServerSocketChannel(); 29 30 ChannelPipeline pipleline = server.pipeline(); 31 pipleline.addFirst(new ServerHandler()); 32 33 group.register(server).addListener(new ChannelFutureListener() { 34 @Override 35 public void operationComplete(ChannelFuture future) throws Exception { 36 server.bind(new InetSocketAddress(9001)); 37 System.out.println("server listen add:"+9001); 38 } 39 }); 40 } 41 public void stop(){ 42 group.shutdownGracefully(); 43 } 44 45 private class ServerHandler extends ChannelInboundHandlerAdapter{ 46 47 @Override 48 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{ 49 Channel child = (Channel)msg; 50 51 child.pipeline().addLast(new ChildHandler()); 52 53 group.register(child); 54 55 } 56 } 57 58 private class ChildHandler extends ChannelInboundHandlerAdapter{ 59 60 @Override 61 public void channelActive(ChannelHandlerContext ctx) throws Exception { 62 System.out.println("connected"); 63 } 64 65 @Override 66 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 67 System.out.println("closed"); 68 } 69 70 @Override 71 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{ 72 Channel chnl = ctx.channel(); 73 ByteBuf data = (ByteBuf)msg; 74 75 System.out.println("recv: "+data.toString(Charset.forName("utf-8"))); 76 chnl.write(msg); 77 } 78 79 @Override 80 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 81 ctx.channel().flush(); 82 } 83 84 } 85 }
一個channel建立以後,首先要作的事就是向pipleline中添加handler,而後纔是把它註冊到NioEventLoopGroup中(第31,33行)。這個順序不能錯,不然,handler的handlerAdded,channelRegistered和channelActive將不會被調用。當NioServerSocketChannel收到一個鏈接時,ServerHandler的的channelRead方法將會被調用,的新建好的鏈接當成參數傳遞進來,第49-53行是對新鏈接的初始化代碼。
tcp client實現
import io.netty.buffer.ByteBuf; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; import java.nio.charset.Charset; public class TcpClient { public static void main(String[] args){ NioEventLoopGroup group = new NioEventLoopGroup(); NioSocketChannel client = new NioSocketChannel(); client.pipeline().addLast(new ClientInboundHandler()); group.register(client); client.connect(new InetSocketAddress(9001)); try{ Thread.sleep(3000); }catch (Exception e){ } group.shutdownGracefully(); } private static class ClientInboundHandler extends ChannelInboundHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("connected"); Channel chnl = ctx.channel(); ByteBuf buf = chnl.alloc().buffer(); buf.writeBytes( "this is test".getBytes()); chnl.writeAndFlush(buf); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("closed"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{ ByteBuf data = (ByteBuf)msg; System.out.println("recv: "+data.toString(Charset.forName("utf-8"))); ctx.channel().close(); } } }
client的實現比server實現相對簡單,添加handler,register順序和server一致。只有把一個channel註冊到gruop中以後才能調用它的方法,應爲channel的大多數方法都須要經過pipleline調用,而pipleline須要在eventLoop中執行。
udp沒有server和client的區別,這裏爲了使代碼更加清晰,把server和client代碼區分開來。
udp server
import io.netty.buffer.ByteBuf; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.nio.NioDatagramChannel; import java.net.InetSocketAddress; import java.nio.charset.Charset; public class UdpServer { public static void main(String[] args){ NioEventLoopGroup group = new NioEventLoopGroup(); NioDatagramChannel chnl = new NioDatagramChannel(); chnl.pipeline().addLast(new UdpHandler()); group.register(chnl).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { chnl.bind(new InetSocketAddress(9002)); System.out.println("udp bind at:"+9002); } }); while(true){ try{ Thread.sleep(1000); }catch (Exception e){ break; } } group.shutdownGracefully(); } private static class UdpHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.print("udp channel active"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{ Channel chnl = ctx.channel(); DatagramPacket pkg = (DatagramPacket)msg; ByteBuf content = pkg.content(); InetSocketAddress from = pkg.sender(); System.out.println("recv: "+content.toString(Charset.forName("utf-8"))+" from:"+from.toString()); pkg = new DatagramPacket(content, from); chnl.write(pkg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.channel().flush(); } } }
udp client
import io.netty.buffer.ByteBuf; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.nio.NioDatagramChannel; import java.net.InetSocketAddress; import java.nio.charset.Charset; public class UdpClient { public static void main(String[] args){ NioEventLoopGroup group = new NioEventLoopGroup(); NioDatagramChannel chnl = new NioDatagramChannel(); chnl.pipeline().addLast(new UdpHandler()); group.register(chnl).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { chnl.bind(new InetSocketAddress(0)); } }); try{ Thread.sleep(3000); }catch (Exception e){ } group.shutdownGracefully(); } private static class UdpHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.print("udp channel active"); Channel chnl = ctx.channel(); ByteBuf content = chnl.alloc().buffer(); content.writeBytes("udp message".getBytes()); chnl.writeAndFlush(new DatagramPacket(content, new InetSocketAddress("127.0.0.1", 9002))); System.out.println("send message"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{ DatagramPacket pkg = (DatagramPacket)msg; ByteBuf content = pkg.content(); InetSocketAddress from = pkg.sender(); System.out.println("recv: "+content.toString(Charset.forName("utf-8"))+" from:"+from.toString()); } } }
NioDatagramChannel和NioSocketChannel的初始化過程大體相同。它們的不一樣點是,NioSocketChannel在connect以後處於active狀態,NioDatagramChannel是在bind以後處於才處於active狀態。