上文咱們闡述了Netty的Reactor模型。在Reactor模型的第二階段,Netty會處理各類io事件。對於客戶端的各類請求就是在這個階段去處理的。本文便來分析一個新的鏈接是如何被處理的。java
代碼的入口就從read方法開始。這裏的unsafe的類型是NioMessageUnsafe,在服務端啓動時就肯定下來了。 react
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } }
咱們省去部分代碼,read方法邏輯很是簡單。就是一個讀出加處理的過程promise
public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; do { //讀取消息 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; //循環處理消息 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); //觸發讀取完畢事件 pipeline.fireChannelReadComplete(); }
protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
在doReadMessages首先accept一個新鏈接,因爲在一階段的時候已經有io事件產生了,因此這裏不會等待而是當即接受一個新鏈接並用SocketChannel表示。數據結構
接着又構造出了一個NioSocketChannel將java的channel封裝成netty本身的channel並添加到list中,咱們點進去看看。socket
public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); }
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { super(parent, ch, SelectionKey.OP_READ); }
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
最終咱們到了AbstractChannel的類中,發現NioSocketChannel的創建會建立unsafe和pipeline。這裏咱們看下具體類型ide
unsafe的具體類型是由子類io.netty.channel.socket.nio.NioSocketChannel#newUnsafe決定的oop
protected AbstractNioUnsafe newUnsafe() { return new NioSocketChannelUnsafe(); }
pipeline則是默認的DefaultChannelPipeline源碼分析
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
這裏咱們便引出了pipeline的概念,看上述代碼便知pipeline的數據結構是一個雙向鏈表。咱們也能夠把它想象成一個責任鏈或者更直白點就是流水線。任何鏈接請求都會經過pipeline處理最終返回到客戶端。this
如今顯得鏈接已經封裝成channel並添加到list中了,如今咱們再看下消息處理spa
int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); }
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
消息處理實際就是pipeline鏈式執行handle的過程。那麼對於服務端的channel,他在接受新鏈接的時候先執行那個handle呢。服務端處理新鏈接的pipeline中,已經自動添加了一個pipeline處理器 ServerBootstrapAcceptor
因此咱們先看下ServerBootstrapAcceptor的channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) { //1.泛型轉換新鏈接建立的channel final Channel child = (Channel) msg; //2.設置channel的handler child.pipeline().addLast(childHandler); for (Entry<ChannelOption<?>, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { //channel綁定到一個raector線程上 childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
1.將剛剛建立的channel泛型轉換出來
2.調用用戶代碼的childHandler屬性,注意,這裏只是添加了一個ChannelInitializer,相應的初始化還未運行,
3.註冊該channel,將該channel綁定到一個reactor線程,後續關於這個channel的事件,任務都是由該reactor線程處理。
如今咱們點進註冊的代碼
public ChannelFuture register(Channel channel) { return next().register(channel); }
public EventLoop next() { return (EventLoop) super.next(); }
next方法返回的是一個reactor線程,咱們看下netty是如何挑選線程的。點擊super.next
public EventExecutor next() { return chooser.next(); }
這裏出現一個chooser表明的是一個選擇策略,下面直接上代碼了
chooser = chooserFactory.newChooser(children);
public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTowEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } }
netty根據線程數量的奇偶性 會選擇出不一樣的選擇策略。二者惟一的區別就是一個是與運算,一個是取餘
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTowEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } } private static final class GenericEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[Math.abs(idx.getAndIncrement() % executors.length)]; } }
在咱們肯定一個reactor線程以後,咱們便開始了註冊的流程
io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); }
io.netty.channel.AbstractChannel.AbstractUnsafe#register
public final void register(EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
註冊的核心代碼即是register0了
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
爲channel添加Handler,這裏將添加handler任務包裝成Task
private final class PendingHandlerAddedTask extends PendingHandlerCallback { PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) { super(ctx); } @Override public void run() { callHandlerAdded0(ctx); } @Override void execute() { EventExecutor executor = ctx.executor(); if (executor.inEventLoop()) { callHandlerAdded0(ctx); } else { try { executor.execute(this); } catch (RejectedExecutionException e) { if (logger.isWarnEnabled()) { logger.warn( "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.", executor, ctx.name(), e); } remove0(ctx); ctx.setRemoved(); } } } }
最終調用io.netty.channel.ChannelInitializer#handlerAdded
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { ctx.handler().handlerAdded(ctx); ctx.setAddComplete(); }
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { initChannel(ctx); } }
這也就是咱們的用戶代碼
到這裏其實已經接近尾聲了。可是咱們的channel目前仍是沒法使用的。由於他並無註冊他感興趣的事件。他如今是一個沒有夢想的channel。因此咱們看下channel激活的具體邏輯
private void invokeChannelActive() { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelActive(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelActive(); } } public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead(); } private void readIfIsAutoRead() { if (channel.config().isAutoRead()) { channel.read(); } } public Channel read() { pipeline.read(); return this; } public final ChannelPipeline read() { tail.read(); return this; } ....... protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
最終在io.netty.channel.nio.AbstractNioChannel#doBeginRead中設置selectionKey對讀事件感興趣。
以上即是netty對新鏈接的處理
參考
https://www.jianshu.com/p/0242b1d4dd21 【netty源碼分析之新鏈接接入全解析】