Netty之Channel*

Netty之Channel*

本文內容主要參考<<Netty In Action>> Netty的文檔和源碼,偏筆記向.java

先簡略瞭解一下ChannelPipelineChannelHandler.api

想象一個流水線車間.當組件從流水線頭部進入,穿越流水線,流水線上的工人按順序對組件進行加工,到達流水線尾部時商品組裝完成.數組

能夠將ChannelPipeline當作流水線,ChannelHandler當作流水線工人.源頭的組件當作event,如read,write等等.promise

1.1 Channel

Channel鏈接了網絡套接字或可以進行I/O操做的組件,如read, write, connect, bind.緩存

咱們能夠經過Channel獲取一些信息.安全

  • Channel的當前狀態(如,是否鏈接,是否打開)
  • Channel的配置參數,如buffer的size
  • 支持的I/O操做
  • 處理全部I/O事件的ChannelPipeline和與通道相關的請求

Channel接口定義了一組和ChannelInboundHandlerAPI密切相關的狀態模型.網絡

52896437562

Channel的狀態改變,會生成對應的event.這些event會轉發給ChannelPipeline中的ChannelHandler,handler會對其進行響應.併發

1.2 ChannelHandler生命週期

下面列出了 interface ChannelHandler 定義的生命週期操做, 在 ChannelHandler被添加到 ChannelPipeline 中或者被從 ChannelPipeline 中移除時會調用這些操做。這些方法中的每個都接受一個 ChannelHandlerContext 參數異步

1.3 ChannelInboundHandler 接口

ChannelInboundHandler處理入站數據以及各類狀態變化,當Channel狀態發生改變會調用ChannelInboundHandler中的一些生命週期方法.這些方法與Channel的生命密切相關.socket

入站數據,就是進入socket的數據.下面展現一些該接口的生命週期API

當某個 ChannelInboundHandler的實現重寫 channelRead()方法時,它將負責顯式地
釋放與池化的 ByteBuf 實例相關的內存。 Netty 爲此提供了一個實用方法ReferenceCountUtil.release().

@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ReferenceCountUtil.release(msg);
    }
}

這種方式還挺繁瑣的,Netty提供了一個SimpleChannelInboundHandler,重寫channelRead0()方法,就能夠在調用過程當中會自動釋放資源.

public class SimpleDiscardHandler
    extends SimpleChannelInboundHandler<Object> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx,
                                    Object msg) {
            // 不用調用ReferenceCountUtil.release(msg)也會釋放資源
    }
}

原理就是這樣,channelRead方法包裝了channelRead0方法.

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg);
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }
        }
    }

1.4 ChannelOutboundHandler

出站操做和數據將由 ChannelOutboundHandler 處理。它的方法將被 Channel、 ChannelPipeline 以及 ChannelHandlerContext 調用。
ChannelOutboundHandler 的一個強大的功能是能夠按需推遲操做或者事件,這使得能夠經過一些複雜的方法來處理請求。例如, 若是到遠程節點的寫入被暫停了, 那麼你能夠推遲沖刷操做並在稍後繼續。

ChannelPromiseChannelFuture: ChannelOutboundHandler中的大部分方法都須要一個ChannelPromise參數, 以便在操做完成時獲得通知。 ChannelPromiseChannelFuture的一個子類,其定義了一些可寫的方法,如setSuccess()和setFailure(), 從而使ChannelFuture不可變.

1.5 ChannelHandler適配器

ChannelHandlerAdapter顧名思義,就是handler的適配器.你須要知道什麼是適配器模式,假設有一個A接口,咱們須要A的subclass實現功能,可是B類中正好有咱們須要的功能,不想複製粘貼B中的方法和屬性了,那麼能夠寫一個適配器類Adpter繼承B實現A,這樣一來Adpter是A的子類而且能直接使用B中的方法,這種模式就是適配器模式.

