Netty源碼分析之服務端啓動過程

1、首先來看一段服務端的示例代碼:java

 1 public class NettyTestServer {
 2     public void bind(int port) throws Exception{
 3         EventLoopGroup bossgroup = new NioEventLoopGroup();//建立BOSS線程組  4         EventLoopGroup workgroup = new NioEventLoopGroup();//建立WORK線程組  5         try{
 6             ServerBootstrap b = new ServerBootstrap();
 7             b.group(bossgroup,workgroup)//綁定BOSS和WORK線程組  8                     .channel(NioServerSocketChannel.class)//設置channel類型,服務端用的是NioServerSocketChannel  9                     .option(ChannelOption.SO_BACKLOG,100) //設置channel的配置選項 10                     .handler(new LoggingHandler(LogLevel.INFO))//設置NioServerSocketChannel的Handler 11                     .childHandler(new ChannelInitializer<SocketChannel>() {//設置childHandler,做爲新建的NioSocketChannel的初始化Handler 12                         @Override//當新建的與客戶端通訊的NioSocketChannel被註冊到EventLoop成功時,該方法會被調用,用於添加業務Handler 13                         protected void initChannel(SocketChannel ch) throws Exception {
14                             ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
15                             ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
16                             ch.pipeline().addLast(new StringDecoder());
17                             ch.pipeline().addLast(new EchoServerHandler());
18                         }
19                     });
20             ChannelFuture f = b.bind(port).sync();//同步等待綁定結束 21             f.channel().closeFuture().sync();//同步等待關閉 22         }finally {
23             bossgroup.shutdownGracefully();
24             workgroup.shutdownGracefully();
25         }
26     }
27     public static  void main(String[] args) throws  Exception{
28         int port = 8082;
29         new NettyTestServer().bind(port);
30     }
31 }
32 @ChannelHandler.Sharable
33 class EchoServerHandler extends ChannelInboundHandlerAdapter{
34     int count = 0;
35 
36     @Override
37     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
38         String body = (String)msg;
39         System.out.println("This is" + ++count + "times receive client:[" + body + "]");
40         body += "$_";
41         ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
42         ctx.writeAndFlush(echo);
43         ctx.fireChannelRead("my name is chenyang");
44     }
45 
46     @Override
47     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
48         cause.printStackTrace();
49         ctx.close();
50     }
51 }

2、首先來看一下ServerBootstrap類,顧名思義,它是一個服務端啓動類,用於幫助用戶快速配置、啓動服務端服務。先來看一下該類的主要成員定義:git

 1 /**
 2  * {@link Bootstrap} sub-class which allows easy bootstrap of {@link ServerChannel}
 3  *
 4  */
 5 public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
 6 
 7     private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);
 8     //如下都是針對NioSocketChannel的
 9     private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
10     private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
11     private volatile EventLoopGroup childGroup;
12     private volatile ChannelHandler childHandler;

可見,ServerBootstrap是AbstractBootstrap的子類,AbstractBootstrap的成員主要有:github

 1 /**
 2  * {@link AbstractBootstrap} is a helper class that makes it easy to bootstrap a {@link Channel}. It support
 3  * method-chaining to provide an easy way to configure the {@link AbstractBootstrap}.
 4  *
 5  * <p>When not used in a {@link ServerBootstrap} context, the {@link #bind()} methods are useful for connectionless
 6  * transports such as datagram (UDP).</p>
 7  */
 8 public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
 9     //如下都是針對服務端NioServerSocketChannel的
10     volatile EventLoopGroup group;
11     private volatile ChannelFactory<? extends C> channelFactory;
12     private volatile SocketAddress localAddress;
13     private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
14     private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
15     private volatile ChannelHandler handler;

用一張圖說明兩個類之間的關係以下(原圖出自:http://blog.csdn.net/zxhoo/article/details/17532857)。bootstrap

 

總結以下: ServerBootstrap比AbstractBootstrap多了4個Part,其中AbstractBootstrap的成員用於設置服務端NioServerSocketChannel(包括所使用的線程組、使用的channel工廠類、使用的Handler以及地址和選項信息等), ServerBootstrap的4個成員用於設置爲有新鏈接時新建的NioSocketChannel。promise

3、ServerBootstrap配置源碼解釋app

 1)b.group(bossgroup,workgroup)less

 1     /**
 2      * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
 3      * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
 4      * {@link Channel}'s.
 5      */
 6     public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
 7         super.group(parentGroup);//設置BOSS線程組(在AbstractBootstrap中)  8         if (childGroup == null) {
 9             throw new NullPointerException("childGroup");
10         }
11         if (this.childGroup != null) {
12             throw new IllegalStateException("childGroup set already");
13         }
14         this.childGroup = childGroup;//設置WORK線程組 15         return this;
16     }

