【Netty】ChannelHandler和codec

1、前言java

  前面學習了Netty的codec框架,下面接着學習ChannelHandler與codec之間的關聯。web

2、ChannelHandler和codec數據庫

  Netty爲不一樣的協議提供了處理器和編解碼器,能夠開箱即用,這些工具支持SSL / TLS和WebSocket,以及經過數據壓縮使得HTTP有更好的性能。緩存

  2.1 使用SSL/TLS保護Netty應用程序安全

  因爲數據隱私很是重要,而SSL和TLS等加密協議用於處理數據隱私,這些協議在其餘協議之上以實現數據安全性。安全網站會使用這種協議,同時,不基於HTTP的應用程序,如安全SMTP(SMTPS)郵件服務甚相當係數據庫系統也會用到這些協議。
服務器

  爲支持SSL/TLS協議,Java提供了javax.net.ssl包,其SSLContext和SSLEngine類使得實現解密和加密變得至關直接容易。而Netty使用SslHandler的ChannelHandler實現,其內部是使用SSLEngine來進行實際工做。websocket

  下圖展現了經過SslHandler的數據流。
網絡

  

  下面代碼展現如何經過ChannelInitializer將SslHandler添加至ChannelPipeline中。 框架

public class SslChannelInitializer extends ChannelInitializer<Channel>{
    private final SslContext context;
    private final boolean startTls;
    
    public SslChannelInitializer(SslContext context,
        boolean startTls) {
        this.context = context;
        this.startTls = startTls;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        SSLEngine engine = context.newEngine(ch.alloc());
        ch.pipeline().addFirst("ssl",
          new SslHandler(engine, startTls));
    }
}

  在大多數狀況下,SslHandler都是ChannelPipeline中的第一個ChannelHandler,只有當全部其餘ChannelHandler將其邏輯應用於數據以後,才能進行加密操做。異步

  2.2 構建Netty的HTTP/HTTPS應用程序

  HTTP/HTTPS是最多見的協議套件之一,許多Web Service API都是基於HTTP/HTTPS的。

  1. HTTP的decoder、encoder、codec

  HTTP基於請求/響應模式:客戶端發送HTTP請求至服務端,服務端響應HTTP請求至客戶端。Netty提供各類編碼器和解碼器以簡化使用此協議。

  下圖展現了HTTP請求的組成部分。

  

  而下圖展現了HTTP響應的組成部分。

  

  下面代碼的HttpPipelineInitializer能夠爲應用提供HTTP支持,僅僅只須要往ChannelPipeline中添加正確的ChannelHandler便可。 

public class HttpPipelineInitializer extends ChannelInitializer<Channel> {
    private final boolean client;
    public HttpPipelineInitializer(boolean client) {
        this.client = client;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (client) {
            pipeline.addLast("decoder", new HttpResponseDecoder());
            pipeline.addLast("encoder", new HttpRequestEncoder());
        } else {
            pipeline.addLast("decoder", new HttpRequestDecoder());
            pipeline.addLast("encoder", new HttpResponseEncoder());
        }
    }
}

  2. HTTP消息聚合

  當管道中的處理器被初始化後,就能夠處理不一樣的消息,但因爲HTTP請求和響應由多部分組成,須要將這些部分匯聚成完整的結果,Netty提供了一個合成器,其能夠消息的不一樣部分組合成FullHttpRequest和FullHttpResponse。因爲消息在完整以前須要被緩存,所以其會帶來小小的開銷,可是不用擔憂消息的碎片化處理。

  下面代碼展現瞭如何自動化裝配消息。

public class HttpAggregatorInitializer extends ChannelInitializer<Channel> {
    private final boolean isClient;
    public HttpAggregatorInitializer(boolean isClient) {
        this.isClient = isClient;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (isClient) {
            pipeline.addLast("codec", new HttpClientCodec());
        } else {
            pipeline.addLast("codec", new HttpServerCodec());
        }
        pipeline.addLast("aggregator",
            new HttpObjectAggregator(512 * 1024));
    }
}

  3. HTTP壓縮

  使用HTTP時,建議採用壓縮來儘量減小傳輸數據的大小。Netty爲壓縮和解壓縮提供了ChannelHandler的實現,其支持gzip和deflate編碼。

  以下代碼展現瞭如何自動壓縮HTTP消息。  

public class HttpCompressionInitializer extends ChannelInitializer<Channel> {
    private final boolean isClient;
    public HttpCompressionInitializer(boolean isClient) {
        this.isClient = isClient;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (isClient) {
            pipeline.addLast("codec", new HttpClientCodec());
            pipeline.addLast("decompressor",
            new HttpContentDecompressor());
        } else {
            pipeline.addLast("codec", new HttpServerCodec());
            pipeline.addLast("compressor",
                new HttpContentCompressor());
        }
    }
}

  4. 使用HTTPS

  當混合了SslHandler時就可使用HTTPS,代碼以下。  

