netty源碼分析之pipeline(一)

經過前面的源碼系列文章中的netty reactor線程三部曲,咱們已經知道,netty的reactor線程就像是一個發動機,驅動着整個netty框架的運行,而服務端的綁定新鏈接的創建正是發動機的導火線,將發動機點燃html

netty在服務端端口綁定和新鏈接創建的過程當中會創建相應的channel,而與channel的動做密切相關的是pipeline這個概念,pipeline像是能夠看做是一條流水線,原始的原料(字節流)進來,通過加工,最後輸出java

本文,我將以新鏈接的創建爲例分爲如下幾個部分給你介紹netty中的pipeline是怎麼玩轉起來的react

  • pipeline 初始化
  • pipeline 添加節點
  • pipeline 刪除節點

pipeline 初始化

新鏈接的創建這篇文章中,咱們已經知道了建立NioSocketChannel的時候會將netty的核心組件建立出來spring

channel中的核心組件

pipeline是其中的一員,在下面這段代碼中被建立bootstrap

AbstractChannel緩存

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
複製代碼

AbstractChannel性能優化

protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}
複製代碼

DefaultChannelPipeline微信

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}
複製代碼

pipeline中保存了channel的引用,建立完pipeline以後,整個pipeline是這個樣子的多線程

pipeline默認結構

pipeline中的每一個節點是一個ChannelHandlerContext對象,每一個context節點保存了它包裹的執行器 ChannelHandler 執行操做所須要的上下文,其實就是pipeline,由於pipeline包含了channel的引用,能夠拿到全部的context信息併發

默認狀況下,一條pipeline會有兩個節點,head和tail,後面的文章咱們具體分析這兩個特殊的節點,今天咱們重點放在pipeline

pipeline添加節點

下面是一段很是常見的客戶端代碼

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new Spliter())
         p.addLast(new Decoder());
         p.addLast(new BusinessHandler())
         p.addLast(new Encoder());
     }
});
複製代碼

首先,用一個spliter未來源TCP數據包拆包,而後將拆出來的包進行decoder,傳入業務處理器BusinessHandler,業務處理完encoder,輸出

整個pipeline結構以下

pipeline結構

我用兩種顏色區分了一下pipeline中兩種不一樣類型的節點,一個是 ChannelInboundHandler,處理inBound事件,最典型的就是讀取數據流,加工處理;還有一種類型的Handler是 ChannelOutboundHandler, 處理outBound事件,好比當調用writeAndFlush()類方法時,就會通過該種類型的handler

不論是哪一種類型的handler,其外層對象 ChannelHandlerContext 之間都是經過雙向鏈表鏈接,而區分一個 ChannelHandlerContext究竟是in仍是out,在添加節點的時候咱們就能夠看到netty是怎麼處理的

DefaultChannelPipeline

@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}
複製代碼
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    for (ChannelHandler h: handlers) {
        addLast(executor, null, h);
    }
    return this;
}
複製代碼
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        // 1.檢查是否有重複handler
        checkMultiplicity(handler);
        // 2.建立節點
        newCtx = newContext(group, filterName(name, handler), handler);
        // 3.添加節點
        addLast0(newCtx);
    }
   
    // 4.回調用戶方法
    callHandlerAdded0(handler);
    
    return this;
}
複製代碼

這裏簡單地用synchronized方法是爲了防止多線程併發操做pipeline底層的雙向鏈表

咱們仍是逐步分析上面這段代碼

1.檢查是否有重複handler

在用戶代碼添加一條handler的時候,首先會查看該handler有沒有添加過

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
        if (!h.isSharable() && h.added) {
            throw new ChannelPipelineException(
                    h.getClass().getName() +
                    " is not a @Sharable handler, so can't be added or removed multiple times.");
        }
        h.added = true;
    }
}
複製代碼

netty使用一個成員變量added標識一個channel是否已經添加,上面這段代碼很簡單,若是當前要添加的Handler是非共享的,而且已經添加過,那就拋出異常,不然,標識該handler已經添加

因而可知,一個Handler若是是sharable的,就能夠無限次被添加到pipeline中,咱們客戶端代碼若是要讓一個Handler被共用,只須要加一個@Sharable標註便可,以下

@Sharable
public class BusinessHandler {
    
}
複製代碼

而若是Handler是sharable的,通常就經過spring的注入的方式使用,不須要每次都new 一個

isSharable() 方法正是經過該Handler對應的類是否標註@Sharable來實現的

ChannelHandlerAdapter

public boolean isSharable() {
   Class<?> clazz = getClass();
    Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
    Boolean sharable = cache.get(clazz);
    if (sharable == null) {
        sharable = clazz.isAnnotationPresent(Sharable.class);
        cache.put(clazz, sharable);
    }
    return sharable;
}
複製代碼

這裏也能夠看到,netty爲了性能優化到極致,還使用了ThreadLocal來緩存Handler的狀態,高併發海量鏈接下,每次有新鏈接添加Handler都會建立調用此方法

2.建立節點

回到主流程,看建立上下文這段代碼

newCtx = newContext(group, filterName(name, handler), handler);
複製代碼

