netty源碼解解析(4.0)-10 ChannelPipleline的默認實現--事件傳遞及處理 netty源碼解解析(4.0)-2 Chanel的接口設計 netty源碼解解析(4.0)-8 Ch

  事件觸發、傳遞、處理是DefaultChannelPipleline實現的另外一個核心能力。在前面在章節中粗略地講過了事件的處理流程,本章將會詳細地分析其中的全部關鍵細節。這些關鍵點包括:html

  • 事件觸發接口和對應的ChannelHandler處理方法。
  • inbound事件的傳遞。 
  • outbound事件的傳遞。
  • ChannelHandler的eventExecutor的分配。

  

  事件的觸發方法和處理方法promise

  netty提供了三種觸發事件的方式:經過Channel觸發,經過ChannelPipleline觸發,經過ChannelHandlerContext觸發。ide

  

  Channel觸發oop

  在netty源碼解解析(4.0)-2 Chanel的接口設計這一章中,列出了Channel觸發事件的全部方法。Channel定義的全部事件觸發方法中,都是用來觸發outbound事件的,只有read方法比較特殊,它直接觸發outbound方法,若是能讀到數據則會觸發inbound方法。下面是Channel的事件觸發方法,和ChannelHandler事件處理方法的對應關係。post

  outbound事件this

Channel方法 ChannelOutboundHandler方法
bind bind(ChannelHandlerContext, SocketAddress, ChannelPromise)  
connect connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)
disconnect disconnect(ChannelHandlerContext, ChannelPromise)
close close(ChannelHandlerContext, ChannelPromise)
deregister deregister(ChannelHandlerContext, ChannelPromise)
write write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
flush flush(ChannelHandlerContext ctx)
writeAndFlush 先調用write而後調用flush
read read(ChannelHandlerContext)

  inbound事件url

Channel方法 ChannelInboundHandler方法
read channelRead(ChannelHandlerContext, Object)
channelReadComplete(ChannelHandlerContext)

  Channel經過調用ChannelPipleline的同名方方法觸發事件,如下是AbstractChannel實現的bind的方法spa

1 @Override
2 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
3  return pipeline.bind(localAddress, promise);
4 }

  其餘方法的實現和bind相似。設計

  

  ChannelPipleline觸發netty

  在netty源碼解解析(4.0)-8 ChannelPipeline的設計這一章中,列出了全部觸發事件的方法。 ChannelPipleline的outbound事件的觸發方法和處理方法的對應關係和Channel同樣,這裏就再也不重複羅列。下面是inbound事件的觸發方法和ChannelHandler事件處理方法的對應關係:

  inbound事件

ChannelPipleline方法 ChannelInboundHandler方法
fireChannelRegistered channelRegistered(ChannelHandlerContext) 
fireChannelUnregistered channelUnregistered(ChannelHandlerContext)
fireChannelActive channelActive(ChannelHandlerContext)
fireChannelInactive channelInactive(ChannelHandlerContext)
fireChannelRead channelRead(ChannelHandlerContext, Object)
fireChannelReadComplete channelReadComplete(ChannelHandlerContext)
fireExceptionCaught exceptionCaught(ChannelHandlerContext, Throwable)
fireUserEventTriggered userEventTriggered(ChannelHandlerContext, Object)
fireChannelWritabilityChanged channelWritabilityChanged(ChannelHandlerContext)

   在DefaultChannelPipleline實現中,事件觸發是經過調用AbstractChannelHandlerContext的方法實現的。inbound事件的觸發方式是調用對應的invokeXXX靜態方法。例如: fireChannelRegistered方法會調用invokeChannelRegistered靜態方法:

1 @Override
2 public final ChannelPipeline fireChannelRegistered() {
3      AbstractChannelHandlerContext.invokeChannelRegistered(head);
4      return this;
5 }

  這裏會把鏈表的頭做爲輸入參數,代表inbound事件是從鏈表頭開始處理。其餘inbound事件觸發方法的實現和這個相似。

  outbound事件的觸發方式是調用AbstractChannelHandlerContext的同名方法,例如bind的方法的實現:

