public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args); }
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// 首先是變量的非空檢查以及合法性判斷,
// nThreads在MultithreadEventLoopGroup的構造方法中已經通過一些默認值處理,
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
// 這裏通常都會使用默認值, // ThreadPerTaskExecutor的做用即字面意思,一個任務一個線程 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } // 子執行器的數組,一個子執行器對應一個線程 children = new EventExecutor[nThreads]; // 根據傳入的線程數量建立多個自執行器 // 注意,這裏子執行器建立好後並不會當即運行起來 for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { // 若是建立子執行器不成功,那麼須要將已經建立好的子執行器也所有銷燬 if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } // 等待因此子執行器中止後在退出 for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // 建立一個子執行器的選擇器,選擇器的做用是從子執行器中選出一個 // 默認使用roundRobin的方式 chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; // 給每一個子執行器添加監聽器,在子執行器終止的時候作一些工做 // 每有一個子執行器終止時就將terminatedChildren變量加一 // 當全部子執行器所有終止時,當前這個執行器組就終止了 for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } // 包裝一個不可變的集合 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet);
protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
public boolean inEventLoop() { return inEventLoop(Thread.currentThread()); }
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
public void execute(Runnable task) { // 非空檢查 if (task == null) { throw new NullPointerException("task"); } // 執行到這裏通常都是外部調用者, boolean inEventLoop = inEventLoop(); // 向任務隊列中添加一個任務 addTask(task); // 若是當前線程不是執行器的線程,那麼須要檢查執行器線程是否已經運行, // 若是還沒在運行,就須要啓動線程 if (!inEventLoop) { startThread(); // 檢查線程是否被關閉 if (isShutdown()) { boolean reject = false; try { // 將剛剛添加的任務移除 if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) { // The task queue does not support removal so the best thing we can do is to just move on and // hope we will be able to pick-up the task before its completely terminated. // In worst case we will log on termination. } if (reject) { reject(); } } } // addTaskWakesUp不知道這個變量意義是什麼,NioEventLoop傳進來的是false // 向任務隊列中添加一個空任務,這樣就可以喚醒阻塞的執行器線程 // 有些狀況下執行器線程會阻塞在taskQueue上, // 因此向阻塞隊列中添加一個元素可以喚醒哪些由於隊列空而被阻塞的線程 if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
private void startThread() { // 狀態量的維護 if (state == ST_NOT_STARTED) { // 這裏使用了jdk中的原子更新器AtomicIntegerFieldUpdater類, // 使用cpu的cas指令保證併發狀況下可以安全地維護狀態量 // 保證只有一個線程可以執行啓動邏輯,保證啓動邏輯只被執行一次 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try { // 實際啓動線程的邏輯 doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); } } } } }
private void processSelectedKeysOptimized() { // 遍歷全部的io事件的SelectionKey for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.keys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { // 處理事件 processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } // 若是須要從新select,那麼把後面的selectionKey所有置0,而後再次調用selectNow方法 if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1; } } }
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // 若是selectionKey是無效的,那麼說明相應的channel是無效的,此時須要關閉這個channel if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See https://github.com/netty/netty/issues/5125 // 只關閉註冊在當前EventLoop上的channel, // 理論上來講,一個channel是能夠註冊到多個Eventloop上的, // SelectionKey無效多是由於channel從當前EventLoop上註銷了, // 可是channel自己依然是正常的,而且註冊在其餘的EventLoop中 if (eventLoop != this || eventLoop == null) { return; } // close the channel if the key is not valid anymore // 到這裏說明channel已經無效了,關閉它 unsafe.close(unsafe.voidPromise()); return; } // 下面處理正常狀況 try { // 準備好的io事件 int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. // 處理connect事件 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. // 處理write事件 if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop // 處理read和accept事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
public final void finishConnect() { // Note this method is invoked by the event loop only if the connection attempt was // neither cancelled nor timed out. assert eventLoop().inEventLoop(); try { // 是否已經處於鏈接成功的狀態 boolean wasActive = isActive(); // 抽象方法,有子類實現 doFinishConnect(); // 處理future對象,將其標記爲成功 fulfillConnectPromise(connectPromise, wasActive); } catch (Throwable t) { fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); } finally { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; } }
protected void doFinishConnect() throws Exception { if (!javaChannel().finishConnect()) { throw new Error(); } }
protected void flush0() { // 若是正在寫數據,直接返回 if (inFlush0) { // Avoid re-entrance return; } // 輸出的緩衝區 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(new NotYetConnectedException(), true); } else { // Do not trigger channelWritabilityChanged because the channel is closed already. outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause), false); } } finally { inFlush0 = false; } return; } try { // 將緩衝區的數據寫入到channel中 doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { /** * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of * failing all flushed messages and also ensure the actual close of the underlying transport * will happen before the promises are notified. * * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()} * may still return {@code true} even if the channel should be closed as result of the exception. */ initialCloseCause = t; close(voidPromise(), t, newClosedChannelException(t), false); } else { try { shutdownOutput(voidPromise(), t); } catch (Throwable t2) { initialCloseCause = t; close(voidPromise(), t2, newClosedChannelException(t), false); } } } finally { inFlush0 = false; } }
public final void read() {
final ChannelConfig config = config();
// 若是通道已經關閉,那麼就不須要再讀取數據,直接返回
if (shouldBreakReadReady(config)) {
final ChannelPipeline pipeline = pipeline();
// 緩衝分配器
final ByteBufAllocator allocator = config.getAllocator();
// 緩衝分配的處理器,處理緩衝分配,讀取計數等
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
ByteBuf byteBuf = null; boolean close = false; try { do { // 分配一個緩衝 byteBuf = allocHandle.allocate(allocator); // 將通道的數據讀取到緩衝中 allocHandle.lastBytesRead(doReadBytes(byteBuf)); // 若是沒有讀取到數據,說明通道中沒有待讀取的數據了, if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. // 由於沒讀取到數據,因此應該釋放緩衝 byteBuf.release(); byteBuf = null; // 若是讀取到的數據量是負數,說明通道已經關閉了 close = allocHandle.lastBytesRead() < 0; if (close) { // There is nothing left to read as we received an EOF. readPending = false; } break; } // 更新Handle內部的簿記量 allocHandle.incMessagesRead(1); readPending = false; // 向channel的處理器流水線中觸發一個事件, // 讓取到的數據可以被流水線上的各個ChannelHandler處理 pipeline.fireChannelRead(byteBuf); byteBuf = null; // 這裏根據以下條件判斷是否繼續讀: // 上一次讀取到的數據量大於0,而且讀取到的數據量等於分配的緩衝的最大容量, // 此時說明通道中還有待讀取的數據 } while (allocHandle.continueReading()); // 讀取完成 allocHandle.readComplete(); // 觸發一個讀取完成的事件 pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 // 這裏isAutoRead默認是true, 因此正常狀況下會繼續監聽read事件 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
本篇主要分析了EventLoop的事件監聽以及處理邏輯,此外處理處理io事件,也會處理添加進來的任務和定時調度任務和延遲調度任務。EventLoop就像是整個框架的發動機或者說是心臟,它經過jdk api進而簡介地調用系統調用,不斷地監聽各類io事件,同時對不一樣的io事件分門別類採用不一樣的處理方式,對於read事件則會將網絡io數據讀取到緩衝中,並將讀取到的數據傳遞給用戶的處理器進行鏈式處理。Channelpipeline就像一個流水線同樣,對觸發的的各類事件進行處理。