netty源碼學習

概述

Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.java

系統架構圖

eventloop.png

啓動過程

咱們首先經過netty官方的demo來分析一下,TelnetServerios

public final class TelnetServer { static final boolean SSL = System.getProperty("ssl") != null; static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8992" : "8023")); public static void main(String[] args) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new TelnetServerInitializer(sslCtx)); b.bind(PORT).sync().channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } 

經過上面的代碼,咱們總結一下:git

  • 服務端在啓動時,須要使用到兩個EventLoopGroup,一個是做爲監聽服務端口,用於accept客戶端的鏈接請求,並建立channel的線程池,線程數量通常設爲1便可;另外一個是負責channel的read & write等事件的worker線程池,若是沒有指定初始值大小,默認爲cpu核數*2,詳見源碼MultithreadEventLoopGroup
  • 指定channel類爲NioServerSocketChannel
  • 經過childHandler方法指定業務處理的ChannelHandler

系統監聽

TelnetServer中的bossGroup的線程數量設置爲1,我有個疑問,線程數量若是大於1會怎麼樣?咱們先看看netty相關的系統監聽和服務註冊的源碼。服務的起點在b.bind(PORT).sync().channel().closeFuture().sync(),那麼咱們就線程b.bind(PORT)開始:github

public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); } public ChannelFuture bind(SocketAddress localAddress) { validate(); if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress); } private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); ... if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { ... } } 

上面的三個方法的代碼中,最重要的是initAndRegister()和doBind0兩個方法,下面咱們先來看一下initAndRegister方法:編程

final ChannelFuture initAndRegister() {
        Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } ... ChannelFuture regFuture = config().group().register(channel); ... return regFuture; } 

其中,channelFactory.newChannel()會建立一個NioServerSocketChannel的實例,這個就和咱們的demo中.channel(NioServerSocketChannel.class)就聯繫起來了。咱們重點來看看init(channel)和config().group().register(channel),先來看看init方法,init方法在ServerBootstrap中:設計模式

void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); System.out.println("hanlder names is :"+p.names()); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); System.out.println("hanlder names is :"+p.names()); } 

上面的代碼能夠發現,init主要乾了下面的幾件事:api

  • 初始化option和AttributeKey參數;
  • 獲取到channel對應的pipeline,注意每一個channel的一輩子中都會有且只有一個pipeline,這裏咱們只要知道這個pipeline的類型是:DefaultChannelPipeline,咱們對於pipeline添加了兩行system.out代碼,第一行輸出:hanlder names is :[DefaultChannelPipeline$TailContext#0],;
  • p.addLast(new ChannelInitializer<Channel>()主要是爲了加入新的鏈接處理器,後面的章節會專門來介紹pipeline,加入完新的連接處理器後,咱們的輸出變爲了:hanlder names is :[ServerBootstrap$1#0, DefaultChannelPipeline$TailContext#0];

咱們再來看看config().group().register(channel)相關的代碼,其中config().group()獲取到的group就是demo中的:bossGroup,看一下此group下實現的register源碼:promise

public ChannelFuture register(Channel channel) { return next().register(channel); } 

其中的next()方法會今後group中獲取到一個NioEventLoop,關於建立NioEventLoop的過程及分配線程的細節,你們有興趣的能夠自行研究一下NioEventLoopGroup。接下來,咱們再來看看NioEventLoop的register方法:數據結構

public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; } 

其中promise.channel().unsafe().register方法在AbstractUnsafe類裏面:架構

public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } ... } } 

AbstractChannel.this.eventLoop = eventLoop 這行代碼將此unsafe對象和NioEventLoopGroup分配的NioEventLoop綁定,其實就是將NioServerSocketChannel和它的eventLoop進行綁定,使得此NioServerSocketChannel相關的代碼只能在eventLoop的專屬線程裏執行,這裏也能夠回答了咱們開頭的問題:「TelnetServer中的bossGroup的線程數量設置爲1,我有個疑問,線程數量若是大於1會怎麼樣?」,答案是:線程數量只能設置爲1,由於有且只有一個線程會服務於NioServerSocketChannel,設置多了是浪費。咱們再來看看register0()相關的代碼,注意register0()相關的代碼執行已是在eventLoop的專屬線程裏執行的了:

