Netty channelRegisteredChannelActive---源碼分析
通過下面的分析咱們能夠了解netty讀數據的一個過程,以及爲何channelActive方法、channelReadComplete方法會回調ChannelOutboundHandler的read方法。segmentfault
上文說到在HeadContext的channelActive方法中會調用readIfIsAutoRead();該方法一樣會在HeadContext的channelReadComplete(xxx)中調用。 readIfIsAutoRead();源碼以下:多線程
private void readIfIsAutoRead() { if (channel.config().isAutoRead()) { channel.read(); } }
channel.config().isAutoRead()能夠經過ChannelOption.AUTO_READ設置。若是設置爲false,那麼channel便不會主動讀數據,除非顯示的調用ChannelHandlerContext的read()ide
AbstractChannel的read()以下oop
@Override public Channel read() { pipeline.read(); return this; }
在read()方法中調用了pipeline的read()方法源碼分析
DefaultChannelPipeline的read()方法this
@Override public final ChannelPipeline read() { tail.read(); return this; }
那麼接下來的重點就是DefaultChannelPipeline的tail及tail.read()方法了。先看一下tail對應的TailContext類,TailContext是DefaultChannelPipeline的內部類線程
DefaultChannelPipelinerest
final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; ...省略代碼... protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
TailContextnetty
// A special catch-all handler that handles both bytes and messages. final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); setAddComplete(); } @Override public ChannelHandler handler() { return this; } ...省略代碼...
TailContext繼承自AbstractChannelHandlerContext,同時實現了ChannelInboundHandler,也是多重身份。
TailContext的read()方法是繼承自AbstractChannelHandlerContext,TailContext沒有重寫。code
AbstractChannleHandlerContext的read()以下:
@Override public ChannelHandlerContext read() { final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeRead(); } else { Runnable task = next.invokeReadTask; if (task == null) { next.invokeReadTask = task = new Runnable() { @Override public void run() { next.invokeRead(); } }; } executor.execute(task); } return this; } private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
其中findContextOutbound()是找到下一個Outbound的ChannelHandlerContext。那麼由tail.read()所表明的含義即是從pipeline中的尾部的最後一個ChannelInboundHandler開始往前查找是Outbound的HandlerContext.
而後該HandlerContext的invokeRead()方法被調用。
如下分析和read過程沒多大關係也能夠跳過
AbstractChannleHandlerContext的read()方法中的
if (executor.inEventLoop()) { next.invokeRead(); } else { Runnable task = next.invokeReadTask; if (task == null) { next.invokeReadTask = task = new Runnable() { @Override public void run() { next.invokeRead(); } }; } executor.execute(task); }
AbstractEventExecutor的inEventLoop()
@Override public boolean inEventLoop() { return inEventLoop(Thread.currentThread()); }
上面代碼的含義是若是調用ChannelHandlerContext read() 所在的線程和executor是同一個線程,那麼直接執行AbstractChannelHandlerContext的invokeRead()方法,不然封裝成任務,放到executor的任務隊列,去等待執行。 這種相似的代碼在netty中很常見,這是netty中不用考慮多線程問題的緣由。netty用這種方式很好的規避了多線程所帶來的問題,很值得咱們借鑑
那麼這個executor怎麼來的呢?看一下AbstractChannelHandlerContext的executor()方法
@Override public EventExecutor executor() { if (executor == null) { return channel().eventLoop(); } else { return executor; } }
若是executor 爲null,就返回channel().eventLoop()。這裏channel().eventLoop()就是每一個channel所對應的EventLoop,專門用來處理IO事件,所以不能被阻塞,不能執行耗時任務,該eventLoop會在channel建立時會和channel綁定,ChannelInboundHandler的channelRegistered()也就會被回調。咱們建立ServerBootstrap是會指定一個WokerGroup例如NioEventLoopGroup,那麼這個eventLoop便會是其中的一員。
那若是executor不爲null,executor是怎麼來的呢?
AbstractChannelHandlerContext的構造方法
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor. ordered = executor == null || executor instanceof OrderedEventExecutor; }
executor是經過構造方法傳進來的。pipeline在添加handler時能夠指定EventExecutorGroup(能夠查看ChannelPipeline接口的API),即是這麼傳進來的,具體的分析過程此處略去(可查看netty 耗時任務如何處理去查看具體分析過程),由於不是此篇文章的重點。
這樣咱們就能處理耗時任務,而不阻塞IO線程了。
第2小節分析到AbstractChannelHandlerContext的invokeRead()方法會被調用,那麼invokeRead()實現了什麼功能?
private void invokeRead() { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).read(this); } catch (Throwable t) { notifyHandlerException(t); } } else { read(); } }
該方法所表達的含義很簡單就是回調ChannelOutboundHandler的read(xxx)方法。若是咱們的自定義的ChannelOutboundHandler繼承自ChannelOutboundHandlerAdapter,而且沒有重寫該方法,或者在重寫的方法中調用了super.read(ctx); 那就會重複調用ChannelHandlerContext的read(),即AbstractChannelHandlerContext的read()方法。這樣read(xxx)回調便會在ChannelHandlerContext的做用下從pipleline的ChannelOutboundHandler中的尾部傳遞到頭部,直到DefaultChannelPipeline的DefaultChannelPipeline的HeadContext.
HeadContext的read(xxx)方法以下,HeadContext自己也是ChannelOutboundHandler
@Override public void read(ChannelHandlerContext ctx) { unsafe.beginRead(); }
以NioChannel爲例,unsafe.beginRead();最終會調用到AbstractNioChannel的doBeginRead()方法,其對應的源碼以下:
@Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
該方法裏就是Java Nio的相關操做,SelectionKey的性趣集中添加OP_READ,最終實現讀數據。