這裏咱們須要先分析 filterName(name, handler) 這段代碼,這個函數用於給handler建立一個惟一性的名字

private String filterName(String name, ChannelHandler handler) {
    if (name == null) {
        return generateName(handler);
    }
    checkDuplicateName(name);
    return name;
}
複製代碼

顯然,咱們傳入的name爲null,netty就給咱們生成一個默認的name,不然,檢查是否有重名,檢查經過的話就返回

netty建立默認name的規則爲 簡單類名#0,下面咱們來看些具體是怎麼實現的

private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
        new FastThreadLocal<Map<Class<?>, String>>() {
    @Override
    protected Map<Class<?>, String> initialValue() throws Exception {
        return new WeakHashMap<Class<?>, String>();
    }
};

private String generateName(ChannelHandler handler) {
    // 先查看緩存中是否有生成過默認name
    Map<Class<?>, String> cache = nameCaches.get();
    Class<?> handlerType = handler.getClass();
    String name = cache.get(handlerType);
    // 沒有生成過,就生成一個默認name,加入緩存 
    if (name == null) {
        name = generateName0(handlerType);
        cache.put(handlerType, name);
    }

    // 生成完了,還要看默認name有沒有衝突
    if (context0(name) != null) {
        String baseName = name.substring(0, name.length() - 1);
        for (int i = 1;; i ++) {
            String newName = baseName + i;
            if (context0(newName) == null) {
                name = newName;
                break;
            }
        }
    }
    return name;
}
複製代碼

netty使用一個 FastThreadLocal(後面的文章會細說)變量來緩存Handler的類和默認名稱的映射關係,在生成name的時候,首先查看緩存中有沒有生成過默認name(簡單類名#0),若是沒有生成,就調用generateName0()生成默認name,而後加入緩存

接下來還須要檢查name是否和已有的name有衝突,調用context0(),查找pipeline裏面有沒有對應的context

private AbstractChannelHandlerContext context0(String name) {
    AbstractChannelHandlerContext context = head.next;
    while (context != tail) {
        if (context.name().equals(name)) {
            return context;
        }
        context = context.next;
    }
    return null;
}
複製代碼

context0()方法鏈表遍歷每個 ChannelHandlerContext,只要發現某個context的名字與待添加的name相同,就返回該context,最後拋出異常,能夠看到,這個實際上是一個線性搜索的過程

若是context0(name) != null 成立,說明現有的context裏面已經有了一個默認name,那麼就從 簡單類名#1 往上一直找,直到找到一個惟一的name,好比簡單類名#3

若是用戶代碼在添加Handler的時候指定了一個name,那麼要作到事僅僅爲檢查一下是否有重複

private void checkDuplicateName(String name) {
    if (context0(name) != null) {
        throw new IllegalArgumentException("Duplicate handler name: " + name);
    }
}

複製代碼

處理完name以後,就進入到建立context的過程,由前面的調用鏈得知,group爲null,所以childExecutor(group)也返回null

DefaultChannelPipeline

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

private EventExecutor childExecutor(EventExecutorGroup group) {
    if (group == null) {
        return null;
    }
    //..
}

複製代碼

DefaultChannelHandlerContext

DefaultChannelHandlerContext(
        DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
    super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
}
複製代碼

構造函數中,DefaultChannelHandlerContext將參數回傳到父類,保存Handler的引用,進入到其父類

AbstractChannelHandlerContext

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                              boolean inbound, boolean outbound) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.inbound = inbound;
    this.outbound = outbound;
}
複製代碼

netty中用兩個字段來表示這個channelHandlerContext屬於inBound仍是outBound,或者二者都是,兩個boolean是經過下面兩個小函數來判斷(見上面一段代碼)

DefaultChannelHandlerContext

private static boolean isInbound(ChannelHandler handler) {
    return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}
複製代碼

經過instanceof關鍵字根據接口類型來判斷,所以,若是一個Handler實現了兩類接口,那麼他既是一個inBound類型的Handler,又是一個outBound類型的Handler,好比下面這個類

ChannelDuplexHandler

經常使用的,將decode操做和encode操做合併到一塊兒的codec,通常會繼承 MessageToMessageCodec,而MessageToMessageCodec就是繼承ChannelDuplexHandler

MessageToMessageCodec

public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler {

    protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out) throws Exception;

    protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out) throws Exception;
 }

複製代碼

context 建立完了以後,接下來終於要將建立完畢的context加入到pipeline中去了

3.添加節點

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev; // 1
    newCtx.next = tail; // 2
    prev.next = newCtx; // 3
    tail.prev = newCtx; // 4
}
複製代碼

用下面這幅圖可見簡單的表示這段過程,說白了,其實就是一個雙向鏈表的插入操做

添加節點過程

操做完畢,該context就加入到pipeline中

添加節點以後

到這裏,pipeline添加節點的操做就完成了,你能夠根據此思路掌握全部的addxxx()系列方法

4.回調用戶方法

AbstractChannelHandlerContext

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    ctx.handler().handlerAdded(ctx);
    ctx.setAddComplete();
}

複製代碼

