這一篇是 ChannelHandler 和 ChannelPipeline 的番外篇,主要從源碼的角度來學習 ChannelHandler、ChannelHandler 和 ChannelPipeline 相互之間是如何創建聯繫和運行的。html
從上一篇的 demo 中能夠看到在初始化 Server 和 Client 的時候,都會經過 ChannelPipeline 的 addLast 方法將 ChannelHandler 添加進去java
// Server.java // 部分代碼片斷 ServerBootstrap serverBootstrap = new ServerBootstrap(); NioEventLoopGroup group = new NioEventLoopGroup(); serverBootstrap.group(group) .channel(NioServerSocketChannel.class)Channel .localAddress(new InetSocketAddress("localhost", 9999)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { // 添加ChannelHandler socketChannel.pipeline().addLast(new OneChannelOutBoundHandler()); socketChannel.pipeline().addLast(new OneChannelInBoundHandler()); socketChannel.pipeline().addLast(new TwoChannelInBoundHandler()); } });
在上面的代碼片斷中,socketChannel.pipeline()方法返回的是一個類型是 DefaultChannelPipeline 的實例,DefaultChannelPipeline 實現了 ChannelPipeline 接口
DefaultChannelPipeline 的 addLast 方法實現以下:git
// DefaultChannelPipeline.java @Override public final ChannelPipeline addLast(ChannelHandler... handlers) { return addLast(null, handlers); } @Override public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) { ObjectUtil.checkNotNull(handlers, "handlers"); for (ChannelHandler h: handlers) { if (h == null) { break; } addLast(executor, null, h); } return this; }
通過一系列重載方法調用,最終進入到下面的 addLast 方法github
// DefaultChannelPipeline.java @Override public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventLoop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this; } } callHandlerAdded0(newCtx); return this; }
在這個方法實現中,利用傳進來的 ChannelHandler 在 newContext 建立了一個 AbstractChannelHandlerContext 對象。newContext 方法實現以下:promise
// DefaultChannelPipeline.java private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) { return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler); }
這裏建立並返回了一個類型爲 DefaultChannelHandlerContext 的對象。從傳入的參數能夠看到,在這裏將 ChannelHandlerContext、ChannelPipeline(this)和 ChannelHandler 三者創建了關係。
最後再看看 addLast0 方法實現:socket
// DefaultChannelPipeline.java private void addLast0(AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; }
這裏出現了 AbstractChannelHandlerContext 的兩個屬性 prev 和 next,而 DefaultChannelPipeline 有一個屬性 tail。從實現邏輯上看起來像是創建了一個雙向鏈表的結構。下面的代碼片斷是關於 tail 和另外一個相關屬性 head:ide
// DefaultChannelPipeline.java public class DefaultChannelPipeline implements ChannelPipeline { final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; // ...... protected DefaultChannelPipeline(Channel channel) { // ...... tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; } // ...... } // HeaderContext.java final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { // ...... @Override public ChannelHandler handler() { return this; } //...... } // TailContext.java final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { // ...... @Override public ChannelHandler handler() { return this; } // ...... }
DefaultChannelPipeline 內部維護了兩個 AbstractChannelHandlerContext 類型的屬性 head、tail,而這兩個屬性又都實現了 ChannelHandler 的子接口。構造方法裏將這兩個屬性維護成了一個雙向鏈表。結合上面的 addLast0 方法實現,能夠知道在添加 ChannelHandler 的時候,實際上是在對 ChannelPipeline 內部維護的雙向鏈表作插入操做。
下面是 ChannelHandlerContext 相關類的結構
因此,對 ChannelPipeline 作 add 操做添加 ChannelHandler 後,內部結構大致是這樣的:
全部的 ChannelHandlerContext 組成了一個雙向鏈表,頭部是 HeadContext,尾部是 TailContext,由於它們都實現了 ChannelHandler 接口,因此它們內部的 Handler 也是本身。每次添加一個 ChannelHandler,將會新建立一個 DefaultChannelHandler 關聯,並按照必定的順序插入到鏈表中。
在 AbstractChannelHandlerContext 類裏有一個屬性 executionMask,在構造方法初始化時會對它進行賦值oop
// AbstractChannelHandlerContext.java // 省略部分代碼 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.executionMask = mask(handlerClass); // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor. ordered = executor == null || executor instanceof OrderedEventExecutor; } // 省略部分代碼
mask 是一個靜態方法,來自於 ChannelHandlerMask 類學習
// ChannelHandlerMask.java // 省略部分代碼 /** * Return the {@code executionMask}. */ static int mask(Class<? extends ChannelHandler> clazz) { // Try to obtain the mask from the cache first. If this fails calculate it and put it in the cache for fast // lookup in the future. Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get(); Integer mask = cache.get(clazz); if (mask == null) { mask = mask0(clazz); cache.put(clazz, mask); } return mask; } /** * Calculate the {@code executionMask}. */ private static int mask0(Class<? extends ChannelHandler> handlerType) { int mask = MASK_EXCEPTION_CAUGHT; try { if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) { mask |= MASK_ALL_INBOUND; if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) { mask &= ~MASK_CHANNEL_REGISTERED; } if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) { mask &= ~MASK_CHANNEL_UNREGISTERED; } if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) { mask &= ~MASK_CHANNEL_ACTIVE; } if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) { mask &= ~MASK_CHANNEL_INACTIVE; } if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) { mask &= ~MASK_CHANNEL_READ; } if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) { mask &= ~MASK_CHANNEL_READ_COMPLETE; } if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) { mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED; } if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) { mask &= ~MASK_USER_EVENT_TRIGGERED; } } if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) { mask |= MASK_ALL_OUTBOUND; if (isSkippable(handlerType, "bind", ChannelHandlerContext.class, SocketAddress.class, ChannelPromise.class)) { mask &= ~MASK_BIND; } if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class, SocketAddress.class, ChannelPromise.class)) { mask &= ~MASK_CONNECT; } if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) { mask &= ~MASK_DISCONNECT; } if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) { mask &= ~MASK_CLOSE; } if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) { mask &= ~MASK_DEREGISTER; } if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) { mask &= ~MASK_READ; } if (isSkippable(handlerType, "write", ChannelHandlerContext.class, Object.class, ChannelPromise.class)) { mask &= ~MASK_WRITE; } if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) { mask &= ~MASK_FLUSH; } } if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) { mask &= ~MASK_EXCEPTION_CAUGHT; } } catch (Exception e) { // Should never reach here. PlatformDependent.throwException(e); } return mask; } @SuppressWarnings("rawtypes") private static boolean isSkippable( final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception { return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() { @Override public Boolean run() throws Exception { Method m; try { m = handlerType.getMethod(methodName, paramTypes); } catch (NoSuchMethodException e) { if (logger.isDebugEnabled()) { logger.debug( "Class {} missing method {}, assume we can not skip execution", handlerType, methodName, e); } return false; } return m != null && m.isAnnotationPresent(Skip.class); } }); } // 省略部分代碼
以上代碼實現邏輯是這樣的:當建立一個 ChannelHandlerContext 時,會與一個 ChannelHandler 綁定,同時會將傳遞進來的 ChannelHandler 進行解析,解析當前 ChannelHandler 支持哪些回調方法,並經過位運算獲得一個結果保存在 ChannelHandlerContext 的 executionMask 屬性裏。注意 m.isAnnotationPresent(Skip.class)這裏,ChannelHandler 的基類 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 裏的回調方法上都有@Skip 註解,當繼承了這兩個類並重寫了某個回調方法後,這個方法上的註解就會被覆蓋掉,解析時就會被認爲當前 ChannelHandler 支持這個回調方法。
下面是每一個回調方法對應的掩碼this
// ChannelHandlerMask.java final class ChannelHandlerMask { // Using to mask which methods must be called for a ChannelHandler. static final int MASK_EXCEPTION_CAUGHT = 1; static final int MASK_CHANNEL_REGISTERED = 1 << 1; static final int MASK_CHANNEL_UNREGISTERED = 1 << 2; static final int MASK_CHANNEL_ACTIVE = 1 << 3; static final int MASK_CHANNEL_INACTIVE = 1 << 4; static final int MASK_CHANNEL_READ = 1 << 5; static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6; static final int MASK_USER_EVENT_TRIGGERED = 1 << 7; static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8; static final int MASK_BIND = 1 << 9; static final int MASK_CONNECT = 1 << 10; static final int MASK_DISCONNECT = 1 << 11; static final int MASK_CLOSE = 1 << 12; static final int MASK_DEREGISTER = 1 << 13; static final int MASK_READ = 1 << 14; static final int MASK_WRITE = 1 << 15; static final int MASK_FLUSH = 1 << 16; static final int MASK_ONLY_INBOUND = MASK_CHANNEL_REGISTERED | MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ | MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED; private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND; static final int MASK_ONLY_OUTBOUND = MASK_BIND | MASK_CONNECT | MASK_DISCONNECT | MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH; private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND; }
咱們以消息讀取和寫入爲例,來看看在 ChannelPipeline 裏的各個 ChannelHandler 是如何按照順序處理消息和事件的。
當 Channel 讀取到消息後,會在如下地方調用 ChannelPipeline 的 fireChannelRead 方法:
// AbstractNioMessageClient.java private final class NioMessageUnsafe extends AbstractNioUnsafe { // 省略代碼 @Override public void read() { // ...... for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } // ...... } // 省略代碼 } // DefaultChannelPipeline.java // 省略代碼 @Override public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; } // 省略代碼
能夠看到,經過 AbstractChannelHandlerContext 的 invokeChannelRead 方法,傳遞 head,從頭部開始觸發讀取事件。
// AbstractChannelHandlerContext.java // 省略代碼 static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { invokeExceptionCaught(t); } } else { fireChannelRead(msg); } } /** * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called * yet. If not return {@code false} and if called or could not detect return {@code true}. * * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event. * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}. */ private boolean invokeHandler() { // Store in local variable to reduce volatile reads. int handlerState = this.handlerState; return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING); } // 省略代碼
在這裏經過 invokeHandler 方法對當前 ChannelHandler 進行狀態檢查,經過了就將調用當前 ChannelHandler 的 channelRead 方法,沒有經過將調用 fireChannelRead 方法將事件傳遞到下一個 ChannelHandler 上。而 head 的類型是 HeadContext,自己也實現了 ChannelInBoundHandler 接口,因此這裏調用的是 HeadContext 的 channelRead 方法。
// DefaultChannelPipeline.java final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.fireChannelRead(msg); } }
這裏對消息沒有作任何處理,直接將讀取消息傳遞下去。接下來看看 ChannelHandlerContext 的 fireChannelRead 作了什麼
// AbstractChannelHandlerContext.java @Override public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg); return this; } private AbstractChannelHandlerContext findContextInbound(int mask) { AbstractChannelHandlerContext ctx = this; EventExecutor currentExecutor = executor(); do { ctx = ctx.next; } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND)); return ctx; } private static boolean skipContext( AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) { // Ensure we correctly handle MASK_EXCEPTION_CAUGHT which is not included in the MASK_EXCEPTION_CAUGHT return (ctx.executionMask & (onlyMask | mask)) == 0 || // We can only skip if the EventExecutor is the same as otherwise we need to ensure we offload // everything to preserve ordering. // // See https://github.com/netty/netty/issues/10067 (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0); }
這裏實現的邏輯是這樣的:在雙向鏈表中,從當前 ChannelHandlerContext 節點向後尋找,直到找到匹配 MASK_CHANNEL_READ 這個掩碼的 ChannelHandlerContext。從上面的章節裏能夠直到 ChannelHandlerContext 的屬性裏保存了當前 ChannelHandler 支持(重寫)的全部方法掩碼的位運算值,經過位運算的結果來找到實現了對應方法的最近的 ChannelHandlerContext。
鏈表最後一個節點是 TailContext
// DefaultChannelPipeline.java final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { onUnhandledInboundMessage(ctx, msg); } /** * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point. */ protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) { onUnhandledInboundMessage(msg); if (logger.isDebugEnabled()) { logger.debug("Discarded message pipeline : {}. Channel : {}.", ctx.pipeline().names(), ctx.channel()); } } /** * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point. */ protected void onUnhandledInboundMessage(Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg); } finally { ReferenceCountUtil.release(msg); } } }
能夠看到,tail 節點的 channelRead 方法沒有將事件繼續傳遞下去,只是釋放了 msg。
咱們經過 OneChannelInBoundHandler 的 channelReadComplete 方法裏的 ctx.write 方法來看
// AbstractChannelHandlerContext.java // 省略代碼 @Override public ChannelFuture write(Object msg) { return write(msg, newPromise()); } @Override public ChannelFuture write(final Object msg, final ChannelPromise promise) { write(msg, false, promise); return promise; } private void write(Object msg, boolean flush, ChannelPromise promise) { ObjectUtil.checkNotNull(msg, "msg"); try { if (isNotValidPromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return; } } catch (RuntimeException e) { ReferenceCountUtil.release(msg); throw e; } final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { final WriteTask task = WriteTask.newInstance(next, m, promise, flush); if (!safeExecute(executor, task, promise, m, !flush)) { // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes // and put it back in the Recycler for re-use later. // // See https://github.com/netty/netty/issues/8343. task.cancel(); } } } private AbstractChannelHandlerContext findContextOutbound(int mask) { AbstractChannelHandlerContext ctx = this; EventExecutor currentExecutor = executor(); do { ctx = ctx.prev; } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND)); return ctx; } // 省略代碼
經過調用一系列重載的 write 方法後,經過 findContextOutbound 方法在雙向鏈表裏向前尋找最近的實現了 write 或 writeAndFlush 方法的 ChannelHandlerContext,調用它的 invokeWrite 或 invokeWriteAndFlush 方法。
// AbstractChannelHandlerContext.java // 省略代碼 void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { write(msg, promise); } } private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } // 省略代碼
同理於讀取消息,這裏通過 invokeHandler 方法檢查經過後調用找到的 ChannelHandlerContext 的 ChannelHandler,沒有經過檢查,則繼續向前傳遞寫入事件。當寫入消息傳遞到頭部,調用 HeadContext 的 write 方法
// DefaultChannelPipeline.java final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; // 省略代碼 @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { unsafe.write(msg, promise); } @Override public void flush(ChannelHandlerContext ctx) { unsafe.flush(); } // 省略代碼 }
最終經過調用 unsafe 的 write 方法寫入消息。 最後,從上面的實現裏能夠發現,在將 ChannelHandler 加入到 ChannelPipeline 時,要把 ChannelOutBoundHandler 類型的 ChannelHandler 進來添加在前面,不然在 ChannelInBoundHandler 寫入消息時,在它後面的 ChannelOutBoundHandler 將沒法獲取到事件。