淺談Netty中的ChannelPipeline

Netty中另外兩個重要組件—— ChannelHandle,ChannelHandleContext,Pipeline。Netty中I/O事件的傳播機制以及數據的過濾和寫出均由它們負責。git

pipeline的初始化

步驟

  • pipeline在建立Channel的時候被建立
    • AbstractChannel中 pipeline = newChannelPipeline()-DefaultChannelPipeline
  • pipeline節點數據結構:ChannelHandlerContext的雙向鏈表
  • pipeline中的兩大哨兵:head和tail

分析

  • pipeline在建立Channel的時候被建立
    • 在服務端Channel和客戶端Channel建立的時候,調用父類AbstractChannel初始化時候會對pipeline進行初始化。
// AbstractChannel(..)
    protected AbstractChannel(Channel parent) {
        ...
        // 建立pipeline
        pipeline = newChannelPipeline();
    }
    // newChannelPipeline()
    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
    // DefaultChannelPipeline(..)
    protected DefaultChannelPipeline(Channel channel) {
        ...
        tail = new TailContext(this);
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }
複製代碼
  • 看下HeadContext和TailContext構造方法
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
        ...
        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
        ...    
    }
    final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
        TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME, true, false);
            ...
    }
    abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
      ...
      AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) {
          this.name = ObjectUtil.checkNotNull(name, "name");
          this.pipeline = pipeline;
          this.executor = executor;
          this.inbound = inbound;
          this.outbound = outbound;
          ...
      }
複製代碼

head與tail它們都會調用父類AbstractChannelHandlerContext構造器去完成初始化,由此咱們能夠預見ChanelPipeline裏面存放的是一個個ChannelHandlerContext,根據DefaultChannelPipeline構造方法咱們能夠知道它們數據結構爲雙向鏈表,根據AbstractChannelHandlerContext構造方法,咱們能夠發現head指定的爲出棧處理,而tail指定的爲入棧處理器。github

  • pipeline中的兩大哨兵:head和tail

pipeline裏面的事件傳播機制咱們接下來驗證,可是咱們能夠推測出入棧從head開始傳播,由於它是出棧處理器,因此它只管往下傳播不作任何處理,一直到tail會結束。出棧從tail開始傳播,由於他是入棧處理器,因此它只管往下傳播事件便可,也不作任何處理。這麼看來對於入棧,從head開始到tail結束;對於出棧偏偏相反,從tail開始到head結束。promise

添加刪除ChannelHandler

步驟

  • 添加ChannelHandler流程
    • 判斷是否重複添加
      • filterName(..)
    • 建立節點並添加至鏈表
    • 回調添加完成事件
      • callHandlerAdded0(..)
  • 刪除ChannelHandler流程
    • 找到節點
    • 鏈表的刪除
    • 回調刪除handler事件

分析

  • 判斷是否重複添加
// filterName(..)
    private String filterName(String name, ChannelHandler handler) {
        if (name == null) {
            return generateName(handler);
        }
        checkDuplicateName(name);
        return name;
    }
    // 判斷重名
    private void checkDuplicateName(String name) {
        if (context0(name) != null) {
            throw new IllegalArgumentException("Duplicate handler name: " + name);
        }
    }
    // 找有沒有同名的context
    private AbstractChannelHandlerContext context0(String name) {
        AbstractChannelHandlerContext context = head.next;
        while (context != tail) {
            if (context.name().equals(name)) {
                return context;
            }
            context = context.next;
        }
        return null;
    }
複製代碼
  • 建立節點並添加至鏈表
// 插入到鏈表中tail節點的前面。
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
複製代碼
  • 順着callHandlerAdded0(..)方法一直跟到AbstractChannelHandlerContext的callHandlerAdded(..)
final void callHandlerAdded() throws Exception {
        ...
        if (setAddComplete()) {
            // 調用具體handler的handlerAdded方法
            handler().handlerAdded(this);
        }
    }
複製代碼
  • 刪除ChannelHandler流程
  • 找到節點
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
        AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
        if (ctx == null) {
            throw new NoSuchElementException(handler.getClass().getName());
        } else {
            return ctx;
        }
    }
    // 相同堆內地址即爲找到
    public final ChannelHandlerContext context(ChannelHandler handler) {
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        AbstractChannelHandlerContext ctx = head.next;
        for (;;) {

            if (ctx == null) {
                return null;
            }

            if (ctx.handler() == handler) {
                return ctx;
            }

            ctx = ctx.next;
        }
    }
複製代碼
  • 鏈表的刪除
private static void remove0(AbstractChannelHandlerContext ctx) {
        AbstractChannelHandlerContext prev = ctx.prev;
        AbstractChannelHandlerContext next = ctx.next;
        prev.next = next;
        next.prev = prev;
    }
