Netty的ChannelPipline傳播源碼解析

 1、基礎鋪墊

1. JAVA中的基本位運算符

運算符 描述
&
|
~
^ 異或
<< 左移
>> 右移

2. 位運算解釋與實例

&(與)緩存

十進制 二進制
3 0 0 1 1
5 0 1 0 1
& 後結果:1 0 0 0 1

即:對應位都爲 1 時,才爲 1,不然全爲 0。安全

|(或)ide

十進制 二進制
3 0 0 1 1
5 0 1 0 1
| 後結果 :7 0 1 1 1

即:對應位只要有 1 時,即爲 1,不然全爲 0。oop

~(非)學習

十進制 二進制
3 0 0 1 1
~ 後結果:12 1 1 0 0

即:對應位取反。this

異或 ^atom

十進制 二進制
3 0 0 1 1
5 0 1 0 1
^ 後結果:6 0 1 1 0

即:只要對應爲不一樣即爲 1。spa

3. 配合Netty實例

咱們在以往學習Netty中見到過相似於如下代碼:.net

selectionKey.interestOps(interestOps | readInterestOp);

咱們重點關注位運算:interestOps | readInterestOp線程

該行代碼的意思是位運算計算一個數字,該數字包含 | 先後的數字!

//初始化一個值
int interestOps = 0;
//給當前這個值增長一個可讀事件
interestOps |= OP_READ;
//給當前的值增長一個可寫的事件
interestOps |= OP_WRITE;
//判斷當前的事件是否是包含可讀事件 true
boolean isRead = (interestOps & OP_READ) == OP_READ;
//判斷當前的事件是否是不包含可讀事件 false
boolean isRead = (interestOps & OP_READ) == 0;
//剔除可讀事件
interestOps &= ~OP_READ;
//剔除可寫事件
interestOps &= ~OP_WRITE;

2、源碼解析

1. 建立管道

io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
	//建立一個管道上下文  尾部節點
    tail = new TailContext(this);
    //建立一個管道上下文  頭部節點
    head = new HeadContext(this);
	//頭部節點的下一個節點設置爲尾部節點
    head.next = tail;
    //尾部節點的上一個節點設置爲頭部節點
    tail.prev = head;
}

能夠看到,這裏初始化管道的時候,管道內部存在兩個Handler tail和head節點,兩個節點組成雙向鏈表!

watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=

2. 向通道內添加一個Handler處理器

ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered");
        super.channelRegistered(ctx);
    }
});

上述代碼再一個Netty開發中是很常見的一個代碼,這裏向通道內添加了一個 ChannelInboundHandlerAdapter,咱們進入到addLast方法:

@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}

//進入到 addLast
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    ObjectUtil.checkNotNull(handlers, "handlers");

    for (ChannelHandler h: handlers) {
        if (h == null) {
            break;
        }
        addLast(executor, null, h);
    }

    return this;
}

//進入到 addLast(executor, null, h);
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        //驗證是否重複添加改handler
        checkMultiplicity(handler);
		//將handler封裝爲上下文對象
        newCtx = newContext(group, filterName(name, handler), handler);
        //將該節點添加到雙向鏈表中
        addLast0(newCtx);
        ........................忽略其餘代碼..............
    }
    ........................忽略其餘代碼..............
    return this;
}

