歡迎關注公衆號:【 愛編程】
若是有須要後臺回覆 2019贈送 1T的學習資料哦!!
前文再續,書接上一回【NioEventLoop】。
在研究NioEventLoop執行過程的時候,檢測IO事件(包括新鏈接),處理IO事件,執行全部任務三個過程。其中檢測IO事件中經過持有的selector去輪詢事件,檢測出新鏈接。這裏複用同一段代碼。java
在開始分析前,先了解一下Channel的設計git
頂層Channel接口定義了socket事件如讀、寫、鏈接、綁定等事件,並使用AbstractChannel做爲骨架實現了這些方法。查看器成員變量,發現大多數通用的組件,都被定義在這裏github
第二層AbstractNioChannel定義了以NIO,即Selector的方式進行讀寫事件的監聽。其成員變量保存了selector相關的一些屬性。算法
第三層內容比較多,定義了服務端channel(左邊繼承了AbstractNioMessageChannel的NioServerSocketChannel)以及客戶端channel(右邊繼承了AbstractNioByteChannel的NioSocketChannel)。編程
本文開始探索一下Netty是如何接入新鏈接?主要分爲四個部分promise
1.檢測新鏈接
2.建立NioSocketChannel
3.分配線程和註冊Selector
4.向Selector註冊讀事件
Netty服務端在啓動的時候會綁定一個bossGroup,即NioEventLoop,在bind()
綁定端口的時候註冊accept(新鏈接接入)事件。掃描到該事件後,便處理。所以入口從:NioEventLoop#processSelectedKeys()
開始。app
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); //省略代碼 // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop //若是當前NioEventLoop是workGroup 則多是OP_READ,bossGroup是OP_ACCEPT if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //新鏈接接入以及讀事件處理入口 unsafe.read(); } }
關鍵的新鏈接接入以及讀事件處理入口unsafe.read();
socket
a).這裏的unsafe
是在Channel建立過程的時候,調用了父類AbstractChannel#AbstractChannel()
的構造方法,和pipeline
一塊兒初始化的。tcp
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
服務端:
unsafe 爲NioServerSockeChanne
l的父類AbstractNioMessageChannel#newUnsafe()建立,能夠看到對應的是AbstractNioMessageChannel的內部類NioMessageUnsafe
;ide
客戶端:
unsafe爲NioSocketChannel
的的父類AbstractNioUnsafe#newUnsafe()建立的話,它對應的是AbstractNioByteChannel的內部類NioByteUnsafe
b).unsafe.read()
NioMessageUnsafe.read()
中主要的操做以下:
1.循環調用jdk底層的代碼建立channel,並用netty的NioSocketChannel包裝起來,表明新鏈接成功接入一個通道。
2.將全部獲取到的channel存儲到一個容器當中,檢測接入的鏈接數,默認是一次接16個鏈接
3.遍歷容器中的channel,依次調用方法fireChannelRead,4.fireChannelReadComplete,fireExceptionCaught來觸發對應的傳播事件。
private final class NioMessageUnsafe extends AbstractNioUnsafe { //臨時存儲讀到的鏈接 private final List<Object> readBuf = new ArrayList<Object>(); @Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); //服務端接入速率處理器 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { //while循環調用doReadMessages()建立新鏈接對象 do { //獲取jdk底層的channel,並加入readBuf容器 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } //把讀到的鏈接作一個累加totalMessages,默認最多累計讀取16個鏈接,結束循環 allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } //觸發readBuf容器內全部的傳播事件:ChannelRead 讀事件 int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } //清空容器 readBuf.clear(); allocHandle.readComplete(); //觸發傳播事件:ChannelReadComplete,全部的讀事件完成 pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); //觸發傳播事件:exceptionCaught,觸發異常 pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }
而這一段關鍵代碼邏輯中 int localRead = doReadMessages(readBuf);
它建立jdk底層channel而且用NioSocketChannel包裝起來,將該channel添加到傳入的容器保存起來,同時返回一個計數。
protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { //將jdk底層的channel封裝到netty的channel,並存儲到傳入的容器當中 //this爲服務端channel buf.add(new NioSocketChannel(this, ch)); //成功和建立 客戶端接入的一條通道,並返回 return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
經過檢測IO事件輪詢新鏈接,當前成功檢測到鏈接接入事件以後,會調用NioServerSocketChannel#doReadMessages()
方法,進行建立NioSocketChannel
,即客戶端channel的過程。
下面就來了解一下NioSocketChannel的主要工做:
.查看原代碼作了兩件事,調用父類構造方法,實例化一個NioSocketChannelConfig。
public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); //實例化一個NioSocketChannelConfig config = new NioSocketChannelConfig(this, socket.socket()); }
1)、查看NioSocketChannel父類構造方法,主要是保存客戶端註冊的讀事件、channel爲成員變量,以及設置阻塞模式爲非阻塞。
public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); //實例化一個NioSocketChannelConfig config = new NioSocketChannelConfig(this, socket.socket()); } protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { //傳入感興趣的讀事件:客戶端channel的讀事件 super(parent, ch, SelectionKey.OP_READ); } protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); //保存客戶端channel爲成員變量 this.ch = ch; //保存感興趣的讀事件爲成員變量 this.readInterestOp = readInterestOp; try { //配置阻塞模式爲非阻塞 ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
最後調用父類的構造方法,是設置該客戶端channel對應的服務端channel,以及channel的id和兩大組件unsafe和pipeline
protected AbstractChannel(Channel parent) { //parent爲建立次客戶端channel的服務端channel(服務端啓動過程當中經過反射建立的) this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
2)、再看NioSocketChannelConfig實例化。主要是保存了javaSocket,而且經過setTcpNoDelay(true);
禁止了tcp的Nagle算法,目的是爲了儘可能讓小的數據包整合成大的發送出去,下降延時.
private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) { super(channel, javaSocket); calculateMaxBytesPerGatheringWrite(); } public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) { super(channel); if (javaSocket == null) { throw new NullPointerException("javaSocket"); } //保存socket this.javaSocket = javaSocket; // Enable TCP_NODELAY by default if possible. if (PlatformDependent.canEnableTcpNoDelayByDefault()) { try { //禁止Nagle算法,目的是爲了讓小的數據包儘可能集合成大的數據包發送出去 setTcpNoDelay(true); } catch (Exception e) { // Ignore. } } }
服務端啓動初始化的時候ServerBootstrap#init()
,主要作了一些參數的配置。其中對於childGroup,childOptions,childAttrs,childHandler
等參數被進行了單獨配置。做爲參數和ServerBootstrapAcceptor
一塊兒,被看成一個特殊的handle,封裝到pipeline中。ServerBootstrapAcceptor
中的eventLoop
爲workGroup
。
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { //省略了不少代碼............. @Override void init(Channel channel) throws Exception { //配置AbstractBootstrap.option final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } //配置AbstractBootstrap.attr final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } //配置pipeline ChannelPipeline p = channel.pipeline(); //獲取ServerBootstrapAcceptor配置參數 final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); //配置AbstractBootstrap.handler ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { //配置ServerBootstrapAcceptor,做爲Handle緊跟HeadContext pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); } //省略了不少代碼............. }
可見,整個服務端pipeline的結構以下圖所示。bossGroup
控制IO事件的檢測與處理,整個bossGroup
對應的pipeline只包括頭(HeadContext
)尾(TailContext
)以及中部的ServerBootstrap.ServerBootstrapAcceptor
。
當新鏈接接入的時候AbstractNioMessageChannel.NioMessageUnsafe#read()
方法被調用,最終調用fireChannelRead()
,方法來觸發下一個Handler的channelRead
方法。而這個Handler正是ServerBootstrapAcceptor
它是ServerBootstrap的內部類,同時繼承自ChannelInboundHandlerAdapter
。也是一個ChannelInboundHandler
。其中channelRead主要作了如下幾件事。
1.爲客戶端channel的pipeline添加childHandler
2.設置客戶端TCP相關屬性childOptions和自定義屬性childAttrs
3.workGroup選擇NioEventLoop並註冊Selector
1)、爲客戶端channel的pipeline添加childHandler
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { private final EventLoopGroup childGroup; private final ChannelHandler childHandler; private final Entry<ChannelOption<?>, Object>[] childOptions; private final Entry<AttributeKey<?>, Object>[] childAttrs; private final Runnable enableAutoReadTask; ServerBootstrapAcceptor( final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler, Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) { this.childGroup = childGroup; this.childHandler = childHandler; this.childOptions = childOptions; this.childAttrs = childAttrs; //省略了一些代碼。。。。。 @Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { //該channel爲客戶端接入時建立的channel final Channel child = (Channel) msg; //添加childHandler child.pipeline().addLast(childHandler); //設置TCP相關屬性:childOptions setChannelOptions(child, childOptions, logger); //設置自定義屬性:childAttrs for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { //選擇NioEventLoop並註冊Selector childGroup.register(child) .addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } } //省略了一些代碼。。。。。 }
客戶端channel
的pipeline添加childHandler
,在服務端EchoServer建立流程中,childHandler的時候,使用了ChannelInitializer
的一個自定義實例。而且覆蓋了其initChannel
方法,改方法獲取到pipeline並添加具體的Handler。查看ChannelInitializer
具體的添加邏輯,handlerAdded
方法。其實在initChannel
邏輯中,首先是回調到用戶代碼執行initChannel
,用戶代碼執行添加Handler的添加操做,以後將ChannelInitializer本身從pipeline中刪除。
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { // This should always be true with our current DefaultChannelPipeline implementation. // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers // will be added in the expected order. //初始化Channel if (initChannel(ctx)) { // We are done with init the Channel, removing the initializer now. removeState(ctx); } } } private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { //回調到用戶代碼 initChannel((C) ctx.channel()); } catch (Throwable cause) { // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...). // We do so to prevent multiple calls to initChannel(...). exceptionCaught(ctx, cause); } finally { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { //刪除自己 pipeline.remove(this); } } return true; } return false; } }
2)、設置客戶端TCP相關屬性childOptions和自定義屬性childAttrs
這點在ServerBootstrapAcceptor#init()
方法中已經體現
3)、workGroup選擇NioEventLoop並註冊Selector
這要從AbstractBootstrap#initAndRegister()
方法開始,而後跟蹤源碼會來到AbstractUnsafe#register()
方法
protected abstract class AbstractUnsafe implements Unsafe { //省略了一些代碼。。。。。 @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } //省略了一些代碼。。。。。 }
最後調用AbstractNioUnsafe#doRegister()
方法經過jdk的javaChannel().register
完成註冊功能。
protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe { //省略了一些代碼。。。。。 @Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } } //省略了一些代碼。。。。。 }
a)、入口:ServerBootstrap.ServerBootstrapAcceptor#channelRead()#childGroup.register()
;
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
b)、實際上調用了AbstractChannel.AbstractUnsafe#register0()
,觸發了通道激活事件;
//觸發通道激活事件,調用HeadContent的 pipeline.fireChannelActive();
c)、pipeline
的頭部開始,即DefaultChannelPipeline.HeadContext#channelActive()
從而觸發了readIfIsAutoRead()
;
@Override public void channelActive(ChannelHandlerContext ctx) { ctx.fireChannelActive(); readIfIsAutoRead(); }
d)、讀事件將從尾部的TailContent#read()被觸發,從而依次執行ctx.read(),從尾部開始,每一個outboundHandler的read()事件都被觸發。直到頭部。
@Override public final ChannelPipeline read() { tail.read(); return this; } @Override public ChannelHandlerContext read() { //獲取最近的outboundhandler final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); //並依次執行其read方法 if (executor.inEventLoop()) { next.invokeRead(); } else { Tasks tasks = next.invokeTasks; if (tasks == null) { next.invokeTasks = tasks = new Tasks(next); } executor.execute(tasks.invokeReadTask); } return this; }
e)、進入頭部HeadContext#read(),而且最終更改了selectionKey,向selector註冊了讀事件
HeadContext#read()
@Override public void read(ChannelHandlerContext ctx) { unsafe.beginRead(); }
AbstractChannel#beginRead()
@Override public final void beginRead() { assertEventLoop(); if (!isActive()) { return; } try { doBeginRead(); } catch (final Exception e) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireExceptionCaught(e); } }); close(voidPromise()); } }
AbstractNioMessageChannel#doBeginRead
@Override protected void doBeginRead() throws Exception { if (inputShutdown) { return; } super.doBeginRead(); }
AbstractNioChannel#doBeginRead()
@Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
參考文章:
Jorgezhong
Netty如何接入新鏈接基本流程如上所述,若是有誤,還望各位指正。建議先從前兩篇看起比較好理解點。
若是對 Java、大數據感興趣請長按二維碼關注一波,我會努力帶給大家價值。以爲對你哪怕有一丁點幫助的請幫忙點個贊或者轉發哦。
關注公衆號【愛編碼】,回覆2019有相關資料哦。