1     @Override
2     public final ChannelFuture bind(SocketAddress localAddress) {
3         return tail.bind(localAddress);
4     }

  這調用鏈表尾的方法,代表outbind事件從鏈表尾開始處理。其餘outbound事件的觸發方法和這個相似。

 

  ChannelHandlerContext觸發

  Channel的事件觸發方法會調用DefaultChannelPipleline的觸發方法,而DefaultChannelPipleline的觸發方法調用AbstractChannelHandlerContext的觸發方法。因此,不管是Channel仍是ChannelPipleline,他們的事件觸發能力都是AbstractChannelHandlerContext提供的,所以ChannelHandlerContext事件觸發方法和ChannelHandler事件處理方法的對應關係和Channel,ChannelPipleline是同樣的。

  

  三種觸發方法的差別

  Channel只能觸發outbound事件,事件從鏈表的tail開始,傳遞到head。ChannelPipleline既能夠觸發outbound事件,又能觸發inbound事件,outbound事件的處理和Channel觸發同樣,inbound事件的從鏈表的head開始,傳遞到tail。ChannelHandlerContext觸發方式最爲靈活,若是調用ChannelHandlerContext的實例觸發事件,outbound事件從這個實例的節點開始向head方向傳遞,inbound事件從這個實例的節點開始向tail傳遞,此外還能夠調用AbstractChannelHandlerContext提供的靜態方法從鏈表中的任意一個節點開始觸發可處理事件。總結起來就是,Channel和ChannelPipleline觸發的事件只能從鏈表的head或tail節點開始觸發和傳遞事件,而ChannelHanderContext能夠從鏈表中任何一個節點觸發和傳遞事件。

 

  事件的傳遞

  事件傳遞的功能在AbstractChannelHandlerContext,這個類的定義以下:

    abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext

  inbound事件的觸發和傳遞

  每一個inbound事件的觸發傳遞實現包含3個方法,一個普通方法fireXXX,一個靜態方法invokeXXX, 和一個普通方法invokeXXX。每一次inbound事件傳遞就是一輪fire-invoke-invoe的調用。下面是channelRegisterd事件的相關的代碼。

 1     @Override
 2     public ChannelHandlerContext fireChannelRegistered() {
 3         invokeChannelRegistered(findContextInbound());
 4         return this;
 5     }
 6 
 7     static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
 8         EventExecutor executor = next.executor();
 9         if (executor.inEventLoop()) {
10             next.invokeChannelRegistered();
11         } else {
12             executor.execute(new Runnable() {
13                 @Override
14                 public void run() {
15                     next.invokeChannelRegistered();
16                 }
17             });
18         }
19     }
20 
21     private void invokeChannelRegistered() {
22         if (invokeHandler()) {
23             try {
24                 ((ChannelInboundHandler) handler()).channelRegistered(this);
25             } catch (Throwable t) {
26                 notifyHandlerException(t);
27             }
28         } else {
29             fireChannelRegistered();
30         }
31     }

  這三個方法各有不一樣的職責:

  • fireChannelRegistered調用findContextInbound找到鏈表上下一個ChannelInboundHandler類型的節點,並把這個節點做爲參數傳給靜態方法invokeChannelRegistered。
  • 靜態invokeChannelRegistered負責調用普通invokeChannelRegistered方法,並確保這個方法在eventExecutor中調用。
  • 普通invokeChannelRegistered負責調用handler對應的事件處理方法,處理異常。若是這個handler對應的handlerAdded方法沒有完成調用,這handler還不能處理事件,跳過這節點,繼續下一輪fire-invoke-invoke循環。

  在廣泛invoveChannelRegistered中,正常狀況下會調用handler的事件處理方法,這裏是handler的channelRegistered方法。若是事件處理方法沒有調用對應的fire方法,那麼這個事件的傳遞就算終止了。因此事件傳遞還須要handler的配合。

  inbound事件傳遞的關鍵實如今findContextInbound中,這個方法是實現以下:

1     private AbstractChannelHandlerContext findContextInbound() {
2         AbstractChannelHandlerContext ctx = this;
3         do {
4             ctx = ctx.next;
5         } while (!ctx.inbound);
6         return ctx;
7     }

  這裏使用next向後遍歷節點,使用inbound屬性判斷節點持有的handler是否ChannelInboundHandler類型,直到找到一個合適的節點爲止。若是沒找到,則返回最後一個節點。這樣就對鏈表中最後一個節點提出了一些特殊的要求:必須是持有ChannelInboundHandler的handler而且;而且要負責終止事件傳遞。DefaultPipleline.TailContext類的實現就知足了這兩點要求。

  

  outbound事件的觸發和傳遞

  每一個outbound事件的觸發和傳遞包含兩點方法: XXX, invokeXXX。 下面以bind事件爲例看看outbound事件的觸發和傳遞:

 1     @Override
 2     public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
 3         if (localAddress == null) {
 4             throw new NullPointerException("localAddress");
 5         }
 6         if (isNotValidPromise(promise, false)) {
 7             // cancelled
 8             return promise;
 9         }