這裏總共分爲兩步:

  1. 驗證Handler是否被重複添加

    checkMultiplicity(handler);
    private static void checkMultiplicity(ChannelHandler handler) {
        //驗證是否是 ChannelHandlerAdapter 類型的,若是不是直接忽略
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            //若是不是可共享的並且是已經添加過的直接報錯
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(
                    h.getClass().getName() +
                    " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            //若是是可共享的或者未添加的,將該handler內的 added屬性設置爲true證實該handler已經被添加
            h.added = true;
        }
    }

    他是如何判斷是否被添加過的呢?

    每個Handler中都存在一個 added屬性,當這個屬性爲true的時候,證實這個Handler已經被添加過了,Netty常規狀況下爲了考慮線程安全問題,是不容許一個Handler被重複的使用的!

    可是咱們有時候會有這樣一個需求,Handler的功能比較相似,並且咱們經過代碼手段,避免了線程安全問題,因此又想重複添加Handler,Netty提供了一個註解 @Sharable註解,當存在該註解的時候,證實這個Handler是能夠被複用的,能夠被重複添加!

    因此,checkMultiplicity方法經過判斷類是否增長了@Sharable註解和added屬性是否爲空來驗證Handle是否違規重複添加了!

    當驗證經過以後,將added設置爲true,證實這個Handler已經被添加過了!

  2. 將Handler封裝爲包裝對象

    newCtx = newContext(group, filterName(name, handler), handler);

    這裏比較難理解的就是這個,咱們進入到newContext方法裏面:

    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }

    進入到 DefaultChannelHandlerContext類的源碼裏面:

    DefaultChannelHandlerContext(
                DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        //調用父類進行掩碼計算
        super(pipeline, executor, name, handler.getClass());
        //保存一個handler
        this.handler = handler;
    }

    這裏除了會保存一個handler還會調用父類,咱們介入到父類裏面:

    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
                                      String name, Class<? extends ChannelHandler> handlerClass) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        //標識 是in仍是out
        this.executionMask = mask(handlerClass);
        // 若是由EventLoop或給定的Executor驅動的驅動程序是OrderedEventExecutor的實例,則其順序爲。
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

    這裏會保存一些屬性,這些屬性都是咱們前面講過的,你們自行分析下,咱們重點關注掩碼的計算:

    this.executionMask = mask(handlerClass);
    static int mask(Class<? extends ChannelHandler> clazz) {
        //直接再緩存中取出
        Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
        Integer mask = cache.get(clazz);
        //緩存中不存在
        if (mask == null) {
            mask = mask0(clazz);
            cache.put(clazz, mask);
        }
        return mask;
    }

    先從緩存中取出,若是不存在就調用 mask0(clazz); 方法計算,而後再放進緩存,咱們進入到mask0(clazz);方法:

    private static int mask0(Class<? extends ChannelHandler> handlerType) {
            int mask = MASK_EXCEPTION_CAUGHT;
            try {
                if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
                    // 若是是 ChannelInboundHandler 實例,全部 Inbound 事件置爲 1
                    mask |= MASK_ALL_INBOUND;
                    //判斷是否存在Skip註解   若是催你在這個跳過的註解  就移除這個
                    if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
                        mask &= ~MASK_CHANNEL_REGISTERED;
                    }
                    ..................忽略相似的代碼.....................
                }
    
                if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
                    mask |= MASK_ALL_OUTBOUND;
    
                    if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
                            SocketAddress.class, ChannelPromise.class)) {
                        mask &= ~MASK_BIND;
                    }
                    ..................忽略相似的代碼.....................
                }
            } catch (Exception e) {
                ..................忽略異常的代碼.....................
            }
    
            return mask;
        }

    這會區分兩種狀況,一種是ChannelInboundHandler類型的,一種是ChannelOutboundHandler類型的,兩者邏輯相同,咱們以ChannelInboundHandler爲例:

    首先,再ChannelHandlerMask類裏面定義了不少的預設掩碼值:

    /**
         * 如下是方法表明的掩碼值
         */
        static final int MASK_EXCEPTION_CAUGHT = 1;
        /**
         * channelRegistered方法的掩碼
         */
        static final int MASK_CHANNEL_REGISTERED = 1 << 1;
        /**
         * channelUnregistered方法的掩碼
         */
        static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
    	/**
    	* 後面的以此類推
    	*/
        static final int MASK_CHANNEL_ACTIVE = 1 << 3;
        static final int MASK_CHANNEL_INACTIVE = 1 << 4;
        static final int MASK_CHANNEL_READ = 1 << 5;
        static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
        static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
        static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
        /**
         * bind方法的掩碼
         */
        static final int MASK_BIND = 1 << 9;
        /**
         * connect方法的掩碼
         */
        static final int MASK_CONNECT = 1 << 10;
    	/**
    	* 後面的以此類推
    	*/
        static final int MASK_DISCONNECT = 1 << 11;
        static final int MASK_CLOSE = 1 << 12;
        static final int MASK_DEREGISTER = 1 << 13;
        static final int MASK_READ = 1 << 14;
        static final int MASK_WRITE = 1 << 15;
        static final int MASK_FLUSH = 1 << 16;
    
        /**
         * 包含所有 Inbound方法的掩碼
         */
        private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_CHANNEL_REGISTERED |
                MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
                MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
    
        /**
         * 包含所有 outbound方法的掩碼
         */
        private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
                MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;

    咱們回到 mask0方法:

    mask |= MASK_ALL_INBOUND;

    一開始,咱們會直接將一個handler的掩碼計算爲擁有所有方法的掩碼!

    if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
        mask &= ~MASK_CHANNEL_REGISTERED;
    }

    判斷該方法是否存在 @Skip註解,若是存在就排除掉這個掩碼!

    整個邏輯執行完畢後,這個掩碼就只會包含handler中沒有被@Sikp註解註解的方法掩碼!

    有同窗可能疑問,我在書寫handler的時候並無增長@Sikp註解呀! 咱們都知道,實現一個Handler就一定須要繼承 ChannelInboundHandlerAdapter或者ChannelOutboundHandlerAdapter, 咱們隨便挑一個類進去看:

    watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=

    能夠看到,這些方法其實都是被默認添加了的,只不過咱們重寫以後沒添加!如今咱們明白,handler是如何區分你實現了那些方法的了!

    這裏會將handler包裝爲HandlerContext對象,相似於tailContext和HeadContext同樣,此時上下文對象的結構以下:

    watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=

  3. 將HandlerContext添加進pipeline中:

    addLast0(newCtx);
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

    watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=

整個過程如上,無非就是指針指向地址的變換,比較簡單,不作深刻分析!

3. 刪除一個處理器

