本文接着前兩篇文章來說,主要講服務端類剩下的部分,咱們仍是來先看看服務端的代碼html
/** * Created by chenhao on 2019/9/4. */ public final class SimpleServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new SimpleServerHandler()) .childHandler(new SimpleServerInitializer()) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(8888).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
在前面兩篇博文中從源碼的角度分析了以下幾行代碼主要作了哪些工做。java
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new SimpleServerHandler()) .childHandler(new SimpleServerInitializer()) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true);
本篇博文將從源碼的角度分析ChannelFuture f = b.bind(8888).sync()
的內部實現。這樣就完成了Netty服務器端啓動過程的源碼分析。promise
AbstractBootstrap.java服務器
public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); }
咱們接着看重載的bindsocket
public ChannelFuture bind(SocketAddress localAddress) { validate();//相關參數的檢查 if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress);//下面將分析 }
該函數主要看兩點:validate()和doBind(localAddress)ide
//函數功能:檢查相關參數是否設置了 @SuppressWarnings("unchecked") public B validate() { if (group == null) {//這裏的group指的是:b.group(bossGroup, workerGroup)代碼中的bossGroup throw new IllegalStateException("group not set"); } if (channelFactory == null) { throw new IllegalStateException("channel or channelFactory not set"); } return (B) this; }
該方法主要檢查了兩個參數,一個是group,一個是channelFactory,在這裏能夠想想這兩個參數是在哪裏以及什麼時候被賦值的?答案是在以下代碼塊中被賦值的,其中是將bossGroup賦值給了group,將BootstrapChannelFactory賦值給了channelFactory.函數
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class)
doBind方法的源代碼以下:oop
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister();//1 final Channel channel = regFuture.channel();//2 if (regFuture.cause() != null) { return regFuture; } final ChannelPromise promise; if (regFuture.isDone()) { promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); } else { // Registration future is almost always fulfilled already, but just in case it's not. promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doBind0(regFuture, channel, localAddress, promise); } }); } return promise; }
doBind這個函數是咱們要分析的重點,這個函數的主要工做有以下幾點:源碼分析
一、經過initAndRegister()方法獲得一個ChannelFuture的實例regFuture。post
二、經過regFuture.cause()方法判斷是否在執行initAndRegister方法時產生來異常。若是產生來異常,則直接返回,若是沒有產生異常則進行第3步。
三、經過regFuture.isDone()來判斷initAndRegister方法是否執行完畢,若是執行完畢來返回true,而後調用doBind0進行socket綁定。若是沒有執行完畢則返回false進行第4步。
四、regFuture會添加一個ChannelFutureListener監聽,當initAndRegister執行完成時,調用operationComplete方法並執行doBind0進行socket綁定。
第三、4點想幹的事就是一個:調用doBind0方法進行socket綁定。
下面將分紅4部分對每行代碼具體作了哪些工做進行詳細分析。
該方法的具體代碼以下:
final ChannelFuture initAndRegister() { //結論:這裏的channel爲一個NioServerSocketChannel對象,具體分析見後面 final Channel channel = channelFactory().newChannel();//1 try { init(channel);//2 } catch (Throwable t) { channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = group().register(channel);//3 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
經過函數名以及內部調用的函數能夠猜想該函數幹了兩件事情:
一、初始化一個Channel,要想初始化,確定要先獲得一個Channel。
final Channel channel = channelFactory().newChannel();//1 init(channel);//2
二、將Channel進行註冊。
ChannelFuture regFuture = group().register(channel);//3
下面咱們將分析這幾行代碼內部幹來些什麼。
在上一篇文章中(Netty源碼分析 (二)----- ServerBootstrap)分析中,咱們知道b.channel(NioServerSocketChannel.class)的功能爲:設置父類屬性channelFactory 爲: BootstrapChannelFactory類的對象。其中這裏BootstrapChannelFactory對象中包括一個clazz屬性爲:NioServerSocketChannel.class
所以,final Channel channel = channelFactory().newChannel();就是調用的BootstrapChannelFactory類中的newChannel()方法,該方法的具體內容爲:
private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> { private final Class<? extends T> clazz; BootstrapChannelFactory(Class<? extends T> clazz) { this.clazz = clazz; } @Override public T newChannel() { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } } @Override public String toString() { return StringUtil.simpleClassName(clazz) + ".class"; } }
看到這個類,咱們能夠獲得的結論:final Channel channel = channelFactory().newChannel();
這行代碼的做用爲經過反射產生來一個NioServerSocketChannel類的實例。
下面將看下NioServerSocketChannel類的構造函數作了哪些工做。
NioServerSocketChannel類的繼承體系結構以下:
其無參構造函數以下:
public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); }
無參構造函數中SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider()
。
函數newSocket的功能爲:利用SelectorProvider產生一個SocketChannelImpl對象。
private static ServerSocketChannel newSocket(SelectorProvider provider) { try { return provider.openServerSocketChannel(); } catch (IOException e) { throw new ChannelException( "Failed to open a server socket.", e); } } public SocketChannel openSocketChannel() throws IOException { return new SocketChannelImpl(this); }
無參構造函數經過newSocket函數產生了一個SocketChannelImpl對象
而後調用了以下構造函數,咱們繼續看
public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } //父類AbstractNioMessageChannel的構造函數 protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent, ch, readInterestOp); } //父類 AbstractNioChannel的構造函數 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp;//SelectionKey.OP_ACCEPT try { ch.configureBlocking(false);//設置當前的ServerSocketChannel爲非阻塞的 } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } } //父類AbstractChannel的構造函數 protected AbstractChannel(Channel parent) { this.parent = parent; unsafe = newUnsafe(); pipeline = new DefaultChannelPipeline(this); }
new NioServerSocketChannel()產生一個實例對象時,調用上面這麼多構造函數主要乾了兩件事情:
一、產生來一個SocketChannelImpl類的實例,設置到ch屬性中,並設置爲非阻塞的。
this.ch = ch;
ch.configureBlocking(false);
二、設置了config屬性
config = new NioServerSocketChannelConfig(this, javaChannel().socket()
三、設置SelectionKey.OP_ACCEPT事件
this.readInterestOp = readInterestOp;//SelectionKey.OP_ACCEPT
四、設置unsafe屬性
@Override protected AbstractNioUnsafe newUnsafe() { return new NioMessageUnsafe(); }
主要做用爲:用來負責底層的connect、register、read和write等操做。
五、設置pipeline屬性
pipeline = new DefaultChannelPipeline(this);
每一個Channel都有本身的pipeline,當有請求事件發生時,pipeline負責調用相應的hander進行處理。
這些屬性在後面都會用到,至於NioServerSocketChannel 對象中的unsafe、pipeline屬性的具體實現後面進行分析。
結論:final Channel channel = channelFactory().newChannel();這行代碼的做用爲經過反射產生來一個NioServerSocketChannel類的實例,其中這個NioServerSocketChannel類對象有這樣幾個屬性:SocketChannel、NioServerSocketChannelConfig 、SelectionKey.OP_ACCEPT事件、NioMessageUnsafe、DefaultChannelPipeline
init方法的具體代碼以下:
@Override void init(Channel channel) throws Exception { //一、設置新接入channel的option final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { channel.config().setOptions(options);//NioServerSocketChannelConfig } //二、設置新接入channel的attr final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //三、設置handler到pipeline上 ChannelPipeline p = channel.pipeline(); if (handler() != null) {//這裏的handler()返回的就是第二部分.handler(new SimpleServerHandler())所設置的SimpleServerHandler p.addLast(handler()); } final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } //p.addLast()向serverChannel的流水線處理器中加入了一個ServerBootstrapAcceptor,從名字上就能夠看出來,這是一個接入器,專門接受新請求,把新的請求扔給某個事件循環器 p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }
該函數的功能爲:
一、設置channel的options
若是沒有設置,則options爲空,該屬性在ServerBootstrap類中的定義以下
Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
options可能以下:
public <T> boolean setOption(ChannelOption<T> option, T value) { validate(option, value); if (option == CONNECT_TIMEOUT_MILLIS) { setConnectTimeoutMillis((Integer) value); } else if (option == MAX_MESSAGES_PER_READ) { setMaxMessagesPerRead((Integer) value); } else if (option == WRITE_SPIN_COUNT) { setWriteSpinCount((Integer) value); } else if (option == ALLOCATOR) { setAllocator((ByteBufAllocator) value); } else if (option == RCVBUF_ALLOCATOR) { setRecvByteBufAllocator((RecvByteBufAllocator) value); } else if (option == AUTO_READ) { setAutoRead((Boolean) value); } else if (option == AUTO_CLOSE) { setAutoClose((Boolean) value); } else if (option == WRITE_BUFFER_HIGH_WATER_MARK) { setWriteBufferHighWaterMark((Integer) value); } else if (option == WRITE_BUFFER_LOW_WATER_MARK) { setWriteBufferLowWaterMark((Integer) value); } else if (option == MESSAGE_SIZE_ESTIMATOR) { setMessageSizeEstimator((MessageSizeEstimator) value); } else { return false; } return true; }
二、設置channel的attrs
若是沒有設置,則attrs爲空,該屬性在ServerBootstrap類中的定義以下
private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
三、設置handler到channel的pipeline上
其中,這裏的handler爲:在博文(Netty源碼分析 (二)----- ServerBootstrap)中分析的經過b.handler(new SimpleServerHandler())
所設置的SimpleServerHandler對象
四、在pipeline上添加來一個ChannelInitializer對象,其中重寫來initChannel方法。該方法經過p.addLast()向serverChannel的流水線處理器中加入了一個 ServerBootstrapAcceptor,
從名字上就能夠看出來,這是一個接入器,專門接受新請求,把新的請求扔給某個事件循環器
看到這裏,咱們發現其實init只是初始化了一些基本的配置和屬性,以及在pipeline上加入了一個接入器,用來專門接受新鏈接,並無啓動服務.
回到 initAndRegister 方法中,繼續看 config().group().register(channel) 這行代碼,config 方法返回了 ServerBootstrapConfig,這個 ServerBootstrapConfig 調用了 group 方法,實際上就是 bossGroup。bossGroup 調用了 register 方法。
前面的分析咱們知道group爲:NioEvenLoopGroup,其繼承MultithreadEventLoopGroup,該類中的register方法以下:
@Override public ChannelFuture register(Channel channel) { return next().register(channel);//調用了NioEvenLoop對象中的register方法,NioEventLoop extends SingleThreadEventLoop }
next()方法的代碼以下,其功能爲選擇下一個NioEventLoop對象。
@Override public EventExecutor next() { return chooser.next();//調用MultithreadEventExecutorGroup中的next方法 }
根據線程個數nThreads是否爲2的冪次方來選擇chooser,其中這兩個chooser爲: PowerOfTwoEventExecutorChooser、GenericEventExecutorChooser,這兩個chooser功能都是同樣,只是求餘的方式不同。
next()方法返回的是一個NioEvenLoop對象
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return children[childIndex.getAndIncrement() & children.length - 1];//利用2的N次方法的特色,使用&求餘更快。 } } private final class GenericEventExecutorChooser implements EventExecutorChooser { @Override public EventExecutor next() { return children[Math.abs(childIndex.getAndIncrement() % children.length)]; } }
結論:因爲NioEventLoopGroup中維護着多個NioEventLoop,next方法回調用chooser策略找到下一個NioEventLoop,並執行該對象的register方法進行註冊。
因爲NioEventLoop extends SingleThreadEventLoop,NioEventLoop沒有重寫該方法,所以看 SingleThreadEventLoop類中的register方法
@Override public ChannelFuture register(Channel channel) { return register(channel, new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final Channel channel, final ChannelPromise promise) { if (channel == null) { throw new NullPointerException("channel"); } if (promise == null) { throw new NullPointerException("promise"); } channel.unsafe().register(this, promise); return promise; }
在本博文第1部分的NioServerSocketChannel實例化中設置來unsafe屬性,具體是調用以下的方法來設置的,所以這裏的channel.unsafe()
就是NioMessageUnsafe實例。
@Override protected AbstractNioUnsafe newUnsafe() { return new NioMessageUnsafe(); }
channel.unsafe().register(this, promise)這行代碼調用的是AbstractUnsafe類中的register方法,具體代碼以下:
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } //判斷該channel是否已經被註冊到EventLoop中 if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } //1 將eventLoop設置在NioServerSocketChannel上 AbstractChannel.this.eventLoop = eventLoop; //判斷當前線程是否爲該EventLoop中擁有的線程,若是是,則直接註冊,若是不是,則添加一個任務到該線程中 if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { //重點 @Override public void run() { register0(promise);//分析 } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
上面的重點是register0(promise)方法。基本邏輯爲:
一、經過調用eventLoop.inEventLoop()方法判斷當前線程是否爲該EventLoop中擁有的線程,若是是,則直接註冊,若是不是,說明該EventLoop在等待並無執行權,則進行第二步。
AbstractEventExecutor.java
@Override public boolean inEventLoop() { return inEventLoop(Thread.currentThread()); }
SingleThreadEventExecutor.java
@Override public boolean inEventLoop(Thread thread) { return thread == this.thread; }
二、既然該EventLoop中的線程此時沒有執行權,可是咱們能夠提交一個任務到該線程中,等該EventLoop的線程有執行權的時候就天然而然的會執行此任務,而該任務負責調用register0方法,這樣也就達到了調用register0方法的目的。
下面看register0這個方法,具體代碼以下:
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } doRegister(); registered = true; safeSetSuccess(promise); //執行完,控制檯輸出:channelRegistered pipeline.fireChannelRegistered(); if (isActive()) { //分析 pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
在上面的代碼中,是經過調用doRegister()方法完成NioServerSocketChannel的註冊,該方法的具體代碼以下:
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } } protected SelectableChannel javaChannel() { return ch; }
在本博文的第1部分的NioServerSocketChannel的實例化分析中,咱們知道這裏的javaChannel()方法返回的ch爲實例化NioServerSocketChannel時產生的一個SocketChannelImpl類的實例,並設置爲非阻塞的,具體見本博文的第1部分。
selectionKey = javaChannel().register(eventLoop().selector, 0, this);就完成了ServerSocketChannel註冊到Selector中。
回顧下,這裏的eventLoop().selector是什麼?答案是:KQueueSelectorImpl對象。
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { super(parent, threadFactory, false); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } provider = selectorProvider; selector = openSelector(); } private Selector openSelector() { final Selector selector; try { selector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } //...省略了一部分代碼 return selector; }
ServerSocketChannel註冊完以後,接着執行pipeline.fireChannelRegistered方法。
public final ChannelPipeline fireChannelRegistered() { AbstractChannelHandlerContext.invokeChannelRegistered(this.head); return this; }
咱們看到invokeChannelRegistered(this.head)傳的參數是head,這個head咱們再下一篇文章中講,繼續往下看
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { public void run() { next.invokeChannelRegistered(); } }); } }
看next.invokeChannelRegistered();
private void invokeChannelRegistered() { if (this.invokeHandler()) { try { ((ChannelInboundHandler)this.handler()).channelRegistered(this); } catch (Throwable var2) { this.notifyHandlerException(var2); } } else { this.fireChannelRegistered(); } }
接着看看this.handler(),實際上就是head的handler()
public ChannelHandler handler() { return this; }
返回的是this,那接着看 head中的channelRegistered(this)
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { DefaultChannelPipeline.this.invokeHandlerAddedIfNeeded(); ctx.fireChannelRegistered(); }
繼續看ctx.fireChannelRegistered();
public ChannelHandlerContext fireChannelRegistered() { invokeChannelRegistered(this.findContextInbound()); return this; }
咱們看看this.findContextInbound()
private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while(!ctx.inbound); return ctx; }
咱們看到 ctx = ctx.next; 其實是從head開始找,找到第一個 inbound 的hander
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { public void run() { next.invokeChannelRegistered(); } }); } }
最後執行next.invokeChannelRegistered();
pipeline中維護了handler鏈表,還記得以前.handler(new SimpleServerHandler())初始化的handler在本博文的第1.2部分的分析中介紹了此handler被添加到此pipeline中了,經過遍歷鏈表,執行InBound類型handler的channelRegistered方法
所以執行到這裏,咱們的控制檯就回輸出:channelRegistered,這行信息。
到這裏,咱們就將doBind方法final ChannelFuture regFuture = initAndRegister();給分析完了,獲得的結論以下:
一、經過反射產生了一個NioServerSocketChannle對象。
二、完成了初始化
三、將NioServerSocketChannel進行了註冊。
接下來咱們分析doBind方法的剩餘部分代碼主要作了什麼,
源代碼以下:
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister();//1 final Channel channel = regFuture.channel();//2 if (regFuture.cause() != null) { return regFuture; } final ChannelPromise promise; if (regFuture.isDone()) { promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); } else { // Registration future is almost always fulfilled already, but just in case it's not. promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { doBind0(regFuture, channel, localAddress, promise); } }); } return promise; }
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
該函數主要是提交了一個Runnable任務到NioEventLoop線程中來進行處理。,這裏先看一下NioEventLoop類的execute方法
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop();//判斷當前線程是否爲該NioEventLoop所關聯的線程,若是是,則添加任務到任務隊列中,若是不是,則先啓動線程,而後添加任務到任務隊列中去 if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); //若是 if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
當提交的任務被線程執行後,則會執行channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE)這行代碼,這行代碼完成的功能爲:實現channel與端口的綁定。
具體以下:
AbstractChannel.java @Override public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); }
在該方法中直接調用了pipeline的bind方法,這裏的pipeline時DefaultChannelPipeline的實例。
DefaultChannelPipeline.java @Override public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); }
在上面方法中直接調用了TailContext實例tail的bind方法,tail在下一篇博文中有詳細的介紹。繼續看tail實例的bind方法
AbstractChannelHandlerContext.java @Override public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { //...省略有效性檢查 final AbstractChannelHandlerContext next = findContextOutbound();// EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new OneTimeTask() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; }
此上面bind函數中的這行代碼:final AbstractChannelHandlerContext next = findContextOutbound();所完成的任務就是在pipeline所持有的以AbstractChannelHandlerContext爲節點的雙向鏈表中從尾節點tail開始向前尋找第一個outbound=true的handler節點。
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
在 DefaultChannelPipeline 的構造器中, 會實例化兩個對象: head 和 tail, 並造成了雙向鏈表的頭和尾。 head 是 HeadContext 的實例, 它實現了 ChannelOutboundHandler 接口和ChannelInboundHandler 接口, 而且它的 outbound 字段爲 true.而tail 是 TailContext 的實例,它實現了ChannelInboundHandler 接口,而且其outbound 字段爲 false,inbound 字段爲true。 基於此在如上的bind函數中調用了 findContextOutbound方法 找到的 AbstractChannelHandlerContext 對象其實就是 head.
繼續看,在pipelie的雙向鏈表中找到第一個outbound=true的AbstractChannelHandlerContext節點head後,而後調用此節點的invokeConnect方法,該方法的代碼以下:
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
HeadContext類中的handler()方法代碼以下:
@Override public ChannelHandler handler() { return this; }
該方法返回的是其自己,這是由於HeadContext因爲其繼承AbstractChannelHandlerContext以及實現了ChannelHandler接口使其具備Context和Handler雙重特性。
繼續看,看HeadContext類中的bind方法,代碼以下:
@Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); }
unsafe這個字段是在HeadContext構造函數中被初始化的,以下:
HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); }
而此構造函數中的pipeline.channel().unsafe()這行代碼返回的就是在本博文前面研究NioServerSocketChannel這個類的構造函數中所初始化的一個實例,以下:
unsafe = newUnsafe();//newUnsafe()方法返回的是NioMessageUnsafe對象。
接下來看NioMessageUnsafe類中的bind方法(準確來講:該方法在AbstractUnsafe中),該類bind具體方法代碼以下:
@Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { //...省略了部分代碼 boolean wasActive = isActive(); try { doBind(localAddress);//核心代碼 } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive && isActive()) { invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
上面的核心代碼就是:doBind(localAddress);須要注意的是,此doBind方法是在NioServerSocketChannel類中的doBind方法,不是其餘類中的。
NioServerSocketChannel類中的doBind方法代碼以下:
@Override protected void doBind(SocketAddress localAddress) throws Exception { javaChannel().socket().bind(localAddress, config.getBacklog()); }
上面方法中javaChannel()方法返回的是NioServerSocketChannel實例初始化時所產生的Java NIO ServerSocketChannel實例(更具體點爲ServerSocketChannelImple實例)。 等價於語句serverSocketChannel.socket().bind(localAddress)完成了指定端口的綁定,這樣就開始監聽此端口。綁定端口成功後,是這裏調用了咱們自定義handler的channelActive方法,在綁定以前,isActive()方法返回false,綁定以後返回true。
@Override public boolean isActive() { return javaChannel().socket().isBound(); }
這樣,就進入了以下的if條件的代碼塊中
if (!wasActive && isActive()) { invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelActive(); } }); } private void invokeLater(Runnable task) { try { //省略了部分代碼 eventLoop().execute(task); } catch (RejectedExecutionException e) { logger.warn("Can't invoke task later as EventLoop rejected it", e); } }
進而開始執行 pipeline.fireChannelActive();這行代碼 ,這行代碼的具體調用鏈以下所示:
DefaultChannelPipeline.java
@Override public ChannelPipeline fireChannelActive() { head.fireChannelActive(); if (channel.config().isAutoRead()) { channel.read(); } return this; } @Override public ChannelHandlerContext fireChannelActive() { final AbstractChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelActive(); } else { executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelActive(); } }); } return this; } private void invokeChannelActive() { try { ((ChannelInboundHandler) handler()).channelActive(this); } catch (Throwable t) { notifyHandlerException(t); } }
最後,咱們來作下總結,netty啓動一個服務所通過的流程
1.設置啓動類參數,最重要的就是設置channel
2.建立server對應的channel,建立各大組件,包括ChannelConfig,ChannelId,ChannelPipeline,ChannelHandler,Unsafe等
3.init 初始化這個 NioServerSocketChannel,設置一些attr,option,以及設置子channel的attr,option,給server的channel添加新channel接入器,並觸發addHandler事件
4.config().group().register(channel) 經過 ServerBootstrap 的 bossGroup 根據group長度取模獲得NioEventLoop ,將 NioServerSocketChannel 註冊到 NioEventLoop 中的 selector 上,而後觸發 channelRegistered事件
5.調用到jdk底層作端口綁定,並觸發active事件