2) .channel(NioServerSocketChannel.class)socket

 1     /**
 2      * The {@link Class} which is used to create {@link Channel} instances from.
 3      * You either use this or {@link #channelFactory(ChannelFactory)} if your
 4      * {@link Channel} implementation has no no-args constructor.
 5      */
 6     public B channel(Class<? extends C> channelClass) {
 7         if (channelClass == null) {
 8             throw new NullPointerException("channelClass");
 9         }
10         return channelFactory(new BootstrapChannelFactory<C>(channelClass));//設置channel工廠 11     }

channelFactory方法就是用來設置channel工廠的,這裏的工廠就是BootstrapChannelFactory(是一個泛型類)。ide

 1     /**
 2      * {@link ChannelFactory} which is used to create {@link Channel} instances from
 3      * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}
 4      * is not working for you because of some more complex needs. If your {@link Channel} implementation
 5      * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for
 6      * simplify your code.
 7      */
 8     @SuppressWarnings("unchecked")
 9     public B channelFactory(ChannelFactory<? extends C> channelFactory) {
10         if (channelFactory == null) {
11             throw new NullPointerException("channelFactory");
12         }
13         if (this.channelFactory != null) {
14             throw new IllegalStateException("channelFactory set already");
15         }
16 
17         this.channelFactory = channelFactory;//設置channel工廠 18         return (B) this;
19     }

下面就是channel工廠類的實現,構造函數傳入一個channel類型(針對服務端也就是NioServerSocketChannel.class),BootstrapChannelFactory工廠類提供的newChannel方法將使用反射建立對應的channel。用於channel的建立通常只在啓動的時候進行,所以使用反射不會形成性能的問題。函數

 1     private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {
 2         private final Class<? extends T> clazz;
 3 
 4         BootstrapChannelFactory(Class<? extends T> clazz) {
 5             this.clazz = clazz;
 6         }
 7 
 8         @Override
 9         public T newChannel() {//須要建立channel的時候,次方法將被調用 10             try {
11                 return clazz.newInstance();//反射建立對應channel 12             } catch (Throwable t) {
13                 throw new ChannelException("Unable to create Channel from class " + clazz, t);
14             }
15         }
16 
17         @Override
18         public String toString() {
19             return StringUtil.simpleClassName(clazz) + ".class";
20         }
21     }

3) .option(ChannelOption.SO_BACKLOG,100)

    用來設置channel的選項,好比設置BackLog的大小等。

 1     /**
 2      * Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they got
 3      * created. Use a value of {@code null} to remove a previous set {@link ChannelOption}.
 4      */
 5     @SuppressWarnings("unchecked")
 6     public <T> B option(ChannelOption<T> option, T value) {
 7         if (option == null) {
 8             throw new NullPointerException("option");
 9         }
10         if (value == null) {
11             synchronized (options) {
12                 options.remove(option);
13             }
14         } else {
15             synchronized (options) {
16                 options.put(option, value);
17             }
18         }
19         return (B) this;
20     }

