網絡編程 - Netty(ChannelPipeline)

初始化

網絡編程 - Netty(Channel)中,Channel的初始化,也提到了會一併初始化ChannelPipeline,一個Channel對應着一個ChannelPipeline。
代碼在AbstractChannel類中的構造方法:編程

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    // 建立一個NioSocketChannel.NioSocketChannelUnsafe的內部類對象
    unsafe = newUnsafe();
    // 建立一個Pipeline,並把當前的Channel賦值給Pipeline
    pipeline = newChannelPipeline();
}

protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

ChannelPipeline的類結構圖以下,能夠看出ChannelPipeline接口繼承了ChannelInboundInvoker和ChannelOutboundInvoker接口。
image.png
ChannelInboundInvoker用於入站操做,會調用ChannelPipeline中下一個ChannelInboundHandler的方法,ChannelOutboundInvoker用於出站操做,會調用ChannelPipeline中下一個ChannelOutboundHandler的方法。
DefaultChannelPipeline的構造函數,除了ChannelFuture、Channel、Promise外,還構造了TailContext和HeadContext,再經過next和prev把兩個連接起來。segmentfault

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

TailContext的類結構圖:
image.png
HeadContext的類結構圖:
image.png
能夠看出,HeadContext比TailContext多繼承了ChannelOutboundHandler接口。
TailContext構造方法,設置pipline,並標誌inbound爲true。經過類結構圖,看出這兩個便是ChannelHandler也是ChannelHandlerContext。安全

TailContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, TAIL_NAME, true, false);
    setAddComplete();
}

HeadContext構造方法,設置pipline,並標誌outbound爲true,另外還設置了unsafe,由於出站涉及到於socket交互。網絡

HeadContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, HEAD_NAME, false, true);
    unsafe = pipeline.channel().unsafe();
    setAddComplete();
}

addLast

網絡編程 - Netty(Channel)中,提到了Channel的初始化,即channel = channelFactory.newChannel(),這句執行完後,就執行init(channel)方法。socket

void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    p.addLast(config.handler());

    // 部分代碼略
}

addLast方法:ide

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        // 檢查是否安全
        checkMultiplicity(handler);
        // 根據handler建立一個AbstractChannelHandlerContext
        newCtx = newContext(group, filterName(name, handler), handler);
        // 把新的AbstractChannelHandlerContext加入到鏈表倒二
        addLast0(newCtx);

        if (!registered) {
            newCtx.setAddPending();
            // 給pendingHandlerCallbackHead賦值,並傳遞newCtx
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

checkMultiplicity,在網絡編程 - 初識Netty的服務端的例子,EchoServerHandler 是有@ChannelHandler.Sharable註解的,用來標識能被多個channel安全的共享,是線程安全的。若是一個沒有表示線程安全的,又被重複使用,就須要拋異常。函數

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {
        // 若是沒有@Sharable註解,且已經加入過了,則拋異常
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
        if (!h.isSharable() && h.added) {
            throw new ChannelPipelineException(
                    h.getClass().getName() +
                    " is not a @Sharable handler, so can't be added or removed multiple times.");
        }
        // 設置已加入
        h.added = true;
    }
}

根據handler建立一個AbstractChannelHandlerContext。oop

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

DefaultChannelHandlerContext(
        DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
    // isInbound判斷是不是ChannelInboundHandler類型
    // isOutbound判斷是不是ChannelOutboundHandler類型
    super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
}

addLast0,把新的AbstractChannelHandlerContext加入到鏈表倒二,headContext <-> nextContext變成headContext <-> newCtx <-> nextContext。this

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

invokeHandlerAddedIfNeeded

在Channel成功調用doRegister方法註冊後,會調用DefaultChannelPipeline類中的invokeHandlerAddedIfNeeded()方法。
方法調用鏈是DefaultChannelPipeline#invokeHandlerAddedIfNeeded() -> DefaultChannelPipeline#callHandlerAddedForAllHandlers() -> PendingHandlerCallback#execute() -> PendingHandlerCallback#callHandlerAdded0() -> ChannelInitializer#handlerAdded() -> ChannelInitializer#initChannel()。spa

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
        try {
            // 這個方法,會執行咱們在.handler(new ChannelInitializer<SocketChannel>()裏面的內容
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {            
            exceptionCaught(ctx, cause);
        } finally {
            // 移除ChannelInitializer
            remove(ctx);
        }
        return true;
    }
    return false;
}

private void remove(ChannelHandlerContext ctx) {
    try {
        ChannelPipeline pipeline = ctx.pipeline();
        if (pipeline.context(this) != null) {
            pipeline.remove(this);
        }
    } finally {
        initMap.remove(ctx);
    }
}

綜上,咱們的雙向鏈表由headContext <-> nextContext變成headContext <-> ChannelInitializer <-> nextContext變成headContext <-> ChannelInitializer <-> newCtx2 <-> newCtx1 <-> nextContext變成headContext <-> newCtx2 <-> newCtx1 <-> nextContext。由於這個時候,ChannelInitializer完成了本身的使命,即執行他的initChannel方法。

關係圖

Channel、ChannelPipeline、ChannelHandler 以及ChannelHandlerContext 之間的關係。綜合這篇和Channel那篇,能夠總結出,Channel初始化時,會建立一個ChannelPipeline,ChannelPipeline內部維護這HeadContext和TailContext,ChannelPipeline會add多個ChannelHandler時,會把ChannelHandler保證成HandlerContext,再與已有的雙向鏈表鏈接起來。
image.png

相關文章
相關標籤/搜索