Netty4學習筆記(1)-- ChannelPipeline

Netty4

Netty是一個和MINA相似的Java NIO框架,目前的最新版本是4.0.13,這兩個框架的主要做者好像都是同一個韓國人html

 

Channel

Channel是Netty最核心的接口,一個Channel就是一個聯絡Socket的通道,經過Channel,你能夠對Socket進行各類操做。java

 

ChannelHandler

用Netty編寫網絡程序的時候,你不多直接操縱Channel,而是經過ChannelHandler來間接操縱Channel。apache

 

ChannelPipeline

ChannelPipeline實際上應該叫作ChannelHandlerPipeline,能夠把ChannelPipeline當作是一個ChandlerHandler的鏈表,當須要對Channel進行某種處理的時候,Pipeline負責依次調用每個Handler進行處理。每一個Channel都有一個屬於本身的Pipeline,調用Channel#pipeline()方法能夠得到Channel的Pipeline,調用Pipeline#channel()方法能夠得到Pipeline的Channel。api

ChannelPipeline的方法有不少,其中一部分是用來管理ChannelHandler的,以下面這些:promise

  1. ChannelPipeline addFirst(String name, ChannelHandler handler);  
  2. ChannelPipeline addLast(String name, ChannelHandler handler);  
  3. ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);  
  4. ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);  
  5. ChannelPipeline remove(ChannelHandler handler);  
  6. ChannelHandler remove(String name);  
  7. ChannelHandler removeFirst();  
  8. ChannelHandler removeLast();  
  9. ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);  
  10. ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);  
  11. ChannelHandler first();  
  12. ChannelHandler last();  
  13. ChannelHandler get(String name);  

根據上面的方法,可以大概想象的到Pipeline按照什麼樣的方式組織Handler。網絡

 

ChannelHandlerContext

ChannelPipeline並非直接管理ChannelHandler,而是經過ChannelHandlerContext來間接管理,這一點經過ChannelPipeline的默認實現DefaultChannelPipeline能夠看出來。框架

調用ChannelHandlerContext#channel()方法能夠獲得和Context綁定的Channel,調用ChannelHandlerContext#handler()方法能夠獲得和Context綁定的Handler。ide

 

 

ChannelPipeline和ChannelHandlerContext默認實現函數

DefaultChannelHandlerContext和DefaultChannelPipeline是ChannelHandlerContext和ChannelPipeline的默認實現,下面是它們的部分代碼:this

  1. final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext {  
  2.   
  3.     volatile DefaultChannelHandlerContext next;  
  4.     volatile DefaultChannelHandlerContext prev;  
  5.   
  6.     private final boolean inbound;  
  7.     private final boolean outbound;  
  8.     private final AbstractChannel channel;  
  9.     private final DefaultChannelPipeline pipeline;  
  10.     private final String name;  
  11.     private final ChannelHandler handler;  
  12.     private boolean removed;  
  13.   
  14.     // ...  
  15. }  

 

  1. final class DefaultChannelPipeline implements ChannelPipeline {  
  2.     // ...  
  3.   
  4.     final DefaultChannelHandlerContext head;  
  5.     final DefaultChannelHandlerContext tail;  
  6.   
  7.     // ...  
  8. }  


從上面的代碼能夠看出,在DefaultPipeline內部,DefaultChannelHandlerContext組成了一個雙向鏈表:

 

再來看看DefaultChannelPipeline的構造函數:

  1. public DefaultChannelPipeline(AbstractChannel channel) {  
  2.     if (channel == null) {  
  3.         throw new NullPointerException("channel");  
  4.     }  
  5.     this.channel = channel;  
  6.   
  7.     TailHandler tailHandler = new TailHandler();  
  8.     tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler);  
  9.   
  10.     HeadHandler headHandler = new HeadHandler(channel.unsafe());  
  11.     head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler);  
  12.   
  13.     head.next = tail;  
  14.     tail.prev = head;  
  15. }  


