-java
一、因爲篇幅過長難以發佈,因此本章節接着上一節來的,上一章節爲【原理剖析(第 010 篇)Netty之服務端啓動工做原理分析(上)】; 二、那麼本章節就繼續分析Netty的服務端啓動,分析Netty的源碼版本爲:netty-netty-4.1.22.Final;
詳見 原理剖析(第 010 篇)Netty之服務端啓動工做原理分析(上)git
上一章節,咱們主要分析了一下線程管理組對象是如何被實例化的,而且還了解到了每一個線程管理組都有一個子線程數組來處理任務;
那麼接下來咱們就直接從4.6開始分析了:github
一、源碼: // NettyServer.java // 將 Boss、Worker 設置到 ServerBootstrap 服務端引導類中 serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // 指定通道類型爲NioServerSocketChannel,一種異步模式,OIO阻塞模式爲OioServerSocketChannel .localAddress("localhost", port)//設置InetSocketAddress讓服務器監聽某個端口已等待客戶端鏈接。 .childHandler(new ChannelInitializer<Channel>() {//設置childHandler執行全部的鏈接請求 @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new PacketHeadDecoder()); ch.pipeline().addLast(new PacketBodyDecoder()); ch.pipeline().addLast(new PacketHeadEncoder()); ch.pipeline().addLast(new PacketBodyEncoder()); ch.pipeline().addLast(new PacketHandler()); } }); 二、主要爲後序的通訊設置了一些配置參數而已,指定構建的Channel爲NioServerSocketChannel,說明須要啓動的是服務端Netty; 然後面的服務端Channel實例化,就是須要經過這個參數反射實例化獲得; 三、同時還設置childHandler,這個childHandler也是有順序的,服務端讀數據時執行的順序是PacketHeadDecoder、PacketBodyDecoder、PacketHandler; 而服務端寫數據時執行的順序是PacketHandler、PacketBodyEncoder、PacketHeadEncoder; 因此在書寫方式你們千萬別寫錯了,按照本示例代碼的方式書寫便可;
一、源碼: // NettyServer.java // 最後綁定服務器等待直到綁定完成,調用sync()方法會阻塞直到服務器完成綁定,而後服務器等待通道關閉,由於使用sync(),因此關閉操做也會被阻塞。 ChannelFuture channelFuture = serverBootstrap.bind().sync(); 二、這裏其實沒什麼好看的,接下來咱們就主要看看這個bind()方法主要乾了些啥,就這麼簡簡單單一句代碼就把服務端給啓動起來了,有點神氣了;
一、源碼: // AbstractBootstrap.java /** * Create a new {@link Channel} and bind it. */ public ChannelFuture bind() { validate(); SocketAddress localAddress = this.localAddress; if (localAddress == null) { throw new IllegalStateException("localAddress not set"); } return doBind(localAddress); // 建立一個Channel,而且綁定它 } // AbstractBootstrap.java private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); // 初始化和註冊 // 執行到此,服務端大概完成了如下幾件事情: // 一、實例化NioServerSocketChannel,併爲Channel配備了pipeline、config、unsafe對象; // 二、將多個handler添加至pipeline雙向鏈表中,而且等待Channel註冊成功後須要給每一個handler觸發添加或者移除事件; // 三、將NioServerSocketChannel註冊到NioEventLoop的多路複用器上; final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } // 既然NioServerSocketChannel的Channel綁定到了多路複用器上,那麼接下來就是綁定地址,綁完地址就能夠正式進行通訊了 if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } } 二、大體一看,原來doBind方法主要乾了兩件事情,initAndRegister與doBind0; 三、initAndRegister主要作的事情就是初始化服務端Channel,而且將服務端Channel註冊到bossGroup子線程的多路複用器上; 四、doBind0則主要完成服務端啓動的最後一步,綁定地址,綁定完後就能夠正式進行通訊了;
一、源碼: // AbstractBootstrap.java final ChannelFuture initAndRegister() { Channel channel = null; try { // 反射調用clazz.getConstructor().newInstance()實例化類 // 同時也實例化了Channel,若是是服務端的話則爲NioServerSocketChannel實例化對象 // 在實例化NioServerSocketChannel的構造方法中,也爲每一個Channel建立了一個管道屬性對象DefaultChannelPipeline=pipeline對象 // 在實例化NioServerSocketChannel的構造方法中,也爲每一個Channel建立了一個配置屬性對象NioServerSocketChannelConfig=config對象 // 在實例化NioServerSocketChannel的構造方法中,也爲每一個Channel建立了一個unsafe屬性對象NioMessageUnsafe=unsafe對象 channel = channelFactory.newChannel(); // 調用ReflectiveChannelFactory的newChannel方法 // 初始化剛剛被實例化的channel init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } // config().group()=bossGroup或parentGroup,而後利用parentGroup去註冊NioServerSocketChannel=channel ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; } 二、逐行分析後會發現,首先經過反射實例化服務端channel對象,而後將服務端channel初始化一下; 三、而後調用bossGroup的註冊方法,將服務端channel做爲參數傳入; 四、至此,方法名也代表該段代碼的意圖,實例化並初始化服務端Channel,而後註冊到bossGroup子線程的多路複用器上;
一、源碼: // ServerBootstrap.java @Override void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } // 服務端ServerSocketChannel的管道對象,Channel實例化的時候就被建立出來了 ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } ChannelInitializer<Channel> tempHandler = new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { System.out.println("initAndRegister.init.initChannel-->ch.eventLoop().execute"); pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }; // 這裏我將addLast的參數剝離出來了,方便查看閱讀 p.addLast(tempHandler); } // DefaultChannelPipeline.java @Override public final ChannelPipeline addLast(ChannelHandler... handlers) { return addLast(null, handlers); } // DefaultChannelPipeline.java @Override public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { if (handlers == null) { throw new NullPointerException("handlers"); } for (ChannelHandler h: handlers) { if (h == null) { break; } addLast(executor, null, h); } return this; } // DefaultChannelPipeline.java @Override public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; // 這裏加了synchronized關鍵字,所以說addLast的新增動做都是線程安全的 // 而後再細看一下其它的方法,只要涉及到的handler的增刪改動做的方法,那些方法的代碼塊都是通過synchronized修飾了,保證操做過程當中線程安全 synchronized (this) { // 檢查handler的一些基本信息,若不是被Sharable註解過的話,並且已經被添加到其餘pipeline時則會拋出異常 checkMultiplicity(handler); // 經過一系列參數的封裝,最後封裝成DefaultChannelHandlerContext對象 newCtx = newContext(group, filterName(name, handler), handler); // 將newCtx添加到倒數第二的位置,即tail的前面一個位置 // 這裏的pipeline中的handler的構成方式是一個雙向鏈表式的結構 addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. // 該addLast方法可能會被其它各個地方調用,可是又爲了保證handler的線程安全,則採用了synchronized來保證addLast的線程安全 // 在Channel未註冊到多路複用器以前,registered確定爲false,那麼則把須要添加的handler封裝成AbstractChannelHandlerContext對象, // 而後調用setAddPending方法,pengding意味着在未來的某個時刻調用,那到底在什麼時刻被調用呢? // 英文解釋中提到一旦Channel註冊成功了的話則會被調用,因此Channel後續註冊完畢,再調用ChannelHandler.handlerAdded if (!registered) { newCtx.setAddPending(); // 將newCtx追加到PendingHandlerCallback單向鏈表的隊尾,以便未來回調時用到 callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run() { callHandlerAdded0(newCtx); } }); return this; } } // 若是能順利執行到這裏來的話,則代表Channel已經註冊到了NioEventLoop的多路複用器上面了 // 而後接下來的就是觸發調用newCtx的ChannelHandler.handlerAdded方法 callHandlerAdded0(newCtx); return this; } // DefaultChannelPipeline.java private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; // 將目前雙向鏈表tail的前驅結點找出來命名爲prev newCtx.prev = prev; // 將新的結點的前驅結點指向prev newCtx.next = tail; // 將新的結點的後驅結點指向tail prev.next = newCtx; // 將prev的後驅結點指向新的結點 tail.prev = newCtx; // 將tail的前驅結點指向新的結點 // 就這樣,將新的結點經過一系列的指針指向,順利的將新結點插到了tail的前面, // 也就是鏈表中倒數第2個結點的位置,原鏈表中倒數第2個結點變成倒數第3個結點 } // DefaultChannelPipeline.java private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) { assert !registered; // 根據added布爾值封裝成PendingHandlerAddedTask、PendingHandlerRemovedTask對象 PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx); PendingHandlerCallback pending = pendingHandlerCallbackHead; if (pending == null) { // 首次添加時則直接賦值而後返回 pendingHandlerCallbackHead = task; } else { // 非首次賦值的話,那麼經過while循環找到隊尾,而後將隊尾的next指向賦上task對象 // Find the tail of the linked-list. while (pending.next != null) { pending = pending.next; // 不停的尋找鏈表中的下一個結點 } // 當pending.next爲空說明已經找到了隊尾結點,而後將隊尾的next指向賦上task對象 pending.next = task; } } 二、其實初始化服務端Channel也作了蠻多的事情,事情再多也只是p.addLast(tempHandler)這句代碼乾的事情多; 三、主要完成了服務端Channel中管道對象pipeline添加handler的操做,添加過程當中主要有如下幾點: • 添加的過程當中是由synchronized關鍵字來保證線程安全的; • 將傳入的handler數組依次循環封裝成AbstractChannelHandlerContext對象添加到管道鎖維護的handler鏈表中; • 當未註冊成功時pipeline還維護了一個用後後序觸發調用newCtx的單向鏈表對象pendingHandlerCallbackHead; • 當註冊成功後,後序會迭代pendingHandlerCallbackHead對象依次執行全部任務的run方法; • 當註冊成功後,還會觸發調用這些newCtx的一些方法,主要是newCtx的ChannelHandler.handlerAdded方法; 四、講到這裏,initAndRegister總算講了一半了,接下來咱們就要看看被實例化的服務端channel是如何註冊到多路複用器上的;
一、源碼: // MultithreadEventLoopGroup.java @Override public ChannelFuture register(Channel channel) { // next()對象實際上是NioEventLoopGroup內部中的children[]屬性中的其中一個,經過必定規則挑選一個NioEventLoop // 那麼也就是說咱們最終調用的是NioEventLoop來實現註冊channel return next().register(channel); // 從另一個層面來說,咱們要想註冊一個Channel,那麼就能夠直接調用NioEventLoopGroup父類中的register(Channel)便可註冊Channel, // 而且會按照必定的規則順序經過next()挑選一個NioEventLoop並將Channel綁定到它上面 // 若是NioEventLoopGroup爲bossGroup的話,那麼該方法註冊的確定是NioServerSocketChannel對象 // 若是NioEventLoopGroup爲workerGroup的話,那麼該方法註冊的確定是ServerSocketChannel對象 } // SingleThreadEventLoop.java @Override public ChannelFuture register(Channel channel) { // 當前this對象是屬於children[]屬性中的其中一個 // 將傳入的Channel與當前對象this一塊兒封裝成DefaultChannelPromise對象 // 而後再調用當前對象的register(ChannelPromise)註冊方法 return register(new DefaultChannelPromise(channel, this)); } // SingleThreadEventLoop.java @Override public ChannelFuture register(final ChannelPromise promise) { // 校驗當前傳參是否爲空,原則上既然是不可能爲空的,由於上一個步驟是經過new出來的一個對象 ObjectUtil.checkNotNull(promise, "promise"); // promise.channel()其實就是上面new DefaultChannelPromise(channel, this)經過封裝後又取出這個channel對象 // promise.channel().unsafe()而每一個Channel都有一個unsafe對象,對於NioServerSocketChannel來講NioMessageUnsafe=unsafe // 當前this對象是屬於children[]屬性中的其中一個 promise.channel().unsafe().register(this, promise); return promise; } // AbstractUnsafe.java @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { // eventLoop對象是屬於children[]屬性中的其中一個 // 而當前類又是Channel的一個抽象類AbstractChannel,也是NioServerSocketChannel的父類 if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } // 這裏的 this.eventLoop 就是Children[i]中的一個,也就是具體執行任務的線程封裝對象 AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { // 若是對象eventLoop中的線程對象和當前線程比對是同樣的話 register0(promise); // 那麼則直接調用註冊方法register0 } else { try { // 比對的結果若是不同,十有八九都是該eventLoop的線程還未啓動, // 所以利用eventLoop的execute將register0(promise)方法做爲任務添加到任務隊列中,並啓動線程來執行任務 eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); // 而服務端Channel的註冊,走的是該else分支,由於線程都還沒建立,eventLoop.inEventLoop()確定就是false結果 } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } // SingleThreadEventExecutor.java /** * 向任務隊列中添加任務task。 * * @param task */ @Override public void execute(Runnable task) { if (task == null) { // 若是傳入的task任務爲空,則直接拋空指針異常,此方法嚴格控制傳入參數必須非空 throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); // 判斷要添加的任務的這個線程,是否是和正在運行的nioEventLoop的處於同一個線程? if (inEventLoop) { // 若是是,則說明就是當前線程正在添加task任務,那麼則直接調用addTask方法添加到隊列中 addTask(task); // 添加task任務 } else { startThread(); // 若是不是當前線程,則看看實例化的對象nioEventLoop父類中state字段是否標識有新建線程,沒有的話則利用線程池新建立一個線程,有的話則不用理會了 addTask(task); // 添加task任務 // 防止意外狀況,還須要判斷下是否被關閉掉,若是被關閉掉的話,則將剛剛添加的任務刪除掉並採起拒絕策略直接拋出RejectedExecutionException異常 if (isShutdown() && removeTask(task)) { reject(); // 拒絕策略直接拋出RejectedExecutionException異常 } } // addTaskWakesUp:添加任務時須要喚醒標誌,默認值爲false,經過構造方法傳進來的也是false // wakesUpForTask(task):不是NonWakeupRunnable類型的task則返回true,意思就是隻要不是NonWakeupRunnable類型的task,都須要喚醒阻塞操做 if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } } 二、經過一路跟蹤config().group().register(channel)該方法進去,最後會發現,源碼會調用一個register0(promise)的代碼來進行註冊; 三、可是跳出來一看,細細回味config().group().register(channel)這段代碼,能夠得出這樣的一個結論: 若之後你們想註冊channel的話,直接經過線程管理組調用register方法,傳入想要註冊的channel對象便可; 四、固然還有一點請你們留意,execute(Runnable task)能夠隨意調用添加任務,若是線程已啓動則直接添加,未啓動的話則先啓動線程再添加任務; 五、那麼咱們仍是先儘快進入register0(promise)看看到底是如何註冊channel的;
一、源碼: // AbstractUnsafe.java private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); // 調用Channel的註冊方法,讓Channel的子類AbstractNioChannel來實現註冊 // 執行到此,說明Channel已經註冊到了多路複用器上,而且也沒有拋出什麼異常,那麼接下來就賦值變量代表已經註冊成功 neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); // 會回調initAndRegister中init方法的p.addLast的initChannel回調 safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { // 檢測Channel是否處於活躍狀態,這裏調用的是底層的socket的活躍狀態 if (firstRegistration) { pipeline.fireChannelActive(); // 這裏也是註冊成功後會僅僅只會被調用一次 } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); // 設置Channel的讀事件 } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } // AbstractNioChannel.java @Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { // 自旋式的死循環,若是正常操做不出現異常的話,那麼則會一直嘗試將Channel註冊到多路複用器selector上面 try { // eventLoop()對象是屬於children[]屬性中的其中一個,children是NioEventLoop類型的對象 // 而前面也瞭解到過,在實例化每一個children的時候,會爲每一個children建立一個多路複用器selector與unwrappedSelector selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); // 若是將Channel註冊到了多路複用器上的成功且沒有拋什麼異常的話,則返回跳出循環 return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } } // DefaultChannelPipeline.java final void invokeHandlerAddedIfNeeded() { assert channel.eventLoop().inEventLoop(); if (firstRegistration) { // pipeline標識是否已註冊,默認值爲true firstRegistration = false; // 立刻置位false,告訴你們該方法只會被調用一次 // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers, // that were added before the registration was done. // 到此爲止,咱們已經將Channel註冊到了NioEventLoop的多路複用器上,那麼接下來是時候回調Handler被添加進來 callHandlerAddedForAllHandlers(); } } // DefaultChannelPipeline.java private void callHandlerAddedForAllHandlers() { final PendingHandlerCallback pendingHandlerCallbackHead; synchronized (this) { assert !registered; // 測試registered是否爲false,由於該方法已經代表只會被調用一次,因此這裏就嚴格判斷 // This Channel itself was registered. registered = true; // 並且當registered設置爲true後,就不會再改變該值的狀態 pendingHandlerCallbackHead = this.pendingHandlerCallbackHead; // Null out so it can be GC'ed. this.pendingHandlerCallbackHead = null; } // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside // the EventLoop. PendingHandlerCallback task = pendingHandlerCallbackHead; // 經過while循環,單向鏈表一個個回調task的execute,該回調添加的就回調添加,該回調移除的則回調移除 while (task != null) { task.execute(); task = task.next; } } 二、看完register0(promise)是否是以爲,原來服務端channel的註冊是這麼簡單,最後就是調用javaChannel().register(...)這個方法一下,而後就這麼稀裏糊塗的註冊到多路複用器上了; 三、在註冊完之際,還會找到以前的單向鏈表對象pendingHandlerCallbackHead,而且依依回調task.execute方法; 四、而後觸發fireChannelRegistered註冊成功事件,告知上層說咱們的服務端channel已經註冊成功了,你們請知悉一下; 五、最後經過beginRead設置服務端的讀事件標誌,就是說服務端的channel僅對讀事件感興趣; 六、至此initAndRegister這塊算是講完了,那麼接下來就看看最後一個步驟綁定ip地址,完成通訊前的最後一步;
一、源碼: // AbstractBootstrap.java private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. // 服務端啓動最後一個步驟,綁完地址就能夠正式進行通訊了 channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { // 服務端channel直接調用bind方法進行綁定地址 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); } // AbstractChannel.java @Override public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); } // DefaultChannelPipeline.java @Override public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); } // AbstractChannelHandlerContext.java @Override public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { if (localAddress == null) { throw new NullPointerException("localAddress"); } if (isNotValidPromise(promise, false)) { // cancelled return promise; } final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; } // AbstractChannelHandlerContext.java private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { bind(localAddress, promise); } } // HeadContext.java @Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); } // AbstractUnsafe.java @Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop(); if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } // See: https://github.com/netty/netty/issues/576 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); } boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); } // NioServerSocketChannel.java @Override protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } } 二、通過這麼一路調用,其實最終會發現,綁定地址也是經過javaChannel().bind(...)這麼簡短的一句話就搞定了; 而前面的註冊到多路複用器上調用的是javaChannel().register(...)一句簡短代碼; 從而可得出這麼一個結論:只要關係到channel的註冊綁定,最終核心底層都是調用這個channel的bind和register方法; 三、至此,服務端的啓動流程算是完結了。。
最後咱們來總結下,經過分析Netty的服務端啓動,通過的流程以下: • 建立兩個線程管理組,以及實例化每一個線程管理組的子線程數組children[]; • 設置啓動類參數,好比channel、localAddress、childHandler等參數; • 反射實例化NioServerSocketChannel,建立ChannelId、unsafe、pipeline等對象; • 初始化NioServerSocketChannel,設置attr、option,添加新的handler到服務端pipeline管道中; • 調用JDK底層作ServerSocketChannel註冊到多路複用器上,而且註冊成功後回調pipeline管道中的單向鏈表依次執行task任務; • 調用JDK底層作NioServerSocketChannel綁定端口,並觸發active事件;
https://gitee.com/ylimhhmily/SpringCloudTutorial.gitsegmentfault
SpringCloudTutorial交流QQ羣: 235322432數組
SpringCloudTutorial交流微信羣: 微信溝通羣二維碼圖片連接promise
歡迎關注,您的確定是對我最大的支持!!!安全