private void register0(ChannelPromise promise) { try { ... doRegister(); ... pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } ... } 

這裏面比較重要的是doRegister()、isActive(),咱們先來看看doRegister()方法:

protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } ... } } 

javaChannel().register方法調用jdk底層的channel進行註冊,具體邏輯就不深刻下去,咱們再來看看上面的isActive()方法:

public boolean isActive() { return javaChannel().socket().isBound(); } 

判斷端口是否綁定,由於咱們如今還沒綁定,因此這裏會返回false。接下來,咱們再來回頭看以前提到的AbstractBootstrap的doBind0()方法:

private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); } 

上面代碼中的channel.bind會調用到AbstractChannel的bind方法:

public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); } 

繼續來看DefaultChannelPipeline中的bind方法:

public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); } 

tail的類型是TailContext,咱們來看看它裏面的bind方法:

public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { safeExecute(executor, new Runnable() { @Override public void run() { next.invokeBind(localAddress, promise); } }, promise, null); } return promise; } 

上面的代碼中的next類型爲HeadContext,由於已經在eventLoop裏面,因此會直接執行next.invokeBind(localAddress, promise),源碼以下:

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { if (invokeHandler()) { try { ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } else { bind(localAddress, promise); } } 

((ChannelOutboundHandler) handler()).bind方法,咱們再來看看這個hanlder的bind方法:

public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); } 

又調到了unsafe裏面的方法,咱們繼續分析:

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { ... boolean wasActive = isActive(); try { doBind(localAddress); } ... if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); } 

核心代碼是doBind方法的調用,它在NioServerSocketChannel中,咱們來繼續分析:

protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } } 

doBind方法裏面就開始調用jdk的相關綁定端口的底層代碼,到此咱們nioserver的啓動流程就已經分析完畢,咱們來總結一下:

  • ServerBootstrap bossGroup的線程數設置爲1是最好的,由於在netty中任何channel的eventloop只能有一個;
  • ServerBootstap在啓動過程當中有兩個比較重要的流程分析,分別是:initAndRegister()和doBind0兩個方法,其中initAndRegister實現NioServerSocketChannel的建立、參數的初始化、eventloop的初始化和channel的綁定、業務的handler註冊到pipeline中;doBind0主要是調用jdk底層進行端口監聽;
  • 下圖是以時序圖的方式作的一個流程總結; 

啓動過程當中涉及到的設計模式總結:

  • 工廠方法+反射:NioServerSocketChannel類對象的建立使用了工廠方法+反射的機制,使得netty在架構上能夠支持Channel接口的實現類的擴展;
  • Future模式:netty中的ChannelFuture和ChannelPromise都是Future模式的使用和擴展;

ChannelPipeline

在前面的server啓動分析時,咱們就遇到了ChannelPipeline,這個章節咱們着重介紹一下ChannelPipeline。首先咱們來看一下ChannelPipeline的類結構關係圖: channelpipeline.jpg 如上圖所示,ChannelPipeline的繼承關係比較簡單,咱們實際使用的pipeline對象都是DefaultChannelPipeline類的對象。咱們在來看一張pipeline和其它重要對象的關係圖: channelpipe.png 由上面的圖片上能夠看出,如下幾點:

  • 在netty中,每個channel都有且只有一個ChannelPipeline爲之提供服務;
  • DefaultChannelPipeline中有兩個固定的ContextHandler存在,一個是head(HeadContext),一個是tail(TailContext);
  • 咱們須要添加的業務處理Context會添加到head和tail之間,並造成一個雙向鏈表;

咱們先提個問題,爲何要有雙向鏈表,難道單向的鏈表不能夠嗎?咱們先來看看DefaultChannelPipeline中的構造方法源碼:

protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; } 

DefaultChannelPipeline在初始化的時候,會建立兩個context,一個爲tail,一個爲head,tail和head組成雙向鏈表結構,後續業務添加的context/handler對,都會加入到這個雙向鏈表結構裏面。咱們先來看一下TailContext的源碼:

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { super(pipeline, null, TAIL_NAME, true, false); setAddComplete(); } } 

上面的代碼中,主要是調用了父類的構造方法:

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                                  boolean inbound, boolean outbound) {
        this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.inbound = inbound; this.outbound = outbound; // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor. ordered = executor == null || executor instanceof OrderedEventExecutor; } 

注意,tail的outbound標誌是false,inbound是true,從字面意義來理解,tail是用來處理inbound事件的,它不能處理outbound相關的事件。但真實的狀況卻並不徹底是這樣,head會是一個例外。head和tail它們既是HandlerContext的同時,又是HandlerContext關聯的hanlder,來看一下代碼:

public ChannelHandler handler() { return this; } 

咱們再來看看HeadContext的源碼:

final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } } 

head的inbound標誌是true,outbound的標誌是false,按照以前的說法,head就只能處理outbound相關的事件的,但事實上不是這樣的:咱們能夠發現一個head和tail實現細節的不一樣:head同時實現了ChannelOutboundHandler和ChannelInboundHandler接口,而tail只實現了ChannelInboundHandler接口。下面以一個inbound事件來進行分析一下:先來看DefaultPipeline中的fireChannelRegistered():

public final ChannelPipeline fireChannelRegistered() { AbstractChannelHandlerContext.invokeChannelRegistered(head); return this; } 

方法調用了AbstractChannelHandlerContext的靜態方法,並將head做爲參數:

static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRegistered(); } }); } } 

上面的代碼將會在eventloop下調用head的invokeChannelRegistered,咱們再來看看:

private void invokeChannelRegistered() { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRegistered(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRegistered(); } } 

上面的方法會調用到head的channelRegistered方法裏面,咱們暫時分析到這裏,代碼分析的結論與咱們剛剛的分析判斷是一致的:head既能夠處理inbound事件,也能夠處理outbound事件。

inbound & outbound事件

咱們剛剛分析的ChannelRegistered,就是一個典型的inbound事件。下面咱們來分析一下inbound和outbound事件。下圖是來自於netty官網關於inbound和outbound事件順序的圖示。由圖可知:

  • inbound事件通常是來源於socket.read方法;
  • outbound事件來源於上層業務的調用,通常會調用到socket.write方法;
  • inbound和outbound事件的處理方向相反,但都會沿着各自的方向單向傳播;
I/O Request
                                        via Channel or
                                    ChannelHandlerContext
                                                    | +---------------------------------------------------+---------------+ | ChannelPipeline | | | \|/ | | +---------------------+ +-----------+----------+ | | | Inbound Handler N | | Outbound Handler 1 | | | +----------+----------+ +-----------+----------+ | | /|\ | | | | \|/ | | +----------+----------+ +-----------+----------+ | | | Inbound Handler N-1 | | Outbound Handler 2 | | | +----------+----------+ +-----------+----------+ | | /|\ . | | . . | | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()| | [ method call] [method call] | | . . | | . \|/ | | +----------+----------+ +-----------+----------+ | | | Inbound Handler 2 | | Outbound Handler M-1 | | | +----------+----------+ +-----------+----------+ | | /|\ | | | | \|/ | | +----------+----------+ +-----------+----------+ | | | Inbound Handler 1 | | Outbound Handler M | | | +----------+----------+ +-----------+----------+ | | /|\ | | +---------------+-----------------------------------+---------------+ | \|/ +---------------+-----------------------------------+---------------+ | | | | | [ Socket.read() ] [ Socket.write() ] | | | | Netty Internal I/O Threads (Transport Implementation) | +-------------------------------------------------------------------+ 

inbound事件

咱們來詳細的分析一下inbound事件相關的源碼。首先,咱們來看看inbound事件有哪些:

fireChannelRegistered; fireChannelUnregistered; fireChannelActive; fireChannelInactive; fireChannelRead(Object msg); fireChannelReadComplete; fireUserEventTriggered(Object event) fireChannelWritabilityChanged; fireExceptionCaught(Throwable cause); 

inbound事件共用9個事件,它們都是以fire...開頭。咱們來簡單看一下fireChannelRead相關的流程代碼,流程的起點是在NioByteUnsafe的read方法:

public final void read() {
            ...
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); ... allocHandle.incMessagesRead(1); readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } ... } 

每次從底層的socket裏面讀取到內容,netty都會調用pipeline的fireChannelRead方法,此方法就是咱們剛剛看到的inbound事件裏面的方法:

public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; } 

上面的pipeline代碼會調用到AbstractChannelHandlerContext的invokeChannelRead方法並將head和讀取到的msg傳遞過去,咱們再來看看invokeChannelRead:

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } 

上面的方法會先調用head的invokeChannelRead方法,進入head中進行處理:

private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } 

流程進入到head的channelRead方法,咱們來看看:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } 

上面的代碼中的ctx仍是head自己,咱們來看看head的fireChannelRead方法:

public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; } 

上面的方法中會經過咱們看看已經看到過的invokeChannelRead方法,調用到head的下一個的處理inbound事件的Context中去,後面代碼咱們便不展開。咱們總結一下inbound相關事件的處理:

  • inbound事件通常是來源於socket的read方法;
  • netty目前的inbound事件一共有9種;
  • netty的inbound事件在pipeline中方法的起點是以fire...()開頭的方法,inbound事件會從head節點開始向後傳遞並處理;

outbound事件

咱們再來看看outbound的事件有哪些,outbound的事件比inbound事件會複雜一些,由於它的外部調用接口會比較多,可是抽象一下,就是下面這幾種事件:

bind; connect; disconnect; close; deregister; read; write; flush; 

outbound的事件入口也在pipeline的公共方法裏,例如write的流程調用:

public final ChannelFuture writeAndFlush(Object msg) { return tail.writeAndFlush(msg); } 

上面的方法會調用到tail的writeAndFlush方法裏面。關於write流程的分析,後面會有專門的章節分析,在此不展開了。

異常事件

經過上面的分析,咱們都瞭解了inbound和outbound事件相關處理細節,那麼在處理inbound和outbound事件時,若是處理邏輯遇到了異常,ChannelPipeline是如何處理的?咱們接下來便來分析一下ChannelPipeline裏關於異常的處理。按下面三種狀況,異常事件的處理狀況是不一樣的:

  • inbound事件;
  • outbound事件,且須要ChannelPromise模式回調通知的方法;
  • outbound事件,但不須要ChannelPromise模式回調通知的方法;

其中,第一和第三兩種狀況處理方式相同。咱們先來看看inbound異常事件的處理。

inbound異常事件

咱們選擇channelActive來分析,首先來看DefaultPipeline中的fireChannelActive:

public final ChannelPipeline fireChannelActive() { AbstractChannelHandlerContext.invokeChannelActive(head); return this; } 

咱們再接着往下看:

static void invokeChannelActive(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelActive(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelActive(); } }); } } 

上面的靜態方法中,會直接進入到next.invokeChannelActive(),此時的ChannelHandlerContext爲head:

private void invokeChannelActive() { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelActive(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelActive(); } } 

在上面的代碼中,咱們假設在try{}模塊內拋出了異常,流程便走到了notifyHandlerException:

private void notifyHandlerException(Throwable cause) { ... invokeExceptionCaught(cause); } 

直接看重點的invokeExceptionCaught:

private void invokeExceptionCaught(final Throwable cause) { if (invokeHandler()) { try { handler().exceptionCaught(this, cause); } catch (Throwable error) { ... } } ... } 

上面的代碼會調用到Context對應的handler的exceptionCaught方法,目前咱們的context仍是head:

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } 

再接着看AbstractChannelHandlerContext的方法:

public ChannelHandlerContext fireExceptionCaught(final Throwable cause) { invokeExceptionCaught(next, cause); return this; } 

注意上面方法中的next,它是head的next節點,咱們再來看看invokeExceptionCaught:

static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) { ObjectUtil.checkNotNull(cause, "cause"); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeExceptionCaught(cause); } ... } 

上面的代碼會調用next(下一個Context)的invokeExceptionCaught方法,最終會調用到能處理異常的hanlder,而後終止,netty建議咱們將異常處理的Context做爲最後一個,也就是tail前面的一個。若是沒有能處理此異常的hanlder,那麼最後會走到tail中的處理方法。

inbound異常事件總結:

  • 異常事件也是在ChanelPipeline上進行傳遞,傳遞順序爲由前向後;
  • 通常會將tail前一個Context做爲異常事件的處理節點,如沒有,則會在tail中進行處理;
  • outbound異常事件(不須要回調通知ChannelPromise),與inbound事件的處理邏輯徹底一致;

outbound異常事件(ChannelPromise)

關於outbound異常事件(ChannelPromise)的處理流程並非在鏈表上進行傳遞處理的,它由於須要通知到ChannelPromise,所以,它的代碼最終會走到PromiseNotificationUtil方法中:

public static void tryFailure(Promise<?> p, Throwable cause, InternalLogger logger) { if (!p.tryFailure(cause) && logger != null) { Throwable err = p.cause(); if (err == null) { logger.warn("Failed to mark a promise as failure because it has succeeded already: {}", p, cause); } else { logger.warn( "Failed to mark a promise as failure because it has failed already: {}, unnotified cause: {}", p, ThrowableUtil.stackTraceToString(err), cause); } } } 

上面的代碼若是調用通知promise成功,則返回,不然打印日誌。

outbound異常事件(ChannelPromise)總結:

  • 處理流程簡單,直接通知ChannePromise,並不會在ChannelPipeline上進行傳遞;

最後,咱們總結一下inbound和outbound事件:

  • inbound事件通常是來源於socket的read方法;
  • netty目前的inbound事件一共有9種;
  • netty的inbound事件在pipeline中方法的起點是以fire...()開頭的方法,inbound事件會從head節點開始向後傳遞並處理;
  • outbound事件通常從pipeline中的方法開始,而後會調用到tail中的方法,而後向前傳遞並處理,最終會通過head,調用到socket的操做;
  • netty目前的outbound事件一共有8種;
  • pipeline的雙向鏈表數據結構是爲了支持inbound和outbound兩種事件的傳遞;

ChannelPipeline小結

  • ChannelPipeline的底層數據結構是一個雙向鏈表結構,雙向鏈表從數據結構上即支持了inbound的outbound兩種事件的流轉;
  • 每一個channel都會建立惟一的ChannelPipeline爲之服務;
  • inbound事件的起點是head、outbound事件的起點是tail;
  • ChannelPipeline能夠支持Context&Handler動態的添加和刪除;
  • 異常事件的處理分爲inbound異常事件處理、outbound異常事件處理且須要通知ChannelPromise和outbound異常事件但無需通知ChannelPromise三種狀況。其中第一種和第三種,都須要在ChannelPipeline上從前到後進行傳遞;第二種直接回調通知ChannelPromise便可;

涉及到的設計模式總結:

  • 管道模型(pipeline):在netty中,全部inbound和outbound事件的傳遞都離不開pipeline,它的pipeline模型的底層是一個雙向鏈表的數據結構,每一個鏈表的節點表明一個對應事件的handler,當事件傳遞到某個節點時,先判斷是否應該處理,最後向下一個節點傳遞,能夠支持handler的熱插拔;

write流程

由於write的流程相對比較複雜,在此咱們單獨拿一個章節來進行分析。首先,咱們來拿netty4中的telnet的demo來講明netty4的write流程:

  • 涉及到的類:TelnetClient、AbstractChannel、DefaultChannelPipeline、TailContext、AbstractChannelHandlerContext、SingleThreadEventExecutor、NioEventLoop、AbstractEventExecutor、AbstractChannelHandlerContext.WriteAndFlushTask、

  • 流程順序是:TelnetClient -> AbstractChannel -> DefaultChannelPipeline -> TailContext(AbstractChannelHandlerContext) -> NioEventLoop (SingleThreadEventExecutor) ->NioEventLoop(run方法) -> AbstractEventExecutor(safeExecute方法) -> WriteAndFlushTask(run方法) -> AbstractChannelHandlerContext(hanlder爲StringEncoder) -> StringEncoder(write方法) -> HeadContext(invokeWrite方法) -> NioSocketChannelUnsafe(write)

流程的起點在TelnetClient,咱們來看一下源碼:

lastWriteFuture = ch.writeAndFlush(line + "\r\n"); 

其中的ch爲NioSocketChannel,telnetclient直接調用了NioSocketChannel的父類AbstractChannel(不是直接的父類)中的writeAndFlush方法,代碼以下:

public ChannelFuture writeAndFlush(Object msg) { return pipeline.writeAndFlush(msg); } 