4) .handler(new LoggingHandler(LogLevel.INFO))

     用於設置服務端NioServerSocketChannel的Handler。

 1     /**
 2      * the {@link ChannelHandler} to use for serving the requests.
 3      */
 4     @SuppressWarnings("unchecked")
 5     public B handler(ChannelHandler handler) {
 6         if (handler == null) {
 7             throw new NullPointerException("handler");
 8         }
 9         this.handler = handler;//設置的是父類AbstractBootstrap裏的成員,也就是該handler是被NioServerSocketChannel使用 10         return (B) this;
11     }

 5) .childHandler(new ChannelInitializer<SocketChannel>() {

       必定要分清.handler和.childHandler的區別,首先,二者都是設置一個Handler,可是,前者設置的Handler是屬於服務端NioServerSocketChannel的,然後者設置的Handler是屬於每個新建的NioSocketChannel的(每當有一個來自客戶端的鏈接時,否會建立一個新的NioSocketChannel)。

 1    /**
 2      * Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.
 3      */
 4     public ServerBootstrap childHandler(ChannelHandler childHandler) {
 5         if (childHandler == null) {
 6             throw new NullPointerException("childHandler");
 7         }
 8         this.childHandler = childHandler;
 9         return this;
10     }

      至此,ServerBootstrap的配置完成,其實有人可能會很好奇,爲何不直接在ServerBootstrap的構造函數中一步完成這些初始化配置操做,這樣作雖然能夠,可是這會致使ServerBootstrap構造函數的參數過多,而是用Builder模式(也就是ServerBootstrap目前採用的模式,能夠參見<<effective java>>)則能夠有效的解決構造方法參數過多的問題。

4、bind流程

1)一切從bind開始  ChannelFuture f = b.bind(port).sync();

1    /**
2      * Create a new {@link Channel} and bind it.
3      */
4     public ChannelFuture bind(int inetPort) {
5         return bind(new InetSocketAddress(inetPort));
6     }

繼續深刻bind

 1     /**
 2      * Create a new {@link Channel} and bind it.
 3      */
 4     public ChannelFuture bind(SocketAddress localAddress) {
 5         validate();
 6         if (localAddress == null) {
 7             throw new NullPointerException("localAddress");
 8         }
 9         return doBind(localAddress);
10     }

繼續攝入doBind

 1  private ChannelFuture doBind(final SocketAddress localAddress) {
 2         final ChannelFuture regFuture = initAndRegister();//初始化並註冊一個channel  3         final Channel channel = regFuture.channel();
 4         if (regFuture.cause() != null) {
 5             return regFuture;
 6         }
 7         //等待註冊成功
 8         if (regFuture.isDone()) {
 9             // At this point we know that the registration was complete and successful.
10             ChannelPromise promise = channel.newPromise();
11  doBind0(regFuture, channel, localAddress, promise);//執行channel.bind() 12             return promise;
13         } else {
14             // Registration future is almost always fulfilled already, but just in case it's not.
15             final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
16             regFuture.addListener(new ChannelFutureListener() {
17                 @Override
18                 public void operationComplete(ChannelFuture future) throws Exception {
19                     Throwable cause = future.cause();
20                     if (cause != null) {
21                         // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
22                         // IllegalStateException once we try to access the EventLoop of the Channel.
23                         promise.setFailure(cause);
24                     } else {
25                         // Registration was successful, so set the correct executor to use.
26                         // See https://github.com/netty/netty/issues/2586
27                         promise.executor = channel.eventLoop();
28                     }
29                     doBind0(regFuture, channel, localAddress, promise);
30                 }
31             });
32             return promise;
33         }
34     }

doBind中最重要的一步就是調用initAndRegister方法了,它會初始化並註冊一個channel,直接看源碼吧。

 1  final ChannelFuture initAndRegister() {
 2         final Channel channel = channelFactory().newChannel();//還記得前面咱們設置過channel工廠麼,終於排上用場了  3         try {
 4  init(channel);//初始化channel(就是NioServerSocketChannel)  5         } catch (Throwable t) {
 6             channel.unsafe().closeForcibly();
 7             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
 8             return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
 9         }
10 
11         ChannelFuture regFuture = group().register(channel);//向EventLoopGroup中註冊一個channel 12         if (regFuture.cause() != null) {
13             if (channel.isRegistered()) {
14                 channel.close();
15             } else {
16                 channel.unsafe().closeForcibly();
17             }
18         }
19 
20         // If we are here and the promise is not failed, it's one of the following cases:
21         // 1) If we attempted registration from the event loop, the registration has been completed at this point.
22         //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
23         // 2) If we attempted registration from the other thread, the registration request has been successfully
24         //    added to the event loop's task queue for later execution.
25         //    i.e. It's safe to attempt bind() or connect() now:
26         //         because bind() or connect() will be executed *after* the scheduled registration task is executed
27         //         because register(), bind(), and connect() are all bound to the same thread.
28 
29         return regFuture;
30     }

 先來看一下init方法

 1 @Override
 2     void init(Channel channel) throws Exception {
 3         final Map<ChannelOption<?>, Object> options = options();
 4         synchronized (options) {
 5  channel.config().setOptions(options);//設置以前配置的channel選項  6         }
 7 
 8         final Map<AttributeKey<?>, Object> attrs = attrs();
 9         synchronized (attrs) {
10             for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
11                 @SuppressWarnings("unchecked")
12                 AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
13  channel.attr(key).set(e.getValue());//設置以前配置的屬性 14             }
15         }
16 
17         ChannelPipeline p = channel.pipeline();//獲取channel綁定的pipeline(pipeline實在channel建立的時候建立並綁定的) 18         if (handler() != null) {//若是用戶配置過Handler 19  p.addLast(handler());//爲NioServerSocketChannel綁定的pipeline添加Handler 20         }
21         //開始準備child用到的4個part,由於接下來就要使用它們。
22         final EventLoopGroup currentChildGroup = childGroup;
23         final ChannelHandler currentChildHandler = childHandler;
24         final Entry<ChannelOption<?>, Object>[] currentChildOptions;
25         final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
26         synchronized (childOptions) {
27             currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
28         }
29         synchronized (childAttrs) {
30             currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
31         }
32         //爲NioServerSocketChannel的pipeline添加一個初始化Handler,當NioServerSocketChannel在EventLoop註冊成功時,該handler的init方法將被調用
33         p.addLast(new ChannelInitializer<Channel>() {
34             @Override
35             public void initChannel(Channel ch) throws Exception {
36                 ch.pipeline().addLast(new ServerBootstrapAcceptor(//爲NioServerSocketChannel的pipeline添加ServerBootstrapAcceptor處理器
//該Handler主要用來將新建立的NioSocketChannel註冊到EventLoopGroup中
37 currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); 38 } 39 }); 40 }

init執行以後,接下來看一下注冊過程(ChannelFuture regFuture = group().register(channel); 注意,這裏的group是以前設置的BOSS EventLoopGroup)

1    @Override
2     public ChannelFuture register(Channel channel) {
3         return next().register(channel);//首先使用next()在BOSS EventLoopGroup中選出下一個EventLoop,而後執行註冊 4     }
1     @Override
2     public ChannelFuture register(Channel channel) {
3         return register(channel, new DefaultChannelPromise(channel, this));
4     }
 1   @Override
 2     public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
 3         if (channel == null) {
 4             throw new NullPointerException("channel");
 5         }
 6         if (promise == null) {
 7             throw new NullPointerException("promise");
 8         }
 9 
10         channel.unsafe().register(this, promise);//unsafe執行的都是實際的操做 11         return promise;
12     }
 1 @Override
 2         public final void register(EventLoop eventLoop, final ChannelPromise promise) {
 3             if (eventLoop == null) {
 4                 throw new NullPointerException("eventLoop");
 5             }
 6             if (isRegistered()) {
 7                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
 8                 return;
 9             }
10             if (!isCompatible(eventLoop)) {
11                 promise.setFailure(
12                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
13                 return;
14             }
15 
16             AbstractChannel.this.eventLoop = eventLoop;//綁定爲該channel選的的EventLoop 17             //必須保證註冊是由該EventLoop發起的,不然會單獨封裝成一個Task,由該EventLoop執行
18             if (eventLoop.inEventLoop()) {
19  register0(promise);//註冊 20             } else {
21                 try {
22                     eventLoop.execute(new OneTimeTask() {
23                         @Override
24                         public void run() {
25                             register0(promise);
26                         }
27                     });
28                 } catch (Throwable t) {
29                     logger.warn(
30                             "Force-closing a channel whose registration task was not accepted by an event loop: {}",
31                             AbstractChannel.this, t);
32                     closeForcibly();
33                     closeFuture.setClosed();
34                     safeSetFailure(promise, t);
35                 }
36             }
37         }
 1 private void register0(ChannelPromise promise) {
 2             try {
 3                 // check if the channel is still open as it could be closed in the mean time when the register
 4                 // call was outside of the eventLoop
 5                 if (!promise.setUncancellable() || !ensureOpen(promise)) {
 6                     return;
 7                 }
 8                 boolean firstRegistration = neverRegistered;
 9  doRegister();//最底層的註冊調用 10                 neverRegistered = false;
11                 registered = true;
12  safeSetSuccess(promise);//設置註冊結果爲成功 13  pipeline.fireChannelRegistered();//發起pipeline調用fireChannelRegistered(head.fireChannelRegistered) 14                 // Only fire a channelActive if the channel has never been registered. This prevents firing
15                 // multiple channel actives if the channel is deregistered and re-registered.
16                 if (firstRegistration && isActive()) {//若是是首次註冊,並且channel已經處於Active狀態(若是是服務端,表示listen成功,若是是客戶端,即是connect成功) 17  pipeline.fireChannelActive();//發起pipeline的fireChannelActive 18                 }
19             } catch (Throwable t) {
20                 // Close the channel directly to avoid FD leak.
21                 closeForcibly();
22                 closeFuture.setClosed();
23                 safeSetFailure(promise, t);
24             }
25         }

doRegister會完成在EventLoop的Selector上的註冊任務。

 1  @Override
 2     protected void doRegister() throws Exception {
 3         boolean selected = false;
 4         for (;;) {
 5             try {
 6                 selectionKey = javaChannel().register(eventLoop().selector, 0, this);//注意,此時op位爲0,channel還不能監聽讀寫事件  7                 return;
 8             } catch (CancelledKeyException e) {
 9                 if (!selected) {
10                     // Force the Selector to select now as the "canceled" SelectionKey may still be
11                     // cached and not removed because no Select.select(..) operation was called yet.
12                     eventLoop().selectNow();
13                     selected = true;
14                 } else {
15                     // We forced a select operation on the selector before but the SelectionKey is still cached
16                     // for whatever reason. JDK bug ?
17                     throw e;
18                 }
19             }
20         }
21     }

由上可知,註冊成功後,NioServerSocketChannel還不能監聽讀寫事件,那麼何時回開始監聽呢?因爲註冊成功以後,會進行pipeline.fireChannelRegistered()調用,該事件會在NioServerSocketChannel的pipeline中傳播(從head開始,逐步findContextInbound),這會致使Inbound類型的Handler的channelRegistered方法被調用。還記得在init方法中爲NioServerSocketChannel添加的ChannelInitializer的Handler嗎,它也是一個InboundHandler,看一下他的實現:

 1 @Sharable
 2 public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
 3 
 4     private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);
 5 
 6     /**
 7      * This method will be called once the {@link Channel} was registered. After the method returns this instance
 8      * will be removed from the {@link ChannelPipeline} of the {@link Channel}.
 9      *
10      * @param ch            the {@link Channel} which was registered.
11      * @throws Exception    is thrown if an error occurs. In that case the {@link Channel} will be closed.
12      */
13     protected abstract void initChannel(C ch) throws Exception;//抽象方法,由子類實現 14 
15     @Override
16     @SuppressWarnings("unchecked")
17     public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {//該方法會在NioServerScoketChannel註冊成功時被調用 18         ChannelPipeline pipeline = ctx.pipeline();
19         boolean success = false;
20         try {
21  initChannel((C) ctx.channel());//調用initChannel 22             pipeline.remove(this);//初始化Handler只完成初始化工做,初始化完成自後就把本身刪除 23             ctx.fireChannelRegistered();//繼續傳播channelRegistered事件 24             success = true;
25         } catch (Throwable t) {
26             logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
27         } finally {
28             if (pipeline.context(this) != null) {
29                 pipeline.remove(this);
30             }
31             if (!success) {
32                 ctx.close();
33             }
34         }
35     }
36 }

在重複貼一次代碼,看一下initChannel裏面是什麼

1     p.addLast(new ChannelInitializer<Channel>() {
2             @Override
3             public void initChannel(Channel ch) throws Exception {//被channelRegistered調用 4                 ch.pipeline().addLast(new ServerBootstrapAcceptor(
5                         currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
6             }
7         }

能夠看到,initChannel只是向pipeline中添加了ServerBootstrapAcceptor類型的Handler。

可是這仍是沒有看到給NioServerSocketChannel註冊讀寫事件的地方,繼續看以前的register0代碼,它還會調用pipleline的fireChannelActive方法,看一下該方方法的代碼:

 1     @Override
 2     public ChannelPipeline fireChannelActive() {
 3  head.fireChannelActive();//將ChannelActive事件在pipeline中傳播  4         //若是channel被配置成自動可讀的,那麼久發起讀事件
 5         if (channel.config().isAutoRead()) {
 6  channel.read();//pipeline.read()-->tail.read()-->*****-->head.read()-->unsafe.beginRead()  7         }
 8 
 9         return this;
10     }
 1     @Override
 2     public ChannelHandlerContext fireChannelActive() {//head的fireChannelActive()  3         final AbstractChannelHandlerContext next = findContextInbound();//尋找下一個Inbound類型的Context  4         EventExecutor executor = next.executor();
 5         if (executor.inEventLoop()) {
 6  next.invokeChannelActive();//調用Context中的Handler的channelActive方法  7         } else {
 8             executor.execute(new OneTimeTask() {
 9                 @Override
10                 public void run() {
11                     next.invokeChannelActive();
12                 }
13             });
14         }
15         return this;
16     }

看一下beginRead實現:

 1         @Override
 2         public final void beginRead() {
 3             if (!isActive()) {
 4                 return;
 5             }
 6 
 7             try {
 8  doBeginRead();//真正的註冊讀事件  9             } catch (final Exception e) {
10                 invokeLater(new OneTimeTask() {
11                     @Override
12                     public void run() {
13                         pipeline.fireExceptionCaught(e);
14                     }
15                 });
16                 close(voidPromise());
17             }
18         }
 1 @Override
 2     protected void doBeginRead() throws Exception {
 3         // Channel.read() or ChannelHandlerContext.read() was called
 4         if (inputShutdown) {
 5             return;
 6         }
 7 
 8         final SelectionKey selectionKey = this.selectionKey;
 9         if (!selectionKey.isValid()) {
10             return;
11         }
12 
13         readPending = true;
14 
15         final int interestOps = selectionKey.interestOps();
16         if ((interestOps & readInterestOp) == 0) {
17             selectionKey.interestOps(interestOps | readInterestOp);//真正的註冊讀事件 18         }
19     }

5、客戶端接入過程

接下來看看,當一個客戶端鏈接進來時,都發生了什麼。

1)首先從事件的源頭看起,下面是EventLoop的事件循環

 1    @Override
 2     protected void run() {
 3         for (;;) {
 4             boolean oldWakenUp = wakenUp.getAndSet(false);
 5             try {
 6                 if (hasTasks()) {
 7                     selectNow();
 8                 } else {
 9  select(oldWakenUp);//調用selector.select() 10 
11                     // 'wakenUp.compareAndSet(false, true)' is always evaluated
12                     // before calling 'selector.wakeup()' to reduce the wake-up
13                     // overhead. (Selector.wakeup() is an expensive operation.)
14                     //
15                     // However, there is a race condition in this approach.
16                     // The race condition is triggered when 'wakenUp' is set to
17                     // true too early.
18                     //
19                     // 'wakenUp' is set to true too early if:
20                     // 1) Selector is waken up between 'wakenUp.set(false)' and
21                     //    'selector.select(...)'. (BAD)
22                     // 2) Selector is waken up between 'selector.select(...)' and
23                     //    'if (wakenUp.get()) { ... }'. (OK)
24                     //
25                     // In the first case, 'wakenUp' is set to true and the
26                     // following 'selector.select(...)' will wake up immediately.
27                     // Until 'wakenUp' is set to false again in the next round,
28                     // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
29                     // any attempt to wake up the Selector will fail, too, causing
30                     // the following 'selector.select(...)' call to block
31                     // unnecessarily.
32                     //
33                     // To fix this problem, we wake up the selector again if wakenUp
34                     // is true immediately after selector.select(...).
35                     // It is inefficient in that it wakes up the selector for both
36                     // the first case (BAD - wake-up required) and the second case
37                     // (OK - no wake-up required).
38 
39                     if (wakenUp.get()) {
40                         selector.wakeup();
41                     }
42                 }
43 
44                 cancelledKeys = 0;
45                 needsToSelectAgain = false;
46                 final int ioRatio = this.ioRatio;
47                 if (ioRatio == 100) {
48                     processSelectedKeys();
49                     runAllTasks();
50                 } else {
51                     final long ioStartTime = System.nanoTime();
52 
53  processSelectedKeys();//有事件發生時,執行這裏 54 
55                     final long ioTime = System.nanoTime() - ioStartTime;
56                     runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
57                 }
58 
59                 if (isShuttingDown()) {
60                     closeAll();
61                     if (confirmShutdown()) {
62                         break;
63                     }
64                 }
65             } catch (Throwable t) {
66                 logger.warn("Unexpected exception in the selector loop.", t);
67 
68                 // Prevent possible consecutive immediate failures that lead to
69                 // excessive CPU consumption.
70                 try {
71                     Thread.sleep(1000);
72                 } catch (InterruptedException e) {
73                     // Ignore.
74                 }
75             }
76         }
77     }

看一下processSelectedKeys代碼

1   private void processSelectedKeys() {
2         if (selectedKeys != null) {
3  processSelectedKeysOptimized(selectedKeys.flip());//執行這裏 4         } else {
5             processSelectedKeysPlain(selector.selectedKeys());
6         }
7     }
 1  private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
 2         for (int i = 0;; i ++) {
 3             final SelectionKey k = selectedKeys[i];
 4             if (k == null) {
 5                 break;
 6             }
 7             // null out entry in the array to allow to have it GC'ed once the Channel close
 8             // See https://github.com/netty/netty/issues/2363
 9             selectedKeys[i] = null;
10 
11             final Object a = k.attachment();
12 
13             if (a instanceof AbstractNioChannel) {//由於是NioServerSocketChannel,因此執行這裏 14                 processSelectedKey(k, (AbstractNioChannel) a);
15             } else {
16                 @SuppressWarnings("unchecked")
17                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
18                 processSelectedKey(k, task);
19             }
20 
21             if (needsToSelectAgain) {
22                 // null out entries in the array to allow to have it GC'ed once the Channel close
23                 // See https://github.com/netty/netty/issues/2363
24                 for (;;) {
25                     if (selectedKeys[i] == null) {
26                         break;
27                     }
28                     selectedKeys[i] = null;
29                     i++;
30                 }
31 
32                 selectAgain();
33                 // Need to flip the optimized selectedKeys to get the right reference to the array
34                 // and reset the index to -1 which will then set to 0 on the for loop
35                 // to start over again.
36                 //
37                 // See https://github.com/netty/netty/issues/1523
38                 selectedKeys = this.selectedKeys.flip();
39                 i = -1;
40             }
41         }
42     }
 1 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
 2         final NioUnsafe unsafe = ch.unsafe();
 3         if (!k.isValid()) {
 4             // close the channel if the key is not valid anymore
 5             unsafe.close(unsafe.voidPromise());
 6             return;
 7         }
 8 
 9         try {
10             int readyOps = k.readyOps();
11             // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
12             // to a spin loop
13             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
14  unsafe.read();//由於是ACCEPT事件,因此執行這裏(這裏的read會由於NioServerSocketChannel和NioSocketChannel不一樣) 15                 if (!ch.isOpen()) {
16                     // Connection already closed - no need to handle write.
17                     return;
18                 }
19             }
20             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
21                 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
22                 ch.unsafe().forceFlush();
23             }
24             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
25                 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
26                 // See https://github.com/netty/netty/issues/924
27                 int ops = k.interestOps();
28                 ops &= ~SelectionKey.OP_CONNECT;
29                 k.interestOps(ops);
30 
31                 unsafe.finishConnect();
32             }
33         } catch (CancelledKeyException ignored) {
34             unsafe.close(unsafe.voidPromise());
35         }
36     }

 NioServerSocketChannel繼承了AbstractNioMessageChannel,因此執行的是AbstractNioMessageChannel的版本

 1   @Override
 2         public void read() {
 3             assert eventLoop().inEventLoop();
 4             final ChannelConfig config = config();
 5             if (!config.isAutoRead() && !isReadPending()) {
 6                 // ChannelConfig.setAutoRead(false) was called in the meantime
 7                 removeReadOp();
 8                 return;
 9             }
10 
11             final int maxMessagesPerRead = config.getMaxMessagesPerRead();
12             final ChannelPipeline pipeline = pipeline();//獲取服務端NioServerSocketChannel的pipeline 13             boolean closed = false;
14             Throwable exception = null;
15             try {
16                 try {
17                     for (;;) {
18                         int localRead = doReadMessages(readBuf);//執行這裏 19                         if (localRead == 0) {
20                             break;
21                         }
22                         if (localRead < 0) {
23                             closed = true;
24                             break;
25                         }
26 
27                         // stop reading and remove op
28                         if (!config.isAutoRead()) {
29                             break;
30                         }
31 
32                         if (readBuf.size() >= maxMessagesPerRead) {
33                             break;
34                         }
35                     }
36                 } catch (Throwable t) {
37                     exception = t;
38                 }
39                 setReadPending(false);
40                 int size = readBuf.size();
41                 for (int i = 0; i < size; i ++) {
42  pipeline.fireChannelRead(readBuf.get(i));//引起ChannelRead 43                 }
44 
45                 readBuf.clear();
46  pipeline.fireChannelReadComplete();//引起channelReadComplete 47 
48                 if (exception != null) {
49                     if (exception instanceof IOException && !(exception instanceof PortUnreachableException)) {
50                         // ServerChannel should not be closed even on IOException because it can often continue
51                         // accepting incoming connections. (e.g. too many open files)
52                         closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
53                     }
54 
55                     pipeline.fireExceptionCaught(exception);
56                 }
57 
58                 if (closed) {
59                     if (isOpen()) {
60                         close(voidPromise());
61                     }
62                 }
63             } finally {
64                 // Check if there is a readPending which was not processed yet.
65                 // This could be for two reasons:
66                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
67                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
68                 //
69                 // See https://github.com/netty/netty/issues/2254
70                 if (!config.isAutoRead() && !isReadPending()) {
71                     removeReadOp();
72                 }
73             }
74         }

