eventloop運行過程:java
在第一次執行任務時建立一個新的線程,而且會調用SingleThreadEventExecutor.this.run()方法,從而讓這個eventloop線程一直運行下去,注意的是跟eventloop綁定的線程來自於ThreadPerTaskExecutor這個類中,在executor.execute這個方法執行時,會建立一個新的線程與eventloop綁定。git
executor.execute(new Runnable() { @Override public void run() {
@Override public void execute(Runnable command) { threadFactory.newThread(command).start(); }
接下來進入到run方法中:github
@Override protected void run() { //循環處理IO事件或者處理線程池中的task任務 for (;;) { try { //判斷實質性select仍是直接處理IO事件和隊列裏面的任務 //當 taskQueue 不爲空時, hasTasks() 就會返回TRUE,那麼 //selectStrategy.calculateStrategy() 的實現裏面就會執行selectSupplier.get() 而get()方法 //裏面會調用 selectNow(); 執行當即返回當前就緒的IO事件的個數,若是存在IO事件,那麼在switch 語 //句中就會直接執行 default, 直接跳出switch語句,若是不存在,就是返回0, 對應於continue,忽略 //這次循環。 //當taskQueue爲空時,就會selectStrategy.calculateStrategy() 就會返回 //SelectStrategy.SELECT, 對用於switch case語句就是執行select()函數,阻塞等待IO事件就緒 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO //說明隊列裏面沒有任務待執行 case SelectStrategy.SELECT: //阻塞執行select(time),每次都會將這個標誌置爲false select(wakenUp.getAndSet(false)); // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up // overhead. (Selector.wakeup() is an expensive operation.) // // However, there is a race condition in this approach. // The race condition is triggered when 'wakenUp' is set to // true too early. // // 'wakenUp' is set to true too early if: // 1) Selector is waken up between 'wakenUp.set(false)' and // 'selector.select(...)'. (BAD) // 2) Selector is waken up between 'selector.select(...)' and // 'if (wakenUp.get()) { ... }'. (OK) // // In the first case, 'wakenUp' is set to true and the // following 'selector.select(...)' will wake up immediately. // Until 'wakenUp' is set to false again in the next round, // 'wakenUp.compareAndSet(false, true)' will fail, and therefore // any attempt to wake up the Selector will fail, too, causing // the following 'selector.select(...)' call to block // unnecessarily. // // To fix this problem, we wake up the selector again if wakenUp // is true immediately after selector.select(...). // It is inefficient in that it wakes up the selector for both // the first case (BAD - wake-up required) and the second case // (OK - no wake-up required). if (wakenUp.get()) { selector.wakeup(); } // fall through default: } cancelledKeys = 0; needsToSelectAgain = false; //這一塊是處理IO事件所花費的時間和處理隊列裏面的任務的時間的比例 final int ioRatio = this.ioRatio; //此時表示處理完IO事件後,執行任務 if (ioRatio == 100) { try { // 處理IO事件 processSelectedKeys(); } finally { // Ensure we always run tasks. // 保證能執行全部的task runAllTasks(); } } else { // 記錄處理IO事件開始的時間 final long ioStartTime = System.nanoTime(); try { // 處理IO事件 processSelectedKeys(); } finally { // Ensure we always run tasks. // 當前時間減去處理IO事件開始的時間就是處理IO事件花費的時間 final long ioTime = System.nanoTime() - ioStartTime; // 執行task的時間taskTime就是ioTime * (100 - ioRatio) / ioRatio //這裏須要注意的是,任務隊列裏面可能沒有執行完 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { //若是shutdown關閉資源 if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { //這種狀況時可被喚醒 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } // If a task was submitted when wakenUp value was true, the task didn't get a chance to call // Selector#wakeup. So we need to check task queue again before executing select operation. // If we don't, the task might be pended until select operation was timed out. // It might be pended until idle timeout if IdleStateHandler existed in pipeline. //這種狀況時可被喚醒 if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } //阻塞執行 //若是第一個定時任務的延遲很是長,好比一個小時,那麼有沒有可能線程一直阻塞在select操做, //只要在這段時間內,有新任務加入,該阻塞就會被釋放,調用了selector.wakeup() int selectedKeys = selector.select(timeoutMillis); selectCnt ++; //這種狀況時可被喚醒 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { // - Selected something, // - waken up by user, or // - the task queue has a pending task. // - a scheduled task is ready for processing break; } //可被喚醒 if (Thread.interrupted()) { // Thread was interrupted so reset selected keys and break so we not run into a busy loop. // As this is most likely a bug in the handler of the user or it's client library we will // also log it. // // See https://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; } long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // timeoutMillis elapsed without anything selected. selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { //發生空循環時須要重建這個selector // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn( "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); rebuildSelector(); selector = this.selector; // Select again to populate selectedKeys. selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } // Harmless exception - log anyway } }
服務端:promise
代碼調用流程以下:app
static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc())); } //p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); // Start the server. //入口 ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
/** * Create a new {@link Channel} and bind it. */ //第一步 AbstractBootstrap public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); }
/** * Create a new {@link Channel} and bind it. */ //第二步 AbstractBootstrap public ChannelFuture bind(SocketAddress localAddress) { validate(); if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress); }
//第三步AbstractBootstrap private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } } final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; } @Override void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } //這是與客戶端不一樣的一點 ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
//第四步 AbstractBootstrap private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
//第五步 ChannelOutboundInvoker ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
//第六步 AbstractChannel @Override public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); }
//第7步 DefaultChannelPipeline @Override public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); }
//第八步 AbstractChannelHandlerContext @Override public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { if (localAddress == null) { throw new NullPointerException("localAddress"); } if (isNotValidPromise(promise, false)) { // cancelled return promise; } final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; }
//第九步 AbstractChannelHandlerContext private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { bind(localAddress, promise); } }
//第10步 ChannelOutboundHandler void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
//第11步 DefaultChannelPipeline @Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); }
//第12步 Channel void bind(SocketAddress localAddress, ChannelPromise promise);
//第13步 AbstractChannel @Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop(); if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } // See: https://github.com/netty/netty/issues/576 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); } boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }
//第14步 AbstractChannel protected abstract void doBind(SocketAddress localAddress) throws Exception;
//第15步 NioServerSocketChannel @Override protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }
在ServerSocketChannel註冊成功時,會調用pipeline.fireChannelRegistered(),該事件會在pipeline中傳播,從head開始,尋找inbound類型的handler,來調用channelRegistered方法。在init方法調用時會添加一個handler,它的類型是inbound的,因此會調用它的方法。less
@Override 16 @SuppressWarnings("unchecked") 17 public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {//NioServerScoketChannel註冊成功時被調用 18 ChannelPipeline pipeline = ctx.pipeline(); 19 boolean success = false; 20 try { 21 initChannel((C) ctx.channel());//調用initChannel 22 pipeline.remove(this);//初始化Handler只完成初始化工做,初始化完成自後就把本身刪除 23 ctx.fireChannelRegistered();//繼續傳播channelRegistered事件 24 success = true; 25 } catch (Throwable t) { 26 logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t); 27 } finally { 28 if (pipeline.context(this) != null) { 29 pipeline.remove(this); 30 } 31 if (!success) { 32 ctx.close(); 33 } 34 } 35 }
該事件還會繼續傳播下去。socket
在綁定時還調用了fireChannelActive方法,這個事件也會在pipeline中傳播下去。ide
pipeline.fireChannelActive()也會調用到unsafe.beginRead()方法。函數
客戶端接入過程:注意serversocket和socket裏面unsafe.read()方法執行是不一樣的oop
NioSocketChannel而言,其繼承自AbstractNioByteChannel
NioServerSocketChannel繼承了AbstractNioMessageChannel
eventloop事件循環裏面收到了一個讀請求->processSelectedKeys();->processSelectedKey(k, (AbstractNioChannel) a);->unsafe.read();->AbstractNioMessageChannel.read->doReadMessages(readBuf)(此處建立了針對客戶端的socketchannel)->pipeline.fireChannelRead();->ServerBootstrapAcceptor.read();->child.pipeline().addLast(childHandler);-> childGroup.register(child);
代碼流程以下:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); }
@Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }
@Override protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
@Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }