五分鐘就能看懂pipeline模型 -Netty 源碼解析

netty源碼解析系列

1、pipeline介紹

1. 什麼是pipeline

     pipeline 有管道,流水線的意思,最先使用在 Unix 操做系統中,可讓不一樣功能的程序相互通信,使軟件更加」高內聚,低耦合」,它以一種」鏈式模型」來串起不一樣的程序或組件,使它們組成一條直線的工做流。promise

2. Netty的ChannelPipeline

     ChannelPipeline 是處理或攔截channel的進站事件和出站事件的雙向鏈表,事件在ChannelPipeline中流動和傳遞,能夠增長或刪除ChannelHandler來實現對不一樣業務邏輯的處理。通俗的說,ChannelPipeline是工廠裏的流水線,ChannelHandler是流水線上的工人。
     ChannelPipeline在建立Channel時會自動建立,每一個Channel都擁有本身的ChannelPipeline緩存

3. Netty I/O事件的處理過程

     如圖所示,入站事件是由 I/O線程被動觸發,由入站處理器按自下而上的方向處理,在中途能夠被攔截丟棄,出站事件由用戶 handler主動觸發,由出站處理器按自上而下的方向處理

2、ChannelHandlerContext

1. 什麼是ChannelHandlerContext

     ChannelHandlerContext是將ChannelHandlerChannelPipeline關聯起來的上下文環境,每添加一個handler都會建立ChannelHandlerContext實例,管理ChannelHandlerChannelPipeline中的傳播流向。安全

2. ChannelHandlerContext和ChannelPipeline以及ChannelHandler之間的關係

     ChannelPipeline依賴於Channel的建立而自動建立,保存了channel,將全部handler組織起來,至關於工廠的流水線。
     ChannelHandler擁有獨立功能邏輯,能夠註冊到多個ChannelPipeline,是不保存channel的,至關於工廠的工人。
     ChannelHandlerContext是關聯ChannelHandlerChannelPipeline的上下文環境,保存了ChannelPipeline,控制ChannelHandlerChannelPipeline中的傳播流向,至關於流水線上的小組長。bash

3、傳播Inbound事件

1. Inbound事件有哪些?

     (1) channelRegistered 註冊事件, channel註冊到 EventLoop上後調用,例如服務崗啓動時, pipeline.fireChannelRegistered();
     (2) channelUnregistered 註銷事件, channelEventLoop上註銷後調用,例如關閉鏈接成功後, pipeline.fireChannelUnregistered();      (3) channelActive 激活事件,綁定端口成功後調用, pipeline.fireChannelActive();
     (4) channelInactive 非激活事件,鏈接關閉後調用, pipeline.fireChannelInactive();      (5) channelRead 讀事件, channel有數據時調用, pipeline.fireChannelRead();
     (6) channelReadComplete 讀完事件, channel讀完以後調用, pipeline.fireChannelReadComplete();
     (7) channelWritabilityChanged 可寫狀態變動事件,當一個 Channel的可寫的狀態發生改變的時候執行,能夠保證寫的操做不要太快,防止 OOMpipeline.fireChannelWritabilityChanged();
     (8) userEventTriggered 用戶事件觸發,例如心跳檢測, ctx.fireUserEventTriggered(evt);
     (9) exceptionCaught 異常事件 說明:咱們能夠看出, Inbound事件都是由 I/O線程觸發,用戶實現部分關注的事件被動調用
     說明 : 咱們能夠看出, Inbound事件都是由 I/O線程觸發,用戶實現部分關注的事件被動調用

2. 添加讀事件

     從前面《Netty 源碼解析-服務端啓動流程解析》《Netty 源碼解析-客戶端鏈接接入及讀I/O解析》咱們知道,當有新鏈接接入時,咱們執行註冊流程,註冊成功後,會調用channelRegistered,咱們從這個方法開始併發

public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
          initChannel((C) ctx.channel());
          ctx.pipeline().remove(this);
          ctx.fireChannelRegistered();
}
複製代碼

     initChannel是在服務啓動時配置的參數childHandler重寫了父類方法ide

private class IOChannelInitialize extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        System.out.println("initChannel");
        ch.pipeline().addLast(new IdleStateHandler(1000, 0, 0));
        ch.pipeline().addLast(new IOHandler());
    }
}
複製代碼

     咱們回憶一下,pipeline是在哪裏建立的oop

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}
複製代碼

     當建立channel時會自動建立pipelinepost

public DefaultChannelPipeline(AbstractChannel channel) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    this.channel = channel;

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}
複製代碼

     在這裏會建立兩個默認的handler,一個InboundHandler --> TailContext,一個OutboundHandler --> HeadContext
     再看addLast方法ui

@Override
public ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}
複製代碼

     在這裏生成一個handler名字,生成規則由handler類名加 」#0」this

@Override
public ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    …
    for (ChannelHandler h: handlers) {
        if (h == null) {
            break;
        }
        addLast(executor, generateName(h), h);
    }
    return this;
}
複製代碼
@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
    synchronized (this) {
        checkDuplicateName(name);
        AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
        addLast0(name, newCtx);
    }
    return this;
}
複製代碼

     因爲pipeline是線程非安全的,經過加鎖來保證併發訪問的安全,進行handler名稱重複性校驗,將handler包裝成DefaultChannelHandlerContext,最後再添加到pipeline

