java netty之ServerBootstrap的啓動

經過前面的幾篇文章,對整個netty部分的架構已經運行原理都有了必定的瞭解,那麼這篇文章來分析一個常常用到的類:ServerBootstrap,通常對於服務器端的編程它用到的都還算是比較的多。。看一看它的初始化,以及它的運行原理。。。java

首先咱們仍是引入一段代碼,經過分析這段代碼來分析ServerBootstrap的運行。。。git

[java]  view plain  copy
 
  1. EventLoopGroup bossGroup = new NioEventLoopGroup();   //這個是用於serversocketchannel的eventloop  
  2.         EventLoopGroup workerGroup = new NioEventLoopGroup();    //這個是用於處理accept到的channel  
  3.         try {  
  4.             ServerBootstrap b = new ServerBootstrap();    //構建serverbootstrap對象  
  5.             b.group(bossGroup, workerGroup);   //設置時間循環對象,前者用來處理accept事件,後者用於處理已經創建的鏈接的io  
  6.             b.channel(NioServerSocketChannel.class);   //用它來創建新accept的鏈接,用於構造serversocketchannel的工廠類  
  7.             b.childHandler(new ChannelInitializer<SocketChannel>(){      //爲accept channel的pipeline預添加的inboundhandler  
  8.                 @Override     //當新鏈接accept的時候,這個方法會調用  
  9.                 protected void initChannel(SocketChannel ch) throws Exception {  
  10.                     // TODO Auto-generated method stub  
  11.                     ch.pipeline().addLast(new MyChannelHandler());   //爲當前的channel的pipeline添加自定義的處理函數  
  12.                 }  
  13.                   
  14.             });  
  15.             //bind方法會建立一個serverchannel,而且會將當前的channel註冊到eventloop上面,  
  16.             //會爲其綁定本地端口,並對其進行初始化,爲其的pipeline加一些默認的handler  
  17.             ChannelFuture f = b.bind(80).sync();      
  18.             f.channel().closeFuture().sync();  //至關於在這裏阻塞,直到serverchannel關閉  
  19.         } finally {  
  20.             bossGroup.shutdownGracefully();  
  21.             workerGroup.shutdownGracefully();  
  22.         }  

這段代碼在前面的文章也有用到,基本上其意思也都在上面的註釋中說的比較清楚了,那麼咱們接下來具體的分析其中的方法調用,首先是ServerBootstrap的group方法:github

[java]  view plain  copy
 
  1. //這裏parent用於執行server的accept時間事件,child纔是用於執行獲取的channel鏈接的事件  
  2. public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {  
  3.     super.group(parentGroup);  
  4.     if (childGroup == null) {  
  5.         throw new NullPointerException("childGroup");  
  6.     }  
  7.     if (this.childGroup != null) {  
  8.         throw new IllegalStateException("childGroup set already");  
  9.     }  
  10.     this.childGroup = childGroup;  
  11.     return this;  
  12. }  

這個方法是用來設置eventloopgroup,首先調用了父類的group方法(abstractbootstrap),就不將父類的方法列出來了,其實意思都差很少,eventloopgroup屬性的值。。。編程

 

好了,接下來咱們再來看一下channel方法:bootstrap

[java]  view plain  copy
 
  1. //構造serversocketchannel factory  
  2. public B channel(Class<? extends C> channelClass) {  
  3.     if (channelClass == null) {  
  4.         throw new NullPointerException("channelClass");  
  5.     }  
  6.     return channelFactory(new BootstrapChannelFactory<C>(channelClass));  //構造工廠類  
  7. }  
  8.   
  9. /** 
  10.  * {@link ChannelFactory} which is used to create {@link Channel} instances from 
  11.  * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)} 
  12.  * is not working for you because of some more complex needs. If your {@link Channel} implementation 
  13.  * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for 
  14.  * simplify your code. 
  15.  */  
  16. @SuppressWarnings("unchecked")  
  17. public B channelFactory(ChannelFactory<? extends C> channelFactory) {  
  18.     if (channelFactory == null) {  
  19.         throw new NullPointerException("channelFactory");  
  20.     }  
  21.     if (this.channelFactory != null) {  
  22.         throw new IllegalStateException("channelFactory set already");  
  23.     }  
  24.   
  25.     this.channelFactory = channelFactory;   //設置  
  26.     return (B) this;  
  27. }  