上面的方法比較簡單,直接調用了DefaultChannelPipeline的writeAndFlush方法,也就是outbound事件開始在pipeline中傳遞:

public final ChannelFuture writeAndFlush(Object msg) { return tail.writeAndFlush(msg); } 

上面的方法調用了TailContext的writeAndFlush方法,實際上是TailContext的父類AbstractChannelHandlerContext中的方法:

public ChannelFuture writeAndFlush(Object msg) { return writeAndFlush(msg, newPromise()); } public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } if (isNotValidPromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return promise; } write(msg, true, promise); return promise; } private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } } 

上面的最後一個方法中,會被調用兩次。第一次調用時,第一次的next的ChannelHandlerContext對應的context爲handler對應爲io.netty.handler.codec.string.StringEncoder的context,context和handler的對應關係爲一對一。首先由於executor.inEventLoop() = false,也就是當前線程和channel的專屬負責讀寫的線程不是同一個線程,因此會先走到else中的邏輯裏面,先建立一個WriteAndFlushTask類型的task,而後調用safeExecute方法:

private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) { try { executor.execute(runnable); } catch (Throwable cause) { try { promise.setFailure(cause); } finally { if (msg != null) { ReferenceCountUtil.release(msg); } } } } 

safeExecute會調用NioEventLoop(SingleThreadEventExecutor)裏的execute方法:

public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } } 

上面的代碼重點在於addTask方法,咱們來看一下細節:

protected void addTask(Runnable task) { if (task == null) { throw new NullPointerException("task"); } if (!offerTask(task)) { reject(task); } } final boolean offerTask(Runnable task) { if (isShutdown()) { reject(); } return taskQueue.offer(task); } 

上面的代碼顯示了,以前生成的task會最終存進類型爲: 的taskQueue中LinkedBlockingQueue中,到此爲止,業務線程已經將write的操做任務經過隊列移交給了NioEventLoop的線程,那麼咱們再來看看NioEventLoop是如何處理上面的task任務的:

protected void run() { for (;;) { try { ... if (ioRatio == 100) { ... } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } ... } } 

上面代碼中最核心的處理以前task的地方是經過runAllTasks方法,咱們再來看看runAllTasks方法:

protected boolean runAllTasks(long timeoutNanos) { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); ... for (;;) { safeExecute(task); ... task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; } protected static void safeExecute(Runnable task) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception. Task: {}", task, t); } } 

上段代碼經過調用父類AbstractEventExecutor的safeExecute()方法,最終調用到了在以前生成的WriteAndFlushTask的run方法,咱們來看一下在WriteAndFlushTask中的代碼流程:

public final void run() { try { // Check for null as it may be set to null if the channel is closed already if (ESTIMATE_TASK_SIZE_ON_SUBMIT) { ctx.pipeline.decrementPendingOutboundBytes(size); } write(ctx, msg, promise); } finally { // Set to null so the GC can collect them directly ctx = null; msg = null; promise = null; handle.recycle(this); } } protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { ctx.invokeWrite(msg, promise); } public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { super.write(ctx, msg, promise); ctx.invokeFlush(); } 

上面的代碼在WriteAndFlushTask及它的父類中,最終會執行這行代碼:ctx.invokeWrite(msg, promise),又調用回了AbstractChannelHandlerContext(hanlder爲StringEncoder),咱們來分析一下:

private void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { System.out.println("not invoke write."); write(msg, promise); } } private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } 

在上面的代碼中最終會執行到((ChannelOutboundHandler) handler()).write(this, msg, promise),也就是StringEncoder的write方法:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { CodecOutputList out = null; try { if (acceptOutboundMessage(msg)) { out = CodecOutputList.newInstance(); @SuppressWarnings("unchecked") I cast = (I) msg; try { encode(ctx, cast, out); } } ... } finally { if (out != null) { final int sizeMinusOne = out.size() - 1; if (sizeMinusOne == 0) { ctx.write(out.get(0), promise); } ... } } } 

上面的代碼主要是對string進行編碼,而後再調用ctx的write方法,此刻的ctx爲StringEncoder對應的context,咱們再來分析一下context的write方法:

public ChannelFuture write(final Object msg, final ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } try { if (isNotValidPromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return promise; } } catch (RuntimeException e) { ReferenceCountUtil.release(msg); throw e; } write(msg, false, promise); return promise; } private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } } 

咱們又回到了以前分析過的write方法,只不過此次的next的類型爲HeadContext,已是write的最後一個context了,代碼最終會執行到next.invokeWrite(m, promise),咱們來繼續分析:

private void invokeWrite(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); } else { write(msg, promise); } } private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } 

上面的兩個方法最終會執行((ChannelOutboundHandler) handler()).write(this, msg, promise),由於如今的context是HeadContext,那麼咱們來看看HeadContext的Handler()會是什麼?

public ChannelHandler handler() { return this; } public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); } 

原來HeadContext的Handler()就是它本身,代碼會調用到unsafe的write方法,unsafe的類型爲:NioSocketChannelUnsafe,咱們再來看看進入到unsafe中的代碼:

public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; ... outboundBuffer.addMessage(msg, size, promise); } 

上面的代碼將msg信息存入到outboundBuffer中,咱們以前在研究WriteAndFlushTask的run方法時,最後還有一個flush操做,當將msg信息存入到outbondBuffer後,unsafe中的flush方法會被調用,咱們來看一下:

public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); } protected void flush0() { if (inFlush0) { // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; ... try { doWrite(outboundBuffer); } catch (Throwable t) { ... } finally { inFlush0 = false; } } 

上面的方法,最終會調用此unsafe的doWrite方法:

protected void doWrite(ChannelOutboundBuffer in) throws Exception { SocketChannel ch = javaChannel(); int writeSpinCount = config().getWriteSpinCount(); do { if (in.isEmpty()) { // All written so clear OP_WRITE clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; } // Ensure the pending writes are made of ByteBufs only. int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); int nioBufferCnt = in.nioBufferCount(); // Always us nioBuffers() to workaround data-corruption. // See https://github.com/netty/netty/issues/2761 switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. writeSpinCount -= doWrite0(in); break; case 1: { // Only one ByteBuf so use non-gathering write // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. ByteBuffer buffer = nioBuffers[0]; int attemptedBytes = buffer.remaining(); final int localWrittenBytes = ch.write(buffer); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } default: { // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. // We limit the max amount to int above so cast is safe long attemptedBytes = in.nioBufferSize(); final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above. adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } } } while (writeSpinCount > 0); incompleteWrite(writeSpinCount < 0); } 

最終代碼將由unsafe的doWrite方法來調用jdk的nio相關操做。

write流程小結:

經過分析netty4的源碼及流程,咱們總結以下:

  • netty4中的最終write的線程是channel的worker線程,與read線程爲同一個線程;
  • 每一個channel在它的生命週期內,有且只有一個worker線程爲它服務;
  • write操做的流程正如咱們上面總結的順序:TelnetClient -> AbstractChannel -> DefaultChannelPipeline -> TailContext(AbstractChannelHandlerContext) -> NioEventLoop (SingleThreadEventExecutor) ->NioEventLoop(run方法) -> AbstractEventExecutor(safeExecute方法) -> WriteAndFlushTask(run方法) -> AbstractChannelHandlerContext(hanlder爲StringEncoder) -> StringEncoder(write方法) -> HeadContext(invokeWrite方法) -> NioSocketChannelUnsafe(write);
  • 下面的時序圖詳細的總結了netty4裏面的write流程 

 

小結

netty小結

在本文中,咱們前後分析了:netty服務啓動流程、netty的信息流轉通道channelPipeline機制、並詳細的分析了netty4的write流程。咱們如今給本次分享作一個小結:

  • netty極其簡化了nio的編程複雜度;
  • bossGroup的線程數設置爲1是最好,在netty的eventloop架構下,一個channel只能被同一個thread服務;
  • 一個channel會有惟一的一個ChannelPipeline,ChannelPipeline的核心是一個雙向鏈表結構。inbound事件從head開始,outbound事件從tail開始,其它的業務context都在head和tail之間,按照順序處理;
  • netty的inbound和outbound事件最終都會在channel的惟一的eventloop架構下按順序執行;
相關文章
相關標籤/搜索