Netty 框架學習 —— 編解碼器框架


編解碼器

每一個網絡應用程序都必須定義如何解析在兩個節點之間來回傳輸的原始字節,以及如何將其和目標應用程序的數據格式作相互轉換。這種轉換邏輯由編解碼器處理,編解碼器由編碼器和解碼器組成,它們每種均可以將字節流從一種格式轉換爲另外一種格式java

  • 編碼器將消息轉換爲適合於傳輸的格式(最有可能的就是字節流)
  • 解碼器則是將 網絡字節流轉換回應用程序的消息格式

所以,編碼器操做出站數據,而解碼器處理入站數據web

1. 解碼器

在這一節,咱們將研究 Netty 所提供的解碼器類,並提供關於什麼時候以及如何使用它們的具體示例,這些類覆蓋了兩個不一樣的用例:瀏覽器

  • 將字節解碼爲消息 —— ByteToMessageDecoder 和 ReplayingDecoder
  • 將一種消息類型解碼爲另外一種 —— MessageToMessageDecoder

何時會用到解碼器呢?很簡單,每當須要爲 ChannelPipeline 中的下一個 ChannelInboundHandler 轉換入站數據時會用到。此外,得益於 ChannelPipeline 的設計,能夠將多個解碼器連接在一塊兒,以實現任意複雜的轉換邏輯服務器

1.1 抽象類 ByteToMessageDecoder

將字節解碼爲消息是一項常見的任務,Netty 它提供了一個 抽象基類 ByteToMessageDecoder,這個類會對入站數據進行緩衝,直到它準備好處理websocket

下面舉一個如何使用這個類的示例,假設你接收了一個包含簡單 int 的字節流,每一個 int 都須要被單獨處理。在這種狀況下,你須要從入站 ByteBuf 中讀取每一個 int,並將它傳遞給 ChannelPipeline 中的下一個 ChannelInboundHandler。爲了解碼這個字節流,你要擴展 ByteToMessageDecoder 類(須要注意的是,原子類型的 int 在被添加到 List 中時,會被自動裝箱爲 Integer)網絡

// 擴展 ByteToMessageDecoder,以將字節解碼爲特定的格式
public class ToIntegerDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        //檢查是否至少有 4 字節可讀(1 個int的字節長度)
        if (in.readableBytes() >= 4) {
            //從入站 ByteBuf 中讀取一個 int,並將其添加到解碼消息的 List 中
            out.add(in.readInt());
        }
    }
}

雖然 ByteToMessageDecoder 使得能夠很簡單地實現這種模式,可是你可能會發現,在調用 readInt()方法前不得不驗證所輸入的 ByteBuf 是否具備足夠的數據有點繁瑣。下面說的 ReplayingDecoder,它是一個特殊的解碼器,以少許的開銷消除了這個步驟框架

1.2 抽象類 ReplayingDecoder

ReplayingDecoder 擴展了 ByteToMessageDecoder 類,使得咱們沒必要調用 readableBytes() 方法。它經過使用一個自定義的 ByteBuf 實現,ReplayingDecoderByteBuf,包裝傳入的 ByteBuf 實現了這一點,其將在內部執行該調用異步

這個類的完整聲明是:socket

public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder

類型參數 S 指定了用於狀態管理的類型,其中 Void 表明不須要狀態管理。下述代碼展現了基於 ReplayingDecoder 從新實現的 ToIntegerDecoderide

// 擴展ReplayingDecoder<Void> 以將字節解碼爲消息
public class ToIntegerDecoder2 extends ReplayingDecoder<Void> {
    // 傳入的 ByteBuf 是 ReplayingDecoderByteBuf
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 從入站 ByteBuf 中讀取一個 int,並將其添加到解碼消息的 List 中
        out.add(in.readInt());
    }
}

和以前同樣,從 ByteBuf 中提取的int將會被添加到List中。若是沒有足夠的字節可用,這 個 readInt() 方法的實現將會拋出一個 Error,其將在基類中被捕獲並處理。當有更多的數據可供讀取時,該 decode() 方法將會被再次調用