到了第四步,pipeline中的新節點添加完成,因而便開始回調用戶代碼 ctx.handler().handlerAdded(ctx);,常見的用戶代碼以下

AbstractChannelHandlerContext

public class DemoHandler extends SimpleChannelInboundHandler<...> {
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // 節點被添加完畢以後回調到此
        // do something
    }
}
複製代碼

接下來,設置該節點的狀態

AbstractChannelHandlerContext

final void setAddComplete() {
    for (;;) {
        int oldState = handlerState;
        if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
            return;
        }
    }
}
複製代碼

用cas修改節點的狀態至:REMOVE_COMPLETE(說明該節點已經被移除) 或者 ADD_COMPLETE

pipeline刪除節點

netty 有個最大的特性之一就是Handler可插拔,作到動態編織pipeline,好比在首次創建鏈接的時候,須要經過進行權限認證,在認證經過以後,就能夠將此context移除,下次pipeline在傳播事件的時候就就不會調用到權限認證處理器

下面是權限認證Handler最簡單的實現,第一個數據包傳來的是認證信息,若是校驗經過,就刪除此Handler,不然,直接關閉鏈接

public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception {
        if (verify(authDataPacket)) {
            ctx.pipeline().remove(this);
        } else {
            ctx.close();
        }
    }

    private boolean verify(ByteBuf byteBuf) {
        //...
    }
}
複製代碼

重點就在 ctx.pipeline().remove(this) 這段代碼

@Override
public final ChannelPipeline remove(ChannelHandler handler) {
    remove(getContextOrDie(handler));
    
    return this;
}

複製代碼

remove操做相比add簡單很多,分爲三個步驟:

1.找到待刪除的節點 2.調整雙向鏈表指針刪除 3.回調用戶函數

1.找到待刪除的節點

DefaultChannelPipeline

private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
    AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
    if (ctx == null) {
        throw new NoSuchElementException(handler.getClass().getName());
    } else {
        return ctx;
    }
}

@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
    if (handler == null) {
        throw new NullPointerException("handler");
    }

    AbstractChannelHandlerContext ctx = head.next;
    for (;;) {

        if (ctx == null) {
            return null;
        }

        if (ctx.handler() == handler) {
            return ctx;
        }

        ctx = ctx.next;
    }
}
複製代碼

這裏爲了找到Handler對應的context,照樣是經過依次遍歷雙向鏈表的方式,直到某一個context的Handler和當前Handler相同,便找到了該節點

2.調整雙向鏈表指針刪除

DefaultChannelPipeline

private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
    assert ctx != head && ctx != tail;

    synchronized (this) {
        // 2.調整雙向鏈表指針刪除
        remove0(ctx);
    }
    // 3.回調用戶函數
    callHandlerRemoved0(ctx);
    return ctx;
}

private static void remove0(AbstractChannelHandlerContext ctx) {
    AbstractChannelHandlerContext prev = ctx.prev;
    AbstractChannelHandlerContext next = ctx.next;
    prev.next = next; // 1
    next.prev = prev; // 2
}

複製代碼

經歷的過程要比添加節點要簡單,能夠用下面一幅圖來表示

刪除節點過程

最後的結果爲

刪除節點以後

結合這兩幅圖,能夠很清晰地瞭解權限驗證Handler的工做原理,另外,被刪除的節點由於沒有對象引用到,果過段時間就會被gc自動回收

3.回調用戶函數

private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
    try {
        ctx.handler().handlerRemoved(ctx);
    } finally {
        ctx.setRemoved();
    }
}
複製代碼

到了第三步,pipeline中的節點刪除完成,因而便開始回調用戶代碼 ctx.handler().handlerRemoved(ctx);,常見的代碼以下

AbstractChannelHandlerContext

public class DemoHandler extends SimpleChannelInboundHandler<...> {
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // 節點被刪除完畢以後回調到此,可作一些資源清理
        // do something
    }
}
複製代碼

最後,將該節點的狀態設置爲removed

final void setRemoved() {
    handlerState = REMOVE_COMPLETE;
}
複製代碼

removexxx系列的其餘方法族大同小異,你能夠根據上面的思路展開其餘的系列方法,這裏再也不贅述

總結

1.以新鏈接建立爲例,新鏈接建立的過程當中建立channel,而在建立channel的過程當中建立了該channel對應的pipeline,建立完pipeline以後,自動給該pipeline添加了兩個節點,即ChannelHandlerContext,ChannelHandlerContext中有用pipeline和channel全部的上下文信息。

2.pipeline是雙向個鏈表結構,添加和刪除節點均只須要調整鏈表結構

3.pipeline中的每一個節點包着具體的處理器ChannelHandler,節點根據ChannelHandler的類型是ChannelInboundHandler仍是ChannelOutboundHandler來判斷該節點屬於in仍是out或者二者都是

下一篇文章將繼續pipeline的分析,敬請期待!

若是你想系統地學Netty,個人小冊《Netty 入門與實戰:仿寫微信 IM 即時通信系統》能夠幫助你,若是你想系統學習Netty原理,那麼你必定不要錯過個人Netty源碼分析系列視頻:coding.imooc.com/class/230.h…

相關文章
相關標籤/搜索