private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
    checkMultiplicity(newCtx);

    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;

    name2ctx.put(name, newCtx);

    callHandlerAdded(newCtx);
}
複製代碼

     這裏分三步
     (1)DefaultChannelHandlerContext進行重複性校驗,若是DefaultChannelHandlerContext不是能夠在多個pipeline中共享的,且已經被添加到pipeline中,則拋出異常
     (2) 修改pipeline中的指針
        添加IdleStateHandler以前
          HeadContext --> IOChannelInitialize --> TailContext

        添加IdleStateHandler以後
          HeadContext --> IOChannelInitialize --> IdleStateHandler --> TailContext

     (3)handler名和DefaultChannelHandlerContext創建映射關係
     (4) 回調handler添加完成監聽事件
     最後刪除IOChannelInitialize

     最後事件鏈上的順序爲:
         HeadContext --> IdleStateHandler --> IOHandler --> TailContext

3. pipeline.fireChannelRead()事件解析

     在這裏咱們選一個比較典型的讀事件解析,其餘事件流程基本相似

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {	
	…
	if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    		unsafe.read();
	}
	…
}
複製代碼

     當boss線程監聽到讀事件,會調用**unsafe.read()**方法

@Override
public final void read() {
	…
	pipeline.fireChannelRead(byteBuf);
	…
}
複製代碼

     入站事件從head開始,tail結束

@Override
public ChannelPipeline fireChannelRead(Object msg) {
    head.fireChannelRead(msg);
    return this;
}
複製代碼
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    if (msg == null) {
        throw new NullPointerException("msg");
    }

    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(msg);
    } else {
        executor.execute(new OneTimeTask() {
            @Override
            public void run() {
                next.invokeChannelRead(msg);
            }
        });
    }
    return this;
}
複製代碼

     查找pipeline中下一個Inbound事件

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}
複製代碼

     HeadContext的下一個 Inbound事件是 IdleStateHandler

private void invokeChannelRead(Object msg) {
    try {
        ((ChannelInboundHandler) handler()).channelRead(this, msg);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}
複製代碼
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
        reading = true;
        firstReaderIdleEvent = firstAllIdleEvent = true;
    }
    ctx.fireChannelRead(msg);
}
複製代碼

     將這個channel讀事件標識爲true,並傳到下一個handler

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    super.channelRead(ctx, msg);
    System.out.println(msg.toString());
}
複製代碼

     這裏執行IOHandler重寫的channelRead()方法,並調用父類channelRead方法

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.fireChannelRead(msg);
}
複製代碼

     繼續調用事件鏈上的下一個handler

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    try {
        logger.debug(
                "Discarded inbound message {} that reached at the tail of the pipeline. " +
                        "Please check your pipeline configuration.", msg);
    } finally {
        ReferenceCountUtil.release(msg);
    }
}
複製代碼

     這裏會調用TailContextRead方法,釋放msg緩存
     總結:傳播Inbound事件是從HeadContext節點往上傳播,一直到TailContext節點結束

4、傳播Outbound事件

1. Outbound事件有哪些?

     (1) bind 事件,綁定端口
     (2) close事件,關閉channel
     (3) connect事件,用於客戶端,鏈接一個遠程機器
     (4) disconnect事件,用於客戶端,關閉遠程鏈接
     (5) deregister事件,用於客戶端,在執行斷開鏈接 disconnect操做後調用,將 channelEventLoop中註銷
     (6) read事件,用於新接入鏈接時,註冊成功多路複用器上後,修改監聽爲 OP_READ操做位
     (7) write事件,向通道寫數據
     (8) flush事件,將通道排隊的數據刷新到遠程機器上

2. 解析write事件

ByteBuf resp = Unpooled.copiedBuffer("hello".getBytes());
	ctx.channel().write(resp);
複製代碼

     咱們在項目中像上面這樣直接調用write寫數據,並不能直接寫進channel,而是寫到緩衝區,還要調用flush方法才能將數據刷進channel,或者直接調用writeAndFlush
     在這裏咱們選擇比較典型的write事件來解析Outbound流程,其餘事件流程相似

@Override
public ChannelFuture write(Object msg) {
    return pipeline.write(msg);
}
複製代碼

     經過上下文綁定的channel直接調用write方法,調用channel相對應的事件鏈上的handler

@Override
public ChannelFuture write(Object msg) {
    return tail.write(msg);
}
複製代碼

     寫事件是從tailhead調用,和讀事件恰好相反

@Override
public ChannelFuture write(Object msg) {
    return write(msg, newPromise());
}
複製代碼
@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
	...
	 write(msg, false, promise);
	...
}
複製代碼
private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeWrite(msg, promise);
        if (flush) {
            next.invokeFlush();
        }
	...
}
...
}
複製代碼

     通過屢次跳轉,獲取上一個Ounbound事件鏈的handler

private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}
複製代碼

     IdleStateHandler既是 Inbound事件,又是 Outbound事件
     繼續跳轉到上一個 handler

     上一個是 HeadContext處理

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}
複製代碼
@Override
public final void write(Object msg, ChannelPromise promise) {
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    ...
    outboundBuffer.addMessage(msg, size, promise);
...
}
複製代碼

     從這裏咱們看到,最終是把數據丟到了緩衝區,自此nettypipeline模型咱們解析完畢
     有關inbound事件和outbound事件的傳輸, 可經過下圖進行概括:

以爲對您有幫助請點 "贊"

相關文章
相關標籤/搜索