請注意 ReplayingDecoderByteBuf 的下面這些方面:

  • 並非全部的 ByteBuf 操做都被支持,若是調用了一個不被支持的方法,將會拋出一個 UnsupportedOperationException
  • ReplayingDecoder 稍慢於 ByteToMessageDecoder

下面這些類用於處理更加複雜的用例:

  • io.netty.handler.codec.LineBasedFrameDecoder —— 這個類在 Netty 內部也有使用,它使用了行尾控制字符(\n 或者 \r\n)來解析消息數據
  • io.netty.handler.codec.http.HttpObjectDecoder —— HTTP 數據解碼器
1.3 抽象類 MessageToMessageDecoder

在這一節,咱們將解釋如何在兩個消息格式之間進行轉換,例如,從一種 POJO 類型轉換爲另外一種

public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter

參數類型 I 指定了 decode() 方法的輸入參數 msg 的類型,它是你必須實現的惟一方法

咱們將編寫一個 IntegerToStringDecoder 解碼器來擴展 MessageToMessageDecoder,它的 decode() 方法會把 Integer 參數轉換爲 String 表示。和以前同樣,解碼的 String 將被添加到傳出的 List 中,並轉發給下一個 ChannelInboundHandler

public class IntegerToStringDecoder extends MessageToMessageEncoder<Integer> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
        //將 Integer 消息轉換爲它的 String 表示,並將其添加到輸出的 List 中
        out.add(String.valueOf(msg));
    }
}
1.4 TooLongFrameException

因爲 Netty 是一個異步框架,因此須要在字節能夠解碼以前在內存中緩衝它們。所以,不能讓解碼器緩衝大量的數據以致於耗盡可用的內存。爲了解除這個常見的顧慮,Netty 提供了 TooLongFrameException 類,其將由解碼器在幀超出指定的大小限制時拋出

爲了不這種狀況,你能夠設置一個最大字節數的閾值,若是超出該閾值,則會致使拋出一個 TooLongFrameException(隨後會被 ChannelHandler.exceptionCaught() 方法捕獲)。而後,如何處理該異常則徹底取決於該解碼器的用戶。某些協議(如 HTTP)可能容許你返回一個特殊的響應。而在其餘的狀況下,惟一的選擇可能就是關閉對應的鏈接

下面的示例使用 TooLongFrameException 來通知 ChannelPipeline 中的其餘 ChannelHandler 發生了幀大小溢出的。須要注意的是,若是你正在使用一個可變幀大小的協議,那麼這種保護措施將是尤其重要的

public class SafeByteToMessageDecoder extends ByteToMessageDecoder {
    public static final int MAX_FRAME_SIZE = 1024;
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int readable = in.readableBytes();
        // 檢查緩衝區是否有超過 MAX_FRAME_SIZE 個字節
        if (readable > MAX_FRAME_SIZE) {
            // 跳過全部的可讀字節,拋出 TooLongFrameException 並通知 ChannelHandler
            in.skipBytes(readable);
            throw new TooLongFrameException("Frame too big!");
        }
        //do something
    }
}

2. 編碼器

編碼器實現了 ChannelOutboundHandler,並將出站數據從一種格式轉換爲另外一種格式,和咱們方纔學習的解碼器的功能正好相反。Netty 提供了一組類,用於幫助你編寫具備如下功能的編碼器:

  • 將消息編碼爲字節
  • 將消息編碼爲消息
2.1 抽象類 MessageToByteEncoder

前面咱們看到了如何使用 ByteToMessageDecoder 來將字節轉換爲消息,如今咱們使用 MessageToByteEncoder 來作逆向的事情

