1、首先來看一段服務端的示例代碼:java
1 public class NettyTestServer { 2 public void bind(int port) throws Exception{ 3 EventLoopGroup bossgroup = new NioEventLoopGroup();//建立BOSS線程組 4 EventLoopGroup workgroup = new NioEventLoopGroup();//建立WORK線程組 5 try{ 6 ServerBootstrap b = new ServerBootstrap(); 7 b.group(bossgroup,workgroup)//綁定BOSS和WORK線程組 8 .channel(NioServerSocketChannel.class)//設置channel類型,服務端用的是NioServerSocketChannel 9 .option(ChannelOption.SO_BACKLOG,100) //設置channel的配置選項 10 .handler(new LoggingHandler(LogLevel.INFO))//設置NioServerSocketChannel的Handler 11 .childHandler(new ChannelInitializer<SocketChannel>() {//設置childHandler,做爲新建的NioSocketChannel的初始化Handler 12 @Override//當新建的與客戶端通訊的NioSocketChannel被註冊到EventLoop成功時,該方法會被調用,用於添加業務Handler 13 protected void initChannel(SocketChannel ch) throws Exception { 14 ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); 15 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter)); 16 ch.pipeline().addLast(new StringDecoder()); 17 ch.pipeline().addLast(new EchoServerHandler()); 18 } 19 }); 20 ChannelFuture f = b.bind(port).sync();//同步等待綁定結束 21 f.channel().closeFuture().sync();//同步等待關閉 22 }finally { 23 bossgroup.shutdownGracefully(); 24 workgroup.shutdownGracefully(); 25 } 26 } 27 public static void main(String[] args) throws Exception{ 28 int port = 8082; 29 new NettyTestServer().bind(port); 30 } 31 } 32 @ChannelHandler.Sharable 33 class EchoServerHandler extends ChannelInboundHandlerAdapter{ 34 int count = 0; 35 36 @Override 37 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 38 String body = (String)msg; 39 System.out.println("This is" + ++count + "times receive client:[" + body + "]"); 40 body += "$_"; 41 ByteBuf echo = Unpooled.copiedBuffer(body.getBytes()); 42 ctx.writeAndFlush(echo); 43 ctx.fireChannelRead("my name is chenyang"); 44 } 45 46 @Override 47 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 48 cause.printStackTrace(); 49 ctx.close(); 50 } 51 }
2、首先來看一下ServerBootstrap類,顧名思義,它是一個服務端啓動類,用於幫助用戶快速配置、啓動服務端服務。先來看一下該類的主要成員定義:git
1 /** 2 * {@link Bootstrap} sub-class which allows easy bootstrap of {@link ServerChannel} 3 * 4 */ 5 public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { 6 7 private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class); 8 //如下都是針對NioSocketChannel的 9 private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>(); 10 private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>(); 11 private volatile EventLoopGroup childGroup; 12 private volatile ChannelHandler childHandler;
可見,ServerBootstrap是AbstractBootstrap的子類,AbstractBootstrap的成員主要有:github
1 /** 2 * {@link AbstractBootstrap} is a helper class that makes it easy to bootstrap a {@link Channel}. It support 3 * method-chaining to provide an easy way to configure the {@link AbstractBootstrap}. 4 * 5 * <p>When not used in a {@link ServerBootstrap} context, the {@link #bind()} methods are useful for connectionless 6 * transports such as datagram (UDP).</p> 7 */ 8 public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { 9 //如下都是針對服務端NioServerSocketChannel的 10 volatile EventLoopGroup group; 11 private volatile ChannelFactory<? extends C> channelFactory; 12 private volatile SocketAddress localAddress; 13 private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>(); 14 private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>(); 15 private volatile ChannelHandler handler;
用一張圖說明兩個類之間的關係以下(原圖出自:http://blog.csdn.net/zxhoo/article/details/17532857)。bootstrap
總結以下: ServerBootstrap比AbstractBootstrap多了4個Part,其中AbstractBootstrap的成員用於設置服務端NioServerSocketChannel(包括所使用的線程組、使用的channel工廠類、使用的Handler以及地址和選項信息等), ServerBootstrap的4個成員用於設置爲有新鏈接時新建的NioSocketChannel。promise
3、ServerBootstrap配置源碼解釋app
1)b.group(bossgroup,workgroup)less
1 /** 2 * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These 3 * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and 4 * {@link Channel}'s. 5 */ 6 public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { 7 super.group(parentGroup);//設置BOSS線程組(在AbstractBootstrap中) 8 if (childGroup == null) { 9 throw new NullPointerException("childGroup"); 10 } 11 if (this.childGroup != null) { 12 throw new IllegalStateException("childGroup set already"); 13 } 14 this.childGroup = childGroup;//設置WORK線程組 15 return this; 16 }
2) .channel(NioServerSocketChannel.class)socket
1 /** 2 * The {@link Class} which is used to create {@link Channel} instances from. 3 * You either use this or {@link #channelFactory(ChannelFactory)} if your 4 * {@link Channel} implementation has no no-args constructor. 5 */ 6 public B channel(Class<? extends C> channelClass) { 7 if (channelClass == null) { 8 throw new NullPointerException("channelClass"); 9 } 10 return channelFactory(new BootstrapChannelFactory<C>(channelClass));//設置channel工廠 11 }
channelFactory方法就是用來設置channel工廠的,這裏的工廠就是BootstrapChannelFactory(是一個泛型類)。ide
1 /** 2 * {@link ChannelFactory} which is used to create {@link Channel} instances from 3 * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)} 4 * is not working for you because of some more complex needs. If your {@link Channel} implementation 5 * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for 6 * simplify your code. 7 */ 8 @SuppressWarnings("unchecked") 9 public B channelFactory(ChannelFactory<? extends C> channelFactory) { 10 if (channelFactory == null) { 11 throw new NullPointerException("channelFactory"); 12 } 13 if (this.channelFactory != null) { 14 throw new IllegalStateException("channelFactory set already"); 15 } 16 17 this.channelFactory = channelFactory;//設置channel工廠 18 return (B) this; 19 }
下面就是channel工廠類的實現,構造函數傳入一個channel類型(針對服務端也就是NioServerSocketChannel.class),BootstrapChannelFactory工廠類提供的newChannel方法將使用反射建立對應的channel。用於channel的建立通常只在啓動的時候進行,所以使用反射不會形成性能的問題。函數
1 private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> { 2 private final Class<? extends T> clazz; 3 4 BootstrapChannelFactory(Class<? extends T> clazz) { 5 this.clazz = clazz; 6 } 7 8 @Override 9 public T newChannel() {//須要建立channel的時候,次方法將被調用 10 try { 11 return clazz.newInstance();//反射建立對應channel 12 } catch (Throwable t) { 13 throw new ChannelException("Unable to create Channel from class " + clazz, t); 14 } 15 } 16 17 @Override 18 public String toString() { 19 return StringUtil.simpleClassName(clazz) + ".class"; 20 } 21 }
3) .option(ChannelOption.SO_BACKLOG,100)
用來設置channel的選項,好比設置BackLog的大小等。
1 /** 2 * Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they got 3 * created. Use a value of {@code null} to remove a previous set {@link ChannelOption}. 4 */ 5 @SuppressWarnings("unchecked") 6 public <T> B option(ChannelOption<T> option, T value) { 7 if (option == null) { 8 throw new NullPointerException("option"); 9 } 10 if (value == null) { 11 synchronized (options) { 12 options.remove(option); 13 } 14 } else { 15 synchronized (options) { 16 options.put(option, value); 17 } 18 } 19 return (B) this; 20 }
4) .handler(new LoggingHandler(LogLevel.INFO))
用於設置服務端NioServerSocketChannel的Handler。
1 /** 2 * the {@link ChannelHandler} to use for serving the requests. 3 */ 4 @SuppressWarnings("unchecked") 5 public B handler(ChannelHandler handler) { 6 if (handler == null) { 7 throw new NullPointerException("handler"); 8 } 9 this.handler = handler;//設置的是父類AbstractBootstrap裏的成員,也就是該handler是被NioServerSocketChannel使用 10 return (B) this; 11 }
5) .childHandler(new ChannelInitializer<SocketChannel>() {
必定要分清.handler和.childHandler的區別,首先,二者都是設置一個Handler,可是,前者設置的Handler是屬於服務端NioServerSocketChannel的,然後者設置的Handler是屬於每個新建的NioSocketChannel的(每當有一個來自客戶端的鏈接時,否會建立一個新的NioSocketChannel)。
1 /** 2 * Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s. 3 */ 4 public ServerBootstrap childHandler(ChannelHandler childHandler) { 5 if (childHandler == null) { 6 throw new NullPointerException("childHandler"); 7 } 8 this.childHandler = childHandler; 9 return this; 10 }
至此,ServerBootstrap的配置完成,其實有人可能會很好奇,爲何不直接在ServerBootstrap的構造函數中一步完成這些初始化配置操做,這樣作雖然能夠,可是這會致使ServerBootstrap構造函數的參數過多,而是用Builder模式(也就是ServerBootstrap目前採用的模式,能夠參見<<effective java>>)則能夠有效的解決構造方法參數過多的問題。
4、bind流程
1)一切從bind開始 ChannelFuture f = b.bind(port).sync();
1 /** 2 * Create a new {@link Channel} and bind it. 3 */ 4 public ChannelFuture bind(int inetPort) { 5 return bind(new InetSocketAddress(inetPort)); 6 }
繼續深刻bind
1 /** 2 * Create a new {@link Channel} and bind it. 3 */ 4 public ChannelFuture bind(SocketAddress localAddress) { 5 validate(); 6 if (localAddress == null) { 7 throw new NullPointerException("localAddress"); 8 } 9 return doBind(localAddress); 10 }
繼續攝入doBind
1 private ChannelFuture doBind(final SocketAddress localAddress) { 2 final ChannelFuture regFuture = initAndRegister();//初始化並註冊一個channel 3 final Channel channel = regFuture.channel(); 4 if (regFuture.cause() != null) { 5 return regFuture; 6 } 7 //等待註冊成功 8 if (regFuture.isDone()) { 9 // At this point we know that the registration was complete and successful. 10 ChannelPromise promise = channel.newPromise(); 11 doBind0(regFuture, channel, localAddress, promise);//執行channel.bind() 12 return promise; 13 } else { 14 // Registration future is almost always fulfilled already, but just in case it's not. 15 final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); 16 regFuture.addListener(new ChannelFutureListener() { 17 @Override 18 public void operationComplete(ChannelFuture future) throws Exception { 19 Throwable cause = future.cause(); 20 if (cause != null) { 21 // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an 22 // IllegalStateException once we try to access the EventLoop of the Channel. 23 promise.setFailure(cause); 24 } else { 25 // Registration was successful, so set the correct executor to use. 26 // See https://github.com/netty/netty/issues/2586 27 promise.executor = channel.eventLoop(); 28 } 29 doBind0(regFuture, channel, localAddress, promise); 30 } 31 }); 32 return promise; 33 } 34 }
doBind中最重要的一步就是調用initAndRegister方法了,它會初始化並註冊一個channel,直接看源碼吧。
1 final ChannelFuture initAndRegister() { 2 final Channel channel = channelFactory().newChannel();//還記得前面咱們設置過channel工廠麼,終於排上用場了 3 try { 4 init(channel);//初始化channel(就是NioServerSocketChannel) 5 } catch (Throwable t) { 6 channel.unsafe().closeForcibly(); 7 // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor 8 return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); 9 } 10 11 ChannelFuture regFuture = group().register(channel);//向EventLoopGroup中註冊一個channel 12 if (regFuture.cause() != null) { 13 if (channel.isRegistered()) { 14 channel.close(); 15 } else { 16 channel.unsafe().closeForcibly(); 17 } 18 } 19 20 // If we are here and the promise is not failed, it's one of the following cases: 21 // 1) If we attempted registration from the event loop, the registration has been completed at this point. 22 // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. 23 // 2) If we attempted registration from the other thread, the registration request has been successfully 24 // added to the event loop's task queue for later execution. 25 // i.e. It's safe to attempt bind() or connect() now: 26 // because bind() or connect() will be executed *after* the scheduled registration task is executed 27 // because register(), bind(), and connect() are all bound to the same thread. 28 29 return regFuture; 30 }
先來看一下init方法
1 @Override 2 void init(Channel channel) throws Exception { 3 final Map<ChannelOption<?>, Object> options = options(); 4 synchronized (options) { 5 channel.config().setOptions(options);//設置以前配置的channel選項 6 } 7 8 final Map<AttributeKey<?>, Object> attrs = attrs(); 9 synchronized (attrs) { 10 for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { 11 @SuppressWarnings("unchecked") 12 AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); 13 channel.attr(key).set(e.getValue());//設置以前配置的屬性 14 } 15 } 16 17 ChannelPipeline p = channel.pipeline();//獲取channel綁定的pipeline(pipeline實在channel建立的時候建立並綁定的) 18 if (handler() != null) {//若是用戶配置過Handler 19 p.addLast(handler());//爲NioServerSocketChannel綁定的pipeline添加Handler 20 } 21 //開始準備child用到的4個part,由於接下來就要使用它們。 22 final EventLoopGroup currentChildGroup = childGroup; 23 final ChannelHandler currentChildHandler = childHandler; 24 final Entry<ChannelOption<?>, Object>[] currentChildOptions; 25 final Entry<AttributeKey<?>, Object>[] currentChildAttrs; 26 synchronized (childOptions) { 27 currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); 28 } 29 synchronized (childAttrs) { 30 currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); 31 } 32 //爲NioServerSocketChannel的pipeline添加一個初始化Handler,當NioServerSocketChannel在EventLoop註冊成功時,該handler的init方法將被調用 33 p.addLast(new ChannelInitializer<Channel>() { 34 @Override 35 public void initChannel(Channel ch) throws Exception { 36 ch.pipeline().addLast(new ServerBootstrapAcceptor(//爲NioServerSocketChannel的pipeline添加ServerBootstrapAcceptor處理器
//該Handler主要用來將新建立的NioSocketChannel註冊到EventLoopGroup中 37 currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); 38 } 39 }); 40 }
init執行以後,接下來看一下注冊過程(ChannelFuture regFuture = group().register(channel); 注意,這裏的group是以前設置的BOSS EventLoopGroup)
1 @Override 2 public ChannelFuture register(Channel channel) { 3 return next().register(channel);//首先使用next()在BOSS EventLoopGroup中選出下一個EventLoop,而後執行註冊 4 }
1 @Override 2 public ChannelFuture register(Channel channel) { 3 return register(channel, new DefaultChannelPromise(channel, this)); 4 }
1 @Override 2 public ChannelFuture register(final Channel channel, final ChannelPromise promise) { 3 if (channel == null) { 4 throw new NullPointerException("channel"); 5 } 6 if (promise == null) { 7 throw new NullPointerException("promise"); 8 } 9 10 channel.unsafe().register(this, promise);//unsafe執行的都是實際的操做 11 return promise; 12 }
1 @Override 2 public final void register(EventLoop eventLoop, final ChannelPromise promise) { 3 if (eventLoop == null) { 4 throw new NullPointerException("eventLoop"); 5 } 6 if (isRegistered()) { 7 promise.setFailure(new IllegalStateException("registered to an event loop already")); 8 return; 9 } 10 if (!isCompatible(eventLoop)) { 11 promise.setFailure( 12 new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); 13 return; 14 } 15 16 AbstractChannel.this.eventLoop = eventLoop;//綁定爲該channel選的的EventLoop 17 //必須保證註冊是由該EventLoop發起的,不然會單獨封裝成一個Task,由該EventLoop執行 18 if (eventLoop.inEventLoop()) { 19 register0(promise);//註冊 20 } else { 21 try { 22 eventLoop.execute(new OneTimeTask() { 23 @Override 24 public void run() { 25 register0(promise); 26 } 27 }); 28 } catch (Throwable t) { 29 logger.warn( 30 "Force-closing a channel whose registration task was not accepted by an event loop: {}", 31 AbstractChannel.this, t); 32 closeForcibly(); 33 closeFuture.setClosed(); 34 safeSetFailure(promise, t); 35 } 36 } 37 }
1 private void register0(ChannelPromise promise) { 2 try { 3 // check if the channel is still open as it could be closed in the mean time when the register 4 // call was outside of the eventLoop 5 if (!promise.setUncancellable() || !ensureOpen(promise)) { 6 return; 7 } 8 boolean firstRegistration = neverRegistered; 9 doRegister();//最底層的註冊調用 10 neverRegistered = false; 11 registered = true; 12 safeSetSuccess(promise);//設置註冊結果爲成功 13 pipeline.fireChannelRegistered();//發起pipeline調用fireChannelRegistered(head.fireChannelRegistered) 14 // Only fire a channelActive if the channel has never been registered. This prevents firing 15 // multiple channel actives if the channel is deregistered and re-registered. 16 if (firstRegistration && isActive()) {//若是是首次註冊,並且channel已經處於Active狀態(若是是服務端,表示listen成功,若是是客戶端,即是connect成功) 17 pipeline.fireChannelActive();//發起pipeline的fireChannelActive 18 } 19 } catch (Throwable t) { 20 // Close the channel directly to avoid FD leak. 21 closeForcibly(); 22 closeFuture.setClosed(); 23 safeSetFailure(promise, t); 24 } 25 }
doRegister會完成在EventLoop的Selector上的註冊任務。
1 @Override 2 protected void doRegister() throws Exception { 3 boolean selected = false; 4 for (;;) { 5 try { 6 selectionKey = javaChannel().register(eventLoop().selector, 0, this);//注意,此時op位爲0,channel還不能監聽讀寫事件 7 return; 8 } catch (CancelledKeyException e) { 9 if (!selected) { 10 // Force the Selector to select now as the "canceled" SelectionKey may still be 11 // cached and not removed because no Select.select(..) operation was called yet. 12 eventLoop().selectNow(); 13 selected = true; 14 } else { 15 // We forced a select operation on the selector before but the SelectionKey is still cached 16 // for whatever reason. JDK bug ? 17 throw e; 18 } 19 } 20 } 21 }
由上可知,註冊成功後,NioServerSocketChannel還不能監聽讀寫事件,那麼何時回開始監聽呢?因爲註冊成功以後,會進行pipeline.fireChannelRegistered()調用,該事件會在NioServerSocketChannel的pipeline中傳播(從head開始,逐步findContextInbound),這會致使Inbound類型的Handler的channelRegistered方法被調用。還記得在init方法中爲NioServerSocketChannel添加的ChannelInitializer的Handler嗎,它也是一個InboundHandler,看一下他的實現:
1 @Sharable 2 public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter { 3 4 private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class); 5 6 /** 7 * This method will be called once the {@link Channel} was registered. After the method returns this instance 8 * will be removed from the {@link ChannelPipeline} of the {@link Channel}. 9 * 10 * @param ch the {@link Channel} which was registered. 11 * @throws Exception is thrown if an error occurs. In that case the {@link Channel} will be closed. 12 */ 13 protected abstract void initChannel(C ch) throws Exception;//抽象方法,由子類實現 14 15 @Override 16 @SuppressWarnings("unchecked") 17 public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {//該方法會在NioServerScoketChannel註冊成功時被調用 18 ChannelPipeline pipeline = ctx.pipeline(); 19 boolean success = false; 20 try { 21 initChannel((C) ctx.channel());//調用initChannel 22 pipeline.remove(this);//初始化Handler只完成初始化工做,初始化完成自後就把本身刪除 23 ctx.fireChannelRegistered();//繼續傳播channelRegistered事件 24 success = true; 25 } catch (Throwable t) { 26 logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t); 27 } finally { 28 if (pipeline.context(this) != null) { 29 pipeline.remove(this); 30 } 31 if (!success) { 32 ctx.close(); 33 } 34 } 35 } 36 }
在重複貼一次代碼,看一下initChannel裏面是什麼
1 p.addLast(new ChannelInitializer<Channel>() { 2 @Override 3 public void initChannel(Channel ch) throws Exception {//被channelRegistered調用 4 ch.pipeline().addLast(new ServerBootstrapAcceptor( 5 currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); 6 } 7 }
能夠看到,initChannel只是向pipeline中添加了ServerBootstrapAcceptor類型的Handler。
可是這仍是沒有看到給NioServerSocketChannel註冊讀寫事件的地方,繼續看以前的register0代碼,它還會調用pipleline的fireChannelActive方法,看一下該方方法的代碼:
1 @Override 2 public ChannelPipeline fireChannelActive() { 3 head.fireChannelActive();//將ChannelActive事件在pipeline中傳播 4 //若是channel被配置成自動可讀的,那麼久發起讀事件 5 if (channel.config().isAutoRead()) { 6 channel.read();//pipeline.read()-->tail.read()-->*****-->head.read()-->unsafe.beginRead() 7 } 8 9 return this; 10 }
1 @Override 2 public ChannelHandlerContext fireChannelActive() {//head的fireChannelActive() 3 final AbstractChannelHandlerContext next = findContextInbound();//尋找下一個Inbound類型的Context 4 EventExecutor executor = next.executor(); 5 if (executor.inEventLoop()) { 6 next.invokeChannelActive();//調用Context中的Handler的channelActive方法 7 } else { 8 executor.execute(new OneTimeTask() { 9 @Override 10 public void run() { 11 next.invokeChannelActive(); 12 } 13 }); 14 } 15 return this; 16 }
看一下beginRead實現:
1 @Override 2 public final void beginRead() { 3 if (!isActive()) { 4 return; 5 } 6 7 try { 8 doBeginRead();//真正的註冊讀事件 9 } catch (final Exception e) { 10 invokeLater(new OneTimeTask() { 11 @Override 12 public void run() { 13 pipeline.fireExceptionCaught(e); 14 } 15 }); 16 close(voidPromise()); 17 } 18 }
1 @Override 2 protected void doBeginRead() throws Exception { 3 // Channel.read() or ChannelHandlerContext.read() was called 4 if (inputShutdown) { 5 return; 6 } 7 8 final SelectionKey selectionKey = this.selectionKey; 9 if (!selectionKey.isValid()) { 10 return; 11 } 12 13 readPending = true; 14 15 final int interestOps = selectionKey.interestOps(); 16 if ((interestOps & readInterestOp) == 0) { 17 selectionKey.interestOps(interestOps | readInterestOp);//真正的註冊讀事件 18 } 19 }
5、客戶端接入過程
接下來看看,當一個客戶端鏈接進來時,都發生了什麼。
1)首先從事件的源頭看起,下面是EventLoop的事件循環
1 @Override 2 protected void run() { 3 for (;;) { 4 boolean oldWakenUp = wakenUp.getAndSet(false); 5 try { 6 if (hasTasks()) { 7 selectNow(); 8 } else { 9 select(oldWakenUp);//調用selector.select() 10 11 // 'wakenUp.compareAndSet(false, true)' is always evaluated 12 // before calling 'selector.wakeup()' to reduce the wake-up 13 // overhead. (Selector.wakeup() is an expensive operation.) 14 // 15 // However, there is a race condition in this approach. 16 // The race condition is triggered when 'wakenUp' is set to 17 // true too early. 18 // 19 // 'wakenUp' is set to true too early if: 20 // 1) Selector is waken up between 'wakenUp.set(false)' and 21 // 'selector.select(...)'. (BAD) 22 // 2) Selector is waken up between 'selector.select(...)' and 23 // 'if (wakenUp.get()) { ... }'. (OK) 24 // 25 // In the first case, 'wakenUp' is set to true and the 26 // following 'selector.select(...)' will wake up immediately. 27 // Until 'wakenUp' is set to false again in the next round, 28 // 'wakenUp.compareAndSet(false, true)' will fail, and therefore 29 // any attempt to wake up the Selector will fail, too, causing 30 // the following 'selector.select(...)' call to block 31 // unnecessarily. 32 // 33 // To fix this problem, we wake up the selector again if wakenUp 34 // is true immediately after selector.select(...). 35 // It is inefficient in that it wakes up the selector for both 36 // the first case (BAD - wake-up required) and the second case 37 // (OK - no wake-up required). 38 39 if (wakenUp.get()) { 40 selector.wakeup(); 41 } 42 } 43 44 cancelledKeys = 0; 45 needsToSelectAgain = false; 46 final int ioRatio = this.ioRatio; 47 if (ioRatio == 100) { 48 processSelectedKeys(); 49 runAllTasks(); 50 } else { 51 final long ioStartTime = System.nanoTime(); 52 53 processSelectedKeys();//有事件發生時,執行這裏 54 55 final long ioTime = System.nanoTime() - ioStartTime; 56 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); 57 } 58 59 if (isShuttingDown()) { 60 closeAll(); 61 if (confirmShutdown()) { 62 break; 63 } 64 } 65 } catch (Throwable t) { 66 logger.warn("Unexpected exception in the selector loop.", t); 67 68 // Prevent possible consecutive immediate failures that lead to 69 // excessive CPU consumption. 70 try { 71 Thread.sleep(1000); 72 } catch (InterruptedException e) { 73 // Ignore. 74 } 75 } 76 } 77 }
看一下processSelectedKeys代碼
1 private void processSelectedKeys() { 2 if (selectedKeys != null) { 3 processSelectedKeysOptimized(selectedKeys.flip());//執行這裏 4 } else { 5 processSelectedKeysPlain(selector.selectedKeys()); 6 } 7 }
1 private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { 2 for (int i = 0;; i ++) { 3 final SelectionKey k = selectedKeys[i]; 4 if (k == null) { 5 break; 6 } 7 // null out entry in the array to allow to have it GC'ed once the Channel close 8 // See https://github.com/netty/netty/issues/2363 9 selectedKeys[i] = null; 10 11 final Object a = k.attachment(); 12 13 if (a instanceof AbstractNioChannel) {//由於是NioServerSocketChannel,因此執行這裏 14 processSelectedKey(k, (AbstractNioChannel) a); 15 } else { 16 @SuppressWarnings("unchecked") 17 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; 18 processSelectedKey(k, task); 19 } 20 21 if (needsToSelectAgain) { 22 // null out entries in the array to allow to have it GC'ed once the Channel close 23 // See https://github.com/netty/netty/issues/2363 24 for (;;) { 25 if (selectedKeys[i] == null) { 26 break; 27 } 28 selectedKeys[i] = null; 29 i++; 30 } 31 32 selectAgain(); 33 // Need to flip the optimized selectedKeys to get the right reference to the array 34 // and reset the index to -1 which will then set to 0 on the for loop 35 // to start over again. 36 // 37 // See https://github.com/netty/netty/issues/1523 38 selectedKeys = this.selectedKeys.flip(); 39 i = -1; 40 } 41 } 42 }
1 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { 2 final NioUnsafe unsafe = ch.unsafe(); 3 if (!k.isValid()) { 4 // close the channel if the key is not valid anymore 5 unsafe.close(unsafe.voidPromise()); 6 return; 7 } 8 9 try { 10 int readyOps = k.readyOps(); 11 // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead 12 // to a spin loop 13 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { 14 unsafe.read();//由於是ACCEPT事件,因此執行這裏(這裏的read會由於NioServerSocketChannel和NioSocketChannel不一樣) 15 if (!ch.isOpen()) { 16 // Connection already closed - no need to handle write. 17 return; 18 } 19 } 20 if ((readyOps & SelectionKey.OP_WRITE) != 0) { 21 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write 22 ch.unsafe().forceFlush(); 23 } 24 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { 25 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking 26 // See https://github.com/netty/netty/issues/924 27 int ops = k.interestOps(); 28 ops &= ~SelectionKey.OP_CONNECT; 29 k.interestOps(ops); 30 31 unsafe.finishConnect(); 32 } 33 } catch (CancelledKeyException ignored) { 34 unsafe.close(unsafe.voidPromise()); 35 } 36 }
NioServerSocketChannel繼承了AbstractNioMessageChannel,因此執行的是AbstractNioMessageChannel的版本
1 @Override 2 public void read() { 3 assert eventLoop().inEventLoop(); 4 final ChannelConfig config = config(); 5 if (!config.isAutoRead() && !isReadPending()) { 6 // ChannelConfig.setAutoRead(false) was called in the meantime 7 removeReadOp(); 8 return; 9 } 10 11 final int maxMessagesPerRead = config.getMaxMessagesPerRead(); 12 final ChannelPipeline pipeline = pipeline();//獲取服務端NioServerSocketChannel的pipeline 13 boolean closed = false; 14 Throwable exception = null; 15 try { 16 try { 17 for (;;) { 18 int localRead = doReadMessages(readBuf);//執行這裏 19 if (localRead == 0) { 20 break; 21 } 22 if (localRead < 0) { 23 closed = true; 24 break; 25 } 26 27 // stop reading and remove op 28 if (!config.isAutoRead()) { 29 break; 30 } 31 32 if (readBuf.size() >= maxMessagesPerRead) { 33 break; 34 } 35 } 36 } catch (Throwable t) { 37 exception = t; 38 } 39 setReadPending(false); 40 int size = readBuf.size(); 41 for (int i = 0; i < size; i ++) { 42 pipeline.fireChannelRead(readBuf.get(i));//引起ChannelRead 43 } 44 45 readBuf.clear(); 46 pipeline.fireChannelReadComplete();//引起channelReadComplete 47 48 if (exception != null) { 49 if (exception instanceof IOException && !(exception instanceof PortUnreachableException)) { 50 // ServerChannel should not be closed even on IOException because it can often continue 51 // accepting incoming connections. (e.g. too many open files) 52 closed = !(AbstractNioMessageChannel.this instanceof ServerChannel); 53 } 54 55 pipeline.fireExceptionCaught(exception); 56 } 57 58 if (closed) { 59 if (isOpen()) { 60 close(voidPromise()); 61 } 62 } 63 } finally { 64 // Check if there is a readPending which was not processed yet. 65 // This could be for two reasons: 66 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method 67 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method 68 // 69 // See https://github.com/netty/netty/issues/2254 70 if (!config.isAutoRead() && !isReadPending()) { 71 removeReadOp(); 72 } 73 } 74 }
而對於NioSocketChannel而言,其繼承自AbstractNioByteChannel,所以調用的AbstractNioByteChannel的read版本以下:
1 @Override 2 public final void read() { 3 final ChannelConfig config = config(); 4 if (!config.isAutoRead() && !isReadPending()) { 5 // ChannelConfig.setAutoRead(false) was called in the meantime 6 removeReadOp(); 7 return; 8 } 9 10 final ChannelPipeline pipeline = pipeline(); 11 final ByteBufAllocator allocator = config.getAllocator(); 12 final int maxMessagesPerRead = config.getMaxMessagesPerRead(); 13 RecvByteBufAllocator.Handle allocHandle = this.allocHandle; 14 if (allocHandle == null) { 15 this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); 16 } 17 18 ByteBuf byteBuf = null; 19 int messages = 0; 20 boolean close = false; 21 try { 22 int totalReadAmount = 0;//讀到的總長度 23 boolean readPendingReset = false; 24 do { 25 byteBuf = allocHandle.allocate(allocator); 26 int writable = byteBuf.writableBytes();//獲取bytebuf還能夠寫入的字節數 27 int localReadAmount = doReadBytes(byteBuf);//真正的讀取,localReadAmount本次讀取的實際長度 28 if (localReadAmount <= 0) {//什麼都沒有讀到 29 // not was read release the buffer 30 byteBuf.release(); 31 byteBuf = null; 32 close = localReadAmount < 0; 33 break;//跳出循環 34 } 35 if (!readPendingReset) { 36 readPendingReset = true; 37 setReadPending(false); 38 } 39 pipeline.fireChannelRead(byteBuf);//發起調用channelRead,將bytebuf傳過去 40 byteBuf = null; 41 //若是當前讀到的總長度+本次讀到的總長度已經大於Integer類型的最大值 42 if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) { 43 // Avoid overflow. 44 totalReadAmount = Integer.MAX_VALUE; 45 break;//跳出循環 46 } 47 //更新總長度 48 totalReadAmount += localReadAmount; 49 50 // stop reading 51 if (!config.isAutoRead()) { 52 break;//若是不是自動讀取,那麼讀取一次以後就自動中止了 53 } 54 //若是本次讀取的大小沒有把bytebuf填滿,那麼說明數據已經所有讀取了 55 if (localReadAmount < writable) { 56 // Read less than what the buffer can hold, 57 // which might mean we drained the recv buffer completely. 58 break;//跳出循環 59 } 60 } while (++ messages < maxMessagesPerRead); 61 62 pipeline.fireChannelReadComplete();//跳出循環後,引起channelReadComplete 63 allocHandle.record(totalReadAmount); 64 65 if (close) { 66 closeOnRead(pipeline); 67 close = false; 68 } 69 } catch (Throwable t) { 70 handleReadException(pipeline, byteBuf, t, close); 71 } finally { 72 // Check if there is a readPending which was not processed yet. 73 // This could be for two reasons: 74 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method 75 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method 76 // 77 // See https://github.com/netty/netty/issues/2254 78 if (!config.isAutoRead() && !isReadPending()) { 79 removeReadOp(); 80 } 81 } 82 }
接着看doMessages
1 @Override 2 protected int doReadMessages(List<Object> buf) throws Exception { 3 SocketChannel ch = javaChannel().accept();//建立SocketChannel,accept客戶端 4 5 try { 6 if (ch != null) { 7 buf.add(new NioSocketChannel(this, ch)); 8 return 1; 9 } 10 } catch (Throwable t) { 11 logger.warn("Failed to create a new channel from an accepted socket.", t); 12 13 try { 14 ch.close(); 15 } catch (Throwable t2) { 16 logger.warn("Failed to close a socket.", t2); 17 } 18 } 19 20 return 0; 21 }
執行完doReadMessages以後,針對客戶端的SocketChannel已經建立了,因爲以後還會引起channelRead和channelReadComplete事件,而這些都會致使pipeline中的ServerBootstrapAcceptor的相應方法被調用,來看一下ServerBootstrapAcceptor源碼:
1 private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { 2 3 private final EventLoopGroup childGroup; 4 private final ChannelHandler childHandler; 5 private final Entry<ChannelOption<?>, Object>[] childOptions; 6 private final Entry<AttributeKey<?>, Object>[] childAttrs; 7 8 ServerBootstrapAcceptor( 9 EventLoopGroup childGroup, ChannelHandler childHandler, 10 Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) { 11 this.childGroup = childGroup; 12 this.childHandler = childHandler; 13 this.childOptions = childOptions; 14 this.childAttrs = childAttrs; 15 } 16 17 @Override 18 @SuppressWarnings("unchecked") 19 public void channelRead(ChannelHandlerContext ctx, Object msg) { 20 final Channel child = (Channel) msg; 21 22 child.pipeline().addLast(childHandler);//將最開始配置的childHandler添加到SocketChannel的pipeline中,這個Handler也是一個初始化Handler,原理和服務端的一致 23 24 for (Entry<ChannelOption<?>, Object> e: childOptions) { 25 try { 26 if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { 27 logger.warn("Unknown channel option: " + e); 28 } 29 } catch (Throwable t) { 30 logger.warn("Failed to set a channel option: " + child, t); 31 } 32 } 33 34 for (Entry<AttributeKey<?>, Object> e: childAttrs) { 35 child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); 36 } 37 38 try { 39 childGroup.register(child).addListener(new ChannelFutureListener() {//將SocketChannel註冊到WORK EventLoopGroup中,註冊過程與服務端相似,此處再也不講解 40 @Override 41 public void operationComplete(ChannelFuture future) throws Exception { 42 if (!future.isSuccess()) { 43 forceClose(child, future.cause()); 44 } 45 } 46 }); 47 } catch (Throwable t) { 48 forceClose(child, t); 49 } 50 } 51 52 private static void forceClose(Channel child, Throwable t) { 53 child.unsafe().closeForcibly(); 54 logger.warn("Failed to register an accepted channel: " + child, t); 55 } 56 57 @Override 58 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 59 final ChannelConfig config = ctx.channel().config(); 60 if (config.isAutoRead()) { 61 // stop accept new connections for 1 second to allow the channel to recover 62 // See https://github.com/netty/netty/issues/1328 63 config.setAutoRead(false); 64 ctx.channel().eventLoop().schedule(new Runnable() { 65 @Override 66 public void run() { 67 config.setAutoRead(true); 68 } 69 }, 1, TimeUnit.SECONDS); 70 } 71 // still let the exceptionCaught event flow through the pipeline to give the user 72 // a chance to do something with it 73 ctx.fireExceptionCaught(cause); 74 } 75 }
引用一張圖(出自:http://blog.csdn.net/zxhoo/article/details/17532857) 。