該方法主要是用於構造用於產生channel的工廠類,在咱們這段代碼說白了就是用於實例化serversocketchannel的工廠類。。。promise

 

接下來咱們再來看一下childHandler方法:服務器

[java]  view plain  copy
 
  1. //設置childHandler,這個是當有channel accept以後爲其添加的handler  
  2. public ServerBootstrap childHandler(ChannelHandler childHandler) {  
  3.     if (childHandler == null) {  
  4.         throw new NullPointerException("childHandler");  
  5.     }  
  6.     this.childHandler = childHandler;  
  7.     return this;  
  8. }  

這個很簡單吧,就是一個賦值,具體說他有什麼用,前面的註釋有說明,不過之後的分析會說明它有什麼用的。。。架構

 

 

接下來咱們來看一下bind方法,這個比較重要吧:socket

[java]  view plain  copy
 
  1. //最終將會建立serverchannel,而後會將其綁定到這個地址,而後對其進行初始化  
  2. public ChannelFuture bind(int inetPort) {  
  3.     return bind(new InetSocketAddress(inetPort));  
  4. }  

好吧,接下來再來看bind方法:ide

[java]  view plain  copy
 
  1. public ChannelFuture bind(SocketAddress localAddress) {  
  2.     validate();  
  3.     if (localAddress == null) {  
  4.         throw new NullPointerException("localAddress");  
  5.     }  
  6.     return doBind(localAddress);  
  7. }  

好吧,再來看看doBind方法:

[java]  view plain  copy
 
  1. private ChannelFuture doBind(final SocketAddress localAddress) {  
  2.     final ChannelFuture regPromise = initAndRegister();   //在這裏建立serverchanel,並對其進行初始化,並將其註冊到eventloop當中去  
  3.     final Channel channel = regPromise.channel();  
  4.     final ChannelPromise promise = channel.newPromise();  
  5.     if (regPromise.isDone()) {  
  6.         doBind0(regPromise, channel, localAddress, promise);   //將當前的serverchannel綁定地址  
  7.     } else {  
  8.         regPromise.addListener(new ChannelFutureListener() {  
  9.             @Override  
  10.             public void operationComplete(ChannelFuture future) throws Exception {  
  11.                 doBind0(future, channel, localAddress, promise);  
  12.             }  
  13.         });  
  14.     }  
  15.   
  16.     return promise;  
  17. }  

這裏調用了一個比較重要的方法:initAndRegister,咱們來看看它的定義:

[java]  view plain  copy
 
  1. //建立初始化以及註冊serverchanel  
  2.     final ChannelFuture initAndRegister() {  
  3.         //利用工廠類建立channel  
  4.         final Channel channel = channelFactory().newChannel();  
  5.         try {  
  6.             init(channel);  //init函數留給了後面來實現,用於初始化channel,例如爲其的pipeline加上handler  
  7.         } catch (Throwable t) {  
  8.             channel.unsafe().closeForcibly();  
  9.             return channel.newFailedFuture(t);  
  10.         }  
  11.   
  12.         ChannelPromise regPromise = channel.newPromise();  
  13.         group().register(channel, regPromise);  //將當前建立的serverchannel註冊到eventloop上面去  
  14.         if (regPromise.cause() != null) {  
  15.             if (channel.isRegistered()) {  
  16.                 channel.close();  
  17.             } else {  
  18.                 channel.unsafe().closeForcibly();  
  19.             }  
  20.         }  
  21.   
  22.         // If we are here and the promise is not failed, it's one of the following cases:  
  23.         // 1) If we attempted registration from the event loop, the registration has been completed at this point.  
  24.         //    i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.  
  25.         // 2) If we attempted registration from the other thread, the registration request has been successfully  
  26.         //    added to the event loop's task queue for later execution.  
  27.         //    i.e. It's safe to attempt bind() or connect() now:  
  28.         //         because bind() or connect() will be executed *after* the scheduled registration task is executed  
  29.         //         because register(), bind(), and connect() are all bound to the same thread.  
  30.   
  31.         return regPromise;  
  32.     }  

