Netty 系列目錄(http://www.javashuo.com/article/p-hskusway-em.html)html
ServerBootstap 建立時序圖以下:java
(1) 綁定 Reactor 線程池ios
服務端一般會構建兩個線程池,bossGroup 負責接收鏈接,childGroup 負責處理網絡 IO 以及系統 Task。git
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); // 省略... this.childGroup = childGroup; return this; } // bossGroup 由父類的 AbstractBootstrap 初始化 public B group(EventLoopGroup group) { // 省略... this.group = group; return self(); }
(2) 綁定 Channelgithub
public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); } // bind 方法時會用反射建立對應的 channel public B channelFactory(ChannelFactory<? extends C> channelFactory) { // 省略... this.channelFactory = channelFactory; return self(); }
(3) 綁定 ChannelHandlerpromise
// NioServerSocketChannel 使用,處理客戶端鏈接 .handler(new LoggerHandler()) // NioSocketChannel 使用,處理網絡 IO .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ServerHandler()); } })
這兩個 handler 的區別以下圖:網絡
下面看一下 ServerBootstrap 是如何綁定端口socket
private ChannelFuture doBind(final SocketAddress localAddress) { // 1. 建立 Channel 並將 Channel 註冊到 NioEventLoop 上 final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // 2. 若是 channel 註冊到 NioEventLoop 上成功,則綁定端口啓動服務 ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // 3. 若是未註冊成功,則綁定 ChannelFutureListener final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // channel 註冊到 NioEventLoop 上失敗 promise.setFailure(cause); } else { // channel 註冊到 NioEventLoop 上失敗則綁定端口 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
doBind 主要完成二件事:一是 Channel 的初始化並註冊到 NioEventLoop 上;二是綁定端口,啓動服務。ide
final ChannelFuture initAndRegister() { Channel channel = null; try { // 1. 建立 NioServerSocketChannel channel = channelFactory.newChannel(); // 2. 配置 channel 的參數 init(channel); } catch (Throwable t) { // 省略... } // 3. 將 channel 註冊到 NioEventLoop 上 ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
channelFactory.newChannel() 其實是用反射調用 NioServerSocketChannel 的無參構造器。oop
// 建立 NIO 底層的 ServerSocketChannel 對象 public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } // NioServerSocketChannel 須要註冊 OP_ACCEPT 事件 public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } // 設置成非阻塞模式,並註冊感興趣的事件 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; ch.configureBlocking(false); } // 建立 channel 是建立對應的 pipeline protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
ServerSocketChannel 建立爲何要用 provider.openServerSocketChannel()?
private static ServerSocketChannel newSocket(SelectorProvider provider) { // ServerSocketChannel.open() 每秒建立 5000 個鏈接時性能會下將 1% // https://github.com/netty/netty/issues/2308 return provider.openServerSocketChannel(); }
init 方法配置 channel 的 socket 參數和附加屬性,並配置相應的 handler。
void init(Channel channel) throws Exception { // 1. 配置 Socket 參數和 NioServerSocketChannel 的附加屬性 final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); // 2. 配置 NioServerSocketChannel 的 handler if (handler != null) { pipeline.addLast(handler); } // 3. 將接收客戶端的 ServerBootstrapAcceptor 註冊到 pipeline 中 // 注意:此時 channel 還未綁定到 eventLoop 上,直接調用會拋出空指針異常 ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
ChannelInitializer 的執行
config().group().register(channel)
NioServerSocketChannel 是如何註冊到 NioEventLoopGroup 是的詳見:http://www.javashuo.com/article/p-yrhrzohe-dx.html
NioServerSocketChannel 註冊到 eventLoop 後就會啓動 NioEventLoop 線程,專門處理對應 channel 的網絡 IO 事件。
// NioEventLoop 啓動 protected void run() { // 省略... processSelectedKeys(); } // Netty 對 NIO Selector 進行了優化,默認不開啓(processSelectedKeysPlain) private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
processSelectedKeysPlain 負責處理註冊在 selector 上的 channel
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { if (selectedKeys.isEmpty()) { return; } Iterator<SelectionKey> i = selectedKeys.iterator(); for (;;) { final SelectionKey k = i.next(); final Object a = k.attachment(); i.remove(); if (a instanceof AbstractNioChannel) { // 處理網絡 IO processSelectedKey(k, (AbstractNioChannel) a); } else { // 用戶自定義的 Task NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (!i.hasNext()) { break; } // 省略... } } // 分別處理 OP_CONNECT、OP_WRITE、OP_READ、OP_ACCEPT 事件 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { // 省略... final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); int readyOps = k.readyOps(); // OP_CONNECT if ((readyOps & SelectionKey.OP_CONNECT) != 0) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } // OP_WRITE if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } // OP_READ、OP_ACCEPT if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } }
能夠看到 channel 的網絡 IO 其實都是由 unsafe 類在處理,NioServerSocketChannel 的 unsafe 是在其父類 AbstractNioMessageChannel 中調用 newUnsafe() 初始化的。
// AbstractNioMessageChannel.NioMessageUnsafe#read public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { do { // 1. 接收客戶端的 NioSocketChannel int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } // 2. 觸發 pipeline 的 fireChannelRead、fireChannelReadComplete、fireExceptionCaught int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } }
NioMessageUnsafe 的 read 方法完成了二件事:一是接收客戶端的 NioSocketChannel;二是觸發 pipeline 的 fireChannelRead 事件完成 channel 的初始化工做 ,若有異常則觸發 fireExceptionCaught。真正接收客戶端請求的操做則委託給了子類 NioServerSocketChannel 的 doReadMessages 方法完成。
// 調用 NIO 底層接收客戶鏈接 protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } return 0; }
至此,NioServerSocketChannel 已經將請求的 NioSocketChannel 接收過來,但還未完成 channel 的初始化工做,如 handler 綁定,參數配置等。
上文提到 NioServerSocketChannel 在初始化的時候會綁定 ServerBootstrapAcceptor,這個 handler 完成了 channel 的初始化工做。NioServerSocketChannel 的 Pipeline 以下圖:
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; // 1. NioSocketChannel 綁定 handler 和相關配置參數 child.pipeline().addLast(childHandler); // 2. 配置 Socket 的 TCP 參數和附加屬性 setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } // 3. NioSocketChannel 註冊到 eventLoop 上 try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
天天用心記錄一點點。內容也許不重要,但習慣很重要!