就好比Netty中的SslHandler類,想使用ByteToMessageDecoder中的方法進行解碼,可是必須是ChannelHandler子類對象才能加入到ChannelPipeline中,經過以下簽名和其實現細節(SslHandler實現細節就不貼了)就可以做爲一個Handler去處理消息了.

public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler

下圖是ChannelHandler和Adpter的UML圖示.

ChannelHandlerAdapter提供了一些實用方法isSharable()若是其對應的實現被標註爲 Sharable, 那麼這個方法將返回 true, 表示它能夠被添加到多個 ChannelPipeline中 .

若是想在本身的ChannelHandler中使用這些適配器類,只須要擴展他們,重寫那些想要自定義的方法便可.

1.6 資源管理

在使用ChannelInboundHandler.channelRead()ChannelOutboundHandler.write()方法處理數據時要避免資源泄露,ByteBuf那篇文章提到過引用計數,當使用完某個ByteBuf以後記得調整引用計數.

Netty提供了一個class ResourceLeakDetector來幫助診斷資源泄露,這可以幫助你判斷應用的運行狀況,可是若是但願提升吞吐量(好比搞一些競賽),關閉內存診斷能夠提升吞吐量.

泄露檢測級別能夠經過將下面的 Java 系統屬性設置爲表中的一個值來定義:
java -Dio.netty.leakDetectionLevel=ADVANCED

若是帶着該 JVM 選項從新啓動你的應用程序,你將看到本身的應用程序最近被泄漏的緩衝
區被訪問的位置。下面是一個典型的由單元測試產生的泄漏報告:

Running io.netty.handler.codec.xml.XmlFrameDecoderTest
15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK:
ByteBuf.release() was not called before it's garbage-collected.
Recent access records: 1
#1: io.netty.buffer.AdvancedLeakAwareByteBuf.toString(
AdvancedLeakAwareByteBuf.java:697)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(
XmlFrameDecoderTest.java:157)
io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(
XmlFrameDecoderTest.java:133)
...

應用程序處理消息釋放資源

消費入站消息釋放資源

@Sharable
public class DiscardInboundHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ReferenceCountUtil.release(msg);// 用於釋放資源的工具類
    }
}

SimpleChannelInboundHandler中的channelRead0()會消費消息以後自動釋放資源.

出站釋放資源

@Sharable
public class DiscardOutboundHandler
                        extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx,
        Object msg, ChannelPromise promise) {
        // 仍是經過util工具類釋放資源
        ReferenceCountUtil.release(msg);
        // 通知ChannelPromise,消息已經處理
        promise.setSuccess();
    }
}

重要的是, 不只要釋放資源,還要通知 ChannelPromise。不然可能會出現 ChannelFutureListener 收不到某個消息已經被處理了的通知的狀況。總之,若是一個消息被消費或者丟棄了, 而且沒有傳遞給 ChannelPipeline 中的下一個ChannelOutboundHandler, 那麼用戶就有責任調用ReferenceCountUtil.release()。若是消息到達了實際的傳輸層, 那麼當它被寫入時或者 Channel 關閉時,都將被自動釋放。

2 ChannelPipelin接口

Channel和ChannelPipeline

每個新建立的 Channel 都將會被分配一個新的 ChannelPipeline。這項關聯是永久性的; Channel 既不能附加另一個 ChannelPipeline,也不能分離其當前的。在 Netty 組件的生命週期中,這是一項固定的操做,不須要開發人員的任何干預。

ChannelHandler和ChannelHandlerContext

根據事件的起源,事件將會被 ChannelInboundHandler 或者 ChannelOutboundHandler 處理。隨後, 經過調用 ChannelHandlerContext 實現,它將被轉發給同一超類型的下一個ChannelHandler。

ChannelHandlerContext使得ChannelHandler可以和它的ChannelPipeline以及其餘的ChannelHandler 交 互 。 ChannelHandler 可 以 通 知 其 所 屬 的 ChannelPipeline 中 的 下 一 個ChannelHandler,甚至能夠動態修改它所屬的ChannelPipeline.