能夠看到,DefaultChinnelPipeline內部使用了兩個特殊的Handler來表示Handler鏈的頭和尾:

 

ChannelHandler的種類

從上面DefaultChannelHandlerContext代碼能夠知道,Handler實際上分爲兩種,Inbound和Outbound,這一點也能夠從ChannelHandler接口的子接口獲得證實:

  1. public interface ChannelInboundHandler extends ChannelHandler {  
  2.   // ...  
  3. }  
  4.   
  5. public interface ChannelOutboundHandler extends ChannelHandler {  
  6.   // ...  
  7. }  

 

 

事件的傳播

爲了搞清楚事件如何在Pipeline裏傳播,讓咱們從Channel的抽象子類AbstractChannel開始,下面是AbstractChannel#write()方法的實現:

  1. public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {  
  2.     // ...  
  3.     @Override  
  4.     public Channel write(Object msg) {  
  5.         return pipeline.write(msg);  
  6.     }  
  7.     // ...  
  8. }  


AbstractChannel直接調用了Pipeline的write()方法:

 

再看DefaultChannelPipeline的write()方法實現:

  1. final class DefaultChannelPipeline implements ChannelPipeline {  
  2.     // ...  
  3.     @Override  
  4.     public ChannelFuture write(Object msg) {  
  5.         return tail.write(msg);  
  6.     }  
  7.     // ...  
  8. }  


由於write是個outbound事件,因此DefaultChannelPipeline直接找到tail部分的context,調用其write()方法:

 

接着看DefaultChannelHandlerContext的write()方法:

  1. final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext {  
  2.     // ...  
  3.     @Override  
  4.     public ChannelFuture write(Object msg) {  
  5.         return write(msg, newPromise());  
  6.     }  
  7.   
  8.     @Override  
  9.     public ChannelFuture write(final Object msg, final ChannelPromise promise) {  
  10.         if (msg == null) {  
  11.             throw new NullPointerException("msg");  
  12.         }  
  13.   
  14.         validatePromise(promise, true);  
  15.   
  16.         write(msg, false, promise);  
  17.   
  18.         return promise;  
  19.     }  
  20.   
  21.     private void write(Object msg, boolean flush, ChannelPromise promise) {  
  22.         DefaultChannelHandlerContext next = findContextOutbound();  
  23.         next.invokeWrite(msg, promise);  
  24.         if (flush) {  
  25.             next.invokeFlush();  
  26.         }  
  27.     }  
  28.   
  29.     private DefaultChannelHandlerContext findContextOutbound() {  
  30.         DefaultChannelHandlerContext ctx = this;  
  31.         do {  
  32.             ctx = ctx.prev;  
  33.         } while (!ctx.outbound);  
  34.         return ctx;  
  35.     }  
  36.   
  37.     private void invokeWrite(Object msg, ChannelPromise promise) {  
  38.         try {  
  39.             ((ChannelOutboundHandler) handler).write(this, msg, promise);  
  40.         } catch (Throwable t) {  
  41.             notifyOutboundHandlerException(t, promise);  
  42.         }  
  43.     }  
  44.   
  45.     // ...  
  46. }  


context的write()方法沿着context鏈往前找,直至找到一個outbound類型的context爲止,而後調用其invokeWrite()方法:

 

invokeWrite()接着調用handler的write()方法:

最後看看ChannelOutboundHandlerAdapter的write()方法實現:

  1. public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {  
  2.     // ...  
  3.     @Override  
  4.     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {  
  5.         ctx.write(msg, promise);  
  6.     }  
  7.     // ...  
  8. }  


默認的實現調用了context的write()方法而不作任何處理,這樣write事件就沿着outbound鏈繼續傳播:

 

可見,Pipeline的事件傳播,是靠Pipeline,Context和Handler共同協做完成的。

 

參考資料

netty.io

Netty in Action

Intercepting Filter

相關文章
相關標籤/搜索