public class HttpsCodecInitializer extends ChannelInitializer<Channel> {
    private final SslContext context;
    private final boolean isClient;
    public HttpsCodecInitializer(SslContext context, boolean isClient) {
        this.context = context;
        this.isClient = isClient;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        SSLEngine engine = context.newEngine(ch.alloc());
        pipeline.addFirst("ssl", new SslHandler(engine));
        if (isClient) {
            pipeline.addLast("codec", new HttpClientCodec());
        } else {
            pipeline.addLast("codec", new HttpServerCodec());
        }
    }
}

  5. WebSocket

  Netty爲基於HTTP的應用程序提供了支持,WebSocket解決了長期存在的問題:若是底層使用HTTP協議,是一系列請求-響應交互,那麼如何實時發佈信息。AJAX提供了一些改進,可是數據流的驅動仍然來自客戶端的請求。而WebSocket規範及其實現表明了一種更有效的解決方案,其爲雙向流量提供了一個單一的TCP鏈接,其在客戶端和服務器之間提供真正的雙向數據交換,而且能夠處理任何類型的數據。

  下圖展現了WebSocket協議,開始數據通訊爲純HTTP,以後升級到雙向WebSocket。

  

  爲了應用中添加WebSocket支持,能夠在管道中添加WebSocketFrames,其包含以下類型。

  

  下面代碼展現瞭如何使用WebSocketServerProtocolHandler。  

public class WebSocketServerInitializer extends ChannelInitializer<Channel>{
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(
            new HttpServerCodec(),
            new HttpObjectAggregator(65536),
            new WebSocketServerProtocolHandler("/websocket"),
            new TextFrameHandler(),
            new BinaryFrameHandler(),
            new ContinuationFrameHandler());
    }
    public static final class TextFrameHandler extends
        SimpleChannelInboundHandler<TextWebSocketFrame> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx,
            TextWebSocketFrame msg) throws Exception {
            // Handle text frame
        }
}

public static final class BinaryFrameHandler extends
    SimpleChannelInboundHandler<BinaryWebSocketFrame> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx,
        BinaryWebSocketFrame msg) throws Exception {
        // Handle binary frame
    }
}

public static final class ContinuationFrameHandler extends
    SimpleChannelInboundHandler<ContinuationWebSocketFrame> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx,
        ContinuationWebSocketFrame msg) throws Exception {
            // Handle continuation frame
        }
    }
}

  而爲了保證WebSocket的安全性,能夠將SslHandler做爲管道中的第一個處理器。

  2.3 空閒鏈接和超時

  檢測空閒鏈接和超時對於及時釋放資源相當重要,Netty提供了幾個處理器進行處理,如IdleStateHandler、ReadTimeoutHandler、WriteTimeoutHandler等。

  首先看看IdleStateHandler的緣由,若是60S內沒有接受或發送數據,如何接收通知,可使用向遠程對等體發送心跳消息來獲取通知; 若是沒有響應,則鏈接被關閉。

public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(
            new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));
        pipeline.addLast(new HeartbeatHandler());
    }
    public static final class HeartbeatHandler
        extends ChannelStateHandlerAdapter {
        private static final ByteBuf HEARTBEAT_SEQUENCE =
            Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(
                "HEARTBEAT", CharsetUtil.ISO_8859_1));
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx,
            Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
                .addListener(
                    ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }
}

  2.4 解碼限定的和基於長度的協議

  1. 限定的協議

  限定消息協議使用指定的字符來標記稱爲幀的消息或消息段的開始或結束位置,如如SMTP,POP3,IMAP和Telnet均使用該協議。以下的兩個解碼器用於處理限定的和基於長度的協議。

  

  下圖顯示瞭如何以行尾序列\ r \ n(回車+換行)分隔符來處理幀。

  

  以下代碼展現瞭如何使用LineBasedFrameDecoder。 

public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new LineBasedFrameDecoder(64 * 1024));
        pipeline.addLast(new FrameHandler());
    }
    public static final class FrameHandler
        extends SimpleChannelInboundHandler<ByteBuf> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx,
            ByteBuf msg) throws Exception {
            // Do something with the data extracted from the frame
        }
    }
}

  DelimiterBasedFrameDecoder類能夠自定義字符來處理數據。

  2. 基於長度的協議

  基於長度的協議經過在幀的頭部對其長度進行編碼來定義幀,而不是經過用特殊的分隔符標記其結尾,以下兩個類用於處理該協議。

  

  下圖展現了FixedLengthFrameDecoder的操做,每幀由8個字節構成。

  

  對於可變長度的幀,可使用LengthFieldBasedFrameDecoder進行處理,其從頭部字段肯定幀長度,並從數據流中提取指定的字節數。

  下圖展現了長度字段在頭部的偏移量0而且具備2字節。

  

  2.5 寫大數據

  因爲網絡緣由,異步框架中的寫入大量數據將存在問題,由於寫入操做是非阻塞的,在寫入完成時或者全部的數據都沒有被寫出就會通知ChannelFuture,此時,若不中止寫入數據,可能會形成內存溢出,所以,在寫入大量數據時,須要處理與遠程對等體鏈接緩慢可能致使內存釋放延遲的狀況。FileRegion支持零拷貝文件傳輸的通道。

  下面是FileRegion的使用示例。  