ChannelPipelin和ChannelHandler

這是一個同時具備入站和出站 ChannelHandler 的 ChannelPipeline 的佈局,而且印證了咱們以前的關於 ChannelPipeline 主要由一系列的 ChannelHandler 所組成的說法。 ChannelPipeline 還提供了經過 ChannelPipeline 自己傳播事件的方法。若是一個入站事件被觸發,它將被從 ChannelPipeline 的頭部開始一直被傳播到 Channel Pipeline 的尾端。

你可能會說, 從事件途經 ChannelPipeline 的角度來看, ChannelPipeline 的頭部和尾端取決於該事件是入站的仍是出站的。然而 Netty 老是將 ChannelPipeline 的入站口(圖 的左側)做爲頭部,而將出站口(該圖的右側)做爲尾端。
當你完成了經過調用 ChannelPipeline.add*()方法將入站處理器( ChannelInboundHandler)和 出 站 處 理 器 ( ChannelOutboundHandler ) 混 合 添 加 到 ChannelPipeline 之 後 , 每 一 個ChannelHandler 從頭部到尾端的順序位置正如同咱們方纔所定義它們的同樣。所以,若是你將圖 6-3 中的處理器( ChannelHandler)從左到右進行編號,那麼第一個被入站事件看到的 ChannelHandler 將是1,而第一個被出站事件看到的 ChannelHandler 將是 5。

在 ChannelPipeline 傳播事件時,它會測試 ChannelPipeline 中的下一個 ChannelHandler 的類型是否和事件的運動方向相匹配。若是不匹配, ChannelPipeline 將跳過該ChannelHandler 並前進到下一個,直到它找到和該事件所指望的方向相匹配的爲止。 (固然, ChannelHandler 也能夠同時實現ChannelInboundHandler 接口和 ChannelOutboundHandler 接口。)

2.1 修改ChannelPipeline

修改指的是添加或刪除ChannelHandler

代碼示例

ChannelPipeline pipeline = ..;
FirstHandler firstHandler = new FirstHandler();
// 先添加一個Handler到ChannelPipeline中
pipeline.addLast("handler1", firstHandler);
// 這個Handler放在了first,意味着放在了handler1以前
pipeline.addFirst("handler2", new SecondHandler());
// 這個Handler被放到了last,意味着在handler1以後
pipeline.addLast("handler3", new ThirdHandler());
...
// 經過名稱刪除
pipeline.remove("handler3");
// 經過對象刪除
pipeline.remove(firstHandler);
// 名稱"handler2"替換成名稱"handler4",並切handler2的實例替換成了handler4的實例
pipeline.replace("handler2", "handler4", new ForthHandler());

這種方式很是靈活,按照須要更換或插入handler達到咱們想要的效果.

ChannelHandler的執行和阻塞

一般 ChannelPipeline 中的每個 ChannelHandler 都是經過它的 EventLoop( I/O 線程)來處理傳遞給它的事件的。因此相當重要的是不要阻塞這個線程,由於這會對總體的 I/O 處理產生負面的影響。

但有時可能須要與那些使用阻塞 API 的遺留代碼進行交互。對於這種狀況, ChannelPipeline 有一些接受一個 EventExecutorGroup 的 add()方法。若是一個事件被傳遞給一個自定義的 EventExecutorGroup ,它將被包含在這個 EventExecutorGroup 中的某個 EventExecutor 所處理,從而被從該Channel 自己的 EventLoop 中移除。對於這種用例, Netty 提供了一個叫 DefaultEventExecutorGroup 的默認實現。

pipeline對handler的操做

2.2 ChannelPipeline的出入站api

入站