複製代碼
  • 回調handler remove方法
final void callHandlerRemoved() throws Exception {
        try {
            // Only call handlerRemoved(...) if we called handlerAdded(...) before.
            if (handlerState == ADD_COMPLETE) {
                handler().handlerRemoved(this);
            }
        } finally {
            // Mark the handler as removed in any case.
            setRemoved();
        }
    }
複製代碼

事件和異常的傳播

步驟

  • inBound事件的傳播
    • ChannelRead事件的傳播
    • SimpleInBoundHandler處理器
  • outBound事件的傳播
    • write事件的傳播
  • 異常的傳播
    • 異常觸發鏈

分析

  • inBound事件的傳播分析
// 省略代碼
    ... 
    serverBootstrap
    ...
    .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new Inbound1())
                                .addLast(new InBound2())
                                .addLast(new Inbound3());
                    }
    });
    ...
    public class Inbound1 extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("at InBound1: " + msg);
            ctx.fireChannelRead(msg);
        }
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.channel().pipeline().fireChannelRead("hello cj");
        }
    }
    public class Inbound2 extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("at InBound2: " + msg);
            ctx.fireChannelRead(msg);
        }

    }
    public class Inbound3 extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("at InBound3: " + msg);
            ctx.fireChannelRead(msg);
        }
    }
複製代碼
  • 咱們來畫一下如今客戶端接入作入棧處理時,客戶端Channel的pipeline中的狀況:

從head開始一直向下一個inboud傳播直到tail結束,也能夠看到ChannelHandlerContext起到的正是中間紐帶的做用, 它能拿到handle也能夠向上獲取到channel與pipeline,一個channel只會有一個pipeline,一個pipeline能夠有多個入棧handler和出棧handler,並且每一個handler都會被ChannelHandlerContext包裹着。事件傳播依賴的ChannelHandlerContext的fire*方法。bash

  • 咱們來運行下代碼驗證下: telnet建立一個客戶端鏈接
  • 控制檯打印

按照咱們上邊說的那樣 InBoud1 -> InBound2 -> InBoud3數據結構

  • outBound事件的傳播分析
public class Outbound1 extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        System.out.println("oubound1 write:" + msg);
        ctx.write(msg, promise);
    }

}

public class Outbound2 extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        System.out.println("oubound2 write:" + msg);
        ctx.write(msg, promise);
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        ctx.executor().schedule(()-> {
            ctx.channel().pipeline().write("hello cj...");
        }, 5, TimeUnit.SECONDS);
    }

}

public class Outbound3 extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        System.out.println("oubound3 write:" + msg);
        ctx.write(msg, promise);
    }

}
複製代碼
  • 咱們來畫一下如今向客戶端寫出時間作出棧處理時,客戶端Channel的pipeline中的狀況:

與入棧事件傳遞順序是徹底相反的,也就是從鏈表尾部開始。ide

  • 咱們驗證下結果
  • 異常的傳播
public class Inbound1 extends ChannelInboundHandlerAdapter {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("Inbound1...");
        super.exceptionCaught(ctx, cause);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        throw new RuntimeException("cj test throw caught...");
    }
}
public class Inbound3 extends ChannelInboundHandlerAdapter {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("Inbound2...");
        super.exceptionCaught(ctx, cause);
    }

}
public class Outbound1 extends ChannelOutboundHandlerAdapter {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("Outbound1...");
        super.exceptionCaught(ctx, cause);
    }

}
public class Outbound2 extends ChannelOutboundHandlerAdapter {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("Outbound2...");
        super.exceptionCaught(ctx, cause);
    }

}
public class Outbound3 extends ChannelOutboundHandlerAdapter {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("Outbound3...");
        super.exceptionCaught(ctx, cause);
    }

}
複製代碼

異常的傳播過程是從head一直遍歷到tail結束,並在tail中將其打印出來。ui

  • 直接驗證下結果
  • 對於ctx.write()和ctx.pipeline().write()有和不一樣
ctx.write("hello cj...");
  ctx.pipeline().write("hello cj...");
複製代碼

ctx.write(..) 咱們按照上面的內容是能夠想到的,ctx.write實際上是直接激活當前節點的下一個節點write,因此它不會從尾部開始向前遍歷全部的outbound,而ctx.pipeline().write(..)咱們看源碼能夠知道,它先調用pipeline的write方法,跟蹤源碼(下圖)能夠發現,他是從tail開始遍歷的,全部的outboud會依次被執行。同理inbound也是如此this

源碼地址

github.com/coderjia061…spa

相關文章
相關標籤/搜索