這個類只有一個方法,而解碼器有兩個。緣由是解碼器一般須要在 Channel 關閉以後產生最後一個消息(所以也就有了 decodeLast() 方法。顯然這不適用於編碼器的場景 —— 在鏈接被關閉以後仍然產生一個消息是毫無心義的

下述代碼展現了 ShortToByteEncoder,其接受一個 Short 類型的實例做爲消息,將它編碼爲Short的原子類型值,並將它寫入 ByteBuf 中,其將隨後被轉發給 ChannelPipeline 中的 下一個 ChannelOutboundHandler。每一個傳出的 Short 值都將會佔用 ByteBuf 中的 2 字節。

public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception {
        // 將 Short 寫入 ByteBuf
        out.writeShort(msg);
    }
}
2.2 抽象類 MessageToMessageEncoder

MessageToMessageEncoder 類的 encode() 方法提供了將入站數據從一個消息格式解碼爲另外一種

下述代碼使用 IntegerToStringEncoder 擴展了 MessageToMessageEncoder,編碼器將每一個出站 Integer 的 String 表示添加到了該 List 中

public class IntegerToStringEncoder extends MessageToMessageEncoder {
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {
        out.add(String.valueOf(msg));
    }
}

抽象的編解碼器類

雖然咱們一直將解碼器和編碼器做爲單獨的實體討論,可是你有時將會發如今同一個類中管理入站和出站數據和消息的轉換是頗有用的。Netty 的抽象編解碼器類正好用於這個目的,由於它們每一個都將捆綁一個解碼器/編碼器對,以處理咱們一直在學習的這兩種類型的操做。正如同你可能已經猜測到的,這些類同時實現了 ChannelInboundHandler 和 ChannelOutboundHandler 接口

爲何咱們並無一直優先於單獨的解碼器和編碼器使用這些複合類呢?由於經過儘量地將這兩種功能分開,最大化了代碼的可重用性和可擴展性,這是 Netty 設計的一個基本原則

1. 抽象類 ByteToMessageCodec

讓咱們來研究這樣的一個場景:咱們須要將字節解碼爲某種形式的消息,多是 POJO,隨後再次對它進行編碼。ByteToMessageCodec 將爲咱們處理好這一切,由於它結合了 ByteToMessageDecoder 以及它的逆向 —— MessageToByteEncoder

任何的請求/響應協議均可以做爲使用 ByteToMessageCodec 的理想選擇。例如,在某個 SMTP 的實現中,編解碼器將讀取傳入字節,並將它們解碼爲一個自定義的消息類型,如 SmtpRequest。而在接收端,當一個響應被建立時,將會產生一個 SmtpResponse,其將被編碼回字節以便進行傳輸

2. 抽象類 MessageToMessageCodec

經過使用 MessageToMessageCodec,咱們能夠在一個單個的類中實現該轉換的往返過程。MessageToMessageCodec 是一個參數化的類,定義以下:

public abstract class MessageToMessageCodec<INBOUND_IN,OUTBOUND_IN>

decode() 方法是將 INBOUND_IN 類型的消息轉換爲 OUTBOUND_IN 類型的消息,而 encode() 方法則進行它的逆向操做。將 INBOUND_IN 類型的消息看做是經過網絡發送的類型, 而將 OUTBOUND_IN 類型的消息看做是應用程序所處理的類型,將可能有所裨益

WebSocket 協議

下面關於 MessageToMessageCodec 的示例引用了一個新出的 WebSocket 協議,這個協議能實現 Web 瀏覽器和服務器之間的全雙向通訊

咱們的 WebSocketConvertHandler 在參數化 MessageToMessageCodec 時將使用 INBOUND_IN 類型的 WebSocketFrame,以及 OUTBOUND_IN 類型的 MyWebSocketFrame,後者是 WebSocketConvertHandler 自己的一個靜態嵌套類