代碼仍是很簡單,並且也相對比較好理解,無非就是利用前面說到過的channel工廠類來建立一個serversocketchannel,而後調用init方法對這個剛剛生成的channel進行一些初始化的操做,而後在調用eventloopgroup的register方法,將當前這個channel的註冊到group上,那麼之後這個channel的事件都在這個group上面執行,說白了也就是一些accept。、。。

 

好,咱們先來看看這個init方法吧:

[java]  view plain  copy
 
  1. @Override  
  2. //初始化chanel,當用channel factory構造channel之後,會調用這個函數來初始化,說白了就是爲當前的channel的pipeline加入一些handler  
  3. void init(Channel channel) throws Exception {  
  4.     //先初始化一些配置  
  5.     final Map<ChannelOption<?>, Object> options = options();  
  6.     synchronized (options) {  
  7.         channel.config().setOptions(options);  
  8.     }  
  9.    //初始化一些屬性  
  10.     final Map<AttributeKey<?>, Object> attrs = attrs();  
  11.     synchronized (attrs) {  
  12.         for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {  
  13.             @SuppressWarnings("unchecked")  
  14.             AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();  
  15.             channel.attr(key).set(e.getValue());  
  16.         }  
  17.     }  
  18.   
  19.     //獲取當前channel的pipeline  
  20.     ChannelPipeline p = channel.pipeline();  
  21.     if (handler() != null) {  
  22.         p.addLast(handler());  
  23.     }  
  24.   
  25.     final EventLoopGroup currentChildGroup = childGroup;  
  26.     final ChannelHandler currentChildHandler = childHandler;  
  27.     final Entry<ChannelOption<?>, Object>[] currentChildOptions;  
  28.     final Entry<AttributeKey<?>, Object>[] currentChildAttrs;  
  29.     synchronized (childOptions) {  
  30.         currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));  
  31.     }  
  32.     synchronized (childAttrs) {  
  33.         currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));  
  34.     }  
  35.   
  36.     p.addLast(new ChannelInitializer<Channel>() {  
  37.         @Override  
  38.         public void initChannel(Channel ch) throws Exception {  
  39.             //這是一個inboundher,將其加入到serverchannel的pipeline上面去  
  40.             ch.pipeline().addLast(new ServerBootstrapAcceptor(  
  41.                     currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));  
  42.         }  
  43.     });  
  44. }  

代碼仍是相對很簡單,首先初始化一些配置參數,而後初始化屬性,最後還要爲當前的channel的pipeline添加一個handler,這個handler用來當channel註冊到eventloop上面以後對其進行一些初始化,咱們仍是來看看channelInitalizer的定義吧:

[java]  view plain  copy
 
  1. public abstract class ChannelInitializer<C extends Channel> extends ChannelStateHandlerAdapter {  
  2.   
  3.     private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);  
  4.   
  5.     /** 
  6.      * This method will be called once the {@link Channel} was registered. After the method returns this instance 
  7.      * will be removed from the {@link ChannelPipeline} of the {@link Channel}. 
  8.      * 
  9.      * @param ch            the {@link Channel} which was registered. 
  10.      * @throws Exception    is thrown if an error occours. In that case the {@link Channel} will be closed. 
  11.      */  
  12.     protected abstract void initChannel(C ch) throws Exception;  
  13.   
  14.     @SuppressWarnings("unchecked")  
  15.     @Override  
  16.     public final void channelRegistered(ChannelHandlerContext ctx)  
  17.             throws Exception {  
  18.         boolean removed = false;  
  19.         boolean success = false;  
  20.         try {  
  21.             //調用用戶定義的init函數對當前的channel進行初始化  
  22.             initChannel((C) ctx.channel());  
  23.             ctx.pipeline().remove(this);  
  24.             removed = true;  
  25.             ctx.fireChannelRegistered();  
  26.             success = true;  
  27.         } catch (Throwable t) {  
  28.             logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);  
  29.         } finally {  
  30.             if (!removed) {  
  31.                 ctx.pipeline().remove(this);  
  32.             }  
  33.             if (!success) {  
  34.                 ctx.close();  
  35.             }  
  36.         }  
  37.     }  
  38.   
  39.     @Override  
  40.     public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {  
  41.         ctx.fireInboundBufferUpdated();  
  42.     }  
  43. }  