FileInputStream in = new FileInputStream(file);
FileRegion region = new DefaultFileRegion(
    in.getChannel(), 0, file.length());
channel.writeAndFlush(region).addListener(
    new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future)
        throws Exception {
        if (!future.isSuccess()) {
            Throwable cause = future.cause();
            // Do something
        }
    }
});

  此示例僅適用於文件內容的直接傳輸,不包括應用程序對數據的任何處理,若須要將數據從文件系統複製到用戶內存,可使用ChunkedWriteHandler,它支持異步編寫大型數據流,而不會致使高內存消耗。

  其核心是ChunkedInput<B>接口,B代碼readChunk的返回類型,對於ChunkedInput<B>,有以下的四種實現。

  

  以下是ChunkedStream的使用示例。  

public class ChunkedWriteHandlerInitializer
    extends ChannelInitializer<Channel> {
    private final File file;
    private final SslContext sslCtx;
    
    public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {
        this.file = file;
        this.sslCtx = sslCtx;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new SslHandler(sslCtx.createEngine());
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new WriteStreamHandler());
    }
    
    public final class WriteStreamHandler
        extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx)
            throws Exception {
            super.channelActive(ctx);
            ctx.writeAndFlush(
            new ChunkedStream(new FileInputStream(file)));
        }
    }
}

  2.6 序列化數據

  JDK提供了ObjectOutputStream和ObjectInputStream對POJO的原始數據類型和圖形進行序列化和反序列化,但其並不高效。

  1. JDK序列化

  當應用程序必須與使用ObjectOutputStream和ObjectInputStream與對等體進行交互時,首先應該考慮兼容性,此時JDK的序列化是正確的選擇。下圖展現了Netty與JDK進行互操做的序列化類。

  

  2. JBOSS序列化

  JBOSS比JDK序列化快三倍,而且更爲緊湊,Netty使用以下兩個編解碼器來支持JBOSS。

  

  以下代碼展現瞭如何使用JBOSS進行序列化操做。 

public class MarshallingInitializer extends ChannelInitializer<Channel> {
    private final MarshallerProvider marshallerProvider;
    private final UnmarshallerProvider unmarshallerProvider;
    public MarshallingInitializer(
        UnmarshallerProvider unmarshallerProvider,
        MarshallerProvider marshallerProvider) {
        this.marshallerProvider = marshallerProvider;
        this.unmarshallerProvider = unmarshallerProvider;
    }
    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        pipeline.addLast(new MarshallingDecoder(unmarshallerProvider));
        pipeline.addLast(new MarshallingEncoder(marshallerProvider));
        pipeline.addLast(new ObjectHandler());
    }
    public static final class ObjectHandler
        extends SimpleChannelInboundHandler<Serializable> {
        @Override
        public void channelRead0(
            ChannelHandlerContext channelHandlerContext,
            Serializable serializable) throws Exception {
            // Do something
        }
    }
}

  3. Protobuf序列化

  Netty還可以使用協議緩衝區進行序列化操做,其由谷歌開發,協議緩衝區以緊湊和高效的方式對結構化數據進行編碼和解碼,下圖顯示了Netty爲支持protobuf而實現的ChannelHandler。

  

  下面示例展現瞭如何使用protobuf。 

public class ProtoBufInitializer extends ChannelInitializer<Channel> {
    private final MessageLite lite;
    public ProtoBufInitializer(MessageLite lite) {
        this.lite = lite;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        pipeline.addLast(new ProtobufEncoder());
        pipeline.addLast(new ProtobufDecoder(lite));
        pipeline.addLast(new ObjectHandler());
    }
    public static final class ObjectHandler
        extends SimpleChannelInboundHandler<Object> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx, Object msg)
            throws Exception {
            // Do something with the object
        }
    }
}

3、總結

  本篇博文講解了如何保證Netty應用程序的安全性,以及如何處理不一樣協議的數據,以及編解碼器和ChannelHandler之間的關係,以及如何把編解碼器添加至管道中進行處理。也謝謝各位園友的觀看~

相關文章
相關標籤/搜索