10 
11         final AbstractChannelHandlerContext next = findContextOutbound();
12         EventExecutor executor = next.executor();
13         if (executor.inEventLoop()) {
14             next.invokeBind(localAddress, promise);
15         } else {
16             safeExecute(executor, new Runnable() {
17                 @Override
18                 public void run() {
19                     next.invokeBind(localAddress, promise);
20                 }
21             }, promise, null);
22         }
23         return promise;
24     }
25 
26     private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
27         if (invokeHandler()) {
28             try {
29                 ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
30             } catch (Throwable t) {
31                 notifyOutboundHandlerException(t, promise);
32             }
33         } else {
34             bind(localAddress, promise);
35         }
36     }

 

  bind方法調用findContextOutbound找到鏈表上下一個持有ChannelOutboundHandler類型handler的節點,而且確保invokeBind方法在eventExecutor中執行。invokeBind方法負責調用handler對應的事件處理方法,這裏是調用handler的bind方法。handler的bind方法中須要調用節點bind方法,這個事件才能繼續傳遞下去。

  outbound事件傳遞的關鍵實如今findContextOutbound中,這個方法的實現以下:

1     private AbstractChannelHandlerContext findContextOutbound() {
2         AbstractChannelHandlerContext ctx = this;
3         do {
4             ctx = ctx.prev;
5         } while (!ctx.outbound);
6         return ctx;
7     }

 

  這裏使用鏈表的prev向前遍歷,使用outbound屬性判斷節點持有的handler是否ChannelOutboundHandler類型,直到找到一個爲止。若是沒找到,將會返回鏈表頭的節點。這樣對鏈表頭的節點也提出了特殊的要求:它持有的handler必須是ChannelOutboundHandler類型。

  

  鏈表節點持有的handler類型

  在事件的傳遞和處理過程當中,必須把inbound事件交給ChannelInboundChandler類型的handler處理,把outbound事件交給ChannelOutboundChandler類型的handler處理。爲了判斷handler類型,定義了兩個boolean類型的屬性: inbound, outbound。inbound==true表示handler是ChannelInboundHandler類型, outbound==true表示handler是ChannelOutboundHandler類型。這兩個值在AbstractChannelHandlerContext構造方法中初始化,初始化值來自輸入的參數。DefaultChannelHandlerContext在構造方法中把這兩個參數的值傳入。

1     DefaultChannelHandlerContext(
2             DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
3         super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
4         if (handler == null) {
5             throw new NullPointerException("handler");
6         }
7         this.handler = handler;
8     }

  使用isInbound的的值設置inbound,isOutbound的值設置outbound。這兩方法只是簡單的使用了instanceof運算符。

1     private static boolean isInbound(ChannelHandler handler) {
2         return handler instanceof ChannelInboundHandler;
3     }
4 
5     private static boolean isOutbound(ChannelHandler handler) {
6         return handler instanceof ChannelOutboundHandler;
7     }

 

  爲ChannelHandler分配eventExecutor

  把一個channleHandler添加到ChannelPipleline中時,ChannelPipleline會給它分配一個eventExecutor, 它的全部的事件處理方法都會在這個executor中執行。若是使用帶group參數的add方法,executor會從group中取,不然會把channel的eventLoop當成這個handler的executor使用。 從group中分配execuor的操做在建立持有handler的鏈表節點時完成:

    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }

 

  childExecutor方法負責從group中取出一個executor分配給handler:

 1     private EventExecutor childExecutor(EventExecutorGroup group) {
 2         if (group == null) {
 3             return null;
 4         }
 5         Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
 6         if (pinEventExecutor != null && !pinEventExecutor) {
 7             return group.next();
 8         }
 9         Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
10         if (childExecutors == null) {
11             // Use size of 4 as most people only use one extra EventExecutor.
12             childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
13         }
14         // Pin one of the child executors once and remember it so that the same child executor
15         // is used to fire events for the same channel.
16         EventExecutor childExecutor = childExecutors.get(group);
17         if (childExecutor == null) {
18             childExecutor = group.next();
19             childExecutors.put(group, childExecutor);
20         }
21         return childExecutor;
22     }

  實際的分配操做要稍微複雜一些。取決於channel的ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP設置,若是沒有設置這個選項或設置成true,  表示每一個channelPipleline只能從一個group中分配一個executor,  這是默認行爲,實現代碼是地9行-19行,這種狀況下每個使用了同一個group的handler,都會被分配到同一個executor中。若是把這個選擇設置成false,這是簡單地從group中取出一個executor,實現代碼是地7行,這種狀況下,每個使用了同一個group的handler被均勻地分配到group中的每個executor中。

  若是沒有指定group,會在地3行退出,這裏沒有分配executor。這種狀況會在AbstractChannelHandlerContext的executor方法中獲得妥善處理:

1     @Override
2     public EventExecutor executor() {
3         if (executor == null) {
4             return channel().eventLoop();
5         } else {
6             return executor;
7         }
8     }

  第4行,處理了沒分配executor的狀況,調用channel的eventLoop方法獲得channel的eventLoop。

相關文章
相關標籤/搜索