ch.pipeline().remove("xxxxxx")
@Override
public final ChannelPipeline remove(ChannelHandler handler) {
    remove(getContextOrDie(handler));
    return this;
}
  1. 尋找處理器Handler的上下文

    getContextOrDie(handler)
    private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
        //尋找handler
        AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
        if (ctx == null) {
            throw new NoSuchElementException(handler.getClass().getName());
        } else {
            return ctx;
        }
    }
    
    //context(handler);
    @Override
    public final ChannelHandlerContext context(ChannelHandler handler) {
        ObjectUtil.checkNotNull(handler, "handler");
    
        AbstractChannelHandlerContext ctx = head.next;
        for (;;) {
    
            if (ctx == null) {
                return null;
            }
    		//循環迭代 判斷是否尋找到這個handler
            if (ctx.handler() == handler) {
                //返回這個handler的上下文對象
                return ctx;
            }
    
            ctx = ctx.next;
        }
    }
  2. 刪除這個處理器

    remove(getContextOrDie(handler));
    private <T extends ChannelHandler> T removeIfExists(ChannelHandlerContext ctx) {
        if (ctx == null) {
            return null;
        }
        return (T) remove((AbstractChannelHandlerContext) ctx).handler();
    }
    
    //直接進入到  刪除Handler的主要邏輯
    //(T) remove((AbstractChannelHandlerContext) ctx).handler();
    private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
        //首先刪除的handler不是tail和尾節點
        assert ctx != head && ctx != tail;
    
        synchronized (this) {
            //刪除上下文對象
            atomicRemoveFromHandlerList(ctx);
    		................忽略....................
    
            EventExecutor executor = ctx.executor();
            if (!executor.inEventLoop()) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        //回調handlerRemoved方法
                        callHandlerRemoved0(ctx);
                    }
                });
                return ctx;
            }
        }
        callHandlerRemoved0(ctx);
        return ctx;
    }

    首先咱們關注 atomicRemoveFromHandlerList(ctx);

    private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
        //獲取該節點的上級節點
        AbstractChannelHandlerContext prev = ctx.prev;
        //獲取該節點的下級節點
        AbstractChannelHandlerContext next = ctx.next;
        //重建指針位置
        prev.next = next;
        next.prev = prev;
    }

    指針位置重建以後,咱們回調handlerRemoved方法

    callHandlerRemoved0(ctx);

至此咱們就完成了pipeline的建立、添加、刪除的源碼解析!

4. 管道事件傳播

咱們前面見到過不少的事件傳播代碼,咱們以 channelRegistered 方法的事件回調爲例:

io.netty.channel.AbstractChannel.AbstractUnsafe#register0

//通知管道  傳播channelRegistered事件
// 觸發 channelRegistered 事件
pipeline.fireChannelRegistered();

咱們進入到改行代碼的源碼:

@Override
public final ChannelPipeline fireChannelRegistered() {
    //執行註冊方法  從head方法
    AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
}
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
}

咱們能夠看到,這裏使用了 next.invokeChannelRegistered();方法 咱們依舊按照同步方法進行分析!

private void invokeChannelRegistered() {
    if (invokeHandler()) {
        try {
            //如今調用的HeadContext的handler
            ((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRegistered();
    }
}

咱們如今進入到了headContext,因此咱們進入到: io.netty.channel.DefaultChannelPipeline.HeadContext#channelRegistered

@Override
public void channelRegistered(ChannelHandlerContext ctx) {
    invokeHandlerAddedIfNeeded();
    //向下傳播事件
    ctx.fireChannelRegistered();
}

這一段代碼除了執行Head的invokeHandlerAddedIfNeeded方法以外,還又一次傳播了channelRegistered事件,咱們進入到 ctx.fireChannelRegistered();:

@Override
public ChannelHandlerContext fireChannelRegistered() {
    invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
    return this;
}

咱們若是想要向下傳播,咱們首先應該找到下一個節點是誰才能傳播,Netty這裏調用了findContextInbound(MASK_CHANNEL_REGISTERED)查找下一個節點,我咱們先關注如下參數 MASK_CHANNEL_REGISTERED, 他是channelRegistered方法的掩碼, 咱們進入到 findContextInbound方法源碼:

private AbstractChannelHandlerContext findContextInbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    do {
        //獲取下一個inbun事件
        ctx = ctx.next;
        //只要和掩碼&運算後不爲0的都是 inbunt事件
    } while ((ctx.executionMask & mask) == 0);
    return ctx;
}

從當前節點向下尋找,只要 掩碼計算包含這個方法,就證實該context包含channelRegistered方法,就直接返回!

尋找到了handler以後,就開始調用了:

invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
}
next.invokeChannelRegistered();

具體邏輯就和上面分析的一致了,調用該handler的ChannelRegistered方法!

傳播某一個事件,就會使用哪一個事件的掩碼,從當前節點向下尋找,知道對應的Handler以後,回調對應的方法!

關於管道的傳播,你明白了嗎?

提一個問題, 觀察如下兩種傳播方式有何不一樣:

ctx.fireChannelRegistered();
ctx.pipeline().fireChannelRegistered();

 

相關文章
相關標籤/搜索