它有一個channelRegistered方法,這個方法是在當前pipeline所屬的channel註冊到eventloop上面以後會激活的方法,它則是調用了用戶自定義的函數來初始化channel,而後在將當前handler移除。。。也就是執行

[java]  view plain  copy
 
  1. ch.pipeline().addLast(new ServerBootstrapAcceptor(  
  2.                        currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));  

這裏又爲當前的serversocketchannel添加了另一個handler,來看看該類型的定義吧:

[java]  view plain  copy
 
  1. private static class ServerBootstrapAcceptor  
  2.         extends ChannelStateHandlerAdapter implements ChannelInboundMessageHandler<Channel> {  
  3.   
  4.     private final EventLoopGroup childGroup;  
  5.     private final ChannelHandler childHandler;  
  6.     private final Entry<ChannelOption<?>, Object>[] childOptions;  
  7.     private final Entry<AttributeKey<?>, Object>[] childAttrs;  
  8.   
  9.     @SuppressWarnings("unchecked")  
  10.     ServerBootstrapAcceptor(  
  11.             EventLoopGroup childGroup, ChannelHandler childHandler,  
  12.             Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {  
  13.         this.childGroup = childGroup;  //這個是用於管理accept的channel的eventloop  
  14.         this.childHandler = childHandler;  
  15.         this.childOptions = childOptions;  
  16.         this.childAttrs = childAttrs;  
  17.     }  
  18.   
  19.     @Override  
  20.     public MessageBuf<Channel> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {  
  21.         return Unpooled.messageBuffer();  
  22.     }  
  23.   
  24.     @Override  
  25.     @SuppressWarnings("unchecked")  
  26.     //當有數據進來的時候,會調用這個方法來處理數據,這裏進來的數據就是accept的channel  
  27.     public void inboundBufferUpdated(ChannelHandlerContext ctx) {  
  28.         MessageBuf<Channel> in = ctx.inboundMessageBuffer(); //獲取buf  
  29.         for (;;) {  
  30.             Channel child = in.poll();  
  31.             if (child == null) {  
  32.                 break;  
  33.             }  
  34.   
  35.             child.pipeline().addLast(childHandler);   //爲accept的channel的pipeline加入用戶定義的初始化handler  
  36.   
  37.             for (Entry<ChannelOption<?>, Object> e: childOptions) {  
  38.                 try {  
  39.                     if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {  
  40.                         logger.warn("Unknown channel option: " + e);  
  41.                     }  
  42.                 } catch (Throwable t) {  
  43.                     logger.warn("Failed to set a channel option: " + child, t);  
  44.                 }  
  45.             }  
  46.   
  47.             for (Entry<AttributeKey<?>, Object> e: childAttrs) {  
  48.                 child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());  
  49.             }  
  50.   
  51.             try {  
  52.                 childGroup.register(child);   //將當前accept的channel註冊到eventloop  
  53.             } catch (Throwable t) {  
  54.                 child.unsafe().closeForcibly();  
  55.                 logger.warn("Failed to register an accepted channel: " + child, t);  
  56.             }  
  57.         }  
  58.     }  
  59.   
  60.     @Override  
  61.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
  62.         final ChannelConfig config = ctx.channel().config();  
  63.         if (config.isAutoRead()) {  
  64.             // stop accept new connections for 1 second to allow the channel to recover  
  65.             // See https://github.com/netty/netty/issues/1328  
  66.             config.setAutoRead(false);  
  67.             ctx.channel().eventLoop().schedule(new Runnable() {  
  68.                 @Override  
  69.                 public void run() {  
  70.                    config.setAutoRead(true);  
  71.                 }  
  72.             }, 1, TimeUnit.SECONDS);  
  73.         }  
  74.         // still let the exceptionCaught event flow through the pipeline to give the user  
  75.         // a chance to do something with it  
  76.         ctx.fireExceptionCaught(cause);  
  77.     }  
  78. }  