而對於NioSocketChannel而言,其繼承自AbstractNioByteChannel,所以調用的AbstractNioByteChannel的read版本以下:

 1  @Override
 2         public final void read() {
 3             final ChannelConfig config = config();
 4             if (!config.isAutoRead() && !isReadPending()) {
 5                 // ChannelConfig.setAutoRead(false) was called in the meantime
 6                 removeReadOp();
 7                 return;
 8             }
 9 
10             final ChannelPipeline pipeline = pipeline();
11             final ByteBufAllocator allocator = config.getAllocator();
12             final int maxMessagesPerRead = config.getMaxMessagesPerRead();
13             RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
14             if (allocHandle == null) {
15                 this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
16             }
17 
18             ByteBuf byteBuf = null;
19             int messages = 0;
20             boolean close = false;
21             try {
22                 int totalReadAmount = 0;//讀到的總長度 23                 boolean readPendingReset = false;
24                 do {
25                     byteBuf = allocHandle.allocate(allocator);
26                     int writable = byteBuf.writableBytes();//獲取bytebuf還能夠寫入的字節數 27                     int localReadAmount = doReadBytes(byteBuf);//真正的讀取,localReadAmount本次讀取的實際長度 28                     if (localReadAmount <= 0) {//什麼都沒有讀到 29                         // not was read release the buffer
30                         byteBuf.release();
31                         byteBuf = null;
32                         close = localReadAmount < 0;
33                         break;//跳出循環 34                     }
35                     if (!readPendingReset) {
36                         readPendingReset = true;
37                         setReadPending(false);
38                     }
39  pipeline.fireChannelRead(byteBuf);//發起調用channelRead,將bytebuf傳過去 40                     byteBuf = null;
41                     //若是當前讀到的總長度+本次讀到的總長度已經大於Integer類型的最大值
42                     if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
43                         // Avoid overflow.
44                         totalReadAmount = Integer.MAX_VALUE;
45                         break;//跳出循環 46                     }
47                     //更新總長度
48                     totalReadAmount += localReadAmount;
49 
50                     // stop reading
51                     if (!config.isAutoRead()) {
52                         break;//若是不是自動讀取,那麼讀取一次以後就自動中止了 53                     }
54                     //若是本次讀取的大小沒有把bytebuf填滿,那麼說明數據已經所有讀取了
55                     if (localReadAmount < writable) {
56                         // Read less than what the buffer can hold,
57                         // which might mean we drained the recv buffer completely.
58                         break;//跳出循環 59                     }
60                 } while (++ messages < maxMessagesPerRead);
61 
62  pipeline.fireChannelReadComplete();//跳出循環後,引起channelReadComplete 63                 allocHandle.record(totalReadAmount);
64 
65                 if (close) {
66                     closeOnRead(pipeline);
67                     close = false;
68                 }
69             } catch (Throwable t) {
70                 handleReadException(pipeline, byteBuf, t, close);
71             } finally {
72                 // Check if there is a readPending which was not processed yet.
73                 // This could be for two reasons:
74                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
75                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
76                 //
77                 // See https://github.com/netty/netty/issues/2254
78                 if (!config.isAutoRead() && !isReadPending()) {
79                     removeReadOp();
80                 }
81             }
82         }

       接着看doMessages

 1  @Override
 2     protected int doReadMessages(List<Object> buf) throws Exception {
 3         SocketChannel ch = javaChannel().accept();//建立SocketChannel,accept客戶端  4 
 5         try {
 6             if (ch != null) {
 7                 buf.add(new NioSocketChannel(this, ch));
 8                 return 1;
 9             }
10         } catch (Throwable t) {
11             logger.warn("Failed to create a new channel from an accepted socket.", t);
12 
13             try {
14                 ch.close();
15             } catch (Throwable t2) {
16                 logger.warn("Failed to close a socket.", t2);
17             }
18         }
19 
20         return 0;
21     }

執行完doReadMessages以後,針對客戶端的SocketChannel已經建立了,因爲以後還會引起channelRead和channelReadComplete事件,而這些都會致使pipeline中的ServerBootstrapAcceptor的相應方法被調用,來看一下ServerBootstrapAcceptor源碼:

 1 private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
 2 
 3         private final EventLoopGroup childGroup;
 4         private final ChannelHandler childHandler;
 5         private final Entry<ChannelOption<?>, Object>[] childOptions;
 6         private final Entry<AttributeKey<?>, Object>[] childAttrs;
 7 
 8         ServerBootstrapAcceptor(
 9                 EventLoopGroup childGroup, ChannelHandler childHandler,
10                 Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
11             this.childGroup = childGroup;
12             this.childHandler = childHandler;
13             this.childOptions = childOptions;
14             this.childAttrs = childAttrs;
15         }
16 
17         @Override
18         @SuppressWarnings("unchecked")
19         public void channelRead(ChannelHandlerContext ctx, Object msg) {
20             final Channel child = (Channel) msg;
21 
22  child.pipeline().addLast(childHandler);//將最開始配置的childHandler添加到SocketChannel的pipeline中,這個Handler也是一個初始化Handler,原理和服務端的一致 23 
24             for (Entry<ChannelOption<?>, Object> e: childOptions) {
25                 try {
26                     if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
27                         logger.warn("Unknown channel option: " + e);
28                     }
29                 } catch (Throwable t) {
30                     logger.warn("Failed to set a channel option: " + child, t);
31                 }
32             }
33 
34             for (Entry<AttributeKey<?>, Object> e: childAttrs) {
35                 child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
36             }
37 
38             try {
39                 childGroup.register(child).addListener(new ChannelFutureListener() {//將SocketChannel註冊到WORK EventLoopGroup中,註冊過程與服務端相似,此處再也不講解 40                     @Override
41                     public void operationComplete(ChannelFuture future) throws Exception {
42                         if (!future.isSuccess()) {
43                             forceClose(child, future.cause());
44                         }
45                     }
46                 });
47             } catch (Throwable t) {
48                 forceClose(child, t);
49             }
50         }
51 
52         private static void forceClose(Channel child, Throwable t) {
53             child.unsafe().closeForcibly();
54             logger.warn("Failed to register an accepted channel: " + child, t);
55         }
56 
57         @Override
58         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
59             final ChannelConfig config = ctx.channel().config();
60             if (config.isAutoRead()) {
61                 // stop accept new connections for 1 second to allow the channel to recover
62                 // See https://github.com/netty/netty/issues/1328
63                 config.setAutoRead(false);
64                 ctx.channel().eventLoop().schedule(new Runnable() {
65                     @Override
66                     public void run() {
67                        config.setAutoRead(true);
68                     }
69                 }, 1, TimeUnit.SECONDS);
70             }
71             // still let the exceptionCaught event flow through the pipeline to give the user
72             // a chance to do something with it
73             ctx.fireExceptionCaught(cause);
74         }
75     }

 

引用一張圖(出自:http://blog.csdn.net/zxhoo/article/details/17532857) 。

相關文章
相關標籤/搜索