出站

  • ChannelPipeline 保存了與 Channel 相關聯的 ChannelHandler
  • ChannelPipeline 能夠根據須要,經過添加或者刪除 ChannelHandler 來動態地修改
  • ChannelPipeline 有着豐富的 API 用以被調用,以響應入站和出站事件

3 ChannelHandlerContext接口

每當有ChannelHandler添加到ChannelPipeline中,都會建立ChannelHandlerContext.若是調用ChannelChannelPipeline上的方法,會沿着整個ChannelPipeline傳播,若是調用ChannelHandlerContext上的相同方法,則會從對應的當前ChannelHandler進行傳播.

API

  • ChannelHandlerContextChannelHandler之間的關聯(綁定)是永遠不會改變的,因此緩存對它的引用是安全的;
  • 如同咱們在本節開頭所解釋的同樣,相對於其餘類的同名方法,ChannelHandlerContext的方法將產生更短的事件流, 應該儘量地利用這個特性來得到最大的性能。

3.1 使用CHannelHandlerContext

從ChannelHandlerContext訪問channel

ChannelHandlerContext ctx = ..;
// 獲取channel引用
Channel channel = ctx.channel();
// 經過channel寫入緩衝區
channel.write(Unpooled.copiedBuffer("Netty in Action",
CharsetUtil.UTF_8));

從ChannelHandlerContext訪問ChannelPipeline

ChannelHandlerContext ctx = ..;
// 獲取ChannelHandlerContext
ChannelPipeline pipeline = ctx.pipeline();
// 經過ChannelPipeline寫入緩衝區
pipeline.write(Unpooled.copiedBuffer("Netty in Action",
CharsetUtil.UTF_8));

有時候咱們不想從頭傳遞數據,想跳過幾個handler,從某個handler開始傳遞數據.咱們必須獲取目標handler以前的handler關聯的ChannelHandlerContext.

ChannelHandlerContext ctx = ..;
// 直接經過ChannelHandlerContext寫數據,發送到下一個handler
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));

好了,ChannelHandlerContext的基本使用應該掌握了,可是你真的理解ChannelHandlerContext,ChannelPipeline和Channelhandler之間的關係了嗎.咱們老看一下Netty的源碼.

先看一下AbstractChannelHandlerContext類,這個類像不像雙向鏈表中的一個Node,

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {
        ...
        volatile AbstractChannelHandlerContext next;
        volatile AbstractChannelHandlerContext prev;
        ...
        }

再來看一看DefaultChannelPipeline,ChannelPipeline中擁有ChannelHandlerContext這個節點的head和tail,

並且DefaultChannelPipeline類中並無ChannelHandler成員或handler數組.

public class DefaultChannelPipeline implements ChannelPipeline {
    ...
        
    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    ...

因此addFirst向pipeline中添加了handler到底添加到哪了呢.看一下pipeline中的addFirst方法

@Override
    public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
        return addFirst(null, name, handler);
    }

    @Override
    public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // 檢查handler是否具備複用能力,不重要
            checkMultiplicity(handler);
            // 名稱,不重要.
            name = filterName(name, handler);
// 這個方法建立了DefaultChannelHandlerContext,handler是其一個成員屬性
// 你如今應該明白了上面說的添加handler會建立handlerContext了吧
            newCtx = newContext(group, name, handler);
// 這個方法
            addFirst0(newCtx);
// 這個方法是調整pipeline中HandlerContext的指針,
// 就是更新HandlerContext鏈表節點之間的位置
private void addFirst0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext nextCtx = head.next;
        newCtx.prev = head;
        newCtx.next = nextCtx;
        head.next = newCtx;
        nextCtx.prev = newCtx;
    }

簡單總結一下,pipeline擁有context(自己像一個鏈表的節點)組成的節點的雙向鏈表首尾,能夠看作pipeline擁有一個context鏈表,context擁有成員handler,這即是三者之間的關係.實際上,handler做爲消息處理的主要組件,實現了和pipeline的解耦,咱們能夠只有一個handler,可是被封裝進不一樣的context可以被不一樣的pipeline使用.