主要是有一個比較重要的方法,inboundBufferUpdated,這個方法是在有數據進來的時候會調用的,用於處理進來的數據,也就是accept到的channel,這裏就知道咱們定義的chidHandler的用處了吧,netty會將這個handler直接加入到剛剛accept到的channel的pipeline上面去。。。最後還要講當前accept到的channel註冊到child eventloop上面去,這裏也就完徹底全的明白了最開始定義的兩個eventloopgroup的做用了。。。

 

好了,serversocketchannel的init以及register差很少了,而後會調用doBind0方法,將當前的serversocketchannel綁定到一個本地端口,

[java]  view plain  copy
 
  1. //將chanel綁定到一個本地地址  
  2.     private static void doBind0(  
  3.             final ChannelFuture regFuture, final Channel channel,  
  4.             final SocketAddress localAddress, final ChannelPromise promise) {  
  5.   
  6.         // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up  
  7.         // the pipeline in its channelRegistered() implementation.  
  8.   
  9.         channel.eventLoop().execute(new Runnable() {  
  10.             @Override  
  11.             //匿名內部類想要訪問外面的參數,那麼外面的參數必須是要final的才行  
  12.             public void run() {  
  13.                 if (regFuture.isSuccess()) {  
  14.                     //調用channel的bind方法,將當前的channl綁定到一個本地地址,實際上是調用的是pipeline的bind方法,可是最終又是調用的當前  
  15.                     //channel的unsafe對象的bind方法  
  16.                     channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);  
  17.                 } else {  
  18.                     promise.setFailure(regFuture.cause());  
  19.                 }  
  20.             }  
  21.         });  
  22.     }  

其實這裏調用bind方法最終仍是調用serversocketchannel的unsafe對象的bind方法。。。。

 

 

到這裏,整個serverbootstrap 就算初始化完成了,並且也能夠開始運行了。。。

[java]  view plain  copy
 
  1. b.childHandler(new ChannelInitializer<SocketChannel>(){      //爲accept channel的pipeline預添加的inboundhandler  
  2.                 @Override     //當新鏈接accept的時候,這個方法會調用  
  3.                 protected void initChannel(SocketChannel ch) throws Exception {  
  4.                     // TODO Auto-generated method stub  
  5.                     ch.pipeline().addLast(new MyChannelHandler());   //爲當前的channel的pipeline添加自定義的處理函數  
  6.                 }  
  7.                   
  8.             });  

這段代碼的意思是對於剛剛accept到的channel,將會在它的pipeline上面添加handler,這個handler的用處主要是就是用戶自定義的initChannel方法,就是初始化這個channel,說白了就是爲它的pipeline上面添加本身定義的handler。。。

 

 

這樣整個serverbootstrap是怎麼運行的也就差很少了。。。

剛開始接觸到netty的時候以爲這裏一頭霧水,經過這段時間對其代碼的閱讀,總算搞懂了其整個運行的原理,並且以爲其設計仍是很漂亮的,雖然有的時候會以爲有那麼一點點的繁瑣。。。。

整個運行過程總結爲一下幾個步驟:

(1)建立用於兩個eventloopgroup對象,一個用於管理serversocketchannel,一個用於管理accept到的channel

(2)建立serverbootstrap對象,

(3)設置eventloopgroup

(4)建立用於構建用到的channel的工廠類

(5)設置childhandler,它的主要功能主要是用戶定義代碼來初始化accept到的channel

(6)建立serversocketchannel,並對它進行初始化,綁定端口,以及register,併爲serversocketchannel的pipeline設置默認的handler

 

經過這幾個步驟,整個serverbootstrap也就算是運行起來了。。。

相關文章
相關標籤/搜索