在以前介紹Bootstrap的初始化以及啓動過程時,咱們屢次接觸了NioEventLoopGroup這個類,關於這個類的理解,還須要瞭解netty的線程模型。NioEventLoopGroup能夠理解爲一組線程,這些線程每個均可以獨立地處理多個channel產生的io事件。java
咱們看其中一個參數比較多的構造方法,其餘一些參數較少的構造方法使用了一些默認值,使用的默認參數以下:git
接下來,咱們看一下其中的一個經常使用的構造方法,github
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) { super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }
可見,當前類中並無什麼初始化邏輯,直接調用了父類的構造方法,因此咱們接着看父類MultithreadEventLoopGroup的構造方法:設計模式
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args); }
一樣,並未作任務處理,直接調用父類構造方法,因此咱們接着看MultithreadEventExecutorGroup構造方法,初始化邏輯的實如今這個類中,api
經過上一小結的分析,咱們知道NioEventLoopGroup的構造方法的主要邏輯的實現是在MultithreadEventExecutorGroup類中,而且在調用構造方法的過程當中加上了一個參數的默認值,即EventExecutorChooserFactory類型參數的默認值DefaultEventExecutorChooserFactory.INSTANCE,這個類以輪詢(roundrobin)的方式從多個線程中依次選出線程用於註冊channel。
總結一下這段代碼的主要步驟:數組
最後給每一個子執行器添加一個監聽器,以監聽子執行器的終止,作一些簿記工做,使得在全部子執行器所有終止後將當前的執行器組終止promise
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// 首先是變量的非空檢查以及合法性判斷,
// nThreads在MultithreadEventLoopGroup的構造方法中已經通過一些默認值處理,
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}安全
// 這裏通常都會使用默認值, // ThreadPerTaskExecutor的做用即字面意思,一個任務一個線程 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } // 子執行器的數組,一個子執行器對應一個線程 children = new EventExecutor[nThreads]; // 根據傳入的線程數量建立多個自執行器 // 注意,這裏子執行器建立好後並不會當即運行起來 for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { // 若是建立子執行器不成功,那麼須要將已經建立好的子執行器也所有銷燬 if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } // 等待因此子執行器中止後在退出 for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // 建立一個子執行器的選擇器,選擇器的做用是從子執行器中選出一個 // 默認使用roundRobin的方式 chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; // 給每一個子執行器添加監聽器,在子執行器終止的時候作一些工做 // 每有一個子執行器終止時就將terminatedChildren變量加一 // 當全部子執行器所有終止時,當前這個執行器組就終止了 for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } // 包裝一個不可變的集合 Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet);
}網絡
上面的方法中調用了newChild方法來建立一個子執行器,而這個方法是一個抽象方法,咱們看NioEventLoopGroup類的實現:併發
protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
可見僅僅是簡單地建立了一個NioEventLoop對象。
到這裏,咱們就把NioEventLoopGroup的初始化過程分析完了。咱們不由思考,既然NioEventLoopGroup是一個執行器組,說白了就是一組線程,那這些線程是何時跑起來的呢?若是讀者還有印象,應該能記得咱們在分析Bootstrap創建鏈接過程時,channel初始化以後須要註冊到EventLoopGroup中,實際上是註冊到其中的一個EventLoop上,註冊邏輯最終是在AbstractChannel.AbstractUnsafe.register方法中實現的,其中有一段代碼:
if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
首先調用eventLoop.inEventLoop()判斷執行器的線程與當前線程是不是同一個,若是是則直接執行註冊的代碼,若是不是就調用eventLoop.execute將註冊邏輯封裝成一個任務放到執行器的任務隊列中,接下里咱們就以這個方法爲切入點,探究一會兒執行器線程的啓動過程。
首先,讓咱們來看一下這個方法,這個方法的做用是判斷當前線程與執行器的線程是否同一個線程。
public boolean inEventLoop() { return inEventLoop(Thread.currentThread()); }
代碼很簡單,就很少說了。
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
方法很簡單,核心邏輯在startThread方法中,
public void execute(Runnable task) { // 非空檢查 if (task == null) { throw new NullPointerException("task"); } // 執行到這裏通常都是外部調用者, boolean inEventLoop = inEventLoop(); // 向任務隊列中添加一個任務 addTask(task); // 若是當前線程不是執行器的線程,那麼須要檢查執行器線程是否已經運行, // 若是還沒在運行,就須要啓動線程 if (!inEventLoop) { startThread(); // 檢查線程是否被關閉 if (isShutdown()) { boolean reject = false; try { // 將剛剛添加的任務移除 if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) { // The task queue does not support removal so the best thing we can do is to just move on and // hope we will be able to pick-up the task before its completely terminated. // In worst case we will log on termination. } if (reject) { reject(); } } } // addTaskWakesUp不知道這個變量意義是什麼,NioEventLoop傳進來的是false // 向任務隊列中添加一個空任務,這樣就可以喚醒阻塞的執行器線程 // 有些狀況下執行器線程會阻塞在taskQueue上, // 因此向阻塞隊列中添加一個元素可以喚醒哪些由於隊列空而被阻塞的線程 if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
這個方法的主要做用是維護內部的狀態量state,使用cas指令併發狀況下對狀態量的修改是線程安全的,而且對於狀態量的判斷保證啓動邏輯只被執行一次
private void startThread() { // 狀態量的維護 if (state == ST_NOT_STARTED) { // 這裏使用了jdk中的原子更新器AtomicIntegerFieldUpdater類, // 使用cpu的cas指令保證併發狀況下可以安全地維護狀態量 // 保證只有一個線程可以執行啓動邏輯,保證啓動邏輯只被執行一次 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try { // 實際啓動線程的邏輯 doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); } } } } }
這個方法我就不貼代碼了,說一下它的主要做用:
具體的業務邏輯仍然是在子類中實現的,也就是SingleThreadEventExecutor.run()方法的具體實現。
咱們仍然以NioEventLoop爲例,看一下它實現的run方法。還大概講一下它的主要邏輯:
這裏,我就不貼代碼了,其中比較重要的是對一些併發狀況的考慮和處理,如selector的喚醒時機。接下來,主要看一下對於各類io事件的處理,至於任務隊列以及調度隊列中任務的處理比較簡單,就不展開了。
這個方法會遍歷全部接受到的io事件對應的selectionKey,而後依次處理。
private void processSelectedKeysOptimized() { // 遍歷全部的io事件的SelectionKey for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.keys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { // 處理事件 processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } // 若是須要從新select,那麼把後面的selectionKey所有置0,而後再次調用selectNow方法 if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1; } } }
這個方法首先對SelectionKey無效的狀況作了處理,分爲兩種狀況:channel自己無效了;channel仍然是正常的,只不過是被從當前的selector上註銷了,可能在其餘的selector中仍然是正常運行的
接下來,咱們着重分析一下對於四種事件的處理邏輯。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // 若是selectionKey是無效的,那麼說明相應的channel是無效的,此時須要關閉這個channel if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See https://github.com/netty/netty/issues/5125 // 只關閉註冊在當前EventLoop上的channel, // 理論上來講,一個channel是能夠註冊到多個Eventloop上的, // SelectionKey無效多是由於channel從當前EventLoop上註銷了, // 可是channel自己依然是正常的,而且註冊在其餘的EventLoop中 if (eventLoop != this || eventLoop == null) { return; } // close the channel if the key is not valid anymore // 到這裏說明channel已經無效了,關閉它 unsafe.close(unsafe.voidPromise()); return; } // 下面處理正常狀況 try { // 準備好的io事件 int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. // 處理connect事件 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. // 處理write事件 if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop // 處理read和accept事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
從代碼中能夠看出,connect事件的處理時經過調用NioUnsafe.finishConnect完成的,咱們看一下AbstractNioUnsafe.finishConnect的實現:
public final void finishConnect() { // Note this method is invoked by the event loop only if the connection attempt was // neither cancelled nor timed out. assert eventLoop().inEventLoop(); try { // 是否已經處於鏈接成功的狀態 boolean wasActive = isActive(); // 抽象方法,有子類實現 doFinishConnect(); // 處理future對象,將其標記爲成功 fulfillConnectPromise(connectPromise, wasActive); } catch (Throwable t) { fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); } finally { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; } }
能夠看出,主要是經過調用doFinishConnect實現完成鏈接的邏輯,具體到子類中,NioSocketChannel.doFinishConnect的實現是:
protected void doFinishConnect() throws Exception { if (!javaChannel().finishConnect()) { throw new Error(); } }
對於的write事件的處理時經過調用NioUnsafe.forceFlush方法完成,最終的實如今AbstractChannel.AbstractUnsafe.flush0中:
大致上看,這個方法的邏輯比較簡單,可是實際上最複雜也是最核心的寫入邏輯在子類實現的doWrite方法中。因爲本篇的重點在於把NioEventLoop的主幹邏輯梳理一下,因此這裏再也不繼續展開,後面會單獨來分析這一塊的源碼,這裏涉及到netty中對緩衝區的封裝,其中涉及到一些比較複雜的邏輯。
protected void flush0() { // 若是正在寫數據,直接返回 if (inFlush0) { // Avoid re-entrance return; } // 輸出的緩衝區 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(new NotYetConnectedException(), true); } else { // Do not trigger channelWritabilityChanged because the channel is closed already. outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause), false); } } finally { inFlush0 = false; } return; } try { // 將緩衝區的數據寫入到channel中 doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { /** * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of * failing all flushed messages and also ensure the actual close of the underlying transport * will happen before the promises are notified. * * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()} * may still return {@code true} even if the channel should be closed as result of the exception. */ initialCloseCause = t; close(voidPromise(), t, newClosedChannelException(t), false); } else { try { shutdownOutput(voidPromise(), t); } catch (Throwable t2) { initialCloseCause = t; close(voidPromise(), t2, newClosedChannelException(t), false); } } } finally { inFlush0 = false; } }
乍看會比較奇怪,爲何這兩個事件要放到一塊兒處理呢,他們明明是不一樣的事件。這裏主要仍是考慮到編碼的統一,由於read事件只有NioSocketChannel纔會有,而accept事件只有NioServerSocketChannel纔會有,因此這裏經過抽象方法,讓不一樣的子類去實現各自的邏輯,是的代碼結構上更統一。咱們這裏看一下NioScketChannel的實現,而對於NioServerSocketChannel的實現我會在後續分析netty服務端的啓動過程時在具體講到,即ServerBootstrap的啓動過程。
總結一下這個方法的主要邏輯:
最後還要根據最後一次讀取到的數據量決定是否關閉通道,若是最後一次讀取到的數據量小於0,說明對端已經關閉了輸出,因此這裏須要將輸入關閉,即通道處於半關閉狀態。
public final void read() {
final ChannelConfig config = config();
// 若是通道已經關閉,那麼就不須要再讀取數據,直接返回
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
// 緩衝分配器
final ByteBufAllocator allocator = config.getAllocator();
// 緩衝分配的處理器,處理緩衝分配,讀取計數等
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null; boolean close = false; try { do { // 分配一個緩衝 byteBuf = allocHandle.allocate(allocator); // 將通道的數據讀取到緩衝中 allocHandle.lastBytesRead(doReadBytes(byteBuf)); // 若是沒有讀取到數據,說明通道中沒有待讀取的數據了, if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. // 由於沒讀取到數據,因此應該釋放緩衝 byteBuf.release(); byteBuf = null; // 若是讀取到的數據量是負數,說明通道已經關閉了 close = allocHandle.lastBytesRead() < 0; if (close) { // There is nothing left to read as we received an EOF. readPending = false; } break; } // 更新Handle內部的簿記量 allocHandle.incMessagesRead(1); readPending = false; // 向channel的處理器流水線中觸發一個事件, // 讓取到的數據可以被流水線上的各個ChannelHandler處理 pipeline.fireChannelRead(byteBuf); byteBuf = null; // 這裏根據以下條件判斷是否繼續讀: // 上一次讀取到的數據量大於0,而且讀取到的數據量等於分配的緩衝的最大容量, // 此時說明通道中還有待讀取的數據 } while (allocHandle.continueReading()); // 讀取完成 allocHandle.readComplete(); // 觸發一個讀取完成的事件 pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 // 這裏isAutoRead默認是true, 因此正常狀況下會繼續監聽read事件 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
}
本篇主要分析了EventLoop的事件監聽以及處理邏輯,此外處理處理io事件,也會處理添加進來的任務和定時調度任務和延遲調度任務。EventLoop就像是整個框架的發動機或者說是心臟,它經過jdk api進而簡介地調用系統調用,不斷地監聽各類io事件,同時對不一樣的io事件分門別類採用不一樣的處理方式,對於read事件則會將網絡io數據讀取到緩衝中,並將讀取到的數據傳遞給用戶的處理器進行鏈式處理。Channelpipeline就像一個流水線同樣,對觸發的的各類事件進行處理。