3.2 handler和context高級用法

緩存ChannelHandlerContext引用

@Sharable
public class WriteHandler extends ChannelHandlerAdapter {
    private ChannelHandlerContext ctx;
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        this.ctx = ctx;
    }
    public void send(String msg) {
        ctx.writeAndFlush(msg);
    }
}

由於一個 ChannelHandler 能夠從屬於多個 ChannelPipeline,因此它也能夠綁定到多個 ChannelHandlerContext 實例。 對於這種用法指在多個ChannelPipeline 中共享同一個 ChannelHandler, 對應的 ChannelHandler 必需要使用@Sharable 註解標註; 不然,試圖將它添加到多個 ChannelPipeline 時將會觸發異常。

@Sharable錯誤用法

@Sharable
public class UnsharableHandler extends ChannelInboundHandlerAdapter {
    private int count;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        count++;
        System.out.println("channelRead(...) called the "
            + count + " time");
        ctx.fireChannelRead(msg);
    }
}

這段代碼的問題在於它擁有狀態 , 即用於跟蹤方法調用次數的實例變量count。將這個類的一個實例添加到ChannelPipeline將極有可能在它被多個併發的Channel訪問時致使問題。(固然,這個簡單的問題能夠經過使channelRead()方法變爲同步方法來修正。)

總之,只應該在肯定了你的 ChannelHandler 是線程安全的時才使用@Sharable 註解。

4.1 入站異常處理

處理入站事件的過程當中有異常被拋出,那麼它將從它在ChannelInboundHandler裏被觸發的那一點開始流經 ChannelPipeline。要想處理這種類型的入站異常,你須要在你的 ChannelInboundHandler 實現中重寫下面的方法。

public void exceptionCaught(
ChannelHandlerContext ctx, Throwable cause) throws Exception 
// 基本處理方式
public class InboundExceptionHandler extends        ChannelInboundHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
                                    Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

由於異常將會繼續按照入站方向流動(就像全部的入站事件同樣), 因此實現了前面所示邏輯的 ChannelInboundHandler 一般位於 ChannelPipeline 的最後。這確保了全部的入站異常都老是會被處理,不管它們可能會發生在ChannelPipeline 中的什麼位置。

  • ChannelHandler.exceptionCaught()的默認實現是簡單地將當前異常轉發給ChannelPipeline 中的下一個 ChannelHandler;

  • 若是異常到達了 ChannelPipeline 的尾端,它將會被記錄爲未被處理;
  • 要想定義自定義的處理邏輯,你須要重寫 exceptionCaught()方法。而後你須要決定是否須要將該異常傳播出去。

4.2 出站異常處理

  • 每一個出站操做都將返回一個 ChannelFuture。 註冊到 ChannelFuture 的 ChannelFutureListener 將在操做完成時被通知該操做是成功了仍是出錯了。
  • 幾乎全部的 ChannelOutboundHandler 上的方法都會傳入一個 ChannelPromise
    的實例。做爲 ChannelFuture 的子類, ChannelPromise 也能夠被分配用於異步通
    知的監聽器。可是, ChannelPromise 還具備提供當即通知的可寫方法:
ChannelPromise setSuccess();
ChannelPromise setFailure(Throwable cause);

1.添加ChannelFutureListener到ChannelFuture

ChannelFuture future = channel.write(someMessage);
    future.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture f) {
            if (!f.isSuccess()) {
                f.cause().printStackTrace();
                f.channel().close();
            }
         }
    });

2.添加ChannelFutureListener到ChannelPromise

public class OutboundExceptionHandler extends           ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg,
        ChannelPromise promise) {
            promise.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) {
                    if (!f.isSuccess()) {
                        f.cause().printStackTrace();
                        f.channel().close();
                    }
                }
        });
    }
}
相關文章
相關標籤/搜索