public class WebSocketConvertHandler 
        extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> {


    @Override
    protected void encode(ChannelHandlerContext ctx, MyWebSocketFrame msg, List<Object> out) throws Exception {
        // 實例化一個指定子類型的 WebSocketFrame
        ByteBuf payload = msg.getData().duplicate().retain();
        switch (msg.getType()) {
            case BINARY:
                out.add(new BinaryWebSocketFrame(payload));
                break;
            case TEXT:
                out.add(new TextWebSocketFrame(payload));
                break;
            case CLOSE:
                out.add(new CloseWebSocketFrame(true, 0, payload));
                break;
            case CONTINUATION:
                out.add(new ContinuationWebSocketFrame(payload));
                break;
            case PONG:
                out.add(new PongWebSocketFrame(payload));
                break;
            case PING:
                out.add(new PingWebSocketFrame(payload));
                break;
            default:
                throw new IllegalStateException("Unsupported websocket msg " + msg);
        }
    }

    // 將 WebSocketFrame 解碼爲 MyWebSocketFrame,並設置 FrameType
    @Override
    protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg, List<Object> out) throws Exception {
        ByteBuf paload = msg.content().duplicate().retain();
        if (msg instanceof  BinaryWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY, paload));
        } else
        if (msg instanceof  CloseWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE, paload));
        } else
        if (msg instanceof  PingWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING, paload));
        } else
        if (msg instanceof  PongWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG, paload));
        } else
        if (msg instanceof  TextWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT, paload));
        } else
        if (msg instanceof  ContinuationWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CONTINUATION, paload));
        } else {
            throw new IllegalStateException("Unsupported websocket msg " + msg);
        }
    }

    public static final class MyWebSocketFrame {
        public enum FrameType {
            BINARY,
            CLOSE,
            PING,
            PONG,
            TEXT,
            CONTINUATION
        }
        private final FrameType type;
        private final ByteBuf data;

        public MyWebSocketFrame(FrameType type, ByteBuf data) {
            this.type = type;
            this.data = data;
        }

        public FrameType getType() {
            return type;
        }

        public ByteBuf getData() {
            return data;
        }
    }
}

3. CombinedChannelDuplexHandler 類

正如咱們前面所提到的,結合一個解碼器和編碼器可能會對可重用性形成影響。可是,有一 種方法既可以避免這種懲罰,又不會犧牲將一個解碼器和一個編碼器做爲一個單獨的單元部署所 帶來的便利性。CombinedChannelDuplexHandler 提供了這個解決方案,其聲明爲:

public class CombinedChannelDuplexHandler
	<I extends ChannelInboundHandler, O extends ChannelOutboundHandler>

這個類充當了 ChannelInboundHandler 和 ChannelOutboundHandler(該類的類型參數 I 和 O)的容器。經過提供分別繼承瞭解碼器類和編碼器類的類型,咱們能夠實現一個編解碼器,而又沒必要直接擴展抽象的編解碼器類

首先,讓咱們研究下述代碼,該實現擴展了 ByteToMessageDecoder,由於它要從 ByteBuf 讀取字符

public class ByteToCharDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        while (in.readableBytes() >= 2) {
            out.add(in.readChar());
        }
    }
}

這裏的 decode() 方法一次將從 ByteBuf 中提取 2 字節,並將它們做爲 char 寫入到 List 中,其將會被自動裝箱爲 Character 對象

下述代碼將 Character 轉換回字節。這個類擴展了 MessageToByteEncoder,由於它須要將 char 消息編碼到 ByteBuf 中。這是經過直接寫入 ByteBuf 作到的

public class CharToByteEncoder extends MessageToByteEncoder<Character> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out) throws Exception {
        out.writeChar(msg);
    }
}

既然咱們有了解碼器和編碼器,咱們能夠結合它們來構建一個編解碼器

// 經過該解碼器和編碼器實現參數化CombinedByteCharCodec
public class CombinedChannelDuplexHandler extends
        io.netty.channel.CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {
    public CombinedChannelDuplexHandler() {
        // 將委託實例傳遞給父類
        super(new ByteToCharDecoder(), new CharToByteEncoder());
    }
}
相關文章
相關標籤/搜索